use std::{marker::PhantomData, sync::Arc, time::Duration};
use futures::channel::oneshot;
use log::{debug, warn};
use parking_lot::Mutex;
use sc_client_api::HeaderBackend;
use sp_consensus::SelectChain;
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, Zero},
SaturatedConversion,
};
use crate::{
phron_primitives::{BlockHash, BlockNumber},
data_io::legacy::{proposal::UnvalidatedPhronProposal, PhronData, MAX_DATA_BRANCH_LEN},
metrics::Checkpoint,
party::manager::Runnable,
BlockId, SessionBoundaries, TimingBlockMetrics,
};
pub fn reduce_header_to_num<B, C>(client: &C, header: B::Header, num: BlockNumber) -> B::Header
where
B: BlockT,
B::Header: HeaderT<Number = BlockNumber>,
C: HeaderBackend<B>,
{
assert!(
header.number() >= &num,
"Cannot reduce {header:?} to number {num:?}"
);
let mut curr_header = header;
while curr_header.number() > &num {
curr_header = client
.header(*curr_header.parent_hash())
.expect("client must respond")
.expect("parent hash is known by the client");
}
curr_header
}
pub fn get_parent<B, C>(client: &C, block: &BlockId) -> Option<BlockId>
where
B: BlockT<Hash = BlockHash>,
B::Header: HeaderT<Number = BlockNumber>,
C: HeaderBackend<B>,
{
if block.number.is_zero() {
return None;
}
if let Some(header) = client.header(block.hash).expect("client must respond") {
Some((*header.parent_hash(), block.number - 1).into())
} else {
warn!(target: "phron-data-store", "Trying to fetch the parent of an unknown block {:?}.", block);
None
}
}
pub fn get_proposal<B, C>(
client: &C,
best_block: BlockId,
finalized_block: BlockId,
) -> Result<PhronData, ()>
where
B: BlockT<Hash = BlockHash>,
B::Header: HeaderT<Number = BlockNumber>,
C: HeaderBackend<B>,
{
let mut curr_block = best_block;
let mut branch = Vec::new();
while curr_block.number > finalized_block.number {
if curr_block.number - finalized_block.number
<= <BlockNumber>::saturated_from(MAX_DATA_BRANCH_LEN)
{
branch.push(curr_block.hash);
}
curr_block = get_parent(client, &curr_block).expect("block of num >= 1 must have a parent")
}
if curr_block.hash == finalized_block.hash {
let num_last = finalized_block.number + <BlockNumber>::saturated_from(branch.len());
branch.reverse();
Ok(PhronData {
head_proposal: UnvalidatedPhronProposal::new(branch, num_last),
})
} else {
warn!(target: "phron-data-store", "Error computing proposal. Conflicting blocks: {:?}, finalized {:?}", curr_block, finalized_block);
Err(())
}
}
const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
pub struct ChainTrackerConfig {
pub refresh_interval: Duration,
}
impl Default for ChainTrackerConfig {
fn default() -> ChainTrackerConfig {
ChainTrackerConfig {
refresh_interval: DEFAULT_REFRESH_INTERVAL,
}
}
}
#[derive(PartialEq, Eq, Clone, Debug)]
struct ChainInfo {
best_block_in_session: BlockId,
highest_finalized: BlockId,
}
pub struct ChainTracker<B, SC, C>
where
B: BlockT<Hash = BlockHash>,
B::Header: HeaderT<Number = BlockNumber>,
C: HeaderBackend<B> + 'static,
SC: SelectChain<B> + 'static,
{
select_chain: SC,
client: Arc<C>,
data_to_propose: Arc<Mutex<Option<PhronData>>>,
session_boundaries: SessionBoundaries,
prev_chain_info: Option<ChainInfo>,
config: ChainTrackerConfig,
_phantom: PhantomData<B>,
}
impl<B, SC, C> ChainTracker<B, SC, C>
where
B: BlockT<Hash = BlockHash>,
B::Header: HeaderT<Number = BlockNumber>,
C: HeaderBackend<B> + 'static,
SC: SelectChain<B> + 'static,
{
pub fn new(
select_chain: SC,
client: Arc<C>,
session_boundaries: SessionBoundaries,
config: ChainTrackerConfig,
metrics: TimingBlockMetrics,
) -> (Self, DataProvider) {
let data_to_propose = Arc::new(Mutex::new(None));
(
ChainTracker {
select_chain,
client,
data_to_propose: data_to_propose.clone(),
session_boundaries,
prev_chain_info: None,
config,
_phantom: PhantomData,
},
DataProvider {
data_to_propose,
metrics,
},
)
}
fn update_data(&mut self, best_block_in_session: &BlockId) {
let client_info = self.client.info();
let finalized_block: BlockId =
(client_info.finalized_hash, client_info.finalized_number).into();
if finalized_block.number >= self.session_boundaries.last_block() {
*self.data_to_propose.lock() = None;
return;
}
if let Some(prev) = &self.prev_chain_info {
if prev.best_block_in_session == *best_block_in_session
&& prev.highest_finalized == finalized_block
{
return;
}
}
self.prev_chain_info = Some(ChainInfo {
best_block_in_session: best_block_in_session.clone(),
highest_finalized: finalized_block.clone(),
});
if best_block_in_session.number == finalized_block.number {
*self.data_to_propose.lock() = None;
return;
}
if best_block_in_session.number < finalized_block.number {
warn!(target: "phron-data-store", "Error updating data. best_block {:?} is lower than finalized {:?}.", best_block_in_session, finalized_block);
return;
}
if let Ok(proposal) = get_proposal(
&*self.client,
best_block_in_session.clone(),
finalized_block,
) {
*self.data_to_propose.lock() = Some(proposal);
}
}
async fn get_best_header(&self) -> B::Header {
self.select_chain.best_chain().await.expect("No best chain")
}
async fn get_best_block_in_session(&self, prev_best_block: Option<BlockId>) -> Option<BlockId> {
let new_best_header = self.get_best_header().await;
if new_best_header.number() < &self.session_boundaries.first_block() {
return None;
}
let last_block = self.session_boundaries.last_block();
let new_best_block = (new_best_header.hash(), *new_best_header.number()).into();
if new_best_header.number() <= &last_block {
Some(new_best_block)
} else {
match prev_best_block {
None => {
let reduced_header =
reduce_header_to_num(&*self.client, new_best_header, last_block);
Some((reduced_header.hash(), *reduced_header.number()).into())
}
Some(prev) => {
if prev.number < last_block {
let reduced_header =
reduce_header_to_num(&*self.client, new_best_header, last_block);
Some((reduced_header.hash(), *reduced_header.number()).into())
} else {
let reduced_header = reduce_header_to_num(
&*self.client,
new_best_header.clone(),
prev.number,
);
if reduced_header.hash() != prev.hash {
let reduced_header =
reduce_header_to_num(&*self.client, new_best_header, last_block);
Some((reduced_header.hash(), *reduced_header.number()).into())
} else {
Some(prev)
}
}
}
}
}
}
pub async fn run(mut self, mut exit: oneshot::Receiver<()>) {
let mut best_block_in_session: Option<BlockId> = None;
loop {
let delay = futures_timer::Delay::new(self.config.refresh_interval);
tokio::select! {
_ = delay => {
best_block_in_session = self.get_best_block_in_session(best_block_in_session).await;
if let Some(best_block) = &best_block_in_session {
self.update_data(best_block);
}
}
_ = &mut exit => {
debug!(target: "phron-data-store", "Task for refreshing best chain received exit signal. Terminating.");
return;
}
}
}
}
}
#[async_trait::async_trait]
impl<B, SC, C> Runnable for ChainTracker<B, SC, C>
where
B: BlockT<Hash = BlockHash>,
B::Header: HeaderT<Number = BlockNumber>,
C: HeaderBackend<B> + 'static,
SC: SelectChain<B> + 'static,
{
async fn run(mut self, exit: oneshot::Receiver<()>) {
ChainTracker::run(self, exit).await
}
}
#[derive(Clone)]
pub struct DataProvider {
data_to_propose: Arc<Mutex<Option<PhronData>>>,
metrics: TimingBlockMetrics,
}
impl DataProvider {
pub async fn get_data(&mut self) -> Option<PhronData> {
let data_to_propose = (*self.data_to_propose.lock()).take();
if let Some(data) = &data_to_propose {
self.metrics.report_block_if_not_present(
*data.head_proposal.branch.last().unwrap(),
std::time::Instant::now(),
Checkpoint::Proposed,
);
debug!(target: "phron-data-store", "Outputting {:?} in get_data", data);
};
data_to_propose
}
}
#[cfg(test)]
mod tests {
use std::{future::Future, sync::Arc, time::Duration};
use futures::channel::oneshot;
use tokio::time::sleep;
use crate::{
data_io::{
data_provider::{ChainTracker, ChainTrackerConfig},
DataProvider, MAX_DATA_BRANCH_LEN,
},
testing::{
client_chain_builder::ClientChainBuilder,
mocks::{phron_data_from_blocks, TestClientBuilder, TestClientBuilderExt},
},
SessionBoundaryInfo, SessionId, SessionPeriod, TimingBlockMetrics,
};
const SESSION_LEN: u32 = 100;
const REFRESH_INTERVAL: Duration = Duration::from_millis(5);
fn prepare_chain_tracker_test() -> (
impl Future<Output = ()>,
oneshot::Sender<()>,
ClientChainBuilder,
DataProvider,
) {
let (client, select_chain) = TestClientBuilder::new().build_with_longest_chain();
let client = Arc::new(client);
let chain_builder =
ClientChainBuilder::new(client.clone(), Arc::new(TestClientBuilder::new().build()));
let session_boundaries = SessionBoundaryInfo::new(SessionPeriod(SESSION_LEN))
.boundaries_for_session(SessionId(0));
let config = ChainTrackerConfig {
refresh_interval: REFRESH_INTERVAL,
};
let (chain_tracker, data_provider) = ChainTracker::new(
select_chain,
client,
session_boundaries,
config,
TimingBlockMetrics::noop(),
);
let (exit_chain_tracker_tx, exit_chain_tracker_rx) = oneshot::channel();
(
async move {
chain_tracker.run(exit_chain_tracker_rx).await;
},
exit_chain_tracker_tx,
chain_builder,
data_provider,
)
}
async fn sleep_enough() {
sleep(REFRESH_INTERVAL + REFRESH_INTERVAL + REFRESH_INTERVAL).await;
}
async fn run_test<F, S>(scenario: S)
where
F: Future,
S: FnOnce(ClientChainBuilder, DataProvider) -> F,
{
let (task_handle, exit, chain_builder, data_provider) = prepare_chain_tracker_test();
let chain_tracker_handle = tokio::spawn(task_handle);
scenario(chain_builder, data_provider).await;
exit.send(()).unwrap();
chain_tracker_handle
.await
.expect("Chain tracker did not terminate cleanly.");
}
#[tokio::test(flavor = "multi_thread")]
async fn proposes_empty_and_nonempty_when_expected() {
run_test(|mut chain_builder, mut data_provider| async move {
sleep_enough().await;
assert_eq!(
data_provider.get_data().await,
None,
"Expected empty proposal"
);
let blocks = chain_builder
.initialize_single_branch_and_import(2 * MAX_DATA_BRANCH_LEN)
.await;
sleep_enough().await;
let data = data_provider.get_data().await.unwrap();
let expected_data = phron_data_from_blocks(blocks[..MAX_DATA_BRANCH_LEN].to_vec());
assert_eq!(data, expected_data);
})
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn proposal_changes_with_finalization() {
run_test(|mut chain_builder, mut data_provider| async move {
let blocks = chain_builder
.initialize_single_branch_and_import(3 * MAX_DATA_BRANCH_LEN)
.await;
for height in 1..(2 * MAX_DATA_BRANCH_LEN) {
chain_builder.finalize_block(&blocks[height - 1].header.hash());
sleep_enough().await;
let data = data_provider.get_data().await.unwrap();
let expected_data =
phron_data_from_blocks(blocks[height..(MAX_DATA_BRANCH_LEN + height)].to_vec());
assert_eq!(data, expected_data);
}
chain_builder.finalize_block(&blocks.last().unwrap().header.hash());
sleep_enough().await;
assert_eq!(
data_provider.get_data().await,
None,
"Expected empty proposal"
);
})
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn returns_empty_proposal_above_session_end() {
run_test(|mut chain_builder, mut data_provider| async move {
let blocks = chain_builder
.initialize_single_branch_and_import(
(SESSION_LEN as usize) + 3 * MAX_DATA_BRANCH_LEN,
)
.await;
sleep_enough().await;
let data = data_provider.get_data().await.unwrap();
let expected_data = phron_data_from_blocks(blocks[0..MAX_DATA_BRANCH_LEN].to_vec());
assert_eq!(data, expected_data);
chain_builder.finalize_block(&blocks.last().unwrap().header.hash());
sleep_enough().await;
assert_eq!(
data_provider.get_data().await,
None,
"Expected empty proposal"
);
})
.await;
}
}