use std::{collections::HashMap, fmt, iter, pin::Pin, sync::Arc};
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use log::{error, trace};
use sc_network::{
    multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, Multiaddr,
    NetworkEventStream as _, NetworkNotification, NetworkPeers, NetworkService,
    NotificationSenderT, PeerId, ProtocolName,
};
use sc_network_common::{
    sync::{SyncEvent, SyncEventStream},
    ExHashT,
};
use sc_network_sync::SyncingService;
use sp_runtime::traits::Block;
use tokio::select;
use crate::network::gossip::{Event, EventStream, NetworkSender, Protocol, RawNetwork};
const AUTHENTICATION_PROTOCOL_NAME: &str = "/auth/0";
const LEGACY_AUTHENTICATION_PROTOCOL_NAME: &str = "/Phron/1";
const BLOCK_SYNC_PROTOCOL_NAME: &str = "/sync/0";
#[derive(Clone)]
pub struct ProtocolNaming {
    authentication_name: ProtocolName,
    authentication_fallback_names: Vec<ProtocolName>,
    block_sync_name: ProtocolName,
    protocols_by_name: HashMap<ProtocolName, Protocol>,
}
impl ProtocolNaming {
    pub fn new(chain_prefix: String) -> Self {
        let authentication_name: ProtocolName =
            format!("{chain_prefix}{AUTHENTICATION_PROTOCOL_NAME}").into();
        let mut protocols_by_name = HashMap::new();
        protocols_by_name.insert(authentication_name.clone(), Protocol::Authentication);
        let authentication_fallback_names: Vec<ProtocolName> =
            vec![LEGACY_AUTHENTICATION_PROTOCOL_NAME.into()];
        for protocol_name in &authentication_fallback_names {
            protocols_by_name.insert(protocol_name.clone(), Protocol::Authentication);
        }
        let block_sync_name: ProtocolName =
            format!("{chain_prefix}{BLOCK_SYNC_PROTOCOL_NAME}").into();
        protocols_by_name.insert(block_sync_name.clone(), Protocol::BlockSync);
        ProtocolNaming {
            authentication_name,
            authentication_fallback_names,
            block_sync_name,
            protocols_by_name,
        }
    }
    pub fn protocol_name(&self, protocol: &Protocol) -> ProtocolName {
        use Protocol::*;
        match protocol {
            Authentication => self.authentication_name.clone(),
            BlockSync => self.block_sync_name.clone(),
        }
    }
    pub fn fallback_protocol_names(&self, protocol: &Protocol) -> Vec<ProtocolName> {
        use Protocol::*;
        match protocol {
            Authentication => self.authentication_fallback_names.clone(),
            _ => Vec::new(),
        }
    }
    fn to_protocol(&self, protocol_name: &str) -> Option<Protocol> {
        self.protocols_by_name.get(protocol_name).copied()
    }
}
#[derive(Debug)]
pub enum SenderError {
    CannotCreateSender(PeerId, Protocol),
    LostConnectionToPeer(PeerId),
    LostConnectionToPeerReady(PeerId),
}
impl fmt::Display for SenderError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            SenderError::CannotCreateSender(peer_id, protocol) => {
                write!(
                    f,
                    "Can not create sender to peer {peer_id:?} with protocol {protocol:?}"
                )
            }
            SenderError::LostConnectionToPeer(peer_id) => {
                write!(
                    f,
                    "Lost connection to peer {peer_id:?} while preparing sender"
                )
            }
            SenderError::LostConnectionToPeerReady(peer_id) => {
                write!(
                    f,
                    "Lost connection to peer {peer_id:?} after sender was ready"
                )
            }
        }
    }
}
impl std::error::Error for SenderError {}
pub struct SubstrateNetworkSender {
    notification_sender: Box<dyn NotificationSenderT>,
    peer_id: PeerId,
}
#[async_trait]
impl NetworkSender for SubstrateNetworkSender {
    type SenderError = SenderError;
    async fn send<'a>(
        &'a self,
        data: impl Into<Vec<u8>> + Send + Sync + 'static,
    ) -> Result<(), SenderError> {
        self.notification_sender
            .ready()
            .await
            .map_err(|_| SenderError::LostConnectionToPeer(self.peer_id))?
            .send(data.into())
            .map_err(|_| SenderError::LostConnectionToPeerReady(self.peer_id))
    }
}
pub struct NetworkEventStream<B: Block, H: ExHashT> {
    stream: Pin<Box<dyn Stream<Item = SubstrateEvent> + Send>>,
    sync_stream: Pin<Box<dyn Stream<Item = SyncEvent> + Send>>,
    naming: ProtocolNaming,
    network: Arc<NetworkService<B, H>>,
}
#[async_trait]
impl<B: Block, H: ExHashT> EventStream<PeerId> for NetworkEventStream<B, H> {
    async fn next_event(&mut self) -> Option<Event<PeerId>> {
        use Event::*;
        use SubstrateEvent::*;
        use SyncEvent::*;
        loop {
            select! {
                Some(event) = self.stream.next() => {
                    match event {
                        NotificationStreamOpened {
                            remote, protocol, ..
                        } => match self.naming.to_protocol(protocol.as_ref()) {
                            Some(protocol) => return Some(StreamOpened(remote, protocol)),
                            None => continue,
                        },
                        NotificationStreamClosed { remote, protocol } => {
                            match self.naming.to_protocol(protocol.as_ref()) {
                                Some(protocol) => return Some(StreamClosed(remote, protocol)),
                                None => continue,
                            }
                        }
                        NotificationsReceived { messages, remote } => {
                            return Some(Messages(
                                remote,
                                messages
                                    .into_iter()
                                    .filter_map(|(protocol, data)| {
                                        self.naming
                                            .to_protocol(protocol.as_ref())
                                            .map(|protocol| (protocol, data))
                                    })
                                    .collect(),
                            ));
                        }
                        Dht(_) => continue,
                    }
                },
                Some(event) = self.sync_stream.next() => {
                    match event {
                        PeerConnected(remote) => {
                            let multiaddress: Multiaddr =
                                iter::once(MultiaddressProtocol::P2p(remote.into())).collect();
                            trace!(target: "phron-network", "Connected event from address {:?}", multiaddress);
                            if let Err(e) = self.network.add_peers_to_reserved_set(
                                self.naming.protocol_name(&Protocol::Authentication),
                                iter::once(multiaddress.clone()).collect(),
                            ) {
                                error!(target: "phron-network", "add_reserved failed for authentications: {}", e);
                            }
                            if let Err(e) = self.network.add_peers_to_reserved_set(
                                self.naming.protocol_name(&Protocol::BlockSync),
                                iter::once(multiaddress).collect(),
                            ) {
                                error!(target: "phron-network", "add_reserved failed for block sync: {}", e);
                            }
                            continue;
                        }
                        PeerDisconnected(remote) => {
                            trace!(target: "phron-network", "Disconnected event for peer {:?}", remote);
                            let addresses: Vec<_> = iter::once(remote).collect();
                            if let Err(e) = self.network.remove_peers_from_reserved_set(
                                self.naming.protocol_name(&Protocol::Authentication),
                                addresses.clone(),
                            ) {
                                log::warn!(target: "phron-network", "Error while removing peer from Protocol::Authentication reserved set: {}", e)
                            }
                            if let Err(e) = self.network.remove_peers_from_reserved_set(
                                self.naming.protocol_name(&Protocol::BlockSync),
                                addresses,
                            ) {
                                log::warn!(target: "phron-network", "Error while removing peer from Protocol::BlockSync reserved set: {}", e)
                            }
                            continue;
                        }
                    }
                },
                else => return None,
            }
        }
    }
}
#[derive(Clone)]
pub struct SubstrateNetwork<B: Block, H: ExHashT> {
    network: Arc<NetworkService<B, H>>,
    sync_network: Arc<SyncingService<B>>,
    naming: ProtocolNaming,
}
impl<B: Block, H: ExHashT> SubstrateNetwork<B, H> {
    pub fn new(
        network: Arc<NetworkService<B, H>>,
        sync_network: Arc<SyncingService<B>>,
        naming: ProtocolNaming,
    ) -> Self {
        SubstrateNetwork {
            network,
            sync_network,
            naming,
        }
    }
}
impl<B: Block, H: ExHashT> RawNetwork for SubstrateNetwork<B, H> {
    type SenderError = SenderError;
    type NetworkSender = SubstrateNetworkSender;
    type PeerId = PeerId;
    type EventStream = NetworkEventStream<B, H>;
    fn event_stream(&self) -> Self::EventStream {
        NetworkEventStream {
            stream: Box::pin(self.network.as_ref().event_stream("phron-network")),
            sync_stream: Box::pin(
                self.sync_network
                    .as_ref()
                    .event_stream("phron-syncing-network"),
            ),
            naming: self.naming.clone(),
            network: self.network.clone(),
        }
    }
    fn sender(
        &self,
        peer_id: Self::PeerId,
        protocol: Protocol,
    ) -> Result<Self::NetworkSender, Self::SenderError> {
        Ok(SubstrateNetworkSender {
            notification_sender: self
                .network
                .notification_sender(peer_id, self.naming.protocol_name(&protocol))
                .map_err(|_| SenderError::CannotCreateSender(peer_id, protocol))?,
            peer_id,
        })
    }
}