1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use futures::channel::oneshot;
use log::{debug, trace, warn};

use crate::{
    party::{Handle, Task as PureTask},
    NodeIndex,
};

/// A wrapper for running the authority task within a specific session.
pub struct Task {
    task: PureTask,
    node_id: NodeIndex,
}

impl Task {
    /// Create a new authority task. The handle should be the handle to the actual task.
    pub fn new(handle: Handle, node_id: NodeIndex, exit: oneshot::Sender<()>) -> Self {
        Task {
            task: PureTask::new(handle, exit),
            node_id,
        }
    }

    /// Stop the authority task and wait for it to finish.
    pub async fn stop(self) -> Result<(), ()> {
        self.task.stop().await
    }

    /// If the authority task stops for any reason, this returns the associated NodeIndex, which
    /// can be used to restart the task.
    pub async fn stopped(&mut self) -> NodeIndex {
        if self.task.stopped().await.is_err() {
            debug!(target: "phron-party", "Authority task failed for {:?}", self.node_id);
        }
        self.node_id
    }
}

/// All the subtasks required to participate in a session as an authority.
pub struct Subtasks {
    exit: oneshot::Receiver<()>,
    member: PureTask,
    aggregator: PureTask,
    refresher: PureTask,
    data_store: PureTask,
}

impl Subtasks {
    /// Create the subtask collection by passing in all the tasks.
    pub fn new(
        exit: oneshot::Receiver<()>,
        member: PureTask,
        aggregator: PureTask,
        refresher: PureTask,
        data_store: PureTask,
    ) -> Self {
        Subtasks {
            exit,
            member,
            aggregator,
            refresher,
            data_store,
        }
    }

    async fn stop(self) -> Result<(), ()> {
        // both member and aggregator are implicitly using forwarder,
        // so we should force them to exit first to avoid any panics, i.e. `send on closed channel`
        debug!(target: "phron-party", "Started to stop all tasks");
        let mut result = Ok(());
        if self.member.stop().await.is_err() {
            warn!(target: "phron-party", "Member stopped with en error");
            result = Err(());
        }
        trace!(target: "phron-party", "Member stopped");
        if self.aggregator.stop().await.is_err() {
            warn!(target: "phron-party", "Aggregator stopped with en error");
            result = Err(());
        }
        trace!(target: "phron-party", "Aggregator stopped");
        if self.refresher.stop().await.is_err() {
            warn!(target: "phron-party", "Refresher stopped with en error");
            result = Err(());
        }
        trace!(target: "phron-party", "Refresher stopped");
        if self.data_store.stop().await.is_err() {
            warn!(target: "phron-party", "DataStore stopped with en error");
            result = Err(());
        }
        trace!(target: "phron-party", "DataStore stopped");
        result
    }

    /// Blocks until the task is done and returns true if it quit unexpectedly.
    pub async fn wait_completion(mut self) -> Result<(), ()> {
        let result = tokio::select! {
            _ = &mut self.exit => Ok(()),
            res = self.member.stopped() => { debug!(target: "phron-party", "Member stopped early"); res },
            res = self.aggregator.stopped() => { debug!(target: "phron-party", "Aggregator stopped early"); res },
            res = self.refresher.stopped() => { debug!(target: "phron-party", "Refresher stopped early"); res },
            res = self.data_store.stopped() => { debug!(target: "phron-party", "DataStore stopped early"); res },
        };
        let stop_result = self.stop().await;
        debug!(target: "phron-party", "Stopped all processes");
        result.and(stop_result)
    }
}