use std::{collections::HashSet, time::Duration};
use futures::{channel::mpsc, StreamExt};
use log::{debug, error, trace, warn};
use substrate_prometheus_endpoint::Registry;
use crate::{
network::GossipNetwork,
session::SessionBoundaryInfo,
sync::{
data::{
NetworkData, PreRequest, Request, ResponseItem, ResponseItems, State, VersionWrapper,
VersionedNetworkData,
},
forest::ExtensionRequest,
handler::{Action, DatabaseIO, Error as HandlerError, HandleStateAction, Handler},
message_limiter::{Error as MsgLimiterError, MsgLimiter},
metrics::{Event, Metrics},
task_queue::TaskQueue,
tasks::{Action as TaskAction, RequestTask},
ticker::Ticker,
Block, BlockId, BlockImport, ChainStatus, ChainStatusNotification, ChainStatusNotifier,
Finalizer, Justification, JustificationSubmissions, RequestBlocks, UnverifiedHeader,
UnverifiedHeaderFor, UnverifiedJustification, Verifier, LOG_TARGET,
},
SyncOracle,
};
const BROADCAST_COOLDOWN: Duration = Duration::from_millis(600);
const CHAIN_EXTENSION_COOLDOWN: Duration = Duration::from_millis(300);
const TICK_PERIOD: Duration = Duration::from_secs(5);
pub struct IO<B, J, N, CE, CS, F, BI>
where
J: Justification,
B: Block<UnverifiedHeader = UnverifiedHeaderFor<J>>,
N: GossipNetwork<VersionedNetworkData<B, J>>,
CE: ChainStatusNotifier<J::Header>,
CS: ChainStatus<B, J>,
F: Finalizer<J>,
BI: BlockImport<B>,
{
network: N,
chain_events: CE,
sync_oracle: SyncOracle,
additional_justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
blocks_from_creator: mpsc::UnboundedReceiver<B>,
database_io: DatabaseIO<B, J, CS, F, BI>,
}
impl<B, J, N, CE, CS, F, BI> IO<B, J, N, CE, CS, F, BI>
where
J: Justification,
B: Block<UnverifiedHeader = UnverifiedHeaderFor<J>>,
N: GossipNetwork<VersionedNetworkData<B, J>>,
CE: ChainStatusNotifier<J::Header>,
CS: ChainStatus<B, J>,
F: Finalizer<J>,
BI: BlockImport<B>,
{
pub fn new(
database_io: DatabaseIO<B, J, CS, F, BI>,
network: N,
chain_events: CE,
sync_oracle: SyncOracle,
additional_justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
blocks_from_creator: mpsc::UnboundedReceiver<B>,
) -> Self {
IO {
network,
chain_events,
sync_oracle,
additional_justifications_from_user,
blocks_from_creator,
database_io,
}
}
}
pub struct Service<B, J, N, CE, CS, V, F, BI>
where
J: Justification,
B: Block<UnverifiedHeader = UnverifiedHeaderFor<J>>,
N: GossipNetwork<VersionedNetworkData<B, J>>,
CE: ChainStatusNotifier<J::Header>,
CS: ChainStatus<B, J>,
V: Verifier<J>,
F: Finalizer<J>,
BI: BlockImport<B>,
{
network: VersionWrapper<B, J, N>,
handler: Handler<B, N::PeerId, J, CS, V, F, BI>,
tasks: TaskQueue<RequestTask>,
broadcast_ticker: Ticker,
chain_extension_ticker: Ticker,
chain_events: CE,
justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
additional_justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
block_requests_from_user: mpsc::UnboundedReceiver<BlockId>,
blocks_from_creator: mpsc::UnboundedReceiver<B>,
metrics: Metrics,
}
impl<J: Justification> JustificationSubmissions<J> for mpsc::UnboundedSender<J::Unverified> {
type Error = mpsc::TrySendError<J::Unverified>;
fn submit(&mut self, justification: J::Unverified) -> Result<(), Self::Error> {
self.unbounded_send(justification)
}
}
impl RequestBlocks for mpsc::UnboundedSender<BlockId> {
type Error = mpsc::TrySendError<BlockId>;
fn request_block(&self, block_id: BlockId) -> Result<(), Self::Error> {
self.unbounded_send(block_id)
}
}
impl<B, J, N, CE, CS, V, F, BI> Service<B, J, N, CE, CS, V, F, BI>
where
J: Justification,
B: Block<UnverifiedHeader = UnverifiedHeaderFor<J>>,
N: GossipNetwork<VersionedNetworkData<B, J>>,
CE: ChainStatusNotifier<J::Header>,
CS: ChainStatus<B, J>,
V: Verifier<J>,
F: Finalizer<J>,
BI: BlockImport<B>,
{
pub fn new(
verifier: V,
session_info: SessionBoundaryInfo,
io: IO<B, J, N, CE, CS, F, BI>,
metrics_registry: Option<Registry>,
) -> Result<
(
Self,
impl JustificationSubmissions<J> + Clone,
impl RequestBlocks,
),
HandlerError<B, J, CS, V, F>,
> {
let IO {
network,
chain_events,
sync_oracle,
additional_justifications_from_user,
blocks_from_creator,
database_io,
} = io;
let network = VersionWrapper::new(network);
let handler = Handler::new(database_io, verifier, sync_oracle, session_info)?;
let tasks = TaskQueue::new();
let broadcast_ticker = Ticker::new(TICK_PERIOD, BROADCAST_COOLDOWN);
let chain_extension_ticker = Ticker::new(TICK_PERIOD, CHAIN_EXTENSION_COOLDOWN);
let (justifications_for_sync, justifications_from_user) = mpsc::unbounded();
let (block_requests_for_sync, block_requests_from_user) = mpsc::unbounded();
let metrics = match Metrics::new(metrics_registry) {
Ok(metrics) => metrics,
Err(e) => {
warn!(target: LOG_TARGET, "Failed to create metrics: {}.", e);
Metrics::noop()
}
};
Ok((
Service {
network,
handler,
tasks,
broadcast_ticker,
chain_extension_ticker,
chain_events,
justifications_from_user,
additional_justifications_from_user,
blocks_from_creator,
block_requests_from_user,
metrics,
},
justifications_for_sync,
block_requests_for_sync,
))
}
fn request_block(&mut self, block_id: BlockId) {
debug!(
target: LOG_TARGET,
"Initiating a request for block {:?}.", block_id
);
self.tasks
.schedule_in(RequestTask::new(block_id), Duration::ZERO);
}
fn broadcast(&mut self) {
self.metrics.report_event(Event::Broadcast);
self.broadcast_ticker.reset();
let state = match self.handler.state() {
Ok(state) => state,
Err(e) => {
self.metrics.report_event_error(Event::Broadcast);
warn!(
target: LOG_TARGET,
"Failed to construct own knowledge state: {}.", e
);
return;
}
};
trace!(target: LOG_TARGET, "Broadcasting state: {:?}", state);
self.metrics
.update_top_finalized_block(state.top_justification().header().id().number());
self.metrics
.update_best_block(state.favourite_block().id().number());
let data = NetworkData::StateBroadcast(state);
if let Err(e) = self.network.broadcast(data) {
self.metrics.report_event_error(Event::Broadcast);
warn!(target: LOG_TARGET, "Error sending broadcast: {}.", e)
}
}
fn request_favourite_extension(&mut self, know_most: HashSet<N::PeerId>) {
self.metrics.report_event(Event::SendExtensionRequest);
let data = match self.handler.state() {
Ok(state) => NetworkData::ChainExtensionRequest(state),
Err(e) => {
self.metrics.report_event_error(Event::SendExtensionRequest);
warn!(
target: LOG_TARGET,
"Error producing state for chain extension request: {}.", e
);
return;
}
};
match self.network.send_to_random(data, know_most) {
Ok(()) => self.chain_extension_ticker.reset(),
Err(e) => {
self.metrics.report_event_error(Event::SendExtensionRequest);
warn!(
target: LOG_TARGET,
"Error sending chain extension request: {}.", e
);
}
}
}
fn request_chain_extension(&mut self, force: bool) {
use ExtensionRequest::*;
match self.handler.extension_request() {
FavouriteBlock { know_most } => self.request_favourite_extension(know_most),
HighestJustified {
id,
know_most,
branch_knowledge,
} => {
self.send_request(PreRequest::new(id, branch_knowledge, know_most));
self.chain_extension_ticker.reset();
}
Noop => {
if force {
self.request_favourite_extension(HashSet::new());
}
}
}
}
fn try_request_chain_extension(&mut self) {
if self.chain_extension_ticker.try_tick() {
self.request_chain_extension(false);
}
}
fn send_request(&mut self, pre_request: PreRequest<N::PeerId>) {
self.metrics.report_event(Event::SendRequest);
let state = match self.handler.state() {
Ok(state) => state,
Err(e) => {
self.metrics.report_event_error(Event::SendRequest);
warn!(
target: LOG_TARGET,
"Failed to construct own knowledge state: {}.", e
);
return;
}
};
let (request, peers) = pre_request.with_state(state);
trace!(target: LOG_TARGET, "Sending a request: {:?}", request);
let data = NetworkData::Request(request);
if let Err(e) = self.network.send_to_random(data, peers) {
self.metrics.report_event_error(Event::SendRequest);
warn!(target: LOG_TARGET, "Error sending request: {}.", e);
}
}
fn send_to(&mut self, data: NetworkData<B, J>, peer: N::PeerId) {
self.metrics.report_event(Event::SendTo);
trace!(
target: LOG_TARGET,
"Sending data {:?} to peer {:?}",
data,
peer
);
if let Err(e) = self.network.send_to(data, peer) {
self.metrics.report_event_error(Event::SendTo);
warn!(target: LOG_TARGET, "Error sending response: {}.", e);
}
}
fn handle_state(&mut self, state: State<J>, peer: N::PeerId) {
self.metrics.report_event(Event::HandleState);
use HandleStateAction::*;
trace!(
target: LOG_TARGET,
"Handling state {:?} received from {:?}.",
state,
peer
);
match self.handler.handle_state(state, peer.clone()) {
Ok(action) => match action {
Response(data) => self.send_to(data, peer),
ExtendChain => self.try_request_chain_extension(),
Noop => (),
},
Err(e) => {
self.metrics.report_event_error(Event::HandleState);
match e {
HandlerError::Verifier(e) => debug!(
target: LOG_TARGET,
"Could not verify data in sync state from {:?}: {}.", peer, e
),
e => warn!(
target: LOG_TARGET,
"Failed to handle sync state from {:?}: {}.", peer, e
),
}
}
}
}
fn handle_state_response(
&mut self,
justification: J::Unverified,
maybe_justification: Option<J::Unverified>,
peer: N::PeerId,
) {
trace!(
target: LOG_TARGET,
"Handling state response {:?} {:?} received from {:?}.",
justification,
maybe_justification,
peer
);
self.metrics.report_event(Event::HandleStateResponse);
let (new_info, maybe_error) =
self.handler
.handle_state_response(justification, maybe_justification, peer.clone());
match maybe_error {
Some(HandlerError::Verifier(e)) => debug!(
target: LOG_TARGET,
"Could not verify justification in sync state from {:?}: {}.", peer, e
),
Some(e) => warn!(
target: LOG_TARGET,
"Failed to handle sync state response from {:?}: {}.", peer, e
),
_ => {}
}
if new_info {
self.try_request_chain_extension();
}
}
fn handle_justification_from_user(&mut self, justification: J::Unverified) {
trace!(
target: LOG_TARGET,
"Handling a justification {:?} from user.",
justification,
);
self.metrics
.report_event(Event::HandleJustificationFromUser);
match self.handler.handle_justification_from_user(justification) {
Ok(true) => self.try_request_chain_extension(),
Ok(false) => {}
Err(e) => {
self.metrics
.report_event_error(Event::HandleJustificationFromUser);
match e {
HandlerError::Verifier(e) => debug!(
target: LOG_TARGET,
"Could not verify justification from user: {}", e
),
e => warn!(
target: LOG_TARGET,
"Failed to handle justification from user: {}", e
),
}
}
}
}
fn handle_request_response(&mut self, response_items: ResponseItems<B, J>, peer: N::PeerId) {
trace!(
target: LOG_TARGET,
"Handling request response from peer {:?}. Items: {:?}.",
peer,
response_items,
);
self.metrics.report_event(Event::HandleRequestResponse);
let (new_info, maybe_error) = self
.handler
.handle_request_response(response_items, peer.clone());
match maybe_error {
Some(HandlerError::Verifier(e)) => {
debug!(target: LOG_TARGET, "Could not verify data from user: {}", e)
}
Some(e) => warn!(
target: LOG_TARGET,
"Failed to handle sync request response from {:?}: {}.", peer, e
),
_ => {}
}
if new_info {
self.try_request_chain_extension();
}
}
fn send_big_response(
&mut self,
response_items: &[ResponseItem<B, J>],
peer: N::PeerId,
) -> Result<(), MsgLimiterError> {
let mut limiter = MsgLimiter::new(response_items);
while let Some(chunk) = limiter.next_largest_msg()? {
self.send_to(NetworkData::RequestResponse(chunk.to_vec()), peer.clone())
}
Ok(())
}
fn handle_request(&mut self, request: Request<J>, peer: N::PeerId) {
trace!(
target: LOG_TARGET,
"Handling a request {:?} from {:?}.",
request,
peer
);
self.metrics.report_event(Event::HandleRequest);
match self.handler.handle_request(request) {
Ok(Action::Response(response_items)) => {
if let Err(e) = self.send_big_response(&response_items, peer) {
error!(
target: LOG_TARGET,
"Error while sending request response: {}.", e
);
self.metrics.report_event_error(Event::HandleRequest);
}
}
Ok(Action::RequestBlock(id)) => self.request_block(id),
Err(e) => {
self.metrics.report_event_error(Event::HandleRequest);
match e {
HandlerError::Verifier(e) => debug!(
target: LOG_TARGET,
"Could not verify justification from user: {}", e
),
e => warn!(
target: LOG_TARGET,
"Error handling request from {:?}: {}.", peer, e
),
}
}
_ => {}
}
}
fn handle_task(&mut self, task: RequestTask) {
trace!(target: LOG_TARGET, "Handling task {}.", task);
if let TaskAction::Request(pre_request, (task, delay)) =
task.process(self.handler.interest_provider())
{
self.send_request(pre_request);
self.tasks.schedule_in(task, delay);
}
self.metrics.report_event(Event::HandleTask);
}
fn handle_chain_event(&mut self, event: ChainStatusNotification<J::Header>) {
use ChainStatusNotification::*;
match event {
BlockImported(header) => {
trace!(target: LOG_TARGET, "Handling a new imported block.");
self.metrics.report_event(Event::HandleBlockImported);
if let Err(e) = self.handler.block_imported(header) {
self.metrics.report_event_error(Event::HandleBlockImported);
error!(
target: LOG_TARGET,
"Error marking block as imported: {}.", e
)
}
}
BlockFinalized(_) => {
trace!(target: LOG_TARGET, "Handling a new finalized block.");
self.metrics.report_event(Event::HandleBlockFinalized);
}
}
if self.broadcast_ticker.try_tick() {
self.broadcast();
}
}
fn handle_internal_request(&mut self, id: BlockId) {
trace!(
target: LOG_TARGET,
"Handling an internal request for block {:?}.",
id,
);
self.metrics.report_event(Event::HandleInternalRequest);
match self.handler.handle_internal_request(&id) {
Ok(true) => self.request_block(id),
Ok(_) => debug!(target: LOG_TARGET, "Already requested block {:?}.", id),
Err(e) => {
self.metrics.report_event(Event::HandleInternalRequest);
match e {
HandlerError::Verifier(e) => debug!(
target: LOG_TARGET,
"Could not verify justification from user: {}", e
),
e => warn!(
target: LOG_TARGET,
"Error handling internal request for block {:?}: {}.", id, e
),
}
}
}
}
fn handle_chain_extension_request(&mut self, state: State<J>, peer: N::PeerId) {
self.metrics.report_event(Event::HandleExtensionRequest);
match self.handler.handle_chain_extension_request(state) {
Ok(Action::Response(response_items)) => {
if let Err(e) = self.send_big_response(&response_items, peer) {
error!(
target: LOG_TARGET,
"Error while sending chain extension request response: {}.", e
);
self.metrics
.report_event_error(Event::HandleExtensionRequest);
}
}
Ok(Action::RequestBlock(id)) => self.request_block(id),
Ok(Action::Noop) => {}
Err(e) => {
self.metrics
.report_event_error(Event::HandleExtensionRequest);
match e {
HandlerError::Verifier(e) => debug!(
target: LOG_TARGET,
"Could not verify justification from {:?}: {}", peer, e
),
e => warn!(
target: LOG_TARGET,
"Error handling chain extension request from {:?}: {}.", peer, e
),
}
}
}
}
fn handle_network_data(&mut self, data: NetworkData<B, J>, peer: N::PeerId) {
use NetworkData::*;
match data {
StateBroadcast(state) => self.handle_state(state, peer),
StateBroadcastResponse(justification, maybe_justification) => {
self.handle_state_response(justification, maybe_justification, peer)
}
Request(request) => {
let state = request.state().clone();
self.handle_request(request, peer.clone());
self.handle_state(state, peer);
}
RequestResponse(response_items) => self.handle_request_response(response_items, peer),
ChainExtensionRequest(state) => self.handle_chain_extension_request(state, peer),
}
}
fn handle_own_block(&mut self, block: B) {
let broadcast = self.handler.handle_own_block(block);
if let Err(e) = self
.network
.broadcast(NetworkData::RequestResponse(broadcast))
{
warn!(
target: LOG_TARGET,
"Error broadcasting newly created block: {}.", e
)
}
}
pub async fn run(mut self) {
loop {
tokio::select! {
maybe_data = self.network.next() => match maybe_data {
Ok((data, peer)) => self.handle_network_data(data, peer),
Err(e) => warn!(target: LOG_TARGET, "Error receiving data from network: {}.", e),
},
Some(task) = self.tasks.pop() => self.handle_task(task),
_ = self.broadcast_ticker.wait_and_tick() => self.broadcast(),
force = self.chain_extension_ticker.wait_and_tick() => self.request_chain_extension(force),
maybe_event = self.chain_events.next() => match maybe_event {
Ok(chain_event) => self.handle_chain_event(chain_event),
Err(e) => warn!(target: LOG_TARGET, "Error when receiving a chain event: {}.", e),
},
maybe_justification = self.justifications_from_user.next() => match maybe_justification {
Some(justification) => {
debug!(target: LOG_TARGET, "Received new justification from user: {:?}.", justification);
self.handle_justification_from_user(justification);
},
None => warn!(target: LOG_TARGET, "Channel with justifications from user closed."),
},
maybe_justification = self.additional_justifications_from_user.next() => match maybe_justification {
Some(justification) => {
debug!(target: LOG_TARGET, "Received new additional justification from user: {:?}.", justification);
self.handle_justification_from_user(justification);
},
None => warn!(target: LOG_TARGET, "Channel with additional justifications from user closed."),
},
maybe_block_id = self.block_requests_from_user.next() => match maybe_block_id {
Some(block_id) => {
debug!(target: LOG_TARGET, "Received new internal block request from user: {:?}.", block_id);
self.handle_internal_request(block_id)
},
None => warn!(target: LOG_TARGET, "Channel with internal block request from user closed."),
},
maybe_own_block = self.blocks_from_creator.next() => match maybe_own_block {
Some(block) => {
debug!(target: LOG_TARGET, "Received new own block: {:?}.", block.header().id());
self.handle_own_block(block)
},
None => warn!(target: LOG_TARGET, "Channel with own blocks closed."),
},
}
}
}
}