1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use std::{boxed::Box, pin::Pin};

use futures::channel::oneshot;
use log::{debug, warn};
use network_clique::SpawnHandleT;

use crate::{Future, SpawnHandle};

/// A single handle that can be waited on, as returned by spawning an essential task.
pub type Handle = Pin<Box<(dyn Future<Output = sc_service::Result<(), ()>> + Send + 'static)>>;

/// A task that can be stopped or awaited until it stops itself.
pub struct Task {
    handle: Handle,
    exit: oneshot::Sender<()>,
    cached_result: Option<Result<(), ()>>,
}

impl Task {
    /// Create a new task.
    pub fn new(handle: Handle, exit: oneshot::Sender<()>) -> Self {
        Task {
            handle,
            exit,
            cached_result: None,
        }
    }

    /// Cleanly stop the task.
    pub async fn stop(self) -> Result<(), ()> {
        if let Some(result) = self.cached_result {
            return result;
        }
        if self.exit.send(()).is_err() {
            warn!(target: "phron-party", "Failed to send exit signal to authority");
        }
        self.handle.await
    }

    /// Await the task to stop by itself. Should usually just block forever, unless something went
    /// wrong. Can be called multiple times.
    pub async fn stopped(&mut self) -> Result<(), ()> {
        if let Some(result) = self.cached_result {
            return result;
        }
        let result = (&mut self.handle).await;
        self.cached_result = Some(result);
        result
    }
}

/// Common args for tasks.
#[derive(Clone)]
pub struct TaskCommon {
    pub spawn_handle: SpawnHandle,
    pub session_id: u32,
}

#[async_trait::async_trait]
pub trait Runnable: Send + 'static {
    async fn run(self, exit: oneshot::Receiver<()>);
}

/// Runs the given task within a single session.
pub fn task<R: Runnable>(subtask_common: TaskCommon, runnable: R, name: &'static str) -> Task {
    let TaskCommon {
        spawn_handle,
        session_id,
    } = subtask_common;
    let (stop, exit) = oneshot::channel();
    let task = {
        async move {
            debug!(target: "phron-party", "Running the {} task for {:?}", name, session_id);
            runnable.run(exit).await;
            debug!(target: "phron-party", "The {} task stopped for {:?}", name, session_id);
        }
    };

    let handle = spawn_handle.spawn_essential("phron/consensus_session_task", task);
    Task::new(handle, stop)
}