use std::{
fmt::{Debug, Display},
hash::Hash,
pin::Pin,
};
use parity_scale_codec::Codec;
use tokio::io::{AsyncRead, AsyncWrite};
mod crypto;
mod incoming;
mod io;
mod manager;
pub mod metrics;
pub mod mock;
mod outgoing;
mod protocols;
mod rate_limiting;
mod service;
#[cfg(test)]
mod testing;
pub use crypto::{PublicKey, SecretKey};
pub use rate_limiting::{RateLimitingDialer, RateLimitingListener};
pub use service::{Service, SpawnHandleT};
const LOG_TARGET: &str = "network-clique";
pub trait Data: Clone + Codec + Send + Sync + 'static {}
impl<D: Clone + Codec + Send + Sync + 'static> Data for D {}
pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send {
fn to_short_string(&self) -> String {
let id = format!("{self}");
if id.len() <= 12 {
return id;
}
let prefix: String = id.chars().take(4).collect();
let suffix: String = id.chars().skip(id.len().saturating_sub(8)).collect();
format!("{}…{}", &prefix, &suffix)
}
}
pub trait AddressingInformation: Debug + Hash + Codec + Clone + Eq + Send + Sync + 'static {
type PeerId: PeerId;
fn peer_id(&self) -> Self::PeerId;
fn verify(&self) -> bool;
fn address(&self) -> String;
}
pub trait NetworkIdentity {
type PeerId: PeerId;
type AddressingInformation: AddressingInformation<PeerId = Self::PeerId>;
fn identity(&self) -> Self::AddressingInformation;
}
#[async_trait::async_trait]
pub trait Network<PK: PublicKey, A: Data, D: Data>: Send + 'static {
fn add_connection(&mut self, peer: PK, address: A);
fn remove_connection(&mut self, peer: PK);
fn send(&self, data: D, recipient: PK);
async fn next(&mut self) -> Option<D>;
}
pub type PeerAddressInfo = String;
pub trait ConnectionInfo {
fn peer_address_info(&self) -> PeerAddressInfo;
}
pub trait Splittable: AsyncWrite + AsyncRead + ConnectionInfo + Unpin + Send {
type Sender: AsyncWrite + ConnectionInfo + Unpin + Send;
type Receiver: AsyncRead + ConnectionInfo + Unpin + Send;
fn split(self) -> (Self::Sender, Self::Receiver);
}
#[async_trait::async_trait]
pub trait Dialer<A: Data>: Clone + Send + 'static {
type Connection: Splittable;
type Error: Display + Send;
async fn connect(&mut self, address: A) -> Result<Self::Connection, Self::Error>;
}
use log::info;
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener, TcpStream,
};
#[async_trait::async_trait]
pub trait Listener {
type Connection: Splittable + 'static;
type Error: Display;
async fn accept(&mut self) -> Result<Self::Connection, Self::Error>;
}
impl ConnectionInfo for TcpStream {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Ok(addr) => addr.to_string(),
Err(e) => format!("unknown address: {e}"),
}
}
}
impl ConnectionInfo for OwnedWriteHalf {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Ok(addr) => addr.to_string(),
Err(e) => e.to_string(),
}
}
}
impl ConnectionInfo for OwnedReadHalf {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Ok(addr) => addr.to_string(),
Err(e) => e.to_string(),
}
}
}
impl Splittable for TcpStream {
type Sender = OwnedWriteHalf;
type Receiver = OwnedReadHalf;
fn split(self) -> (Self::Sender, Self::Receiver) {
let (receiver, sender) = self.into_split();
(sender, receiver)
}
}
#[async_trait::async_trait]
impl Listener for TcpListener {
type Connection = TcpStream;
type Error = std::io::Error;
async fn accept(&mut self) -> Result<Self::Connection, Self::Error> {
let stream = TcpListener::accept(self).await.map(|(stream, _)| stream)?;
if stream.set_linger(None).is_err() {
info!(target: LOG_TARGET, "stream.set_linger(None) failed.");
};
Ok(stream)
}
}
pub struct Splitted<I, O>(I, O);
impl<I: AsyncRead + Unpin, O: Unpin> AsyncRead for Splitted<I, O> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
impl<I: Unpin, O: AsyncWrite + Unpin> AsyncWrite for Splitted<I, O> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.1).poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.1).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.1).poll_shutdown(cx)
}
}
impl<I, O: ConnectionInfo> ConnectionInfo for Splitted<I, O> {
fn peer_address_info(&self) -> PeerAddressInfo {
self.1.peer_address_info()
}
}
impl<
I: AsyncRead + ConnectionInfo + Unpin + Send,
O: AsyncWrite + ConnectionInfo + Unpin + Send,
> Splittable for Splitted<I, O>
{
type Sender = O;
type Receiver = I;
fn split(self) -> (Self::Sender, Self::Receiver) {
(self.1, self.0)
}
}