Skip to content

feat: add custom exex for block pushing#186

Draft
tac0turtle wants to merge 7 commits intomainfrom
marko/ex-ex
Draft

feat: add custom exex for block pushing#186
tac0turtle wants to merge 7 commits intomainfrom
marko/ex-ex

Conversation

@tac0turtle
Copy link
Contributor

Description

This pr adds a push based streaming service in order to better integrate into atlas allowing blocks to be pushed instead of pulled via rpc

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Performance improvement
  • Refactoring

Related Issues

Fixes #(issue)

Checklist

  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged and published

Testing

Additional Notes

@coderabbitai
Copy link

coderabbitai bot commented Mar 21, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ec254a6a-bf79-4aa3-9d95-9ff83825db25

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch marko/ex-ex

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link

claude bot commented Mar 21, 2026

Claude finished @tac0turtle's task —— View job


Code Review Complete

I've conducted a thorough review of this PR that adds a custom ExEx for block pushing integration with Atlas. Here's my comprehensive feedback:

🟢 Strengths

Clean Architecture & Design

  • Excellent separation of concerns: The exex-remote crate provides a clean wire contract that's decoupled from ev-reth's internal types
  • Versioned schema: Smart use of schema versioning (REMOTE_EXEX_SCHEMA_VERSION_V1) for future compatibility
  • Stable API: The wire module provides a stable re-export interface for consumers

Robust Implementation

  • Comprehensive type coverage: RemoteTransactionV1, RemoteBlockV1, and notification variants capture all necessary execution data
  • EV-specific features: Proper handling of fee_payer, batch calls, and raw_2718 transaction bytes
  • Good error handling: Well-structured CodecError with appropriate error variants

Strong Testing

  • Excellent codec tests (crates/exex-remote/src/codec.rs:58-168): Comprehensive roundtrip tests for all notification variants
  • CLI argument validation (crates/node/src/args.rs:43-68): Proper testing of remote ExEx configuration

🟡 Areas for Improvement

Code Quality Issues

  1. Large function in exex.rs (crates/node/src/exex.rs:100+): The main conversion function appears quite long and could benefit from being broken into smaller, more focused functions

  2. Missing error propagation validation:

// In crates/node/src/exex.rs:74-92
tokio::spawn(async move {
    loop {
        match notifications.recv().await {
            // Error handling looks correct but could use more explicit logging
            Err(broadcast::error::RecvError::Lagged(skipped)) => {
                let status = Status::resource_exhausted(format!(
                    "remote exex subscriber lagged by {skipped} messages"
                ));
                let _ = tx.send(Err(status)).await; // Consider logging this failure
            }
        }
    }
});
  1. Magic constants: MAX_GRPC_MESSAGE_SIZE = 64 * 1024 * 1024 should be configurable or at least documented why 64MB was chosen

Performance Considerations

  1. Broadcast buffer management: The bounded broadcast buffer (default 1024) may be too small for high-throughput scenarios. Consider making this configurable based on expected block rate and subscriber count.

  2. Memory efficiency: The wire types clone entire transaction and receipt vectors. For large blocks, this could be memory-intensive.

Security & Robustness

  1. gRPC security: No authentication/authorization on the gRPC endpoint. Consider adding options for:

    • TLS encryption
    • Authentication tokens
    • IP allowlisting
  2. Resource limits: While there's a message size limit, there are no limits on:

    • Number of concurrent subscribers
    • Rate limiting per subscriber
    • Maximum connection duration

Documentation & Testing

  1. Missing integration tests: No end-to-end tests verifying the gRPC server and client integration
  2. Example improvements: The examples are good but could benefit from error handling demonstrations
  3. Performance benchmarks: No performance testing for large block scenarios

🔴 Critical Issues

Potential Data Race

In crates/node/src/exex.rs:84-86, if the broadcast sender is dropped while subscribers are active, they'll receive RecvError::Closed but the error handling may not be sufficient:

Err(broadcast::error::RecvError::Closed) => break, // Should this log the disconnection?

Type Safety Concerns

RemoteBlockV1::new() uses debug assertions for transaction/receipt length matching (crates/exex-remote/src/types.rs:221-225), but these are only active in debug builds:

debug_assert_eq!(
    transactions.len(),
    receipts.len(),
    "transactions and receipts must have matching lengths"
);

Consider making this a runtime check or using a builder pattern.

🛠️ Recommendations

High Priority

  1. Add runtime validation for transaction/receipt length matching instead of debug-only assertions
  2. Implement proper logging for subscriber disconnections and errors
  3. Add configuration options for security (TLS, auth) and resource limits

Medium Priority

  1. Break down large functions in exex.rs into smaller, testable units
  2. Add integration tests for the full gRPC pipeline
  3. Document performance characteristics and recommended buffer sizes

Low Priority

  1. Add benchmarks for large block scenarios
  2. Consider compression for large messages
  3. Add metrics/monitoring hooks for subscriber health

📋 Summary

This is a solid implementation that successfully achieves its goal of providing a push-based streaming service for Atlas integration. The architecture is well-designed with proper separation of concerns and good test coverage for core functionality.

The main concerns are around production readiness (security, resource limits, monitoring) and some potential robustness issues. However, these don't prevent the core functionality from working correctly.

Recommendation: ✅ Approve with suggestions - The implementation is sound and ready for integration, with the understanding that some production hardening may be needed before high-traffic deployment.


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant