use std::sync::Arc;
use futures::{
channel::{mpsc, oneshot},
pin_mut, StreamExt,
};
use log::{debug, error, trace};
use sc_client_api::HeaderBackend;
use sp_runtime::traits::{Block, Header};
use tokio::time;
use crate::{
abft::SignatureSet,
aggregation::Aggregator,
phron_primitives::{BlockHash, BlockNumber},
crypto::Signature,
justification::PhronJustification,
metrics::Checkpoint,
network::data::Network,
party::{
manager::aggregator::AggregatorVersion::{Current, Legacy},
AuthoritySubtaskCommon, Task,
},
sync::{substrate::Justification, JustificationSubmissions, JustificationTranslator},
BlockId, CurrentRmcNetworkData, Keychain, LegacyRmcNetworkData, SessionBoundaries,
TimingBlockMetrics, STATUS_REPORT_INTERVAL,
};
pub struct IO<JS>
where
JS: JustificationSubmissions<Justification> + Send + Sync + Clone,
{
pub blocks_from_interpreter: mpsc::UnboundedReceiver<BlockId>,
pub justifications_for_chain: JS,
pub justification_translator: JustificationTranslator,
}
async fn process_new_block_data<CN, LN>(
aggregator: &mut Aggregator<'_, CN, LN>,
block: BlockId,
metrics: &TimingBlockMetrics,
) where
CN: Network<CurrentRmcNetworkData>,
LN: Network<LegacyRmcNetworkData>,
{
trace!(target: "phron-party", "Received unit {:?} in aggregator.", block);
metrics.report_block(block.hash, std::time::Instant::now(), Checkpoint::Ordered);
aggregator.start_aggregation(block.hash).await;
}
fn process_hash<B, C, JS>(
hash: BlockHash,
multisignature: SignatureSet<Signature>,
justifications_for_chain: &mut JS,
justification_translator: &JustificationTranslator,
client: &Arc<C>,
) -> Result<(), ()>
where
B: Block<Hash = BlockHash>,
B::Header: Header<Number = BlockNumber>,
C: HeaderBackend<B> + Send + Sync + 'static,
JS: JustificationSubmissions<Justification> + Send + Sync + Clone,
{
let number = client.number(hash).unwrap().unwrap();
let justification = match justification_translator.translate(
PhronJustification::CommitteeMultisignature(multisignature),
BlockId::new(hash, number),
) {
Ok(justification) => justification,
Err(e) => {
error!(target: "phron-party", "Issue with translating justification from Aggregator to Sync Justification: {}.", e);
return Err(());
}
};
if let Err(e) = justifications_for_chain.submit(justification) {
error!(target: "phron-party", "Issue with sending justification from Aggregator to JustificationHandler {}.", e);
return Err(());
}
Ok(())
}
async fn run_aggregator<B, C, CN, LN, JS>(
mut aggregator: Aggregator<'_, CN, LN>,
io: IO<JS>,
client: Arc<C>,
session_boundaries: &SessionBoundaries,
metrics: TimingBlockMetrics,
mut exit_rx: oneshot::Receiver<()>,
) -> Result<(), ()>
where
B: Block<Hash = BlockHash>,
B::Header: Header<Number = BlockNumber>,
JS: JustificationSubmissions<Justification> + Send + Sync + Clone,
C: HeaderBackend<B> + Send + Sync + 'static,
LN: Network<LegacyRmcNetworkData>,
CN: Network<CurrentRmcNetworkData>,
{
let IO {
blocks_from_interpreter,
mut justifications_for_chain,
justification_translator,
} = io;
let blocks_from_interpreter = blocks_from_interpreter.take_while(|block| {
let block_num = block.number;
async move {
if block_num == session_boundaries.last_block() {
debug!(target: "phron-party", "Aggregator is processing last block in session.");
}
block_num <= session_boundaries.last_block()
}
});
pin_mut!(blocks_from_interpreter);
let mut hash_of_last_block = None;
let mut no_more_blocks = false;
let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL);
loop {
trace!(target: "phron-party", "Aggregator Loop started a next iteration");
tokio::select! {
maybe_block = blocks_from_interpreter.next() => {
if let Some(block) = maybe_block {
hash_of_last_block = Some(block.hash);
process_new_block_data::<CN, LN>(
&mut aggregator,
block,
&metrics
).await;
} else {
debug!(target: "phron-party", "Blocks ended in aggregator.");
no_more_blocks = true;
}
}
multisigned_hash = aggregator.next_multisigned_hash() => {
if let Some((hash, multisignature)) = multisigned_hash {
process_hash(hash, multisignature, &mut justifications_for_chain, &justification_translator, &client)?;
if Some(hash) == hash_of_last_block {
hash_of_last_block = None;
}
} else {
debug!(target: "phron-party", "The stream of multisigned hashes has ended. Terminating.");
break;
}
}
_ = status_ticker.tick() => {
aggregator.status_report();
},
_ = &mut exit_rx => {
debug!(target: "phron-party", "Aggregator received exit signal. Terminating.");
break;
}
}
if hash_of_last_block.is_none() && no_more_blocks {
debug!(target: "phron-party", "Aggregator processed all provided blocks. Terminating.");
break;
}
}
debug!(target: "phron-party", "Aggregator finished its work.");
Ok(())
}
pub enum AggregatorVersion<CN, LN> {
Current(CN),
Legacy(LN),
}
pub fn task<B, C, CN, LN, JS>(
subtask_common: AuthoritySubtaskCommon,
client: Arc<C>,
io: IO<JS>,
session_boundaries: SessionBoundaries,
metrics: TimingBlockMetrics,
multikeychain: Keychain,
version: AggregatorVersion<CN, LN>,
) -> Task
where
B: Block<Hash = BlockHash>,
B::Header: Header<Number = BlockNumber>,
JS: JustificationSubmissions<Justification> + Send + Sync + Clone + 'static,
C: HeaderBackend<B> + Send + Sync + 'static,
LN: Network<LegacyRmcNetworkData> + 'static,
CN: Network<CurrentRmcNetworkData> + 'static,
{
let AuthoritySubtaskCommon {
spawn_handle,
session_id,
} = subtask_common;
let (stop, exit) = oneshot::channel();
let task = {
async move {
let aggregator_io = match version {
Current(rmc_network) => Aggregator::new_current(&multikeychain, rmc_network),
Legacy(rmc_network) => Aggregator::new_legacy(&multikeychain, rmc_network),
};
debug!(target: "phron-party", "Running the aggregator task for {:?}", session_id);
let result = run_aggregator(
aggregator_io,
io,
client,
&session_boundaries,
metrics,
exit,
)
.await;
debug!(target: "phron-party", "Aggregator task stopped for {:?}", session_id);
result
}
};
let handle =
spawn_handle.spawn_essential_with_result("phron/consensus_session_aggregator", task);
Task::new(handle, stop)
}