use std::{default::Default, path::PathBuf, time::Duration};
use futures_timer::Delay;
use log::{debug, error, info, trace, warn};
use tokio::{task::spawn_blocking, time::sleep};
use crate::{
    party::{
        manager::{Handle, Task, TaskCommon as AuthoritySubtaskCommon},
        traits::{ChainState, NodeSessionManager},
    },
    session::SessionBoundaryInfo,
    session_map::ReadOnlySessionMap,
    SessionId, SyncOracle,
};
pub(crate) mod backup;
pub mod impls;
pub mod manager;
pub mod traits;
#[cfg(test)]
mod mocks;
pub(crate) struct ConsensusPartyParams<CS, NSM> {
    pub session_authorities: ReadOnlySessionMap,
    pub chain_state: CS,
    pub sync_oracle: SyncOracle,
    pub backup_saving_path: Option<PathBuf>,
    pub session_manager: NSM,
    pub session_info: SessionBoundaryInfo,
}
pub(crate) struct ConsensusParty<CS, NSM>
where
    CS: ChainState,
    NSM: NodeSessionManager,
{
    session_authorities: ReadOnlySessionMap,
    chain_state: CS,
    sync_oracle: SyncOracle,
    backup_saving_path: Option<PathBuf>,
    session_manager: NSM,
    session_info: SessionBoundaryInfo,
}
const SESSION_STATUS_CHECK_PERIOD: Duration = Duration::from_millis(1000);
impl<CS, NSM> ConsensusParty<CS, NSM>
where
    CS: ChainState,
    NSM: NodeSessionManager,
{
    pub(crate) fn new(params: ConsensusPartyParams<CS, NSM>) -> Self {
        let ConsensusPartyParams {
            session_authorities,
            sync_oracle,
            backup_saving_path,
            chain_state,
            session_manager,
            session_info,
            ..
        } = params;
        Self {
            sync_oracle,
            session_authorities,
            backup_saving_path,
            chain_state,
            session_manager,
            session_info,
        }
    }
    async fn run_session(&mut self, session_id: SessionId) {
        let last_block = self.session_info.last_block_of_session(session_id);
        if let Some(previous_session_id) = session_id.0.checked_sub(1) {
            let backup_saving_path = self.backup_saving_path.clone();
            spawn_blocking(move || backup::remove(backup_saving_path, previous_session_id));
        }
        if self.chain_state.best_block_number() >= last_block {
            for attempt in 0..10 {
                if attempt != 0 {
                    Delay::new(Duration::from_millis(200)).await;
                }
                let last_finalized_number = self.chain_state.finalized_number();
                if last_finalized_number >= last_block {
                    debug!(target: "phron-party", "Skipping session {:?} early because block {:?} is already finalized", session_id, last_finalized_number);
                    return;
                }
            }
        }
        let authority_data = match self
            .session_authorities
            .subscribe_to_insertion(session_id)
            .await
            .await
        {
            Err(e) => panic!("Error while receiving the notification about current session {e:?}"),
            Ok(authority_data) => authority_data,
        };
        let authorities = authority_data.authorities();
        trace!(target: "phron-party", "Authority data for session {:?}: {:?}", session_id, authorities);
        let mut maybe_authority_task = if let Some(node_id) =
            self.session_manager.node_idx(authorities)
        {
            match backup::rotate(self.backup_saving_path.clone(), session_id.0) {
                Ok(backup) => {
                    debug!(target: "phron-party", "Running session {:?} as authority id {:?}", session_id, node_id);
                    Some(
                        self.session_manager
                            .spawn_authority_task_for_session(
                                session_id,
                                node_id,
                                backup,
                                authorities,
                            )
                            .await,
                    )
                }
                Err(err) => {
                    error!(
                        target: "Phron-member",
                        "Error setting up backup saving for session {:?}. Not running the session: {}",
                        session_id, err
                    );
                    return;
                }
            }
        } else {
            debug!(target: "phron-party", "Running session {:?} as non-authority", session_id);
            if let Err(e) = self
                .session_manager
                .start_nonvalidator_session(session_id, authorities)
            {
                warn!(target: "phron-party", "Failed to start nonvalidator session{:?}: {}", session_id, e);
            }
            None
        };
        let mut check_session_status = Delay::new(SESSION_STATUS_CHECK_PERIOD);
        let next_session_id = SessionId(session_id.0 + 1);
        let mut start_next_session_network = Some(
            self.session_authorities
                .subscribe_to_insertion(next_session_id)
                .await,
        );
        loop {
            tokio::select! {
                _ = &mut check_session_status => {
                    let last_finalized_number = self.chain_state.finalized_number();
                    if last_finalized_number >= last_block {
                        debug!(target: "phron-party", "Terminating session {:?}", session_id);
                        break;
                    }
                    check_session_status = Delay::new(SESSION_STATUS_CHECK_PERIOD);
                },
                Some(next_session_authority_data) = async {
                    match &mut start_next_session_network {
                        Some(notification) => {
                            match notification.await {
                                Err(e) => {
                                    warn!(target: "phron-party", "Error with subscription {:?}", e);
                                    start_next_session_network = Some(self.session_authorities.subscribe_to_insertion(next_session_id).await);
                                    None
                                },
                                Ok(next_session_authority_data) => {
                                    Some(next_session_authority_data)
                                }
                            }
                        },
                        None => None,
                    }
                } => {
                    let next_session_authorities = next_session_authority_data.authorities();
                    match self.session_manager.node_idx(next_session_authorities) {
                         Some(next_session_node_id) => if let Err(e) = self
                                .session_manager
                                .early_start_validator_session(
                                    next_session_id,
                                    next_session_node_id,
                                    next_session_authorities,
                                )
                            {
                                warn!(target: "phron-party", "Failed to early start validator session{:?}: {}", next_session_id, e);
                            }
                        None => {
                            if let Err(e) = self
                                .session_manager
                                .start_nonvalidator_session(next_session_id, next_session_authorities)
                            {
                                warn!(target: "phron-party", "Failed to early start nonvalidator session{:?}: {}", next_session_id, e);
                            }
                        }
                    }
                    start_next_session_network = None;
                },
                Some(_) = async {
                    match maybe_authority_task.as_mut() {
                        Some(task) => Some(task.stopped().await),
                        None => None,
                    }
                } => {
                    warn!(target: "phron-party", "Authority task ended prematurely, giving up for this session.");
                    maybe_authority_task = None;
                },
            }
        }
        if let Some(task) = maybe_authority_task {
            debug!(target: "phron-party", "Stopping the authority task.");
            if task.stop().await.is_err() {
                warn!(target: "phron-party", "Authority task did not stop silently");
            }
        }
        if let Err(e) = self.session_manager.stop_session(session_id) {
            warn!(target: "phron-party", "Session Manager failed to stop in session {:?}: {}", session_id, e)
        }
    }
    pub async fn run(mut self) {
        let starting_session = self.catch_up().await;
        for curr_id in starting_session.0.. {
            info!(target: "phron-party", "Running session {:?}.", curr_id);
            self.run_session(SessionId(curr_id)).await;
        }
    }
    async fn catch_up(&mut self) -> SessionId {
        let mut finalized_number = self.chain_state.finalized_number();
        let mut previous_finalized_number = None;
        while self.sync_oracle.major_sync() && Some(finalized_number) != previous_finalized_number {
            sleep(Duration::from_millis(500)).await;
            previous_finalized_number = Some(finalized_number);
            finalized_number = self.chain_state.finalized_number();
        }
        self.session_info
            .session_id_from_block_num(finalized_number)
    }
}
#[cfg(test)]
mod tests {
    use std::{
        collections::{HashMap, HashSet},
        sync::Arc,
        time::Duration,
    };
    use sp_runtime::testing::UintAuthorityId;
    use tokio::{task::JoinHandle, time::sleep};
    use crate::{
        phron_primitives::{AuthorityId, SessionAuthorityData},
        party::{
            mocks::{MockChainState, MockNodeSessionManager},
            ConsensusParty, ConsensusPartyParams, SESSION_STATUS_CHECK_PERIOD,
        },
        session::SessionBoundaryInfo,
        session_map::SharedSessionMap,
        SessionId, SessionPeriod, SyncOracle,
    };
    type Party = ConsensusParty<Arc<MockChainState>, Arc<MockNodeSessionManager>>;
    struct PartyState {
        validator_started: Vec<SessionId>,
        early_started: Vec<SessionId>,
        stopped: Vec<SessionId>,
        non_validator_started: Vec<SessionId>,
    }
    #[derive(Default)]
    struct BlockEvents {
        session_authorities: Option<(SessionId, Vec<AuthorityId>)>,
        id: Option<Option<AuthorityId>>,
        state_to_assert: Option<PartyState>,
    }
    struct PartyTest {
        current_block: u32,
        controller: MockController,
        block_events: HashMap<u32, BlockEvents>,
        handle: Option<JoinHandle<()>>,
    }
    impl PartyTest {
        fn new(session_period: SessionPeriod) -> (Self, Party) {
            let (party, controller) = create_mocked_consensus_party(session_period);
            (
                Self {
                    current_block: 0,
                    controller,
                    block_events: Default::default(),
                    handle: None,
                },
                party,
            )
        }
        fn run_party(mut self, party: Party) -> Self {
            let party_handle = tokio::spawn(party.run());
            self.handle = Some(party_handle);
            self
        }
        fn assert_state(&self, expected_state: PartyState, block: u32) {
            let PartyState {
                validator_started,
                early_started,
                stopped,
                non_validator_started,
            } = expected_state;
            assert_eq!(
                *self
                    .controller
                    .node_session_manager
                    .validator_session_started
                    .lock()
                    .unwrap(),
                HashSet::from_iter(validator_started),
                "`validator_session_started` mismatch at block #{block}"
            );
            assert_eq!(
                *self
                    .controller
                    .node_session_manager
                    .session_early_started
                    .lock()
                    .unwrap(),
                HashSet::from_iter(early_started),
                "`session_early_started` mismatch at block #{block}"
            );
            assert_eq!(
                *self
                    .controller
                    .node_session_manager
                    .session_stopped
                    .lock()
                    .unwrap(),
                HashSet::from_iter(stopped),
                "`session_stopped` mismatch at block #{block}"
            );
            assert_eq!(
                *self
                    .controller
                    .node_session_manager
                    .nonvalidator_session_started
                    .lock()
                    .unwrap(),
                HashSet::from_iter(non_validator_started),
                "`nonvalidator_session_started` mismatch at block #{block}"
            );
        }
        async fn run_for_n_blocks(mut self, n: u32) -> Self {
            for i in self.current_block..self.current_block + n {
                self.controller.chain_state_mock.set_best_block(i);
                self.controller.chain_state_mock.set_finalized_block(i);
                if let Some(events) = self.block_events.remove(&i) {
                    self.handle_events(events, i).await;
                }
            }
            self.current_block += n;
            self
        }
        async fn handle_events(&mut self, events: BlockEvents, block: u32) {
            let BlockEvents {
                session_authorities,
                id,
                state_to_assert,
            } = events;
            if let Some(expected_state) = state_to_assert {
                sleep(Duration::from_millis(
                    SESSION_STATUS_CHECK_PERIOD.as_millis() as u64 + 100,
                ))
                .await;
                self.assert_state(expected_state, block);
            }
            if let Some((session, authorities)) = session_authorities {
                self.controller
                    .shared_session_map
                    .update(session, SessionAuthorityData::new(authorities, None))
                    .await;
            }
            if let Some(id) = id {
                self.controller.node_session_manager.set_node_id(id)
            }
        }
        fn set_authorities_for_session_at_block(
            mut self,
            block: u32,
            authorities: Vec<AuthorityId>,
            session: SessionId,
        ) -> Self {
            let events = self.block_events.entry(block).or_default();
            events.session_authorities = Some((session, authorities));
            self
        }
        fn set_node_id_for_session_at_block(mut self, block: u32, id: Option<AuthorityId>) -> Self {
            let events = self.block_events.entry(block).or_default();
            events.id = Some(id);
            self
        }
        async fn set_best_and_finalized_block(
            mut self,
            best_block: u32,
            finalized_block: u32,
        ) -> Self {
            self.controller.chain_state_mock.set_best_block(best_block);
            self.controller
                .chain_state_mock
                .set_finalized_block(finalized_block);
            self.current_block = best_block + 1;
            self
        }
        fn expect_session_states_at_block(
            mut self,
            block: u32,
            expected_state: PartyState,
        ) -> Self {
            let events = self.block_events.entry(block).or_default();
            events.state_to_assert = Some(expected_state);
            self
        }
        async fn set_now(
            mut self,
            session_authorities: Option<(SessionId, Vec<AuthorityId>)>,
            id: Option<Option<AuthorityId>>,
        ) -> Self {
            if let Some((session, authorities)) = session_authorities {
                self.controller
                    .shared_session_map
                    .update(session, SessionAuthorityData::new(authorities, None))
                    .await;
            }
            if let Some(id) = id {
                self.controller.node_session_manager.set_node_id(id)
            }
            self
        }
    }
    const SESSION_PERIOD: u32 = 30;
    #[derive(Debug)]
    struct MockController {
        pub shared_session_map: SharedSessionMap,
        pub chain_state_mock: Arc<MockChainState>,
        pub node_session_manager: Arc<MockNodeSessionManager>,
    }
    #[allow(clippy::type_complexity)]
    fn create_mocked_consensus_party(
        session_period: SessionPeriod,
    ) -> (
        ConsensusParty<Arc<MockChainState>, Arc<MockNodeSessionManager>>,
        MockController,
    ) {
        let shared_map = SharedSessionMap::new();
        let readonly_session_authorities = shared_map.read_only();
        let chain_state = Arc::new(MockChainState::new());
        let sync_oracle = SyncOracle::new();
        let session_manager = Arc::new(MockNodeSessionManager::new());
        let session_info = SessionBoundaryInfo::new(session_period);
        let controller = MockController {
            shared_session_map: shared_map,
            chain_state_mock: chain_state.clone(),
            node_session_manager: session_manager.clone(),
        };
        let params = ConsensusPartyParams {
            session_authorities: readonly_session_authorities,
            chain_state,
            sync_oracle,
            backup_saving_path: None,
            session_manager,
            session_info,
        };
        (ConsensusParty::new(params), controller)
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn party_starts_session_for_node_in_authorities() {
        let (test, party) = PartyTest::new(SessionPeriod(SESSION_PERIOD));
        let authorities: Vec<_> = (0..10)
            .map(|id| UintAuthorityId(id).to_public_key())
            .collect();
        let state_1 = PartyState {
            validator_started: vec![SessionId(0)],
            early_started: vec![SessionId(1)],
            stopped: vec![],
            non_validator_started: vec![],
        };
        let state_2 = PartyState {
            validator_started: vec![SessionId(0), SessionId(1)],
            early_started: vec![SessionId(1)],
            stopped: vec![SessionId(0)],
            non_validator_started: vec![],
        };
        test.set_authorities_for_session_at_block(0, authorities.clone(), SessionId(0))
            .set_authorities_for_session_at_block(25, authorities, SessionId(1))
            .set_node_id_for_session_at_block(0, Some(UintAuthorityId(0).to_public_key()))
            .expect_session_states_at_block(28, state_1)
            .expect_session_states_at_block(29, state_2)
            .run_party(party)
            .run_for_n_blocks(SESSION_PERIOD)
            .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn party_run_3_authorities_sessions() {
        let (test, party) = PartyTest::new(SessionPeriod(SESSION_PERIOD));
        let authorities: Vec<_> = (0..10)
            .map(|id| UintAuthorityId(id).to_public_key())
            .collect();
        let state_1 = PartyState {
            validator_started: vec![SessionId(0)],
            early_started: vec![SessionId(1)],
            stopped: vec![],
            non_validator_started: vec![],
        };
        let state_2 = PartyState {
            validator_started: vec![SessionId(0), SessionId(1)],
            early_started: vec![SessionId(1)],
            stopped: vec![SessionId(0)],
            non_validator_started: vec![],
        };
        let state_3 = PartyState {
            validator_started: vec![SessionId(0), SessionId(1), SessionId(2)],
            early_started: vec![SessionId(1), SessionId(2)],
            stopped: vec![SessionId(0), SessionId(1)],
            non_validator_started: vec![],
        };
        let state_4 = PartyState {
            validator_started: vec![SessionId(0), SessionId(1), SessionId(2), SessionId(3)],
            early_started: vec![SessionId(1), SessionId(2), SessionId(3)],
            stopped: vec![SessionId(0), SessionId(1), SessionId(2)],
            non_validator_started: vec![],
        };
        test.set_authorities_for_session_at_block(0, authorities.clone(), SessionId(0))
            .set_authorities_for_session_at_block(25, authorities.clone(), SessionId(1))
            .set_authorities_for_session_at_block(55, authorities.clone(), SessionId(2))
            .set_authorities_for_session_at_block(85, authorities, SessionId(3))
            .set_node_id_for_session_at_block(0, Some(UintAuthorityId(0).to_public_key()))
            .expect_session_states_at_block(28, state_1)
            .expect_session_states_at_block(29, state_2)
            .expect_session_states_at_block(59, state_3)
            .expect_session_states_at_block(89, state_4)
            .run_party(party)
            .run_for_n_blocks(3 * SESSION_PERIOD)
            .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn party_run_3_non_authorities_sessions() {
        let (test, party) = PartyTest::new(SessionPeriod(SESSION_PERIOD));
        let authorities: Vec<_> = (0..10)
            .map(|id| UintAuthorityId(id).to_public_key())
            .collect();
        let state_1 = PartyState {
            non_validator_started: vec![SessionId(0)],
            early_started: vec![],
            stopped: vec![],
            validator_started: vec![],
        };
        let state_2 = PartyState {
            non_validator_started: vec![SessionId(0), SessionId(1)],
            early_started: vec![],
            stopped: vec![SessionId(0)],
            validator_started: vec![],
        };
        let state_3 = PartyState {
            non_validator_started: vec![SessionId(0), SessionId(1), SessionId(2)],
            early_started: vec![],
            stopped: vec![SessionId(0), SessionId(1)],
            validator_started: vec![],
        };
        let state_4 = PartyState {
            non_validator_started: vec![SessionId(0), SessionId(1), SessionId(2), SessionId(3)],
            early_started: vec![],
            stopped: vec![SessionId(0), SessionId(1), SessionId(2)],
            validator_started: vec![],
        };
        test.set_authorities_for_session_at_block(0, authorities.clone(), SessionId(0))
            .set_authorities_for_session_at_block(25, authorities.clone(), SessionId(1))
            .set_authorities_for_session_at_block(55, authorities.clone(), SessionId(2))
            .set_authorities_for_session_at_block(85, authorities, SessionId(3))
            .set_node_id_for_session_at_block(0, None)
            .expect_session_states_at_block(24, state_1)
            .expect_session_states_at_block(29, state_2)
            .expect_session_states_at_block(59, state_3)
            .expect_session_states_at_block(89, state_4)
            .run_party(party)
            .run_for_n_blocks(3 * SESSION_PERIOD)
            .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn party_early_skips_past_sessions() {
        let (test, party) = PartyTest::new(SessionPeriod(SESSION_PERIOD));
        let authorities: Vec<_> = (0..10)
            .map(|id| UintAuthorityId(id).to_public_key())
            .collect();
        let state = PartyState {
            validator_started: vec![SessionId(2)],
            early_started: vec![SessionId(3)],
            non_validator_started: vec![],
            stopped: vec![],
        };
        test.set_now(
            Some((SessionId(0), authorities.clone())),
            Some(Some(UintAuthorityId(0).to_public_key())),
        )
        .await
        .set_now(Some((SessionId(1), authorities.clone())), None)
        .await
        .set_now(Some((SessionId(2), authorities.clone())), None)
        .await
        .set_now(Some((SessionId(3), authorities)), None)
        .await
        .set_best_and_finalized_block(SESSION_PERIOD * 2, SESSION_PERIOD * 2)
        .await
        .run_party(party)
        .expect_session_states_at_block(61, state)
        .run_for_n_blocks(1)
        .await;
    }
    #[tokio::test(flavor = "multi_thread")]
    async fn party_dont_start_session_for_node_non_in_authorities() {
        let (test, party) = PartyTest::new(SessionPeriod(SESSION_PERIOD));
        let authorities: Vec<_> = (0..10)
            .map(|id| UintAuthorityId(id).to_public_key())
            .collect();
        let state_1 = PartyState {
            validator_started: vec![SessionId(0)],
            early_started: vec![],
            non_validator_started: vec![],
            stopped: vec![],
        };
        let state_2 = PartyState {
            validator_started: vec![SessionId(0)],
            early_started: vec![],
            non_validator_started: vec![SessionId(1)],
            stopped: vec![SessionId(0)],
        };
        test.set_authorities_for_session_at_block(0, authorities.clone(), SessionId(0))
            .set_authorities_for_session_at_block(25, authorities[1..].to_vec(), SessionId(1))
            .set_node_id_for_session_at_block(0, Some(UintAuthorityId(0).to_public_key()))
            .expect_session_states_at_block(24, state_1)
            .expect_session_states_at_block(29, state_2)
            .run_party(party)
            .run_for_n_blocks(SESSION_PERIOD)
            .await;
    }
}