Chapter 4: Message/Transaction Collection
In Chapter 3: Consensus Management, we saw how the replicas in ResilientDB use a process like PBFT (Pre-Prepare, Prepare, Commit) to agree on the order of transactions. This involves a lot of messages flying back and forth between replicas for each transaction!
Imagine dozens of transactions being processed concurrently. For transaction #51, Replica A needs to collect Prepare votes. For transaction #52, it might be collecting Commit votes. For transaction #53, it just received the Pre-Prepare proposal. How does a replica keep track of all these different messages for all these different transactions without getting confused?
Welcome to Chapter 4! We’ll explore the components responsible for organizing this potential chaos: Message/Transaction Collection, primarily handled by TransactionCollector and MessageManager.
The Challenge: Organizing Consensus Messages
Think about the PBFT process from Chapter 3:
- A Pre-Prepare message arrives for sequence number
N. - Multiple Prepare messages arrive for sequence number
N(hopefully agreeing on the same proposal). - Multiple Commit messages arrive for sequence number
N.
Each replica needs to:
- Group all messages related to the same sequence number
N. - Count how many
Preparemessages it has received forN. - Count how many
Commitmessages it has received forN. - Determine when enough messages (
2f+1) of a certain type have arrived to move to the next stage (e.g., from Prepared to Committed). - Finally, trigger the execution of the transaction for sequence
Nonce it’s fully confirmed (Committed).
Doing this efficiently for potentially thousands of transactions simultaneously requires a dedicated system.
Meet the Organizers: TransactionCollector and MessageManager
ResilientDB uses two main components for this task:
-
TransactionCollector: The Project Manager for a Single Transaction.- Job: Manages the lifecycle of one specific transaction sequence number.
- Analogy: Imagine a project manager assigned to build one specific feature (e.g., “Add Login Button”). They collect all approvals (Prepare/Commit messages) related only to that feature from different team members (replicas).
- Function: Stores the initial proposal (Pre-Prepare), collects incoming Prepare and Commit messages for its sequence number, counts them, tracks the consensus state (e.g., None -> ReadyPrepare -> ReadyCommit -> ReadyExecute -> Executed), and holds onto the messages as potential proof.
-
MessageManager: The Department Head Overseeing All Projects.- Job: Manages the entire collection of
TransactionCollectorinstances. - Analogy: This is like the head of the project management department. When a new approval form (consensus message) comes in, the department head looks at which project (sequence number) it belongs to and forwards it to the correct project manager (
TransactionCollector). They also coordinate the start of new projects (assigning sequence numbers) and signal when a project is truly finished and ready for launch (triggering execution). - Function: Receives incoming consensus messages from the Consensus Manager, finds the appropriate
TransactionCollectorfor the message’s sequence number (often using a helper likeLockFreeCollectorPool), passes the message to that collector, and interfaces with the Transaction Execution layer to commit finalized transactions. It also assigns new sequence numbers for client requests.
- Job: Manages the entire collection of
Together, MessageManager routes the incoming messages, and TransactionCollector does the detailed work of counting and state tracking for each individual transaction sequence.
How It Works: Following a Transaction’s Messages
Let’s trace how messages for a specific transaction (say, sequence number N=100) are handled:
Scenario: Replica A is processing transaction N=100.
-
Pre-Prepare Arrives:
- The
PRE-PREPAREmessage forseq=100arrives at Replica A via the Network Communication layer. - It’s passed to the Consensus Manager.
- The
ConsensusManagercallsMessageManager::AddConsensusMsgwith the message.
- The
-
MessageManager Finds the Collector:
MessageManagerneeds theTransactionCollectorresponsible forseq=100. It likely uses a helperLockFreeCollectorPool.LockFreeCollectorPool::GetCollector(100)calculates an index (e.g.,100 % pool_size) and returns a pointer to the correctTransactionCollectorinstance.
// Simplified from platform/consensus/ordering/pbft/lock_free_collector_pool.cpp TransactionCollector* LockFreeCollectorPool::GetCollector(uint64_t seq) { // 'mask_' is related to the pool size (a power of 2) // This efficiently calculates 'seq % pool_size' uint32_t idx = seq & mask_; // 'collector_' is a vector of TransactionCollector unique_ptrs return collector_[idx].get(); }- This code quickly finds the right collector using bitwise math, which is faster than division.
-
TransactionCollector Adds Pre-Prepare:
MessageManagercallsTransactionCollector::AddRequeston the collector forseq=100, passing thePRE-PREPAREmessage.TransactionCollectorstores this as the “main request” forseq=100.- It updates its internal state, potentially moving from
NonetoREADY_PREPARE.
// Simplified from platform/consensus/ordering/pbft/transaction_collector.cpp int TransactionCollector::AddRequest( std::unique_ptr<Request> request, const SignatureInfo& signature, bool is_main_request, /* ... callback function ... */) { if (/* request invalid or already committed */) return -2; if (seq_ != request->seq()) return -2; // Check sequence number match if (is_main_request) { // This is the PRE-PREPARE // Store the main request atomically (simplified here) main_request_ = std::move(request); main_request_signature_ = signature; // Update status via callback or directly MaybeUpdateStatus(Request::TYPE_PRE_PREPARE, 1); return 0; } else { // ... Handle PREPARE / COMMIT (see below) ... } return 0; // Default success }- This simplified code checks sequence numbers and stores the main proposal.
-
Prepare Messages Arrive:
- Several
PREPAREmessages forseq=100arrive from other replicas. - Each goes through
MessageManager::AddConsensusMsg->LockFreeCollectorPool::GetCollector(100)->TransactionCollector::AddRequest. - Inside
AddRequest(theelsebranch now):- The
TransactionCollectorincrements a counter forPREPAREmessages received for the specific hash of the proposal. It often uses a bitset to track which replicas sent a Prepare. - It calls a helper function (or a callback passed by
MessageManager) to check if the state should change.
- The
// Simplified continuation of TransactionCollector::AddRequest else { // Handle PREPARE or COMMIT std::string hash = request->hash(); int type = request->type(); int sender_id = request->sender_id(); // Record that this sender sent this type of message for this hash // (Using a map and bitset, simplified logic here) received_messages_[type][hash].insert(sender_id); int current_count = received_messages_[type][hash].size(); // Store the message itself for potential proof if (type == Request::TYPE_PREPARE) { StorePrepareProof(request, signature); // Simplified function call } // Check if enough messages arrived to change state MaybeUpdateStatus(type, current_count); // If status became READY_EXECUTE, trigger commit if (status_.load() == TransactionStatue::READY_EXECUTE) { Commit(); return 1; // Indicate commit triggered } return 0; } - Several
-
TransactionCollector Updates State (Prepare -> Commit):
- The
MaybeUpdateStatusfunction (or callback) checks: “Is the current statusREADY_PREPAREAND have we received enough (>= 2f+1)PREPAREmessages?” - If yes, it atomically updates the
status_fromREADY_PREPAREtoREADY_COMMIT. - The
TransactionCollectoris now considered “Prepared” forseq=100.
- The
-
Commit Messages Arrive:
COMMITmessages forseq=100arrive and are routed to the sameTransactionCollector.AddRequestcounts them similarly.
-
TransactionCollector Updates State (Commit -> Execute):
MaybeUpdateStatuschecks: “Is the current statusREADY_COMMITAND have we received enough (>= 2f+1)COMMITmessages?”- If yes, it atomically updates the
status_toREADY_EXECUTE.
-
TransactionCollector Triggers Execution:
- Detecting the transition to
READY_EXECUTE, theAddRequestlogic callsTransactionCollector::Commit(). Commit()performs final checks and then calls theTransactionExecutor(provided during setup) to actually execute the transaction associated with the main request.
// Simplified from platform/consensus/ordering/pbft/transaction_collector.cpp int TransactionCollector::Commit() { // Atomically change status from READY_EXECUTE to EXECUTED TransactionStatue expected = TransactionStatue::READY_EXECUTE; if (!status_.compare_exchange_strong(expected, TransactionStatue::EXECUTED)) { return -2; // Already executed or wrong state } // Mark as committed internally is_committed_ = true; // If an executor was provided and we have the main request... if (executor_ && main_request_) { // Tell the executor to commit this request executor_->Commit(std::move(main_request_)); } return 0; // Success }- This function ensures the transaction is executed only once by atomically changing the state and then passes the request to the Transaction Execution layer.
- Detecting the transition to
Visualization:
This diagram shows how MessageManager acts as the router, TransactionCollector does the counting and state management for a specific sequence number, and finally triggers the TransactionExecutor.
Under the Hood: MessageManager and LockFreeCollectorPool
The MessageManager is initialized with the system configuration, a TransactionManager (for execution logic), a CheckPointManager, and system info. Crucially, it creates the TransactionExecutor and the LockFreeCollectorPool.
// Simplified from platform/consensus/ordering/pbft/message_manager.cpp
MessageManager::MessageManager(
const ResDBConfig& config,
std::unique_ptr<TransactionManager> transaction_manager,
/* ... other managers ... */)
: config_(config),
// ... other initializations ...
transaction_executor_(std::make_unique<TransactionExecutor>(
config, /* callback for responses */, /* ... */)),
collector_pool_(std::make_unique<LockFreeCollectorPool>(
"txn_collector_pool", config.GetMaxProcessTxn(), // Pool name and size
transaction_executor_.get(), // Pass executor to collectors
config.GetConfigData().enable_viewchange())) // Config flag
{
// ... setup callbacks, like telling pool to update when seq advances ...
LOG(INFO) << "MessageManager initialized.";
}The LockFreeCollectorPool creates a vector of TransactionCollector objects. The size is typically a power of two, allowing for efficient indexing using bitwise AND (& mask_). It uses a clever trick: the vector size is actually twice the conceptual capacity. When sequence number seq is finalized, the pool proactively re-initializes the collector at index (seq % capacity) + capacity to be ready for sequence number seq + capacity. This avoids needing locks when switching collectors for sequence numbers.
// Simplified from platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
LockFreeCollectorPool::LockFreeCollectorPool(const std::string& name,
uint32_t size, // Max concurrent txns
TransactionExecutor* executor,
bool enable_viewchange)
: capacity_(/* Calculate next power of 2 >= size*2 */),
mask_(capacity_ - 1), // For efficient modulo
executor_(executor)
{
// Create 2 * capacity collectors!
collector_.resize(capacity_ * 2);
for (size_t i = 0; i < (capacity_ * 2); ++i) {
// Initialize each collector for its future sequence number
collector_[i] = std::make_unique<TransactionCollector>(i, executor, /*...*/);
}
}
// Called when 'seq' is committed, prepares the slot for 'seq + capacity'
void LockFreeCollectorPool::Update(uint64_t seq) {
uint32_t current_idx = seq & mask_;
uint32_t next_idx = current_idx ^ capacity_; // The 'twin' slot
uint64_t future_seq = seq + capacity_;
// Re-create the collector in the twin slot for the future sequence number
collector_[next_idx] = std::make_unique<TransactionCollector>(
future_seq, executor_, /*...*/);
}This pooling mechanism allows MessageManager to quickly find the right TransactionCollector for any given sequence number within its processing window.
Conclusion
You’ve now explored the vital organizational layer of ResilientDB’s consensus process!
- We learned that during consensus, many messages (
PRE-PREPARE,PREPARE,COMMIT) are exchanged for each transaction. - The
TransactionCollectoracts like a dedicated project manager for a single transaction sequence number. It gathers related messages, counts them, tracks the consensus state (ReadyPrepare, ReadyCommit, ReadyExecute), and holds proof. - The
MessageManageracts as the department head, overseeing allTransactionCollectors. It routes incoming consensus messages to the correct collector (usingLockFreeCollectorPool) and triggers the final execution step. - This system efficiently manages the flow of messages, ensuring that transactions proceed through the consensus stages correctly and are executed only when fully agreed upon.
Now that we understand how transactions are proposed (Chapter 1), communicated (Chapter 2), agreed upon (Chapter 3), and collected (Chapter 4), what happens when a transaction is finally marked READY_EXECUTE? How is the actual work (like changing data in the database) performed?
That’s the focus of our next chapter!
Next: Chapter 5: Transaction Execution
Generated by AI Codebase Knowledge Builder