use std::{
collections::{HashMap, HashSet},
fmt::{Debug, Display, Error as FmtError, Formatter},
future::Future,
hash::Hash,
time::Instant,
};
use futures::{channel::mpsc, StreamExt};
use log::{debug, error, info, trace, warn};
use network_clique::SpawnHandleT;
use rand::{seq::IteratorRandom, thread_rng};
use substrate_prometheus_endpoint::Registry;
use tokio::time;
const MAX_QUEUE_SIZE: usize = 16;
use crate::{
network::{
gossip::{
metrics::Metrics, Event, EventStream, Network, NetworkSender, Protocol, RawNetwork,
},
Data,
},
SpawnHandle, STATUS_REPORT_INTERVAL,
};
const LOG_TARGET: &str = "phron-network";
enum Command<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
Send(D, P),
SendToRandom(D, HashSet<P>),
Broadcast(D),
}
pub struct Service<N: RawNetwork, AD: Data, BSD: Data> {
network: N,
messages_from_authentication_user: mpsc::UnboundedReceiver<Command<AD, N::PeerId>>,
messages_from_block_sync_user: mpsc::UnboundedReceiver<Command<BSD, N::PeerId>>,
messages_for_authentication_user: mpsc::UnboundedSender<(AD, N::PeerId)>,
messages_for_block_sync_user: mpsc::UnboundedSender<(BSD, N::PeerId)>,
authentication_connected_peers: HashSet<N::PeerId>,
authentication_peer_senders: HashMap<N::PeerId, mpsc::Sender<AD>>,
block_sync_connected_peers: HashSet<N::PeerId>,
block_sync_peer_senders: HashMap<N::PeerId, mpsc::Sender<BSD>>,
spawn_handle: SpawnHandle,
metrics: Metrics,
timestamp_of_last_log_that_channel_is_full: HashMap<(N::PeerId, Protocol), Instant>,
}
struct ServiceInterface<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
messages_from_service: mpsc::UnboundedReceiver<(D, P)>,
messages_for_service: mpsc::UnboundedSender<Command<D, P>>,
}
#[derive(Debug)]
pub enum Error {
ServiceStopped,
}
impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use Error::*;
match self {
ServiceStopped => {
write!(f, "gossip network service stopped")
}
}
}
}
#[async_trait::async_trait]
impl<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> Network<D> for ServiceInterface<D, P> {
type Error = Error;
type PeerId = P;
fn send_to(&mut self, data: D, peer_id: Self::PeerId) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::Send(data, peer_id))
.map_err(|_| Error::ServiceStopped)
}
fn send_to_random(
&mut self,
data: D,
peer_ids: HashSet<Self::PeerId>,
) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::SendToRandom(data, peer_ids))
.map_err(|_| Error::ServiceStopped)
}
fn broadcast(&mut self, data: D) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::Broadcast(data))
.map_err(|_| Error::ServiceStopped)
}
async fn next(&mut self) -> Result<(D, Self::PeerId), Self::Error> {
self.messages_from_service
.next()
.await
.ok_or(Error::ServiceStopped)
}
}
#[derive(Debug)]
enum SendError {
MissingSender,
SendingFailed,
}
impl<N: RawNetwork, AD: Data, BSD: Data> Service<N, AD, BSD> {
pub fn new(
network: N,
spawn_handle: SpawnHandle,
metrics_registry: Option<Registry>,
) -> (
Self,
impl Network<AD, Error = Error, PeerId = N::PeerId>,
impl Network<BSD, Error = Error, PeerId = N::PeerId>,
) {
let (messages_for_authentication_user, messages_from_authentication_service) =
mpsc::unbounded();
let (messages_for_block_sync_user, messages_from_block_sync_service) = mpsc::unbounded();
let (messages_for_authentication_service, messages_from_authentication_user) =
mpsc::unbounded();
let (messages_for_block_sync_service, messages_from_block_sync_user) = mpsc::unbounded();
let metrics = match Metrics::new(metrics_registry) {
Ok(metrics) => metrics,
Err(e) => {
warn!(target: LOG_TARGET, "Failed to create metrics: {e}.");
Metrics::noop()
}
};
(
Service {
network,
messages_from_authentication_user,
messages_from_block_sync_user,
messages_for_authentication_user,
messages_for_block_sync_user,
spawn_handle,
metrics,
authentication_connected_peers: HashSet::new(),
authentication_peer_senders: HashMap::new(),
block_sync_connected_peers: HashSet::new(),
block_sync_peer_senders: HashMap::new(),
timestamp_of_last_log_that_channel_is_full: HashMap::new(),
},
ServiceInterface {
messages_from_service: messages_from_authentication_service,
messages_for_service: messages_for_authentication_service,
},
ServiceInterface {
messages_from_service: messages_from_block_sync_service,
messages_for_service: messages_for_block_sync_service,
},
)
}
fn get_authentication_sender(&mut self, peer: &N::PeerId) -> Option<&mut mpsc::Sender<AD>> {
self.authentication_peer_senders.get_mut(peer)
}
fn get_block_sync_sender(&mut self, peer: &N::PeerId) -> Option<&mut mpsc::Sender<BSD>> {
self.block_sync_peer_senders.get_mut(peer)
}
fn peer_sender<D: Data>(
&self,
peer_id: N::PeerId,
mut receiver: mpsc::Receiver<D>,
protocol: Protocol,
) -> impl Future<Output = ()> + Send + 'static {
let network = self.network.clone();
let metrics = self.metrics.clone();
async move {
let mut sender = None;
loop {
if let Some(data) = receiver.next().await {
metrics.report_message_popped_from_peer_sender_queue(protocol);
let s = if let Some(s) = sender.as_mut() {
s
} else {
match network.sender(peer_id.clone(), protocol) {
Ok(s) => sender.insert(s),
Err(e) => {
debug!(
target: LOG_TARGET,
"Failed creating sender. Dropping message: {}", e
);
continue;
}
}
};
let maybe_timer = metrics.start_sending_in(protocol);
if let Err(e) = s.send(data.encode()).await {
debug!(
target: LOG_TARGET,
"Failed sending data to peer. Dropping sender and message: {}", e
);
sender = None;
}
if let Some(timer) = maybe_timer {
timer.observe_duration();
}
} else {
debug!(
target: LOG_TARGET,
"Sender was dropped for peer {:?}. Peer sender exiting.", peer_id
);
return;
}
}
}
}
fn possibly_log_that_channel_is_full(&mut self, peer: N::PeerId, protocol: Protocol) {
let peer_and_protocol = (peer, protocol);
if self
.timestamp_of_last_log_that_channel_is_full
.get(&peer_and_protocol)
.map(|t| t.elapsed() >= time::Duration::from_secs(1))
.unwrap_or(true)
{
debug!(
target: LOG_TARGET,
"Failed sending data in {:?} protocol to peer {:?}, because peer_sender receiver is full",
protocol,
peer_and_protocol.0
);
self.timestamp_of_last_log_that_channel_is_full
.insert(peer_and_protocol, Instant::now());
}
}
fn send_to_authentication_peer(&mut self, data: AD, peer: N::PeerId) -> Result<(), SendError> {
match self.get_authentication_sender(&peer) {
Some(sender) => {
match sender.try_send(data) {
Err(e) => {
if e.is_full() {
self.possibly_log_that_channel_is_full(
peer.clone(),
Protocol::Authentication,
);
}
if e.is_disconnected() {
trace!(target: LOG_TARGET, "Failed sending data to peer because peer_sender receiver is dropped: {:?}", peer);
}
Err(SendError::SendingFailed)
}
Ok(_) => {
self.metrics
.report_message_pushed_to_peer_sender_queue(Protocol::Authentication);
Ok(())
}
}
}
None => Err(SendError::MissingSender),
}
}
fn send_to_block_sync_peer(&mut self, data: BSD, peer: N::PeerId) -> Result<(), SendError> {
match self.get_block_sync_sender(&peer) {
Some(sender) => {
match sender.try_send(data) {
Err(e) => {
if e.is_full() {
self.possibly_log_that_channel_is_full(
peer.clone(),
Protocol::BlockSync,
);
}
if e.is_disconnected() {
trace!(target: LOG_TARGET, "Failed sending data to peer because peer_sender receiver is dropped: {:?}", peer);
}
Err(SendError::SendingFailed)
}
Ok(_) => {
self.metrics
.report_message_pushed_to_peer_sender_queue(Protocol::BlockSync);
Ok(())
}
}
}
None => Err(SendError::MissingSender),
}
}
fn send_authentication_data(&mut self, data: AD, peer_id: N::PeerId) {
if let Err(e) = self.send_to_authentication_peer(data, peer_id.clone()) {
trace!(
target: LOG_TARGET,
"Failed to send to peer{:?}, {:?}",
peer_id,
e
);
}
}
fn send_block_sync_data(&mut self, data: BSD, peer_id: N::PeerId) {
if let Err(e) = self.send_to_block_sync_peer(data, peer_id.clone()) {
trace!(
target: LOG_TARGET,
"Failed to send to peer{:?}, {:?}",
peer_id,
e
);
}
}
fn protocol_peers(&self, protocol: Protocol) -> &HashSet<N::PeerId> {
match protocol {
Protocol::Authentication => &self.authentication_connected_peers,
Protocol::BlockSync => &self.block_sync_connected_peers,
}
}
fn random_peer<'a>(
&'a self,
peer_ids: &'a HashSet<N::PeerId>,
protocol: Protocol,
) -> Option<&'a N::PeerId> {
peer_ids
.intersection(self.protocol_peers(protocol))
.choose(&mut thread_rng())
.or_else(|| {
self.protocol_peers(protocol)
.iter()
.choose(&mut thread_rng())
})
}
fn send_to_random_authentication(&mut self, data: AD, peer_ids: HashSet<N::PeerId>) {
let peer_id = match self.random_peer(&peer_ids, Protocol::Authentication) {
Some(peer_id) => peer_id.clone(),
None => {
trace!(
target: LOG_TARGET,
"Failed to send authentication message to random peer, no peers are available."
);
return;
}
};
self.send_authentication_data(data, peer_id);
}
fn send_to_random_block_sync(&mut self, data: BSD, peer_ids: HashSet<N::PeerId>) {
let peer_id = match self.random_peer(&peer_ids, Protocol::BlockSync) {
Some(peer_id) => peer_id.clone(),
None => {
trace!(
target: LOG_TARGET,
"Failed to send block sync message to random peer, no peers are available."
);
return;
}
};
self.send_block_sync_data(data, peer_id);
}
fn broadcast_authentication(&mut self, data: AD) {
let peers = self.protocol_peers(Protocol::Authentication).clone();
for peer in peers {
self.send_authentication_data(data.clone(), peer);
}
}
fn broadcast_block_sync(&mut self, data: BSD) {
let peers = self.protocol_peers(Protocol::BlockSync).clone();
for peer in peers {
self.send_block_sync_data(data.clone(), peer);
}
}
fn handle_network_event(&mut self, event: Event<N::PeerId>) -> Result<(), ()> {
use Event::*;
match event {
StreamOpened(peer, protocol) => {
trace!(
target: LOG_TARGET,
"StreamOpened event for peer {:?} and the protocol {:?}.",
peer,
protocol
);
match protocol {
Protocol::Authentication => {
let (tx, rx) = mpsc::channel(MAX_QUEUE_SIZE);
self.authentication_connected_peers.insert(peer.clone());
self.authentication_peer_senders.insert(peer.clone(), tx);
self.spawn_handle.spawn(
"phron/network/authentication_peer_sender",
self.peer_sender(peer, rx, Protocol::Authentication),
);
}
Protocol::BlockSync => {
let (tx, rx) = mpsc::channel(MAX_QUEUE_SIZE);
self.block_sync_connected_peers.insert(peer.clone());
self.block_sync_peer_senders.insert(peer.clone(), tx);
self.spawn_handle.spawn(
"phron/network/sync_peer_sender",
self.peer_sender(peer, rx, Protocol::BlockSync),
);
}
};
}
StreamClosed(peer, protocol) => {
trace!(
target: LOG_TARGET,
"StreamClosed event for peer {:?} and protocol {:?}",
peer,
protocol
);
match protocol {
Protocol::Authentication => {
self.authentication_connected_peers.remove(&peer);
self.authentication_peer_senders.remove(&peer);
}
Protocol::BlockSync => {
self.block_sync_connected_peers.remove(&peer);
self.block_sync_peer_senders.remove(&peer);
}
}
}
Messages(peer_id, messages) => {
for (protocol, data) in messages.into_iter() {
match protocol {
Protocol::Authentication => match AD::decode(&mut &data[..]) {
Ok(data) => self
.messages_for_authentication_user
.unbounded_send((data, peer_id.clone()))
.map_err(|_| ())?,
Err(e) => {
warn!(
target: LOG_TARGET,
"Error decoding authentication protocol message: {}", e
)
}
},
Protocol::BlockSync => match BSD::decode(&mut &data[..]) {
Ok(data) => self
.messages_for_block_sync_user
.unbounded_send((data, peer_id.clone()))
.map_err(|_| ())?,
Err(e) => {
warn!(
target: LOG_TARGET,
"Error decoding block sync protocol message: {}", e
)
}
},
};
}
}
}
Ok(())
}
fn status_report(&self) {
let mut status = String::from("Network status report: ");
status.push_str(&format!(
"authentication connected peers - {:?}; ",
self.authentication_connected_peers.len()
));
status.push_str(&format!(
"block sync connected peers - {:?}; ",
self.block_sync_connected_peers.len()
));
info!(target: LOG_TARGET, "{}", status);
}
pub async fn run(mut self) {
let mut events_from_network = self.network.event_stream();
let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL);
loop {
tokio::select! {
maybe_event = events_from_network.next_event() => match maybe_event {
Some(event) => if self.handle_network_event(event).is_err() {
error!(target: LOG_TARGET, "Cannot forward messages to user.");
return;
},
None => {
error!(target: LOG_TARGET, "Network event stream ended.");
return;
}
},
maybe_message = self.messages_from_authentication_user.next() => match maybe_message {
Some(Command::Broadcast(message)) => self.broadcast_authentication(message),
Some(Command::SendToRandom(message, peer_ids)) => self.send_to_random_authentication(message, peer_ids),
Some(Command::Send(message, peer_id)) => self.send_authentication_data(message, peer_id),
None => {
error!(target: LOG_TARGET, "Authentication user message stream ended.");
return;
}
},
maybe_message = self.messages_from_block_sync_user.next() => match maybe_message {
Some(Command::Broadcast(message)) => self.broadcast_block_sync(message),
Some(Command::SendToRandom(message, peer_ids)) => self.send_to_random_block_sync(message, peer_ids),
Some(Command::Send(message, peer_id)) => self.send_block_sync_data(message, peer_id),
None => {
error!(target: LOG_TARGET, "Block sync user message stream ended.");
return;
}
},
_ = status_ticker.tick() => {
self.status_report();
},
}
}
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, iter};
use futures::channel::oneshot;
use network_clique::mock::{random_peer_id, MockPublicKey};
use parity_scale_codec::Encode;
use sc_service::TaskManager;
use tokio::runtime::Handle;
use super::{Error, SendError, Service};
use crate::network::{
gossip::{
mock::{MockEvent, MockRawNetwork, MockSenderError},
Network,
},
mock::MockData,
Protocol,
};
const PROTOCOL: Protocol = Protocol::Authentication;
pub struct TestData {
pub network: MockRawNetwork,
gossip_network: Box<dyn Network<MockData, Error = Error, PeerId = MockPublicKey>>,
pub service: Service<MockRawNetwork, MockData, MockData>,
_task_manager: TaskManager,
_other_network: Box<dyn Network<MockData, Error = Error, PeerId = MockPublicKey>>,
}
impl TestData {
fn prepare() -> Self {
let task_manager = TaskManager::new(Handle::current(), None).unwrap();
let (event_stream_oneshot_tx, _) = oneshot::channel();
let network = MockRawNetwork::new(event_stream_oneshot_tx);
let (service, gossip_network, other_network) =
Service::new(network.clone(), task_manager.spawn_handle().into(), None);
let gossip_network = Box::new(gossip_network);
let other_network = Box::new(other_network);
Self {
network,
service,
gossip_network,
_task_manager: task_manager,
_other_network: other_network,
}
}
async fn cleanup(self) {
self.network.close_channels().await;
}
}
#[async_trait::async_trait]
impl Network<MockData> for TestData {
type Error = Error;
type PeerId = MockPublicKey;
fn send_to(&mut self, data: MockData, peer_id: Self::PeerId) -> Result<(), Self::Error> {
self.gossip_network.send_to(data, peer_id)
}
fn send_to_random(
&mut self,
data: MockData,
peer_ids: HashSet<Self::PeerId>,
) -> Result<(), Self::Error> {
self.gossip_network.send_to_random(data, peer_ids)
}
fn broadcast(&mut self, data: MockData) -> Result<(), Self::Error> {
self.gossip_network.broadcast(data)
}
async fn next(&mut self) -> Result<(MockData, Self::PeerId), Self::Error> {
self.gossip_network.next().await
}
}
fn message(i: u8) -> MockData {
MockData::new(i.into(), 3)
}
#[tokio::test]
async fn test_notification_stream_opened() {
let mut test_data = TestData::prepare();
let peer_ids: Vec<_> = (0..3).map(|_| random_peer_id()).collect();
peer_ids.iter().for_each(|peer_id| {
test_data
.service
.handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL))
.expect("Should handle");
});
let message = message(1);
test_data.service.broadcast_authentication(message.clone());
let broadcasted_messages = HashSet::<_>::from_iter(
test_data
.network
.send_message
.take(peer_ids.len())
.await
.into_iter(),
);
let expected_messages = HashSet::from_iter(
peer_ids
.into_iter()
.map(|peer_id| (message.clone().encode(), peer_id, PROTOCOL)),
);
assert_eq!(broadcasted_messages, expected_messages);
test_data.cleanup().await
}
#[tokio::test]
async fn test_notification_stream_closed() {
let mut test_data = TestData::prepare();
let peer_ids: Vec<_> = (0..3).map(|_| random_peer_id()).collect();
let opened_authorities_n = 2;
peer_ids.iter().for_each(|peer_id| {
test_data
.service
.handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL))
.expect("Should handle");
});
peer_ids
.iter()
.skip(opened_authorities_n)
.for_each(|peer_id| {
test_data
.service
.handle_network_event(MockEvent::StreamClosed(peer_id.clone(), PROTOCOL))
.expect("Should handle");
});
let message = message(1);
test_data.service.broadcast_authentication(message.clone());
let broadcasted_messages = HashSet::<_>::from_iter(
test_data
.network
.send_message
.take(opened_authorities_n)
.await
.into_iter(),
);
let expected_messages = HashSet::from_iter(
peer_ids
.into_iter()
.take(opened_authorities_n)
.map(|peer_id| (message.clone().encode(), peer_id, PROTOCOL)),
);
assert_eq!(broadcasted_messages, expected_messages);
test_data.cleanup().await
}
#[tokio::test]
async fn test_create_sender_error() {
let mut test_data = TestData::prepare();
test_data
.network
.create_sender_errors
.lock()
.push_back(MockSenderError);
let peer_id = random_peer_id();
let message_1 = message(1);
let message_2 = message(4);
test_data
.service
.handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL))
.expect("Should handle");
test_data.service.broadcast_authentication(message_1);
test_data
.service
.broadcast_authentication(message_2.clone());
let expected = (message_2.encode(), peer_id, PROTOCOL);
assert_eq!(
test_data
.network
.send_message
.next()
.await
.expect("Should receive message"),
expected,
);
test_data.cleanup().await
}
#[tokio::test]
async fn test_send_error() {
let mut test_data = TestData::prepare();
test_data
.network
.send_errors
.lock()
.push_back(MockSenderError);
let peer_id = random_peer_id();
let message_1 = message(1);
let message_2 = message(4);
test_data
.service
.handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL))
.expect("Should handle");
test_data.service.broadcast_authentication(message_1);
test_data
.service
.broadcast_authentication(message_2.clone());
let expected = (message_2.encode(), peer_id, PROTOCOL);
println!("just before");
assert_eq!(
test_data
.network
.send_message
.next()
.await
.expect("Should receive message"),
expected,
);
test_data.cleanup().await
}
#[tokio::test]
async fn test_notification_received() {
let mut test_data = TestData::prepare();
let message = message(1);
let peer_id = random_peer_id();
test_data
.service
.handle_network_event(MockEvent::Messages(
peer_id.clone(),
vec![(PROTOCOL, message.clone().encode().into())],
))
.expect("Should handle");
let (received_message, received_peer_id) =
test_data.next().await.expect("Should receive message");
assert_eq!(received_message, message);
assert_eq!(received_peer_id, peer_id);
test_data.cleanup().await
}
#[tokio::test]
async fn test_send_to_connected() {
let mut test_data = TestData::prepare();
let peer_id = random_peer_id();
let message = message(1);
test_data
.service
.handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL))
.expect("Should handle");
test_data
.service
.send_to_authentication_peer(message.clone(), peer_id.clone())
.expect("interface works");
let expected = (message.encode(), peer_id, PROTOCOL);
assert_eq!(
test_data
.network
.send_message
.next()
.await
.expect("Should receive message"),
expected,
);
test_data.cleanup().await
}
#[tokio::test]
async fn test_no_send_to_disconnected() {
let mut test_data = TestData::prepare();
let peer_id = random_peer_id();
let message = message(1);
assert!(matches!(
test_data
.service
.send_to_authentication_peer(message, peer_id),
Err(SendError::MissingSender)
));
test_data.cleanup().await
}
#[tokio::test]
async fn test_send_to_random_connected() {
let mut test_data = TestData::prepare();
let peer_id = random_peer_id();
let message = message(1);
test_data
.service
.handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL))
.expect("Should handle");
test_data
.service
.send_to_random_authentication(message.clone(), iter::once(peer_id.clone()).collect());
let expected = (message.encode(), peer_id, PROTOCOL);
assert_eq!(
test_data
.network
.send_message
.next()
.await
.expect("Should receive message"),
expected,
);
test_data.cleanup().await
}
#[tokio::test]
async fn test_send_to_random_disconnected() {
let mut test_data = TestData::prepare();
let peer_id = random_peer_id();
let other_peer_id = random_peer_id();
let message = message(1);
test_data
.service
.handle_network_event(MockEvent::StreamOpened(other_peer_id.clone(), PROTOCOL))
.expect("Should handle");
test_data
.service
.send_to_random_authentication(message.clone(), iter::once(peer_id.clone()).collect());
let expected = (message.encode(), other_peer_id, PROTOCOL);
assert_eq!(
test_data
.network
.send_message
.next()
.await
.expect("Should receive message"),
expected,
);
test_data.cleanup().await
}
}