use std::{
collections::{HashMap, HashSet},
fmt::{Display, Error as FmtError, Formatter},
};
use futures::channel::mpsc;
use crate::{metrics::Metrics, Data, PeerId, PublicKey};
mod direction;
use direction::DirectedPeers;
#[derive(Debug, PartialEq, Eq)]
pub enum SendError {
ConnectionClosed,
PeerNotFound,
}
impl Display for SendError {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use SendError::*;
match self {
ConnectionClosed => write!(f, "worker dead"),
PeerNotFound => write!(f, "peer not found"),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum AddResult {
Uninterested,
Added,
Replaced,
}
pub struct ManagerStatus<PK: PublicKey + PeerId> {
outgoing_peers: HashSet<PK>,
missing_outgoing: HashSet<PK>,
incoming_peers: HashSet<PK>,
missing_incoming: HashSet<PK>,
}
impl<PK: PublicKey + PeerId> ManagerStatus<PK> {
fn new<A: Data, D: Data>(manager: &Manager<PK, A, D>) -> Self {
let mut incoming_peers = HashSet::new();
let mut missing_incoming = HashSet::new();
let mut outgoing_peers = HashSet::new();
let mut missing_outgoing = HashSet::new();
for peer in manager.wanted.incoming_peers() {
match manager.active_connection(peer) {
true => incoming_peers.insert(peer.clone()),
false => missing_incoming.insert(peer.clone()),
};
}
for peer in manager.wanted.outgoing_peers() {
match manager.active_connection(peer) {
true => outgoing_peers.insert(peer.clone()),
false => missing_outgoing.insert(peer.clone()),
};
}
ManagerStatus {
incoming_peers,
missing_incoming,
outgoing_peers,
missing_outgoing,
}
}
fn wanted_incoming(&self) -> usize {
self.incoming_peers.len().checked_add(self.missing_incoming.len()).unwrap()
}
fn wanted_outgoing(&self) -> usize {
self.outgoing_peers.len().checked_add(self.missing_outgoing.len()).unwrap()
}
}
fn pretty_peer_id_set<PK: PublicKey + PeerId>(set: &HashSet<PK>) -> String {
set.iter()
.map(|peer_id| peer_id.to_short_string())
.collect::<Vec<_>>()
.join(", ")
}
impl<PK: PublicKey + PeerId> Display for ManagerStatus<PK> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
let wanted_incoming = self.wanted_incoming();
let wanted_outgoing = self.wanted_outgoing();
if wanted_incoming + wanted_outgoing == 0 {
return write!(f, "not maintaining any connections; ");
}
match wanted_incoming {
0 => write!(f, "not expecting any incoming connections; ")?,
_ => {
write!(f, "expecting {wanted_incoming:?} incoming connections; ")?;
match self.incoming_peers.is_empty() {
true => write!(f, "WARNING! No incoming peers even though we expected them, maybe connecting to us is impossible; ")?,
false => write!(
f,
"have - {:?} [{}]; ",
self.incoming_peers.len(),
pretty_peer_id_set(&self.incoming_peers),
)?,
}
if !self.missing_incoming.is_empty() {
write!(
f,
"missing - {:?} [{}]; ",
self.missing_incoming.len(),
pretty_peer_id_set(&self.missing_incoming),
)?;
}
}
}
match wanted_outgoing {
0 => write!(f, "not attempting any outgoing connections; ")?,
_ => {
write!(f, "attempting {wanted_outgoing:?} outgoing connections; ")?;
if !self.outgoing_peers.is_empty() {
write!(
f,
"have - {:?} [{}]; ",
self.outgoing_peers.len(),
pretty_peer_id_set(&self.outgoing_peers),
)?;
}
if !self.missing_outgoing.is_empty() {
write!(
f,
"missing - {:?} [{}]; ",
self.missing_outgoing.len(),
pretty_peer_id_set(&self.missing_outgoing),
)?;
}
}
}
Ok(())
}
}
pub struct Manager<PK: PublicKey + PeerId, A: Data, D: Data> {
wanted: DirectedPeers<PK, A>,
have: HashMap<PK, mpsc::UnboundedSender<D>>,
}
impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
pub fn new(own_id: PK, metrics: Metrics) -> Self {
Manager {
wanted: DirectedPeers::new(own_id, metrics),
have: HashMap::new(),
}
}
fn active_connection(&self, peer_id: &PK) -> bool {
self.have
.get(peer_id)
.map(|sender| !sender.is_closed())
.unwrap_or(false)
}
pub fn add_peer(&mut self, peer_id: PK, address: A) -> bool {
self.wanted.add_peer(peer_id, address)
}
pub fn peer_address(&self, peer_id: &PK) -> Option<A> {
self.wanted.peer_address(peer_id)
}
pub fn add_connection(
&mut self,
peer_id: PK,
data_for_network: mpsc::UnboundedSender<D>,
) -> AddResult {
use AddResult::*;
if !self.wanted.interested(&peer_id) {
return Uninterested;
}
match self.have.insert(peer_id, data_for_network) {
Some(_) => Replaced,
None => Added,
}
}
pub fn remove_peer(&mut self, peer_id: &PK) {
self.wanted.remove_peer(peer_id);
self.have.remove(peer_id);
}
pub fn send_to(&mut self, peer_id: &PK, data: D) -> Result<(), SendError> {
self.have
.get(peer_id)
.ok_or(SendError::PeerNotFound)?
.unbounded_send(data)
.map_err(|_| SendError::ConnectionClosed)
}
pub fn status_report(&self) -> ManagerStatus<PK> {
ManagerStatus::new(self)
}
pub fn is_authorized(&self, public_key: &PK) -> bool {
self.wanted.interested(public_key)
}
}
#[cfg(test)]
mod tests {
use futures::{channel::mpsc, StreamExt};
use super::{AddResult::*, Manager, SendError};
use crate::{
metrics::Metrics,
mock::{key, MockPublicKey},
};
type Data = String;
type Address = String;
#[test]
fn add_remove() {
let (own_id, _) = key();
let mut manager = Manager::<MockPublicKey, Address, Data>::new(own_id, Metrics::noop());
let (peer_id, _) = key();
let (peer_id_b, _) = key();
let address = String::from("43.43.43.43:43000");
let attempting_connections = manager.add_peer(peer_id.clone(), address.clone());
assert!(!manager.add_peer(peer_id.clone(), address.clone()));
match attempting_connections {
true => assert_eq!(manager.peer_address(&peer_id), Some(address)),
false => assert_eq!(manager.peer_address(&peer_id), None),
}
assert_eq!(manager.peer_address(&peer_id_b), None);
manager.remove_peer(&peer_id);
assert_eq!(manager.peer_address(&peer_id), None);
manager.remove_peer(&peer_id);
manager.remove_peer(&peer_id_b);
}
#[tokio::test]
async fn send_receive() {
let (mut connecting_id, _) = key();
let mut connecting_manager =
Manager::<MockPublicKey, Address, Data>::new(connecting_id.clone(), Metrics::noop());
let (mut listening_id, _) = key();
let mut listening_manager =
Manager::<MockPublicKey, Address, Data>::new(listening_id.clone(), Metrics::noop());
let data = String::from("DATA");
let address = String::from("43.43.43.43:43000");
let (tx, _rx) = mpsc::unbounded();
assert_eq!(
connecting_manager.add_connection(listening_id.clone(), tx),
Uninterested
);
assert_eq!(
connecting_manager.send_to(&listening_id, data.clone()),
Err(SendError::PeerNotFound)
);
if connecting_manager.add_peer(listening_id.clone(), address.clone()) {
assert!(!listening_manager.add_peer(connecting_id.clone(), address.clone()))
} else {
std::mem::swap(&mut connecting_id, &mut listening_id);
std::mem::swap(&mut connecting_manager, &mut listening_manager);
assert!(connecting_manager.add_peer(listening_id.clone(), address.clone()));
}
let (tx, mut rx) = mpsc::unbounded();
assert_eq!(
connecting_manager.add_connection(listening_id.clone(), tx),
Added
);
assert!(connecting_manager
.send_to(&listening_id, data.clone())
.is_ok());
assert_eq!(data, rx.next().await.expect("should receive"));
let (tx, mut rx) = mpsc::unbounded();
assert_eq!(
listening_manager.add_connection(connecting_id.clone(), tx),
Added
);
assert!(listening_manager
.send_to(&connecting_id, data.clone())
.is_ok());
assert_eq!(data, rx.next().await.expect("should receive"));
listening_manager.remove_peer(&connecting_id);
assert!(rx.next().await.is_none());
}
}