use std::{
    collections::{hash_map::Entry::Occupied, BTreeMap, HashMap, HashSet},
    default::Default,
    hash::Hash,
    num::NonZeroUsize,
    sync::Arc,
    time::{self, Duration},
};
use futures::{
    channel::{
        mpsc::{self, UnboundedSender},
        oneshot,
    },
    StreamExt,
};
use futures_timer::Delay;
use log::{debug, error, info, trace, warn};
use lru::LruCache;
use sc_client_api::{BlockchainEvents, HeaderBackend};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use crate::{
    phron_primitives::{BlockHash, BlockNumber},
    data_io::{
        chain_info::{CachedChainInfoProvider, ChainInfoProvider, SubstrateChainInfoProvider},
        legacy::{
            proposal::{PhronProposal, PendingProposalStatus, ProposalStatus},
            status_provider::get_proposal_status,
            PhronNetworkMessage,
        },
    },
    network::data::{
        component::{Network as ComponentNetwork, Receiver, SimpleNetwork},
        Network as DataNetwork,
    },
    party::manager::Runnable,
    sync::RequestBlocks,
    BlockId, SessionBoundaries,
};
type MessageId = u64;
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum ChainEvent {
    Imported(BlockId),
    Finalized(BlockNumber),
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct PendingProposalInfo {
    messages: HashSet<MessageId>,
    first_occurrence: time::SystemTime,
    status: ProposalStatus,
}
impl PendingProposalInfo {
    fn new(status: ProposalStatus) -> Self {
        PendingProposalInfo {
            messages: HashSet::new(),
            first_occurrence: time::SystemTime::now(),
            status,
        }
    }
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct PendingMessageInfo<M: PhronNetworkMessage> {
    message: M,
    pending_proposals: HashSet<PhronProposal>,
}
impl<M: PhronNetworkMessage> PendingMessageInfo<M> {
    fn new(message: M) -> Self {
        PendingMessageInfo {
            message,
            pending_proposals: HashSet::new(),
        }
    }
}
pub struct DataStoreConfig {
    pub max_triggers_pending: usize,
    pub max_proposals_pending: usize,
    pub max_messages_pending: usize,
    pub available_proposals_cache_capacity: NonZeroUsize,
    pub periodic_maintenance_interval: Duration,
    pub request_block_after: Duration,
}
impl Default for DataStoreConfig {
    fn default() -> DataStoreConfig {
        DataStoreConfig {
            max_triggers_pending: 80_000,
            max_proposals_pending: 80_000,
            max_messages_pending: 40_000,
            available_proposals_cache_capacity: NonZeroUsize::new(8000).unwrap(),
            periodic_maintenance_interval: Duration::from_secs(25),
            request_block_after: Duration::from_secs(20),
        }
    }
}
pub struct DataStore<B, C, RB, Message, R>
where
    B: BlockT<Hash = BlockHash>,
    B::Header: HeaderT<Number = BlockNumber>,
    C: HeaderBackend<B> + BlockchainEvents<B> + Send + Sync + 'static,
    RB: RequestBlocks + 'static,
    Message: PhronNetworkMessage
        + std::fmt::Debug
        + Send
        + Sync
        + Clone
        + parity_scale_codec::Codec
        + 'static,
    R: Receiver<Message>,
{
    next_free_id: MessageId,
    pending_proposals: HashMap<PhronProposal, PendingProposalInfo>,
    event_triggers: HashMap<ChainEvent, HashSet<PhronProposal>>,
    pending_messages: BTreeMap<MessageId, PendingMessageInfo<Message>>,
    chain_info_provider: CachedChainInfoProvider<SubstrateChainInfoProvider<B, C>>,
    available_proposals_cache: LruCache<PhronProposal, ProposalStatus>,
    num_triggers_registered_since_last_pruning: usize,
    highest_finalized_num: BlockNumber,
    session_boundaries: SessionBoundaries,
    client: Arc<C>,
    block_requester: RB,
    config: DataStoreConfig,
    messages_from_network: R,
    messages_for_phron: UnboundedSender<Message>,
}
impl<B, C, RB, Message, R> DataStore<B, C, RB, Message, R>
where
    B: BlockT<Hash = BlockHash>,
    B::Header: HeaderT<Number = BlockNumber>,
    C: HeaderBackend<B> + BlockchainEvents<B> + Send + Sync + 'static,
    RB: RequestBlocks + 'static,
    Message: PhronNetworkMessage
        + std::fmt::Debug
        + Send
        + Sync
        + Clone
        + parity_scale_codec::Codec
        + 'static,
    R: Receiver<Message>,
{
    pub fn new<N: ComponentNetwork<Message, R = R>>(
        session_boundaries: SessionBoundaries,
        client: Arc<C>,
        block_requester: RB,
        config: DataStoreConfig,
        component_network: N,
    ) -> (Self, impl DataNetwork<Message>) {
        let (messages_for_phron, messages_from_data_store) = mpsc::unbounded();
        let (messages_to_network, messages_from_network) = component_network.into();
        let status = client.info();
        let chain_info_provider = CachedChainInfoProvider::new(
            SubstrateChainInfoProvider::new(client.clone()),
            Default::default(),
        );
        let highest_finalized_num = status.finalized_number;
        (
            DataStore {
                next_free_id: 0,
                pending_proposals: HashMap::new(),
                event_triggers: HashMap::new(),
                pending_messages: BTreeMap::new(),
                chain_info_provider,
                available_proposals_cache: LruCache::new(config.available_proposals_cache_capacity),
                num_triggers_registered_since_last_pruning: 0,
                highest_finalized_num,
                session_boundaries,
                client,
                block_requester,
                config,
                messages_from_network,
                messages_for_phron,
            },
            SimpleNetwork::new(messages_from_data_store, messages_to_network),
        )
    }
    pub async fn run(&mut self, mut exit: oneshot::Receiver<()>) {
        let mut maintenance_clock = Delay::new(self.config.periodic_maintenance_interval);
        let mut import_stream = self.client.import_notification_stream();
        let mut finality_stream = self.client.finality_notification_stream();
        loop {
            self.prune_pending_messages();
            self.prune_triggers();
            tokio::select! {
                Some(message) = self.messages_from_network.next() => {
                    trace!(target: "phron-data-store", "Received message at Data Store {:?}", message);
                    self.on_message_received(message);
                }
                Some(block) = &mut import_stream.next() => {
                    trace!(target: "phron-data-store", "Block import notification at Data Store for block {:?}", block);
                    self.on_block_imported((block.header.hash(), *block.header.number()).into());
                },
                Some(block) = &mut finality_stream.next() => {
                    trace!(target: "phron-data-store", "Finalized block import notification at Data Store for block {:?}", block);
                    self.on_block_finalized((block.header.hash(), *block.header.number()).into());
                }
                _ = &mut maintenance_clock => {
                    self.run_maintenance();
                    maintenance_clock = Delay::new(self.config.periodic_maintenance_interval);
                }
                _ = &mut exit => {
                    debug!(target: "phron-data-store", "Data Store task received exit signal. Terminating.");
                    break;
                }
            }
        }
    }
    fn update_highest_finalized(&mut self) {
        let highest_finalized = self.chain_info_provider.get_highest_finalized();
        self.on_block_imported(highest_finalized.clone());
        self.on_block_finalized(highest_finalized);
    }
    fn run_maintenance(&mut self) {
        self.update_highest_finalized();
        let proposals_with_timestamps: Vec<_> = self
            .pending_proposals
            .iter()
            .map(|(proposal, info)| (proposal.clone(), info.first_occurrence))
            .collect();
        match proposals_with_timestamps.len() {
            0 => {
                trace!(target: "phron-data-store", "No pending proposals in data store during maintenance.");
            }
            1..=5 => {
                info!(target: "phron-data-store", "Data Store maintenance. Awaiting {:?} proposals: {:?}",proposals_with_timestamps.len(), proposals_with_timestamps);
            }
            _ => {
                info!(target: "phron-data-store", "Data Store maintenance. Awaiting {:?} proposals: (showing 5 initial only) {:?}",proposals_with_timestamps.len(), &proposals_with_timestamps[..5]);
            }
        }
        let now = time::SystemTime::now();
        for (proposal, first_occurrence) in proposals_with_timestamps {
            if self.bump_proposal(&proposal) {
                continue;
            }
            let time_waiting = match now.duration_since(first_occurrence) {
                Ok(tw) if tw >= self.config.request_block_after => tw,
                _ => continue,
            };
            let block = proposal.top_block();
            if !self.chain_info_provider.is_block_imported(&block) {
                debug!(target: "phron-data-store", "Requesting a block {:?} after it has been missing for {:?} secs.", block, time_waiting.as_secs());
                if let Err(e) = self.block_requester.request_block(block.clone()) {
                    warn!(target: "phron-data-store", "Error requesting block {:?}, {}.", block, e);
                }
                continue;
            }
            let bottom_block = proposal.bottom_block();
            let parent_hash = match self.chain_info_provider.get_parent_hash(&bottom_block) {
                Ok(ph) => ph,
                _ => {
                    warn!(target: "phron-data-store", "Expected the block below the proposal {:?} to be imported", proposal);
                    continue;
                }
            };
            let parent_num = bottom_block.number - 1;
            if let Ok(finalized_block) = self.chain_info_provider.get_finalized_at(parent_num) {
                if parent_hash != finalized_block.hash {
                    warn!(target: "phron-data-store", "The proposal {:?} is pending because the parent: \
                        {:?}, does not agree with the block finalized at this height: {:?}.", proposal, parent_hash, finalized_block);
                } else {
                    warn!(target: "phron-data-store", "The proposal {:?} is pending even though blocks \
                            have been imported and parent was finalized.", proposal);
                }
            } else {
                debug!(target: "phron-data-store", "Justification for block {:?} {:?} \
                        still not present after {:?} secs.", parent_num, parent_hash, time_waiting.as_secs());
            }
        }
    }
    fn register_block_import_trigger(&mut self, proposal: &PhronProposal, block: &BlockId) {
        self.num_triggers_registered_since_last_pruning += 1;
        self.event_triggers
            .entry(ChainEvent::Imported(block.clone()))
            .or_default()
            .insert(proposal.clone());
    }
    fn register_finality_trigger(&mut self, proposal: &PhronProposal, number: BlockNumber) {
        self.num_triggers_registered_since_last_pruning += 1;
        if number > self.highest_finalized_num {
            self.event_triggers
                .entry(ChainEvent::Finalized(number))
                .or_default()
                .insert(proposal.clone());
        }
    }
    fn register_next_finality_trigger(&mut self, proposal: &PhronProposal) {
        if self.highest_finalized_num < proposal.number_below_branch() {
            self.register_finality_trigger(proposal, proposal.number_below_branch());
        } else if self.highest_finalized_num < proposal.number_top_block() {
            self.register_finality_trigger(proposal, self.highest_finalized_num + 1);
        }
    }
    fn on_block_finalized(&mut self, block: BlockId) {
        if self.highest_finalized_num < block.number {
            let old_num = self.highest_finalized_num;
            let new_num = block.number;
            self.highest_finalized_num = new_num;
            let mut num = old_num + 1;
            while num <= new_num {
                if let Some(proposals_to_bump) =
                    self.event_triggers.remove(&ChainEvent::Finalized(num))
                {
                    for proposal in proposals_to_bump {
                        self.bump_proposal(&proposal);
                    }
                }
                num += 1;
            }
        }
    }
    fn on_block_imported(&mut self, block: BlockId) {
        if let Some(proposals_to_bump) = self.event_triggers.remove(&ChainEvent::Imported(block)) {
            for proposal in proposals_to_bump {
                self.bump_proposal(&proposal);
            }
        }
    }
    fn on_proposal_available(&mut self, proposal: &PhronProposal) {
        if let Some(proposal_info) = self.pending_proposals.remove(proposal) {
            for id in proposal_info.messages {
                self.remove_proposal_from_pending_message(proposal, id);
            }
        }
    }
    fn bump_proposal(&mut self, proposal: &PhronProposal) -> bool {
        let old_status = match self.pending_proposals.get(proposal) {
            None => {
                return false;
            }
            Some(info) => info.status.clone(),
        };
        let new_status = self.check_proposal_availability(proposal, Some(&old_status));
        self.pending_proposals.get_mut(proposal).unwrap().status = new_status.clone();
        use PendingProposalStatus::*;
        use ProposalStatus::*;
        match new_status {
            Pending(PendingTopBlock) => {
                self.register_next_finality_trigger(proposal);
                false
            }
            Pending(TopBlockImportedButIncorrectBranch) => {
                false
            }
            Pending(TopBlockImportedButNotFinalizedAncestor) => {
                self.register_next_finality_trigger(proposal);
                false
            }
            Finalize(_) | Ignore => {
                self.on_proposal_available(proposal);
                true
            }
        }
    }
    fn check_proposal_availability(
        &mut self,
        proposal: &PhronProposal,
        old_status: Option<&ProposalStatus>,
    ) -> ProposalStatus {
        if let Some(status) = self.available_proposals_cache.get(proposal) {
            return status.clone();
        }
        let status = get_proposal_status(&mut self.chain_info_provider, proposal, old_status);
        match status {
            ProposalStatus::Finalize(_) | ProposalStatus::Ignore => {
                self.available_proposals_cache
                    .put(proposal.clone(), status.clone());
            }
            _ => {}
        }
        status
    }
    fn add_message_proposal_dependency(
        &mut self,
        proposal: &PhronProposal,
        message_info: &mut PendingMessageInfo<Message>,
        id: MessageId,
    ) {
        if !self.pending_proposals.contains_key(proposal) {
            use PendingProposalStatus::*;
            use ProposalStatus::*;
            let status = self.check_proposal_availability(proposal, None);
            match &status {
                Pending(PendingTopBlock) => {
                    self.pending_proposals
                        .insert(proposal.clone(), PendingProposalInfo::new(status));
                    self.register_block_import_trigger(proposal, &proposal.top_block());
                    self.register_next_finality_trigger(proposal);
                }
                Pending(TopBlockImportedButIncorrectBranch) => {
                    self.pending_proposals
                        .insert(proposal.clone(), PendingProposalInfo::new(status));
                    self.register_next_finality_trigger(proposal);
                }
                Pending(TopBlockImportedButNotFinalizedAncestor) => {
                    self.pending_proposals
                        .insert(proposal.clone(), PendingProposalInfo::new(status));
                    self.register_next_finality_trigger(proposal);
                }
                Finalize(_) | Ignore => {
                    return;
                }
            }
        }
        let proposal_info = self
            .pending_proposals
            .get_mut(proposal)
            .expect("exists as checked above");
        proposal_info.messages.insert(id);
        message_info.pending_proposals.insert(proposal.clone());
    }
    fn on_message_dependencies_resolved(&self, message: Message) {
        trace!(target: "phron-data-store", "Sending message from DataStore {:?}", message);
        if let Err(e) = self.messages_for_phron.unbounded_send(message) {
            error!(target: "phron-data-store", "Unable to send a ready message from DataStore {}", e);
        }
    }
    fn assign_fresh_message_id(&mut self) -> MessageId {
        self.next_free_id += 1;
        self.next_free_id - 1
    }
    fn remove_proposal_from_pending_message(&mut self, proposal: &PhronProposal, id: MessageId) {
        let mut message_info = match self.pending_messages.remove(&id) {
            Some(message_info) => message_info,
            None => {
                warn!(target: "phron-data-store", "Message {:?} not found when resolving a proposal dependency {:?}.", id, proposal);
                return;
            }
        };
        message_info.pending_proposals.remove(proposal);
        if message_info.pending_proposals.is_empty() {
            self.on_message_dependencies_resolved(message_info.message);
        } else {
            self.pending_messages.insert(id, message_info);
        }
    }
    fn remove_message_id_from_pending_proposal(&mut self, proposal: &PhronProposal, id: MessageId) {
        if let Occupied(mut proposal_entry) = self.pending_proposals.entry(proposal.clone()) {
            let proposal_info = proposal_entry.get_mut();
            proposal_info.messages.remove(&id);
            if proposal_info.messages.is_empty() {
                proposal_entry.remove();
            }
        } else {
            warn!(target: "phron-data-store", "Proposal {:?} with id {:?} referenced in message does not exist", proposal, id);
        }
    }
    fn prune_single_message(&mut self) -> bool {
        let maybe_id = self.pending_messages.keys().next().cloned();
        if let Some(id) = maybe_id {
            if let Some(message_info) = self.pending_messages.remove(&id) {
                for proposal in message_info.pending_proposals {
                    self.remove_message_id_from_pending_proposal(&proposal, id);
                }
                true
            } else {
                warn!(
                    "Trying to prune a message whose id is not in pending messages {:?}",
                    id
                );
                false
            }
        } else {
            warn!(target: "phron-data-store", "Tried to prune a message but there are none pending.");
            false
        }
    }
    fn prune_pending_messages(&mut self) {
        while self.pending_messages.len() > self.config.max_messages_pending
            || self.pending_proposals.len() > self.config.max_proposals_pending
        {
            if !self.prune_single_message() {
                warn!(target: "phron-data-store", "Message pruning in DataStore failed. Moving on.");
                break;
            }
        }
    }
    fn prune_triggers(&mut self) {
        if self.num_triggers_registered_since_last_pruning > self.config.max_triggers_pending {
            let pending_proposals = &self.pending_proposals;
            self.event_triggers.retain(|_event, proposal_set| {
                proposal_set.retain(|proposal| pending_proposals.contains_key(proposal));
                !proposal_set.is_empty()
            });
            self.num_triggers_registered_since_last_pruning = 0;
        }
    }
    fn on_message_received(&mut self, message: Message) {
        let mut proposals = Vec::new();
        for data in message.included_data() {
            let unvalidated_proposal = data.head_proposal;
            match unvalidated_proposal.validate_bounds(&self.session_boundaries) {
                Ok(proposal) => proposals.push(proposal),
                Err(error) => {
                    warn!(target: "phron-data-store", "Message {:?} dropped as it contains \
                            proposal {:?} not within bounds ({:?}).", message, unvalidated_proposal, error);
                    return;
                }
            }
        }
        let mut message_info = PendingMessageInfo::new(message.clone());
        let message_id = self.assign_fresh_message_id();
        for proposal in proposals {
            self.add_message_proposal_dependency(&proposal, &mut message_info, message_id);
        }
        if message_info.pending_proposals.is_empty() {
            self.on_message_dependencies_resolved(message);
        } else {
            self.pending_messages.insert(message_id, message_info);
        }
    }
}
#[async_trait::async_trait]
impl<B, C, RB, Message, R> Runnable for DataStore<B, C, RB, Message, R>
where
    B: BlockT<Hash = BlockHash>,
    B::Header: HeaderT<Number = BlockNumber>,
    C: HeaderBackend<B> + BlockchainEvents<B> + Send + Sync + 'static,
    RB: RequestBlocks + 'static,
    Message: PhronNetworkMessage
        + std::fmt::Debug
        + Send
        + Sync
        + Clone
        + parity_scale_codec::Codec
        + 'static,
    R: Receiver<Message> + 'static,
{
    async fn run(mut self, exit: oneshot::Receiver<()>) {
        DataStore::run(&mut self, exit).await
    }
}