use std::{
collections::{HashMap, HashSet},
fmt::Debug,
time::Duration,
};
use futures::channel::mpsc;
use log::{debug, info};
use crate::{
abft::Recipient,
crypto::{AuthorityPen, AuthorityVerifier},
network::{
address_cache::{ValidatorAddressCacheUpdater, ValidatorAddressingInfo},
session::{
data::DataInSession, Authentication, Connections, Discovery, DiscoveryMessage,
SessionHandler, SessionHandlerError,
},
AddressingInformation, Data, NetworkIdentity, PeerId,
},
NodeIndex, SessionId,
};
#[derive(Debug, PartialEq, Eq)]
pub enum ConnectionCommand<A: AddressingInformation> {
AddReserved(HashSet<A>),
DelReserved(HashSet<A::PeerId>),
}
pub type AddressedData<D, P> = (D, P);
struct Session<D: Data, A: AddressingInformation> {
handler: SessionHandler<A>,
discovery: Discovery<A>,
data_for_user: Option<mpsc::UnboundedSender<D>>,
}
#[derive(Clone)]
pub struct PreValidatorSession {
pub session_id: SessionId,
pub verifier: AuthorityVerifier,
pub node_id: NodeIndex,
pub pen: AuthorityPen,
}
#[derive(Clone)]
pub struct PreNonvalidatorSession {
pub session_id: SessionId,
pub verifier: AuthorityVerifier,
}
pub struct ManagerActions<A: AddressingInformation> {
pub maybe_command: Option<ConnectionCommand<A>>,
pub maybe_message: Option<Authentication<A>>,
}
impl<A: AddressingInformation> ManagerActions<A> {
fn noop() -> Self {
ManagerActions {
maybe_command: None,
maybe_message: None,
}
}
}
pub struct Manager<NI: NetworkIdentity, D: Data, VCU: ValidatorAddressCacheUpdater> {
network_identity: NI,
connections: Connections<NI::PeerId>,
sessions: HashMap<SessionId, Session<D, NI::AddressingInformation>>,
validator_address_cache_updater: VCU,
discovery_cooldown: Duration,
}
#[derive(Debug, PartialEq, Eq)]
pub enum SendError {
UserSend,
NoSession,
}
impl<NI: NetworkIdentity, D: Data, VCU: ValidatorAddressCacheUpdater> Manager<NI, D, VCU> {
pub fn new(
network_identity: NI,
validator_address_cache_updater: VCU,
discovery_cooldown: Duration,
) -> Self {
Manager {
network_identity,
connections: Connections::new(),
sessions: HashMap::new(),
validator_address_cache_updater,
discovery_cooldown,
}
}
fn delete_reserved(
to_remove: HashSet<NI::PeerId>,
) -> Option<ConnectionCommand<NI::AddressingInformation>> {
match to_remove.is_empty() {
true => None,
false => Some(ConnectionCommand::DelReserved(to_remove)),
}
}
pub fn finish_session(
&mut self,
session_id: SessionId,
) -> ManagerActions<NI::AddressingInformation> {
self.sessions.remove(&session_id);
ManagerActions {
maybe_command: Self::delete_reserved(self.connections.remove_session(session_id)),
maybe_message: None,
}
}
fn discover_authorities(
&mut self,
session_id: &SessionId,
) -> Option<Authentication<NI::AddressingInformation>> {
self.sessions.get_mut(session_id).and_then(
|Session {
handler, discovery, ..
}| { discovery.discover_authorities(handler) },
)
}
pub fn discovery(&mut self) -> Vec<Authentication<NI::AddressingInformation>> {
let sessions: Vec<_> = self.sessions.keys().cloned().collect();
sessions
.iter()
.flat_map(|session_id| self.discover_authorities(session_id))
.collect()
}
fn start_validator_session(
&mut self,
pre_session: PreValidatorSession,
address: NI::AddressingInformation,
) -> (
Option<Authentication<NI::AddressingInformation>>,
mpsc::UnboundedReceiver<D>,
) {
let PreValidatorSession {
session_id,
verifier,
node_id,
pen,
} = pre_session;
let handler = SessionHandler::new(Some((node_id, pen)), verifier, session_id, address);
let discovery = Discovery::new(self.discovery_cooldown);
let (data_for_user, data_from_network) = mpsc::unbounded();
let data_for_user = Some(data_for_user);
self.sessions.insert(
session_id,
Session {
handler,
discovery,
data_for_user,
},
);
(self.discover_authorities(&session_id), data_from_network)
}
pub fn update_validator_session(
&mut self,
pre_session: PreValidatorSession,
) -> Result<
(
ManagerActions<NI::AddressingInformation>,
mpsc::UnboundedReceiver<D>,
),
SessionHandlerError,
> {
let address = self.network_identity.identity();
println!("address: {:?}", address);
println!("pre_session: {:?}", pre_session.session_id);
let session = match self.sessions.get_mut(&pre_session.session_id) {
Some(session) => session,
None => {
let (maybe_message, data_from_network) =
self.start_validator_session(pre_session, address);
return Ok((
ManagerActions {
maybe_command: None,
maybe_message,
},
data_from_network,
));
}
};
let PreValidatorSession {
session_id,
verifier,
node_id,
pen,
} = pre_session;
self.validator_address_cache_updater.update(
node_id,
ValidatorAddressingInfo {
session: session_id,
network_level_address: address.address(),
validator_network_peer_id: address.peer_id().to_string(),
},
);
let peers_to_stay = session
.handler
.update(Some((node_id, pen)), verifier, address)?
.iter()
.map(|address| address.peer_id())
.collect();
let maybe_command = Self::delete_reserved(
self.connections
.remove_session(session_id)
.difference(&peers_to_stay)
.cloned()
.collect(),
);
let (data_for_user, data_from_network) = mpsc::unbounded();
session.data_for_user = Some(data_for_user);
self.connections.add_peers(session_id, peers_to_stay);
Ok((
ManagerActions {
maybe_command,
maybe_message: self.discover_authorities(&session_id),
},
data_from_network,
))
}
fn start_nonvalidator_session(
&mut self,
pre_session: PreNonvalidatorSession,
address: NI::AddressingInformation,
) {
let PreNonvalidatorSession {
session_id,
verifier,
} = pre_session;
let handler = SessionHandler::new(None, verifier, session_id, address);
let discovery = Discovery::new(self.discovery_cooldown);
self.sessions.insert(
session_id,
Session {
handler,
discovery,
data_for_user: None,
},
);
}
pub fn update_nonvalidator_session(
&mut self,
pre_session: PreNonvalidatorSession,
) -> Result<ManagerActions<NI::AddressingInformation>, SessionHandlerError> {
let address = self.network_identity.identity();
match self.sessions.get_mut(&pre_session.session_id) {
Some(session) => {
session
.handler
.update(None, pre_session.verifier, address)?;
}
None => {
self.start_nonvalidator_session(pre_session, address);
}
};
Ok(ManagerActions::noop())
}
pub fn on_user_message(
&self,
data: D,
session_id: SessionId,
recipient: Recipient,
) -> Vec<AddressedData<DataInSession<D>, NI::PeerId>> {
if let Some(handler) = self
.sessions
.get(&session_id)
.map(|session| &session.handler)
{
let to_send = DataInSession { data, session_id };
match recipient {
Recipient::Everyone => (0..handler.node_count().0)
.map(NodeIndex)
.flat_map(|node_id| handler.peer_id(&node_id))
.map(|peer_id| (to_send.clone(), peer_id))
.collect(),
Recipient::Node(node_id) => handler
.peer_id(&node_id)
.into_iter()
.map(|peer_id| (to_send.clone(), peer_id))
.collect(),
}
} else {
Vec::new()
}
}
pub fn on_discovery_message(
&mut self,
message: DiscoveryMessage<NI::AddressingInformation>,
) -> ManagerActions<NI::AddressingInformation> {
let session_id = message.session_id();
let creator = message.0.creator();
match self.sessions.get_mut(&session_id) {
Some(Session {
handler, discovery, ..
}) => {
let (maybe_address, maybe_message) =
discovery.handle_authentication(message, handler);
let mut maybe_command = None;
if let Some(address) = maybe_address {
self.validator_address_cache_updater.update(
creator,
ValidatorAddressingInfo {
session: session_id,
network_level_address: address.address(),
validator_network_peer_id: address.peer_id().to_string(),
},
);
if handler.is_validator() {
debug!(target: "phron-network", "Adding addresses for session {:?} to reserved: {:?}", session_id, address);
self.connections.add_peers(session_id, [address.peer_id()]);
maybe_command = Some(ConnectionCommand::AddReserved([address].into()));
}
}
ManagerActions {
maybe_command,
maybe_message,
}
}
None => {
debug!(target: "phron-network", "Received message from unknown session: {:?}", message);
ManagerActions::noop()
}
}
}
pub fn send_session_data(&self, session_id: &SessionId, data: D) -> Result<(), SendError> {
match self
.sessions
.get(session_id)
.and_then(|session| session.data_for_user.as_ref())
{
Some(data_for_user) => data_for_user
.unbounded_send(data)
.map_err(|_| SendError::UserSend),
None => Err(SendError::NoSession),
}
}
pub fn status_report(&self) {
let mut status = String::from("Connection Manager status report: ");
let mut authenticated: Vec<_> = self
.sessions
.iter()
.filter(|(_, session)| session.handler.authentication().is_some())
.map(|(session_id, session)| {
let mut peers = session
.handler
.peers()
.into_iter()
.map(|(node_id, peer_id)| (node_id.0, peer_id))
.collect::<Vec<_>>();
peers.sort_by(|x, y| x.0.cmp(&y.0));
(session_id.0, session.handler.node_count().0, peers)
})
.collect();
authenticated.sort_by(|x, y| x.0.cmp(&y.0));
if !authenticated.is_empty() {
let authenticated_status = authenticated
.iter()
.map(|(session_id, node_count, peers)| {
let peer_ids = peers
.iter()
.map(|(node_id, peer_id)| {
format!("{:?}: {}", node_id, peer_id.to_short_string())
})
.collect::<Vec<_>>()
.join(", ");
format!(
"{:?}: {}/{} {{{}}}",
session_id,
peers.len() + 1,
node_count,
peer_ids
)
})
.collect::<Vec<_>>()
.join(", ");
status.push_str(&format!(
"authenticated authorities: {authenticated_status}; "
));
}
let mut missing: Vec<_> = self
.sessions
.iter()
.filter(|(_, session)| session.handler.authentication().is_some())
.map(|(session_id, session)| {
(
session_id.0,
session
.handler
.missing_nodes()
.iter()
.map(|id| id.0)
.collect::<Vec<_>>(),
)
})
.filter(|(_, missing)| !missing.is_empty())
.collect();
missing.sort_by(|x, y| x.0.cmp(&y.0));
if !missing.is_empty() {
let missing_status = missing
.iter()
.map(|(session_id, missing)| format!("{session_id:?}: {missing:?}"))
.collect::<Vec<_>>()
.join(", ");
status.push_str(&format!("missing authorities: {missing_status}; "));
}
if !authenticated.is_empty() || !missing.is_empty() {
info!(target: "phron-network", "{}", status);
}
}
}
#[cfg(test)]
mod tests {
use std::{iter, time::Duration};
use futures::StreamExt;
use network_clique::mock::{random_address, MockAddressingInformation};
use super::{
ConnectionCommand, Manager, ManagerActions, PreNonvalidatorSession, PreValidatorSession,
SendError,
};
use crate::{
network::{
address_cache::{test::noop_updater, ValidatorAddressCacheUpdater},
mock::crypto_basics,
session::data::DataInSession,
},
Recipient, SessionId,
};
const NUM_NODES: usize = 7;
const DISCOVERY_PERIOD: Duration = Duration::from_secs(60);
fn build() -> Manager<MockAddressingInformation, i32, impl ValidatorAddressCacheUpdater> {
Manager::new(random_address(), noop_updater(), DISCOVERY_PERIOD)
}
#[test]
fn starts_nonvalidator_session() {
let mut manager = build();
let (_, verifier) = crypto_basics(NUM_NODES);
let session_id = SessionId(43);
let ManagerActions {
maybe_command,
maybe_message,
} = manager
.update_nonvalidator_session(PreNonvalidatorSession {
session_id,
verifier,
})
.unwrap();
assert!(maybe_command.is_none());
assert!(maybe_message.is_none());
assert_eq!(
manager.send_session_data(&session_id, -43),
Err(SendError::NoSession)
);
}
#[test]
fn starts_validator_session() {
let mut manager = build();
let (validator_data, verifier) = crypto_basics(NUM_NODES);
let (node_id, pen) = validator_data[0].clone();
let session_id = SessionId(43);
let (
ManagerActions {
maybe_command,
maybe_message,
},
_data_from_network,
) = manager
.update_validator_session(PreValidatorSession {
session_id,
verifier,
node_id,
pen,
})
.unwrap();
assert!(maybe_command.is_none());
assert!(maybe_message.is_some());
assert_eq!(manager.send_session_data(&session_id, -43), Ok(()));
}
#[tokio::test]
async fn stops_session() {
let mut manager = build();
let (validator_data, verifier) = crypto_basics(NUM_NODES);
let (node_id, pen) = validator_data[0].clone();
let session_id = SessionId(43);
let (
ManagerActions {
maybe_command,
maybe_message,
},
mut data_from_network,
) = manager
.update_validator_session(PreValidatorSession {
session_id,
verifier,
node_id,
pen,
})
.unwrap();
assert!(maybe_command.is_none());
assert!(maybe_message.is_some());
assert_eq!(manager.send_session_data(&session_id, -43), Ok(()));
assert_eq!(data_from_network.next().await, Some(-43));
let ManagerActions {
maybe_command,
maybe_message,
} = manager.finish_session(session_id);
assert!(maybe_command.is_none());
assert!(maybe_message.is_none());
assert_eq!(
manager.send_session_data(&session_id, -43),
Err(SendError::NoSession)
);
assert!(data_from_network.next().await.is_none());
}
#[test]
fn handles_broadcast() {
let mut manager = build();
let (validator_data, verifier) = crypto_basics(NUM_NODES);
let (node_id, pen) = validator_data[0].clone();
let session_id = SessionId(43);
manager
.update_validator_session(PreValidatorSession {
session_id,
verifier: verifier.clone(),
node_id,
pen,
})
.unwrap();
let mut other_manager = build();
let (node_id, pen) = validator_data[1].clone();
let (ManagerActions { maybe_message, .. }, _) = other_manager
.update_validator_session(PreValidatorSession {
session_id,
verifier,
node_id,
pen,
})
.unwrap();
let message = maybe_message.expect("there should be a discovery message");
let (address, message) = (message.0.address(), message);
let ManagerActions {
maybe_command,
maybe_message,
} = manager.on_discovery_message(message);
assert_eq!(
maybe_command,
Some(ConnectionCommand::AddReserved(
iter::once(address).collect()
))
);
assert!(maybe_message.is_some());
}
#[test]
fn sends_user_data() {
let mut manager = build();
let (validator_data, verifier) = crypto_basics(NUM_NODES);
let (node_id, pen) = validator_data[0].clone();
let session_id = SessionId(43);
manager
.update_validator_session(PreValidatorSession {
session_id,
verifier: verifier.clone(),
node_id,
pen,
})
.unwrap();
let mut other_manager = build();
let (node_id, pen) = validator_data[1].clone();
let (ManagerActions { maybe_message, .. }, _) = other_manager
.update_validator_session(PreValidatorSession {
session_id,
verifier,
node_id,
pen,
})
.unwrap();
let message = maybe_message.expect("there should be a discovery message");
manager.on_discovery_message(message);
let messages = manager.on_user_message(2137, session_id, Recipient::Everyone);
assert_eq!(messages.len(), 1);
let (network_data, _) = &messages[0];
assert_eq!(
network_data,
&DataInSession {
data: 2137,
session_id
}
);
}
}