diff --git a/core/src/consensus/tendermint/message.rs b/core/src/consensus/tendermint/message.rs index a0dd9b65ad..6e24147fc3 100644 --- a/core/src/consensus/tendermint/message.rs +++ b/core/src/consensus/tendermint/message.rs @@ -79,6 +79,8 @@ const MESSAGE_ID_PROPOSAL_BLOCK: u8 = 0x02; const MESSAGE_ID_STEP_STATE: u8 = 0x03; const MESSAGE_ID_REQUEST_MESSAGE: u8 = 0x04; const MESSAGE_ID_REQUEST_PROPOSAL: u8 = 0x05; +const MESSAGE_ID_REQUEST_COMMIT: u8 = 0x06; +const MESSAGE_ID_COMMIT: u8 = 0x07; #[derive(Debug, PartialEq)] pub enum TendermintMessage { @@ -102,6 +104,13 @@ pub enum TendermintMessage { height: Height, view: View, }, + RequestCommit { + height: Height, + }, + Commit { + block: Bytes, + votes: Vec, + }, } impl Encodable for TendermintMessage { @@ -160,6 +169,22 @@ impl Encodable for TendermintMessage { s.append(height); s.append(view); } + TendermintMessage::RequestCommit { + height, + } => { + s.begin_list(2); + s.append(&MESSAGE_ID_REQUEST_COMMIT); + s.append(height); + } + TendermintMessage::Commit { + block, + votes, + } => { + s.begin_list(3); + s.append(&MESSAGE_ID_COMMIT); + s.append(block); + s.append_list(votes); + } } } } @@ -253,6 +278,34 @@ impl Decodable for TendermintMessage { view, } } + MESSAGE_ID_REQUEST_COMMIT => { + let item_count = rlp.item_count()?; + if item_count != 2 { + return Err(DecoderError::RlpIncorrectListLen { + got: item_count, + expected: 2, + }) + } + let height = rlp.at(1)?.as_val()?; + TendermintMessage::RequestCommit { + height, + } + } + MESSAGE_ID_COMMIT => { + let item_count = rlp.item_count()?; + if item_count != 3 { + return Err(DecoderError::RlpIncorrectListLen { + got: item_count, + expected: 3, + }) + } + let block = rlp.at(1)?.as_val()?; + let votes = rlp.at(2)?.as_list()?; + TendermintMessage::Commit { + block, + votes, + } + } _ => return Err(DecoderError::Custom("Unknown message id detected")), }) } @@ -408,6 +461,42 @@ mod tests { }); } + #[test] + fn encode_and_decode_tendermint_message_6() { + rlp_encode_and_decode_test!(TendermintMessage::RequestCommit { + height: 3, + }); + } + + #[test] + fn encode_and_decode_tendermint_message_7() { + rlp_encode_and_decode_test!(TendermintMessage::Commit { + block: vec![1u8, 2u8], + votes: vec![ + ConsensusMessage { + signature: SchnorrSignature::random(), + signer_index: 0x1234, + on: VoteOn { + step: VoteStep::new(2, 3, Step::Commit), + block_hash: Some(H256::from( + "07feab4c39250abf60b77d7589a5b61fdf409bd837e936376381d19db1e1f050" + )), + }, + }, + ConsensusMessage { + signature: SchnorrSignature::random(), + signer_index: 0x1235, + on: VoteOn { + step: VoteStep::new(2, 3, Step::Commit), + block_hash: Some(H256::from( + "07feab4c39250abf60b77d7589a5b61fdf409bd837e936376381d19db1e1f050" + )), + }, + } + ] + }); + } + #[test] fn encode_and_decode_consensus_message_1() { let message = ConsensusMessage::default(); diff --git a/core/src/consensus/tendermint/network.rs b/core/src/consensus/tendermint/network.rs index 9d90702b28..05175fd636 100644 --- a/core/src/consensus/tendermint/network.rs +++ b/core/src/consensus/tendermint/network.rs @@ -352,6 +352,43 @@ impl NetworkExtension for TendermintExtension { self.send_votes(token, votes); } } + Ok(TendermintMessage::RequestCommit { + height, + }) => { + ctrace!(ENGINE, "Received RequestCommit for {} from {:?}", height, token); + let (result, receiver) = crossbeam::bounded(1); + self.inner + .send(worker::Event::RequestCommit { + height, + result, + }) + .unwrap(); + + if let Ok(message) = receiver.recv() { + ctrace!(ENGINE, "Send commit for {} to {:?}", height, token); + self.api.send(token, Arc::new(message)); + } + } + Ok(TendermintMessage::Commit { + block, + votes, + }) => { + ctrace!(ENGINE, "Received Commit from {:?}", token); + let (result, receiver) = crossbeam::bounded(1); + self.inner + .send(worker::Event::GetCommit { + block: block.clone(), + votes, + result, + }) + .unwrap(); + + if let Some(c) = receiver.recv().unwrap() { + if let Err(e) = c.import_block(block) { + cinfo!(ENGINE, "Failed to import committed block {:?}", e); + } + } + } _ => cinfo!(ENGINE, "Invalid message from peer {}", token), } } diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index d54d34b67e..c353ac3a15 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -163,6 +163,15 @@ pub enum Event { requested: BitSet, result: crossbeam::Sender, }, + RequestCommit { + height: Height, + result: crossbeam::Sender, + }, + GetCommit { + block: Bytes, + votes: Vec, + result: crossbeam::Sender>>, + }, } impl Worker { @@ -342,6 +351,20 @@ impl Worker { }) => { inner.get_all_votes_and_authors(&vote_step, &requested, result); } + Ok(Event::RequestCommit { + height, + result + }) => { + inner.on_request_commit_message(height, result); + } + Ok(Event::GetCommit { + block, + votes, + result + }) => { + let client = inner.on_commit_message(block, votes); + result.send(client).unwrap(); + } Err(crossbeam::RecvError) => { cerror!(ENGINE, "The event channel for tendermint thread had been closed."); break @@ -469,7 +492,7 @@ impl Worker { } pub fn need_proposal(&self) -> bool { - self.proposal.is_none() + self.proposal.is_none() && !self.step.is_commit() } pub fn get_all_votes_and_authors( @@ -612,7 +635,7 @@ impl Worker { #[allow(clippy::cognitive_complexity)] fn move_to_step(&mut self, state: TendermintState, is_restoring: bool) { - ctrace!(ENGINE, "Transition to {:?} triggered.", state); + ctrace!(ENGINE, "Transition to {:?} triggered from {:?}.", state, self.step); let prev_step = mem::replace(&mut self.step, state.clone()); if !is_restoring { self.backup(); @@ -1587,6 +1610,25 @@ impl Worker { result.send(message).unwrap(); } + fn send_request_commit(&self, token: &NodeId, height: Height, result: &crossbeam::Sender) { + ctrace!(ENGINE, "Request commit {} to {:?}", height, token); + let message = TendermintMessage::RequestCommit { + height, + } + .rlp_bytes() + .into_vec(); + result.send(message).unwrap(); + } + + fn send_commit(&self, block: encoded::Block, votes: Vec, result: &crossbeam::Sender) { + let message = TendermintMessage::Commit { + block: block.into_inner(), + votes, + }; + + result.send(message.rlp_bytes().into_vec()).unwrap(); + } + fn on_proposal_message( &mut self, signature: SchnorrSignature, @@ -1736,6 +1778,10 @@ impl Worker { return } + if self.height < peer_vote_step.height && !self.step.is_commit() { + self.send_request_commit(token, self.height, &result); + } + let peer_has_proposal = (self.view == peer_vote_step.view && peer_proposal.is_some()) || self.view < peer_vote_step.view || self.height < peer_vote_step.height; @@ -1842,6 +1888,215 @@ impl Worker { } } } + + fn on_request_commit_message(&self, height: Height, result: crossbeam::Sender) { + if height >= self.height { + return + } + + if height == self.height - 1 { + let block = self.client().block(&height.into()).expect("Parent block should exist"); + let block_hash = block.hash(); + let seal = block.seal(); + let view = TendermintSealView::new(&seal).consensus_view().expect("Block is already verified and imported"); + + let votes = self + .votes + .get_all_votes_in_round(&VoteStep { + height, + view, + step: Step::Precommit, + }) + .into_iter() + .filter(|vote| vote.on.block_hash == Some(block_hash)) + .collect(); + + self.send_commit(block, votes, &result); + } else if height < self.height - 1 { + let block = self.client().block(&height.into()).expect("Parent block should exist"); + let child_block = self.client().block(&(height + 1).into()).expect("Parent block should exist"); + let child_block_header_seal = child_block.header().seal(); + let child_block_seal_view = TendermintSealView::new(&child_block_header_seal); + let view = child_block_seal_view.previous_block_view().expect("Verified block"); + let on = VoteOn { + step: VoteStep::new(height, view, Step::Precommit), + block_hash: Some(block.hash()), + }; + let mut votes = Vec::new(); + for (index, signature) in child_block_seal_view.signatures().expect("The block is verified") { + let message = ConsensusMessage { + signature, + signer_index: index, + on: on.clone(), + }; + votes.push(message); + } + + self.send_commit(block, votes, &result); + } + } + + #[allow(clippy::cognitive_complexity)] + fn on_commit_message(&mut self, block: Bytes, votes: Vec) -> Option> { + if self.step.is_commit() { + return None + } + let block_hash = { + let block_view = BlockView::new(&block); + block_view.hash() + }; + + if votes.is_empty() { + cwarn!(ENGINE, "Invalid commit message received: precommits are empty",); + return None + } + + let first_vote = &votes[0]; + let commit_vote_on = first_vote.on.clone(); + let commit_height = first_vote.height(); + let commit_view = first_vote.on.step.view; + let commit_block_hash = match &first_vote.on.block_hash { + Some(block_hash) => *block_hash, + None => { + cwarn!(ENGINE, "Invalid commit message-{} received: precommit nil", commit_height); + return None + } + }; + + if commit_block_hash != block_hash { + cwarn!( + ENGINE, + "Invalid commit message-{} received: block_hash {} is different from precommit's block_hash {}", + commit_height, + block_hash, + commit_block_hash, + ); + return None + } + + if commit_height < self.height { + cdebug!( + ENGINE, + "Received commit message is old. Current height is {} but commit messages is for height {}", + self.height, + commit_height, + ); + return None + } else if commit_height > self.height { + cwarn!( + ENGINE, + "Invalid commit message received: precommit on height {} but current height is {}", + commit_height, + self.height + ); + return None + } + + let prev_block_hash = self + .client() + .block_header(&(self.height - 1).into()) + .expect("self.height - 1 == the best block number") + .hash(); + + if commit_vote_on.step.step != Step::Precommit { + cwarn!( + ENGINE, + "Invalid commit message-{} received: vote is not precommit but {:?}", + commit_height, + commit_vote_on.step.step + ); + return None + } + + let mut vote_bitset = BitSet::new(); + + for vote in &votes { + let signer_index = vote.signer_index; + + if vote.on != commit_vote_on { + cwarn!( + ENGINE, + "Invalid commit message received: One precommit on {:?}, other precommit on {:?}", + commit_vote_on, + vote.on + ); + return None + } + + if signer_index >= self.validators.count(&prev_block_hash) { + cwarn!( + ENGINE, + "Invalid commit message-{} received: invalid signer index {}", + commit_height, + signer_index + ); + return None + } + + let sender_public = self.validators.get(&prev_block_hash, signer_index); + + match vote.verify(&sender_public) { + Err(err) => { + cwarn!( + ENGINE, + "Invalid commit message-{} received: invalid signature signer_index: {} address: {} internal error: {:?}", + commit_height, + signer_index, + public_to_address(&sender_public), + err + ); + return None + } + Ok(false) => { + cwarn!( + ENGINE, + "Invalid commit message-{} received: invalid signature signer_index: {} address: {}", + commit_height, + signer_index, + public_to_address(&sender_public) + ); + return None + } + Ok(true) => {} + } + vote_bitset.set(signer_index); + } + + if let Err(err) = self.validators.check_enough_votes(&prev_block_hash, &vote_bitset) { + cwarn!(ENGINE, "Invalid commit message-{} received: check_enough_votes failed: {:?}", commit_height, err); + return None + } + + cdebug!(ENGINE, "Commit message-{} is verified", commit_height); + for vote in votes { + if !self.votes.is_old_or_known(&vote) { + self.votes.vote(vote); + } + } + + // Since we don't have proposal vote, set proposal = None + self.proposal = Proposal::None; + self.view = commit_view; + self.votes_received = vote_bitset; + self.last_two_thirds_majority = TwoThirdsMajority::Empty; + + self.move_to_step( + TendermintState::Commit { + block_hash, + view: commit_view, + }, + false, + ); + + if self.client().block(&BlockId::Hash(block_hash)).is_some() { + cdebug!(ENGINE, "Committed block is already imported {}", block_hash); + None + } else { + cdebug!(ENGINE, "Committed block is not imported yet {}", block_hash); + let c = self.client.upgrade()?; + Some(c) + } + } } fn calculate_score(height: Height, view: View) -> U256 {