From ba531443738634c1fdf4200f3fe6be22f4dc7e40 Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 11 Sep 2019 12:17:33 +0900 Subject: [PATCH 1/3] Add Commit message in Tendermint Before this commit, Tendermint extension only requests current height and view's votes. Handling only the current view's message makes code simple. However, there may be a commit before the current view. This creates a liveness problem. After this commit, a node requests a Commit message if some of its peer's height is higher than the node. The commit message is not related to the node's current view. The Commit message fixes the liveness problem. --- core/src/consensus/tendermint/message.rs | 89 ++++++++ core/src/consensus/tendermint/network.rs | 37 ++++ core/src/consensus/tendermint/worker.rs | 255 +++++++++++++++++++++++ 3 files changed, 381 insertions(+) diff --git a/core/src/consensus/tendermint/message.rs b/core/src/consensus/tendermint/message.rs index a0dd9b65ad..6e24147fc3 100644 --- a/core/src/consensus/tendermint/message.rs +++ b/core/src/consensus/tendermint/message.rs @@ -79,6 +79,8 @@ const MESSAGE_ID_PROPOSAL_BLOCK: u8 = 0x02; const MESSAGE_ID_STEP_STATE: u8 = 0x03; const MESSAGE_ID_REQUEST_MESSAGE: u8 = 0x04; const MESSAGE_ID_REQUEST_PROPOSAL: u8 = 0x05; +const MESSAGE_ID_REQUEST_COMMIT: u8 = 0x06; +const MESSAGE_ID_COMMIT: u8 = 0x07; #[derive(Debug, PartialEq)] pub enum TendermintMessage { @@ -102,6 +104,13 @@ pub enum TendermintMessage { height: Height, view: View, }, + RequestCommit { + height: Height, + }, + Commit { + block: Bytes, + votes: Vec, + }, } impl Encodable for TendermintMessage { @@ -160,6 +169,22 @@ impl Encodable for TendermintMessage { s.append(height); s.append(view); } + TendermintMessage::RequestCommit { + height, + } => { + s.begin_list(2); + s.append(&MESSAGE_ID_REQUEST_COMMIT); + s.append(height); + } + TendermintMessage::Commit { + block, + votes, + } => { + s.begin_list(3); + s.append(&MESSAGE_ID_COMMIT); + s.append(block); + s.append_list(votes); + } } } } @@ -253,6 +278,34 @@ impl Decodable for TendermintMessage { view, } } + MESSAGE_ID_REQUEST_COMMIT => { + let item_count = rlp.item_count()?; + if item_count != 2 { + return Err(DecoderError::RlpIncorrectListLen { + got: item_count, + expected: 2, + }) + } + let height = rlp.at(1)?.as_val()?; + TendermintMessage::RequestCommit { + height, + } + } + MESSAGE_ID_COMMIT => { + let item_count = rlp.item_count()?; + if item_count != 3 { + return Err(DecoderError::RlpIncorrectListLen { + got: item_count, + expected: 3, + }) + } + let block = rlp.at(1)?.as_val()?; + let votes = rlp.at(2)?.as_list()?; + TendermintMessage::Commit { + block, + votes, + } + } _ => return Err(DecoderError::Custom("Unknown message id detected")), }) } @@ -408,6 +461,42 @@ mod tests { }); } + #[test] + fn encode_and_decode_tendermint_message_6() { + rlp_encode_and_decode_test!(TendermintMessage::RequestCommit { + height: 3, + }); + } + + #[test] + fn encode_and_decode_tendermint_message_7() { + rlp_encode_and_decode_test!(TendermintMessage::Commit { + block: vec![1u8, 2u8], + votes: vec![ + ConsensusMessage { + signature: SchnorrSignature::random(), + signer_index: 0x1234, + on: VoteOn { + step: VoteStep::new(2, 3, Step::Commit), + block_hash: Some(H256::from( + "07feab4c39250abf60b77d7589a5b61fdf409bd837e936376381d19db1e1f050" + )), + }, + }, + ConsensusMessage { + signature: SchnorrSignature::random(), + signer_index: 0x1235, + on: VoteOn { + step: VoteStep::new(2, 3, Step::Commit), + block_hash: Some(H256::from( + "07feab4c39250abf60b77d7589a5b61fdf409bd837e936376381d19db1e1f050" + )), + }, + } + ] + }); + } + #[test] fn encode_and_decode_consensus_message_1() { let message = ConsensusMessage::default(); diff --git a/core/src/consensus/tendermint/network.rs b/core/src/consensus/tendermint/network.rs index 9d90702b28..05175fd636 100644 --- a/core/src/consensus/tendermint/network.rs +++ b/core/src/consensus/tendermint/network.rs @@ -352,6 +352,43 @@ impl NetworkExtension for TendermintExtension { self.send_votes(token, votes); } } + Ok(TendermintMessage::RequestCommit { + height, + }) => { + ctrace!(ENGINE, "Received RequestCommit for {} from {:?}", height, token); + let (result, receiver) = crossbeam::bounded(1); + self.inner + .send(worker::Event::RequestCommit { + height, + result, + }) + .unwrap(); + + if let Ok(message) = receiver.recv() { + ctrace!(ENGINE, "Send commit for {} to {:?}", height, token); + self.api.send(token, Arc::new(message)); + } + } + Ok(TendermintMessage::Commit { + block, + votes, + }) => { + ctrace!(ENGINE, "Received Commit from {:?}", token); + let (result, receiver) = crossbeam::bounded(1); + self.inner + .send(worker::Event::GetCommit { + block: block.clone(), + votes, + result, + }) + .unwrap(); + + if let Some(c) = receiver.recv().unwrap() { + if let Err(e) = c.import_block(block) { + cinfo!(ENGINE, "Failed to import committed block {:?}", e); + } + } + } _ => cinfo!(ENGINE, "Invalid message from peer {}", token), } } diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index d54d34b67e..f9bb09a858 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -163,6 +163,15 @@ pub enum Event { requested: BitSet, result: crossbeam::Sender, }, + RequestCommit { + height: Height, + result: crossbeam::Sender, + }, + GetCommit { + block: Bytes, + votes: Vec, + result: crossbeam::Sender>>, + }, } impl Worker { @@ -342,6 +351,20 @@ impl Worker { }) => { inner.get_all_votes_and_authors(&vote_step, &requested, result); } + Ok(Event::RequestCommit { + height, + result + }) => { + inner.on_request_commit_message(height, result); + } + Ok(Event::GetCommit { + block, + votes, + result + }) => { + let client = inner.on_commit_message(block, votes); + result.send(client).unwrap(); + } Err(crossbeam::RecvError) => { cerror!(ENGINE, "The event channel for tendermint thread had been closed."); break @@ -1587,6 +1610,25 @@ impl Worker { result.send(message).unwrap(); } + fn send_request_commit(&self, token: &NodeId, height: Height, result: &crossbeam::Sender) { + ctrace!(ENGINE, "Request commit {} to {:?}", height, token); + let message = TendermintMessage::RequestCommit { + height, + } + .rlp_bytes() + .into_vec(); + result.send(message).unwrap(); + } + + fn send_commit(&self, block: encoded::Block, votes: Vec, result: &crossbeam::Sender) { + let message = TendermintMessage::Commit { + block: block.into_inner(), + votes, + }; + + result.send(message.rlp_bytes().into_vec()).unwrap(); + } + fn on_proposal_message( &mut self, signature: SchnorrSignature, @@ -1736,6 +1778,10 @@ impl Worker { return } + if self.height < peer_vote_step.height && !self.step.is_commit() { + self.send_request_commit(token, self.height, &result); + } + let peer_has_proposal = (self.view == peer_vote_step.view && peer_proposal.is_some()) || self.view < peer_vote_step.view || self.height < peer_vote_step.height; @@ -1842,6 +1888,215 @@ impl Worker { } } } + + fn on_request_commit_message(&self, height: Height, result: crossbeam::Sender) { + if height >= self.height { + return + } + + if height == self.height - 1 { + let block = self.client().block(&height.into()).expect("Parent block should exist"); + let block_hash = block.hash(); + let seal = block.seal(); + let view = TendermintSealView::new(&seal).consensus_view().expect("Block is already verified and imported"); + + let votes = self + .votes + .get_all_votes_in_round(&VoteStep { + height, + view, + step: Step::Precommit, + }) + .into_iter() + .filter(|vote| vote.on.block_hash == Some(block_hash)) + .collect(); + + self.send_commit(block, votes, &result); + } else if height < self.height - 1 { + let block = self.client().block(&height.into()).expect("Parent block should exist"); + let child_block = self.client().block(&(height + 1).into()).expect("Parent block should exist"); + let child_block_header_seal = child_block.header().seal(); + let child_block_seal_view = TendermintSealView::new(&child_block_header_seal); + let view = child_block_seal_view.previous_block_view().expect("Verified block"); + let on = VoteOn { + step: VoteStep::new(height, view, Step::Precommit), + block_hash: Some(block.hash()), + }; + let mut votes = Vec::new(); + for (index, signature) in child_block_seal_view.signatures().expect("The block is verified") { + let message = ConsensusMessage { + signature, + signer_index: index, + on: on.clone(), + }; + votes.push(message); + } + + self.send_commit(block, votes, &result); + } + } + + #[allow(clippy::cognitive_complexity)] + fn on_commit_message(&mut self, block: Bytes, votes: Vec) -> Option> { + if self.step.is_commit() { + return None + } + let block_hash = { + let block_view = BlockView::new(&block); + block_view.hash() + }; + + if votes.is_empty() { + cwarn!(ENGINE, "Invalid commit message received: precommits are empty",); + return None + } + + let first_vote = &votes[0]; + let commit_vote_on = first_vote.on.clone(); + let commit_height = first_vote.height(); + let commit_view = first_vote.on.step.view; + let commit_block_hash = match &first_vote.on.block_hash { + Some(block_hash) => *block_hash, + None => { + cwarn!(ENGINE, "Invalid commit message-{} received: precommit nil", commit_height); + return None + } + }; + + if commit_block_hash != block_hash { + cwarn!( + ENGINE, + "Invalid commit message-{} received: block_hash {} is different from precommit's block_hash {}", + commit_height, + block_hash, + commit_block_hash, + ); + return None + } + + if commit_height < self.height { + cdebug!( + ENGINE, + "Received commit message is old. Current height is {} but commit messages is for height {}", + self.height, + commit_height, + ); + return None + } else if commit_height > self.height { + cwarn!( + ENGINE, + "Invalid commit message received: precommit on height {} but current height is {}", + commit_height, + self.height + ); + return None + } + + let prev_block_hash = self + .client() + .block_header(&(self.height - 1).into()) + .expect("self.height - 1 == the best block number") + .hash(); + + if commit_vote_on.step.step != Step::Precommit { + cwarn!( + ENGINE, + "Invalid commit message-{} received: vote is not precommit but {:?}", + commit_height, + commit_vote_on.step.step + ); + return None + } + + let mut vote_bitset = BitSet::new(); + + for vote in &votes { + let signer_index = vote.signer_index; + + if vote.on != commit_vote_on { + cwarn!( + ENGINE, + "Invalid commit message received: One precommit on {:?}, other precommit on {:?}", + commit_vote_on, + vote.on + ); + return None + } + + if signer_index >= self.validators.count(&prev_block_hash) { + cwarn!( + ENGINE, + "Invalid commit message-{} received: invalid signer index {}", + commit_height, + signer_index + ); + return None + } + + let sender_public = self.validators.get(&prev_block_hash, signer_index); + + match vote.verify(&sender_public) { + Err(err) => { + cwarn!( + ENGINE, + "Invalid commit message-{} received: invalid signature signer_index: {} address: {} internal error: {:?}", + commit_height, + signer_index, + public_to_address(&sender_public), + err + ); + return None + } + Ok(false) => { + cwarn!( + ENGINE, + "Invalid commit message-{} received: invalid signature signer_index: {} address: {}", + commit_height, + signer_index, + public_to_address(&sender_public) + ); + return None + } + Ok(true) => {} + } + vote_bitset.set(signer_index); + } + + if let Err(err) = self.validators.check_enough_votes(&prev_block_hash, &vote_bitset) { + cwarn!(ENGINE, "Invalid commit message-{} received: check_enough_votes failed: {:?}", commit_height, err); + return None + } + + cdebug!(ENGINE, "Commit message-{} is verified", commit_height); + for vote in votes { + if !self.votes.is_old_or_known(&vote) { + self.votes.vote(vote); + } + } + + // Since we don't have proposal vote, set proposal = None + self.proposal = Proposal::None; + self.view = commit_view; + self.votes_received = vote_bitset; + self.last_two_thirds_majority = TwoThirdsMajority::Empty; + + self.move_to_step( + TendermintState::Commit { + block_hash, + view: commit_view, + }, + false, + ); + + if self.client().block(&BlockId::Hash(block_hash)).is_some() { + cdebug!(ENGINE, "Committed block is already imported {}", block_hash); + None + } else { + cdebug!(ENGINE, "Committed block is not imported yet {}", block_hash); + let c = self.client.upgrade()?; + Some(c) + } + } } fn calculate_score(height: Height, view: View) -> U256 { From f798c5ac3dfb46dec946a6b865468538964b1459 Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 11 Sep 2019 12:19:32 +0900 Subject: [PATCH 2/3] Do not request proposal to peers if the current state is Commit --- core/src/consensus/tendermint/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index f9bb09a858..4eb97a938c 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -492,7 +492,7 @@ impl Worker { } pub fn need_proposal(&self) -> bool { - self.proposal.is_none() + self.proposal.is_none() && !self.step.is_commit() } pub fn get_all_votes_and_authors( From ddb887a36ce6537db993e828c4a0bd1c042d7c45 Mon Sep 17 00:00:00 2001 From: Juhyung Park Date: Wed, 11 Sep 2019 12:20:03 +0900 Subject: [PATCH 3/3] Make step transition log verbose --- core/src/consensus/tendermint/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index 4eb97a938c..c353ac3a15 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -635,7 +635,7 @@ impl Worker { #[allow(clippy::cognitive_complexity)] fn move_to_step(&mut self, state: TendermintState, is_restoring: bool) { - ctrace!(ENGINE, "Transition to {:?} triggered.", state); + ctrace!(ENGINE, "Transition to {:?} triggered from {:?}.", state, self.step); let prev_step = mem::replace(&mut self.step, state.clone()); if !is_restoring { self.backup();