use std::{
collections::{hash_map::Entry::Occupied, BTreeMap, HashMap, HashSet},
default::Default,
hash::Hash,
num::NonZeroUsize,
sync::Arc,
time::{self, Duration},
};
use futures::{
channel::{
mpsc::{self, UnboundedSender},
oneshot,
},
StreamExt,
};
use futures_timer::Delay;
use log::{debug, error, info, trace, warn};
use lru::LruCache;
use sc_client_api::{BlockchainEvents, HeaderBackend};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use crate::{
phron_primitives::{BlockHash, BlockNumber},
data_io::{
chain_info::{CachedChainInfoProvider, ChainInfoProvider, SubstrateChainInfoProvider},
legacy::{
proposal::{PhronProposal, PendingProposalStatus, ProposalStatus},
status_provider::get_proposal_status,
PhronNetworkMessage,
},
},
network::data::{
component::{Network as ComponentNetwork, Receiver, SimpleNetwork},
Network as DataNetwork,
},
party::manager::Runnable,
sync::RequestBlocks,
BlockId, SessionBoundaries,
};
type MessageId = u64;
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum ChainEvent {
Imported(BlockId),
Finalized(BlockNumber),
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct PendingProposalInfo {
messages: HashSet<MessageId>,
first_occurrence: time::SystemTime,
status: ProposalStatus,
}
impl PendingProposalInfo {
fn new(status: ProposalStatus) -> Self {
PendingProposalInfo {
messages: HashSet::new(),
first_occurrence: time::SystemTime::now(),
status,
}
}
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct PendingMessageInfo<M: PhronNetworkMessage> {
message: M,
pending_proposals: HashSet<PhronProposal>,
}
impl<M: PhronNetworkMessage> PendingMessageInfo<M> {
fn new(message: M) -> Self {
PendingMessageInfo {
message,
pending_proposals: HashSet::new(),
}
}
}
pub struct DataStoreConfig {
pub max_triggers_pending: usize,
pub max_proposals_pending: usize,
pub max_messages_pending: usize,
pub available_proposals_cache_capacity: NonZeroUsize,
pub periodic_maintenance_interval: Duration,
pub request_block_after: Duration,
}
impl Default for DataStoreConfig {
fn default() -> DataStoreConfig {
DataStoreConfig {
max_triggers_pending: 80_000,
max_proposals_pending: 80_000,
max_messages_pending: 40_000,
available_proposals_cache_capacity: NonZeroUsize::new(8000).unwrap(),
periodic_maintenance_interval: Duration::from_secs(25),
request_block_after: Duration::from_secs(20),
}
}
}
pub struct DataStore<B, C, RB, Message, R>
where
B: BlockT<Hash = BlockHash>,
B::Header: HeaderT<Number = BlockNumber>,
C: HeaderBackend<B> + BlockchainEvents<B> + Send + Sync + 'static,
RB: RequestBlocks + 'static,
Message: PhronNetworkMessage
+ std::fmt::Debug
+ Send
+ Sync
+ Clone
+ parity_scale_codec::Codec
+ 'static,
R: Receiver<Message>,
{
next_free_id: MessageId,
pending_proposals: HashMap<PhronProposal, PendingProposalInfo>,
event_triggers: HashMap<ChainEvent, HashSet<PhronProposal>>,
pending_messages: BTreeMap<MessageId, PendingMessageInfo<Message>>,
chain_info_provider: CachedChainInfoProvider<SubstrateChainInfoProvider<B, C>>,
available_proposals_cache: LruCache<PhronProposal, ProposalStatus>,
num_triggers_registered_since_last_pruning: usize,
highest_finalized_num: BlockNumber,
session_boundaries: SessionBoundaries,
client: Arc<C>,
block_requester: RB,
config: DataStoreConfig,
messages_from_network: R,
messages_for_phron: UnboundedSender<Message>,
}
impl<B, C, RB, Message, R> DataStore<B, C, RB, Message, R>
where
B: BlockT<Hash = BlockHash>,
B::Header: HeaderT<Number = BlockNumber>,
C: HeaderBackend<B> + BlockchainEvents<B> + Send + Sync + 'static,
RB: RequestBlocks + 'static,
Message: PhronNetworkMessage
+ std::fmt::Debug
+ Send
+ Sync
+ Clone
+ parity_scale_codec::Codec
+ 'static,
R: Receiver<Message>,
{
pub fn new<N: ComponentNetwork<Message, R = R>>(
session_boundaries: SessionBoundaries,
client: Arc<C>,
block_requester: RB,
config: DataStoreConfig,
component_network: N,
) -> (Self, impl DataNetwork<Message>) {
let (messages_for_phron, messages_from_data_store) = mpsc::unbounded();
let (messages_to_network, messages_from_network) = component_network.into();
let status = client.info();
let chain_info_provider = CachedChainInfoProvider::new(
SubstrateChainInfoProvider::new(client.clone()),
Default::default(),
);
let highest_finalized_num = status.finalized_number;
(
DataStore {
next_free_id: 0,
pending_proposals: HashMap::new(),
event_triggers: HashMap::new(),
pending_messages: BTreeMap::new(),
chain_info_provider,
available_proposals_cache: LruCache::new(config.available_proposals_cache_capacity),
num_triggers_registered_since_last_pruning: 0,
highest_finalized_num,
session_boundaries,
client,
block_requester,
config,
messages_from_network,
messages_for_phron,
},
SimpleNetwork::new(messages_from_data_store, messages_to_network),
)
}
pub async fn run(&mut self, mut exit: oneshot::Receiver<()>) {
let mut maintenance_clock = Delay::new(self.config.periodic_maintenance_interval);
let mut import_stream = self.client.import_notification_stream();
let mut finality_stream = self.client.finality_notification_stream();
loop {
self.prune_pending_messages();
self.prune_triggers();
tokio::select! {
Some(message) = self.messages_from_network.next() => {
trace!(target: "phron-data-store", "Received message at Data Store {:?}", message);
self.on_message_received(message);
}
Some(block) = &mut import_stream.next() => {
trace!(target: "phron-data-store", "Block import notification at Data Store for block {:?}", block);
self.on_block_imported((block.header.hash(), *block.header.number()).into());
},
Some(block) = &mut finality_stream.next() => {
trace!(target: "phron-data-store", "Finalized block import notification at Data Store for block {:?}", block);
self.on_block_finalized((block.header.hash(), *block.header.number()).into());
}
_ = &mut maintenance_clock => {
self.run_maintenance();
maintenance_clock = Delay::new(self.config.periodic_maintenance_interval);
}
_ = &mut exit => {
debug!(target: "phron-data-store", "Data Store task received exit signal. Terminating.");
break;
}
}
}
}
fn update_highest_finalized(&mut self) {
let highest_finalized = self.chain_info_provider.get_highest_finalized();
self.on_block_imported(highest_finalized.clone());
self.on_block_finalized(highest_finalized);
}
fn run_maintenance(&mut self) {
self.update_highest_finalized();
let proposals_with_timestamps: Vec<_> = self
.pending_proposals
.iter()
.map(|(proposal, info)| (proposal.clone(), info.first_occurrence))
.collect();
match proposals_with_timestamps.len() {
0 => {
trace!(target: "phron-data-store", "No pending proposals in data store during maintenance.");
}
1..=5 => {
info!(target: "phron-data-store", "Data Store maintenance. Awaiting {:?} proposals: {:?}",proposals_with_timestamps.len(), proposals_with_timestamps);
}
_ => {
info!(target: "phron-data-store", "Data Store maintenance. Awaiting {:?} proposals: (showing 5 initial only) {:?}",proposals_with_timestamps.len(), &proposals_with_timestamps[..5]);
}
}
let now = time::SystemTime::now();
for (proposal, first_occurrence) in proposals_with_timestamps {
if self.bump_proposal(&proposal) {
continue;
}
let time_waiting = match now.duration_since(first_occurrence) {
Ok(tw) if tw >= self.config.request_block_after => tw,
_ => continue,
};
let block = proposal.top_block();
if !self.chain_info_provider.is_block_imported(&block) {
debug!(target: "phron-data-store", "Requesting a block {:?} after it has been missing for {:?} secs.", block, time_waiting.as_secs());
if let Err(e) = self.block_requester.request_block(block.clone()) {
warn!(target: "phron-data-store", "Error requesting block {:?}, {}.", block, e);
}
continue;
}
let bottom_block = proposal.bottom_block();
let parent_hash = match self.chain_info_provider.get_parent_hash(&bottom_block) {
Ok(ph) => ph,
_ => {
warn!(target: "phron-data-store", "Expected the block below the proposal {:?} to be imported", proposal);
continue;
}
};
let parent_num = bottom_block.number - 1;
if let Ok(finalized_block) = self.chain_info_provider.get_finalized_at(parent_num) {
if parent_hash != finalized_block.hash {
warn!(target: "phron-data-store", "The proposal {:?} is pending because the parent: \
{:?}, does not agree with the block finalized at this height: {:?}.", proposal, parent_hash, finalized_block);
} else {
warn!(target: "phron-data-store", "The proposal {:?} is pending even though blocks \
have been imported and parent was finalized.", proposal);
}
} else {
debug!(target: "phron-data-store", "Justification for block {:?} {:?} \
still not present after {:?} secs.", parent_num, parent_hash, time_waiting.as_secs());
}
}
}
fn register_block_import_trigger(&mut self, proposal: &PhronProposal, block: &BlockId) {
self.num_triggers_registered_since_last_pruning += 1;
self.event_triggers
.entry(ChainEvent::Imported(block.clone()))
.or_default()
.insert(proposal.clone());
}
fn register_finality_trigger(&mut self, proposal: &PhronProposal, number: BlockNumber) {
self.num_triggers_registered_since_last_pruning += 1;
if number > self.highest_finalized_num {
self.event_triggers
.entry(ChainEvent::Finalized(number))
.or_default()
.insert(proposal.clone());
}
}
fn register_next_finality_trigger(&mut self, proposal: &PhronProposal) {
if self.highest_finalized_num < proposal.number_below_branch() {
self.register_finality_trigger(proposal, proposal.number_below_branch());
} else if self.highest_finalized_num < proposal.number_top_block() {
self.register_finality_trigger(proposal, self.highest_finalized_num + 1);
}
}
fn on_block_finalized(&mut self, block: BlockId) {
if self.highest_finalized_num < block.number {
let old_num = self.highest_finalized_num;
let new_num = block.number;
self.highest_finalized_num = new_num;
let mut num = old_num + 1;
while num <= new_num {
if let Some(proposals_to_bump) =
self.event_triggers.remove(&ChainEvent::Finalized(num))
{
for proposal in proposals_to_bump {
self.bump_proposal(&proposal);
}
}
num += 1;
}
}
}
fn on_block_imported(&mut self, block: BlockId) {
if let Some(proposals_to_bump) = self.event_triggers.remove(&ChainEvent::Imported(block)) {
for proposal in proposals_to_bump {
self.bump_proposal(&proposal);
}
}
}
fn on_proposal_available(&mut self, proposal: &PhronProposal) {
if let Some(proposal_info) = self.pending_proposals.remove(proposal) {
for id in proposal_info.messages {
self.remove_proposal_from_pending_message(proposal, id);
}
}
}
fn bump_proposal(&mut self, proposal: &PhronProposal) -> bool {
let old_status = match self.pending_proposals.get(proposal) {
None => {
return false;
}
Some(info) => info.status.clone(),
};
let new_status = self.check_proposal_availability(proposal, Some(&old_status));
self.pending_proposals.get_mut(proposal).unwrap().status = new_status.clone();
use PendingProposalStatus::*;
use ProposalStatus::*;
match new_status {
Pending(PendingTopBlock) => {
self.register_next_finality_trigger(proposal);
false
}
Pending(TopBlockImportedButIncorrectBranch) => {
false
}
Pending(TopBlockImportedButNotFinalizedAncestor) => {
self.register_next_finality_trigger(proposal);
false
}
Finalize(_) | Ignore => {
self.on_proposal_available(proposal);
true
}
}
}
fn check_proposal_availability(
&mut self,
proposal: &PhronProposal,
old_status: Option<&ProposalStatus>,
) -> ProposalStatus {
if let Some(status) = self.available_proposals_cache.get(proposal) {
return status.clone();
}
let status = get_proposal_status(&mut self.chain_info_provider, proposal, old_status);
match status {
ProposalStatus::Finalize(_) | ProposalStatus::Ignore => {
self.available_proposals_cache
.put(proposal.clone(), status.clone());
}
_ => {}
}
status
}
fn add_message_proposal_dependency(
&mut self,
proposal: &PhronProposal,
message_info: &mut PendingMessageInfo<Message>,
id: MessageId,
) {
if !self.pending_proposals.contains_key(proposal) {
use PendingProposalStatus::*;
use ProposalStatus::*;
let status = self.check_proposal_availability(proposal, None);
match &status {
Pending(PendingTopBlock) => {
self.pending_proposals
.insert(proposal.clone(), PendingProposalInfo::new(status));
self.register_block_import_trigger(proposal, &proposal.top_block());
self.register_next_finality_trigger(proposal);
}
Pending(TopBlockImportedButIncorrectBranch) => {
self.pending_proposals
.insert(proposal.clone(), PendingProposalInfo::new(status));
self.register_next_finality_trigger(proposal);
}
Pending(TopBlockImportedButNotFinalizedAncestor) => {
self.pending_proposals
.insert(proposal.clone(), PendingProposalInfo::new(status));
self.register_next_finality_trigger(proposal);
}
Finalize(_) | Ignore => {
return;
}
}
}
let proposal_info = self
.pending_proposals
.get_mut(proposal)
.expect("exists as checked above");
proposal_info.messages.insert(id);
message_info.pending_proposals.insert(proposal.clone());
}
fn on_message_dependencies_resolved(&self, message: Message) {
trace!(target: "phron-data-store", "Sending message from DataStore {:?}", message);
if let Err(e) = self.messages_for_phron.unbounded_send(message) {
error!(target: "phron-data-store", "Unable to send a ready message from DataStore {}", e);
}
}
fn assign_fresh_message_id(&mut self) -> MessageId {
self.next_free_id += 1;
self.next_free_id - 1
}
fn remove_proposal_from_pending_message(&mut self, proposal: &PhronProposal, id: MessageId) {
let mut message_info = match self.pending_messages.remove(&id) {
Some(message_info) => message_info,
None => {
warn!(target: "phron-data-store", "Message {:?} not found when resolving a proposal dependency {:?}.", id, proposal);
return;
}
};
message_info.pending_proposals.remove(proposal);
if message_info.pending_proposals.is_empty() {
self.on_message_dependencies_resolved(message_info.message);
} else {
self.pending_messages.insert(id, message_info);
}
}
fn remove_message_id_from_pending_proposal(&mut self, proposal: &PhronProposal, id: MessageId) {
if let Occupied(mut proposal_entry) = self.pending_proposals.entry(proposal.clone()) {
let proposal_info = proposal_entry.get_mut();
proposal_info.messages.remove(&id);
if proposal_info.messages.is_empty() {
proposal_entry.remove();
}
} else {
warn!(target: "phron-data-store", "Proposal {:?} with id {:?} referenced in message does not exist", proposal, id);
}
}
fn prune_single_message(&mut self) -> bool {
let maybe_id = self.pending_messages.keys().next().cloned();
if let Some(id) = maybe_id {
if let Some(message_info) = self.pending_messages.remove(&id) {
for proposal in message_info.pending_proposals {
self.remove_message_id_from_pending_proposal(&proposal, id);
}
true
} else {
warn!(
"Trying to prune a message whose id is not in pending messages {:?}",
id
);
false
}
} else {
warn!(target: "phron-data-store", "Tried to prune a message but there are none pending.");
false
}
}
fn prune_pending_messages(&mut self) {
while self.pending_messages.len() > self.config.max_messages_pending
|| self.pending_proposals.len() > self.config.max_proposals_pending
{
if !self.prune_single_message() {
warn!(target: "phron-data-store", "Message pruning in DataStore failed. Moving on.");
break;
}
}
}
fn prune_triggers(&mut self) {
if self.num_triggers_registered_since_last_pruning > self.config.max_triggers_pending {
let pending_proposals = &self.pending_proposals;
self.event_triggers.retain(|_event, proposal_set| {
proposal_set.retain(|proposal| pending_proposals.contains_key(proposal));
!proposal_set.is_empty()
});
self.num_triggers_registered_since_last_pruning = 0;
}
}
fn on_message_received(&mut self, message: Message) {
let mut proposals = Vec::new();
for data in message.included_data() {
let unvalidated_proposal = data.head_proposal;
match unvalidated_proposal.validate_bounds(&self.session_boundaries) {
Ok(proposal) => proposals.push(proposal),
Err(error) => {
warn!(target: "phron-data-store", "Message {:?} dropped as it contains \
proposal {:?} not within bounds ({:?}).", message, unvalidated_proposal, error);
return;
}
}
}
let mut message_info = PendingMessageInfo::new(message.clone());
let message_id = self.assign_fresh_message_id();
for proposal in proposals {
self.add_message_proposal_dependency(&proposal, &mut message_info, message_id);
}
if message_info.pending_proposals.is_empty() {
self.on_message_dependencies_resolved(message);
} else {
self.pending_messages.insert(message_id, message_info);
}
}
}
#[async_trait::async_trait]
impl<B, C, RB, Message, R> Runnable for DataStore<B, C, RB, Message, R>
where
B: BlockT<Hash = BlockHash>,
B::Header: HeaderT<Number = BlockNumber>,
C: HeaderBackend<B> + BlockchainEvents<B> + Send + Sync + 'static,
RB: RequestBlocks + 'static,
Message: PhronNetworkMessage
+ std::fmt::Debug
+ Send
+ Sync
+ Clone
+ parity_scale_codec::Codec
+ 'static,
R: Receiver<Message> + 'static,
{
async fn run(mut self, exit: oneshot::Receiver<()>) {
DataStore::run(&mut self, exit).await
}
}