use std::{collections::HashMap, marker::PhantomData, ops::Deref, sync::Arc};
use futures::StreamExt;
use log::{debug, error, trace};
use sc_client_api::{Backend, FinalityNotification};
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_consensus_aura::AuraApi;
use sp_runtime::traits::{Block, Header};
use tokio::sync::{
    oneshot::{Receiver as OneShotReceiver, Sender as OneShotSender},
    RwLock,
};
use crate::{
    phron_primitives::{PhronSessionApi, AuraId, BlockHash, BlockNumber, SessionAuthorityData},
    runtime_api::RuntimeApi,
    session::SessionBoundaryInfo,
    ClientForPhron, SessionId, SessionPeriod,
};
const PRUNING_THRESHOLD: u32 = 10;
const LOG_TARGET: &str = "aleph-session-updater";
type SessionMap = HashMap<SessionId, SessionAuthorityData>;
type SessionSubscribers = HashMap<SessionId, Vec<OneShotSender<SessionAuthorityData>>>;
pub trait AuthorityProvider {
    fn authority_data(&self, block_number: BlockNumber) -> Option<SessionAuthorityData>;
    fn next_authority_data(&self, block_number: BlockNumber) -> Option<SessionAuthorityData>;
    fn aura_authorities(&self, block_number: BlockNumber) -> Option<Vec<AuraId>>;
    fn next_aura_authorities(&self, block_number: BlockNumber) -> Option<Vec<AuraId>>;
}
pub struct AuthorityProviderImpl<C, B, BE, RA>
where
    C: ClientForPhron<B, BE> + Send + Sync + 'static,
    C::Api: crate::phron_primitives::PhronSessionApi<B> + AuraApi<B, AuraId>,
    B: Block<Hash = BlockHash>,
    BE: Backend<B> + 'static,
    RA: RuntimeApi,
{
    client: Arc<C>,
    api: RA,
    _phantom: PhantomData<(B, BE)>,
}
impl<C, B, BE, RA> AuthorityProviderImpl<C, B, BE, RA>
where
    C: ClientForPhron<B, BE> + Send + Sync + 'static,
    C::Api: crate::phron_primitives::PhronSessionApi<B> + AuraApi<B, AuraId>,
    B: Block<Hash = BlockHash>,
    B::Header: Header<Number = BlockNumber>,
    BE: Backend<B> + 'static,
    RA: RuntimeApi,
{
    pub fn new(client: Arc<C>, api: RA) -> Self {
        Self {
            client,
            api,
            _phantom: PhantomData,
        }
    }
    fn block_hash(&self, block: BlockNumber) -> Option<BlockHash> {
        match self.client.block_hash(block) {
            Ok(r) => r,
            Err(e) => {
                error!(
                    target: LOG_TARGET,
                    "Error while retrieving hash for block #{}. {}", block, e
                );
                None
            }
        }
    }
}
impl<C, B, BE, RA> AuthorityProvider for AuthorityProviderImpl<C, B, BE, RA>
where
    C: ClientForPhron<B, BE> + Send + Sync + 'static,
    C::Api: PhronSessionApi<B> + AuraApi<B, AuraId>,
    B: Block<Hash = BlockHash>,
    B::Header: Header<Number = BlockNumber>,
    BE: Backend<B> + 'static,
    RA: RuntimeApi,
{
    fn aura_authorities(&self, block_number: BlockNumber) -> Option<Vec<AuraId>> {
        AuraApi::authorities(
            self.client.runtime_api().deref(),
            self.block_hash(block_number)?,
        )
        .ok()
    }
    fn next_aura_authorities(&self, block_number: BlockNumber) -> Option<Vec<AuraId>> {
        self.api
            .next_aura_authorities(self.block_hash(block_number)?)
            .ok()
    }
    fn authority_data(&self, block_number: BlockNumber) -> Option<SessionAuthorityData> {
        let block_hash = self.block_hash(block_number)?;
        match self.client.runtime_api().authority_data(block_hash) {
            Ok(data) => Some(data),
            Err(_) => PhronSessionApi::authorities(self.client.runtime_api().deref(), block_hash)
                .map(|authorities| SessionAuthorityData::new(authorities, None))
                .ok(),
        }
    }
    fn next_authority_data(&self, block_number: BlockNumber) -> Option<SessionAuthorityData> {
        let block_hash = self.block_hash(block_number)?;
        match self
            .client
            .runtime_api()
            .next_session_authority_data(block_hash)
            .map(|r| r.ok())
        {
            Ok(maybe_data) => maybe_data,
            Err(_) => self
                .client
                .runtime_api()
                .next_session_authorities(block_hash)
                .map(|r| {
                    r.map(|authorities| SessionAuthorityData::new(authorities, None))
                        .ok()
                })
                .ok()
                .flatten(),
        }
    }
}
#[async_trait::async_trait]
pub trait FinalityNotifier {
    async fn next(&mut self) -> Option<BlockNumber>;
    fn last_finalized(&self) -> BlockNumber;
}
pub struct FinalityNotifierImpl<C, B, BE>
where
    C: ClientForPhron<B, BE> + Send + Sync + 'static,
    C::Api: crate::phron_primitives::PhronSessionApi<B>,
    B: Block,
    B::Header: Header<Number = BlockNumber>,
    BE: Backend<B> + 'static,
{
    notification_stream: TracingUnboundedReceiver<FinalityNotification<B>>,
    client: Arc<C>,
    _phantom: PhantomData<(B, BE)>,
}
impl<C, B, BE> FinalityNotifierImpl<C, B, BE>
where
    C: ClientForPhron<B, BE> + Send + Sync + 'static,
    C::Api: crate::phron_primitives::PhronSessionApi<B>,
    B: Block,
    B::Header: Header<Number = BlockNumber>,
    BE: Backend<B> + 'static,
{
    pub fn new(client: Arc<C>) -> Self {
        Self {
            notification_stream: client.finality_notification_stream(),
            client,
            _phantom: PhantomData,
        }
    }
}
#[async_trait::async_trait]
impl<C, B, BE> FinalityNotifier for FinalityNotifierImpl<C, B, BE>
where
    C: ClientForPhron<B, BE> + Send + Sync + 'static,
    C::Api: crate::phron_primitives::PhronSessionApi<B>,
    B: Block,
    B::Header: Header<Number = BlockNumber>,
    BE: Backend<B> + 'static,
{
    async fn next(&mut self) -> Option<BlockNumber> {
        self.notification_stream
            .next()
            .await
            .map(|block| *block.header.number())
    }
    fn last_finalized(&self) -> BlockNumber {
        self.client.info().finalized_number
    }
}
#[derive(Clone, Debug)]
pub struct SharedSessionMap(Arc<RwLock<(SessionMap, SessionSubscribers)>>);
#[derive(Clone)]
pub struct ReadOnlySessionMap {
    inner: Arc<RwLock<(SessionMap, SessionSubscribers)>>,
}
impl SharedSessionMap {
    pub fn new() -> Self {
        Self(Arc::new(RwLock::new((HashMap::new(), HashMap::new()))))
    }
    pub async fn update(
        &mut self,
        id: SessionId,
        authority_data: SessionAuthorityData,
    ) -> Option<SessionAuthorityData> {
        let mut guard = self.0.write().await;
        if let Some(senders) = guard.1.remove(&id) {
            for sender in senders {
                if let Err(e) = sender.send(authority_data.clone()) {
                    error!(
                        target: LOG_TARGET,
                        "Error while sending notification: {:?}", e
                    );
                }
            }
        }
        guard.0.insert(id, authority_data)
    }
    async fn prune_below(&mut self, id: SessionId) {
        let mut guard = self.0.write().await;
        guard.0.retain(|&s, _| s >= id);
        guard.1.retain(|&s, _| s >= id);
    }
    pub fn read_only(&self) -> ReadOnlySessionMap {
        ReadOnlySessionMap {
            inner: self.0.clone(),
        }
    }
}
impl ReadOnlySessionMap {
    pub async fn subscribe_to_insertion(
        &self,
        id: SessionId,
    ) -> OneShotReceiver<SessionAuthorityData> {
        let (sender, receiver) = tokio::sync::oneshot::channel();
        let mut guard = self.inner.write().await;
        if let Some(authority_data) = guard.0.get(&id) {
            sender
                .send(authority_data.clone())
                .expect("we control both ends");
        } else {
            guard.1.entry(id).or_insert_with(Vec::new).push(sender);
        }
        receiver
    }
}
pub struct SessionMapUpdater<AP, FN>
where
    AP: AuthorityProvider,
    FN: FinalityNotifier,
{
    session_map: SharedSessionMap,
    authority_provider: AP,
    finality_notifier: FN,
    session_info: SessionBoundaryInfo,
}
impl<AP, FN> SessionMapUpdater<AP, FN>
where
    AP: AuthorityProvider,
    FN: FinalityNotifier,
{
    pub fn new(authority_provider: AP, finality_notifier: FN, period: SessionPeriod) -> Self {
        Self {
            session_map: SharedSessionMap::new(),
            authority_provider,
            finality_notifier,
            session_info: SessionBoundaryInfo::new(period),
        }
    }
    pub fn readonly_session_map(&self) -> ReadOnlySessionMap {
        self.session_map.read_only()
    }
    async fn handle_first_block_of_session(&mut self, session_id: SessionId) {
        let first_block = self.session_info.first_block_of_session(session_id);
        debug!(
            target: LOG_TARGET,
            "Handling first block #{:?} of session {:?}", first_block, session_id.0
        );
        if let Some(authority_data) = self.authority_provider.next_authority_data(first_block) {
            self.session_map
                .update(SessionId(session_id.0 + 1), authority_data)
                .await;
        } else {
            panic!("Authorities for next session {:?} must be available at first block #{:?} of current session", session_id.0, first_block);
        }
        if session_id.0 > PRUNING_THRESHOLD && session_id.0 % PRUNING_THRESHOLD == 0 {
            debug!(
                target: LOG_TARGET,
                "Pruning session map below session #{:?}",
                session_id.0 - PRUNING_THRESHOLD
            );
            self.session_map
                .prune_below(SessionId(session_id.0 - PRUNING_THRESHOLD))
                .await;
        }
    }
    fn authorities_for_session(&mut self, session_id: SessionId) -> Option<SessionAuthorityData> {
        let first_block = self.session_info.first_block_of_session(session_id);
        self.authority_provider.authority_data(first_block)
    }
    async fn catch_up(&mut self) -> SessionId {
        let last_finalized = self.finality_notifier.last_finalized();
        let current_session = self.session_info.session_id_from_block_num(last_finalized);
        let starting_session = SessionId(current_session.0.saturating_sub(PRUNING_THRESHOLD - 1));
        debug!(target: LOG_TARGET,
            "Last finalized is {:?}; Catching up with authorities starting from session {:?} up to next session {:?}",
            last_finalized, starting_session.0, current_session.0 + 1
        );
        for session in starting_session.0..current_session.0 {
            let id = SessionId(session);
            if let Some(authority_data) = self.authorities_for_session(id) {
                self.session_map.update(id, authority_data).await;
            } else {
                debug!(
                    target: LOG_TARGET,
                    "No authorities for session {:?} during catch-up. Most likely already pruned.",
                    id.0
                )
            }
        }
        match self.authorities_for_session(current_session) {
            Some(current_authority_data) => {
                self.session_map
                    .update(current_session, current_authority_data)
                    .await
            }
            None => panic!(
                "Authorities for current session {:?} must be available from the beginning",
                current_session.0
            ),
        };
        self.handle_first_block_of_session(current_session).await;
        current_session
    }
    pub async fn run(mut self) {
        let mut last_updated = self.catch_up().await;
        while let Some(last_finalized) = self.finality_notifier.next().await {
            trace!(
                target: LOG_TARGET,
                "got FinalityNotification about #{:?}",
                last_finalized
            );
            let session_id = self.session_info.session_id_from_block_num(last_finalized);
            if last_updated >= session_id {
                continue;
            }
            for session in (last_updated.0 + 1)..=session_id.0 {
                self.handle_first_block_of_session(SessionId(session)).await;
            }
            last_updated = session_id;
        }
    }
}
#[cfg(test)]
mod tests {
    use std::time::Duration;
    use futures_timer::Delay;
    use sc_utils::mpsc::tracing_unbounded;
    use tokio::sync::oneshot::error::TryRecvError;
    use super::*;
    use crate::{phron_primitives::BlockNumber, session::testing::authority_data};
    const FIRST_THRESHOLD: u32 = PRUNING_THRESHOLD + 1;
    const SECOND_THRESHOLD: u32 = 2 * PRUNING_THRESHOLD + 1;
    impl ReadOnlySessionMap {
        async fn get(&self, id: SessionId) -> Option<SessionAuthorityData> {
            self.inner.read().await.0.get(&id).cloned()
        }
    }
    struct MockProvider {
        pub session_map: HashMap<BlockNumber, SessionAuthorityData>,
        pub next_session_map: HashMap<BlockNumber, SessionAuthorityData>,
    }
    impl MockProvider {
        fn new() -> Self {
            Self {
                session_map: HashMap::new(),
                next_session_map: HashMap::new(),
            }
        }
        fn add_session(&mut self, session_id: BlockNumber) {
            self.session_map
                .insert(session_id, authority_data_for_session(session_id));
            self.next_session_map
                .insert(session_id, authority_data_for_session(session_id + 1));
        }
    }
    impl AuthorityProvider for MockProvider {
        fn authority_data(&self, block_number: BlockNumber) -> Option<SessionAuthorityData> {
            self.session_map.get(&block_number).cloned()
        }
        fn next_authority_data(&self, block_number: BlockNumber) -> Option<SessionAuthorityData> {
            self.next_session_map.get(&block_number).cloned()
        }
        fn aura_authorities(&self, _block_number: BlockNumber) -> Option<Vec<AuraId>> {
            None
        }
        fn next_aura_authorities(&self, _block_number: BlockNumber) -> Option<Vec<AuraId>> {
            None
        }
    }
    struct MockNotifier {
        pub last_finalized: BlockNumber,
        pub receiver: TracingUnboundedReceiver<BlockNumber>,
    }
    impl MockNotifier {
        fn new(receiver: TracingUnboundedReceiver<BlockNumber>) -> Self {
            Self {
                receiver,
                last_finalized: 0,
            }
        }
    }
    #[async_trait::async_trait]
    impl FinalityNotifier for MockNotifier {
        async fn next(&mut self) -> Option<BlockNumber> {
            self.receiver.next().await
        }
        fn last_finalized(&self) -> BlockNumber {
            self.last_finalized
        }
    }
    fn authority_data_for_session(session_id: u32) -> SessionAuthorityData {
        authority_data(session_id * 4, (session_id + 1) * 4)
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn genesis_catch_up() {
        let (_sender, receiver) = tracing_unbounded("test", 1_000);
        let mut mock_provider = MockProvider::new();
        let mock_notifier = MockNotifier::new(receiver);
        mock_provider.add_session(0);
        let updater = SessionMapUpdater::new(mock_provider, mock_notifier, SessionPeriod(1));
        let session_map = updater.readonly_session_map();
        let _handle = tokio::spawn(updater.run());
        Delay::new(Duration::from_millis(50)).await;
        assert_eq!(
            session_map.get(SessionId(0)).await,
            Some(authority_data(0, 4))
        );
        assert_eq!(
            session_map.get(SessionId(1)).await,
            Some(authority_data(4, 8))
        );
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn updates_session_map_on_notifications() {
        let (sender, receiver) = tracing_unbounded("test", 1_000);
        let mut mock_provider = MockProvider::new();
        let mock_notificator = MockNotifier::new(receiver);
        mock_provider.add_session(0);
        mock_provider.add_session(1);
        mock_provider.add_session(2);
        let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
        let session_map = updater.readonly_session_map();
        for n in 1..3 {
            sender.unbounded_send(n).unwrap();
        }
        let _handle = tokio::spawn(updater.run());
        Delay::new(Duration::from_millis(50)).await;
        assert_eq!(
            session_map.get(SessionId(0)).await,
            Some(authority_data(0, 4))
        );
        assert_eq!(
            session_map.get(SessionId(1)).await,
            Some(authority_data(4, 8))
        );
        assert_eq!(
            session_map.get(SessionId(2)).await,
            Some(authority_data(8, 12))
        );
        assert_eq!(
            session_map.get(SessionId(3)).await,
            Some(authority_data(12, 16))
        );
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn catch_up() {
        let (_sender, receiver) = tracing_unbounded("test", 1_000);
        let mut mock_provider = MockProvider::new();
        let mut mock_notificator = MockNotifier::new(receiver);
        mock_provider.add_session(0);
        mock_provider.add_session(1);
        mock_provider.add_session(2);
        mock_notificator.last_finalized = 2;
        let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
        let session_map = updater.readonly_session_map();
        let _handle = tokio::spawn(updater.run());
        Delay::new(Duration::from_millis(50)).await;
        assert_eq!(
            session_map.get(SessionId(0)).await,
            Some(authority_data_for_session(0))
        );
        assert_eq!(
            session_map.get(SessionId(1)).await,
            Some(authority_data_for_session(1))
        );
        assert_eq!(
            session_map.get(SessionId(2)).await,
            Some(authority_data_for_session(2))
        );
        assert_eq!(
            session_map.get(SessionId(3)).await,
            Some(authority_data_for_session(3))
        );
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn catch_up_old_sessions() {
        let (_sender, receiver) = tracing_unbounded("test", 1_000);
        let mut mock_provider = MockProvider::new();
        let mut mock_notificator = MockNotifier::new(receiver);
        for i in 0..SECOND_THRESHOLD {
            mock_provider.add_session(i);
        }
        mock_notificator.last_finalized = 20;
        let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
        let session_map = updater.readonly_session_map();
        let _handle = tokio::spawn(updater.run());
        Delay::new(Duration::from_millis(50)).await;
        for i in 0..FIRST_THRESHOLD {
            assert_eq!(
                session_map.get(SessionId(i)).await,
                None,
                "Session {i:?} should be pruned"
            );
        }
        for i in FIRST_THRESHOLD..SECOND_THRESHOLD {
            assert_eq!(
                session_map.get(SessionId(i)).await,
                Some(authority_data_for_session(i)),
                "Session {i:?} should not be pruned"
            );
        }
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn deals_with_database_pruned_authorities() {
        let (_sender, receiver) = tracing_unbounded("test", 1_000);
        let mut mock_provider = MockProvider::new();
        let mut mock_notificator = MockNotifier::new(receiver);
        mock_provider.add_session(5);
        mock_notificator.last_finalized = 5;
        let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
        let session_map = updater.readonly_session_map();
        let _handle = tokio::spawn(updater.run());
        Delay::new(Duration::from_millis(50)).await;
        for i in 0..5 {
            assert_eq!(
                session_map.get(SessionId(i)).await,
                None,
                "Session {i:?} should not be available"
            );
        }
        assert_eq!(
            session_map.get(SessionId(5)).await,
            Some(authority_data_for_session(5))
        );
        assert_eq!(
            session_map.get(SessionId(6)).await,
            Some(authority_data_for_session(6))
        );
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn prunes_old_sessions() {
        let (sender, receiver) = tracing_unbounded("test", 1_000);
        let mut mock_provider = MockProvider::new();
        let mock_notificator = MockNotifier::new(receiver);
        for i in 0..SECOND_THRESHOLD {
            mock_provider.add_session(i);
        }
        let updater = SessionMapUpdater::new(mock_provider, mock_notificator, SessionPeriod(1));
        let session_map = updater.readonly_session_map();
        let _handle = tokio::spawn(updater.run());
        for n in 1..FIRST_THRESHOLD {
            sender.unbounded_send(n).unwrap();
        }
        Delay::new(Duration::from_millis(50)).await;
        for i in 0..=FIRST_THRESHOLD {
            assert_eq!(
                session_map.get(SessionId(i)).await,
                Some(authority_data_for_session(i)),
                "Session {i:?} should be available"
            );
        }
        for i in (FIRST_THRESHOLD + 1)..=SECOND_THRESHOLD {
            assert_eq!(
                session_map.get(SessionId(i)).await,
                None,
                "Session {i:?} should not be avalable yet"
            );
        }
        for n in FIRST_THRESHOLD..SECOND_THRESHOLD {
            sender.unbounded_send(n).unwrap();
        }
        Delay::new(Duration::from_millis(50)).await;
        for i in 0..(FIRST_THRESHOLD - 1) {
            assert_eq!(
                session_map.get(SessionId(i)).await,
                None,
                "Session {i:?} should be pruned"
            );
        }
        for i in FIRST_THRESHOLD..=SECOND_THRESHOLD {
            assert_eq!(
                session_map.get(SessionId(i)).await,
                Some(authority_data_for_session(i)),
                "Session {i:?} should be avalable"
            );
        }
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn subscription_with_already_defined_session_works() {
        let mut shared = SharedSessionMap::new();
        let readonly = shared.read_only();
        let session = SessionId(0);
        shared.update(session, authority_data(0, 2)).await;
        let mut receiver = readonly.subscribe_to_insertion(session).await;
        assert_eq!(Ok(authority_data(0, 2)), receiver.try_recv());
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn notifies_on_insertion() {
        let mut shared = SharedSessionMap::new();
        let readonly = shared.read_only();
        let session = SessionId(0);
        let mut receiver = readonly.subscribe_to_insertion(session).await;
        assert_eq!(Err(TryRecvError::Empty), receiver.try_recv());
        shared.update(session, authority_data(0, 2)).await;
        assert_eq!(Ok(authority_data(0, 2)), receiver.await);
    }
}