use futures::channel::oneshot;
use log::{debug, trace, warn};
use crate::{
party::{Handle, Task as PureTask},
NodeIndex,
};
pub struct Task {
task: PureTask,
node_id: NodeIndex,
}
impl Task {
pub fn new(handle: Handle, node_id: NodeIndex, exit: oneshot::Sender<()>) -> Self {
Task {
task: PureTask::new(handle, exit),
node_id,
}
}
pub async fn stop(self) -> Result<(), ()> {
self.task.stop().await
}
pub async fn stopped(&mut self) -> NodeIndex {
if self.task.stopped().await.is_err() {
debug!(target: "phron-party", "Authority task failed for {:?}", self.node_id);
}
self.node_id
}
}
pub struct Subtasks {
exit: oneshot::Receiver<()>,
member: PureTask,
aggregator: PureTask,
refresher: PureTask,
data_store: PureTask,
}
impl Subtasks {
pub fn new(
exit: oneshot::Receiver<()>,
member: PureTask,
aggregator: PureTask,
refresher: PureTask,
data_store: PureTask,
) -> Self {
Subtasks {
exit,
member,
aggregator,
refresher,
data_store,
}
}
async fn stop(self) -> Result<(), ()> {
debug!(target: "phron-party", "Started to stop all tasks");
let mut result = Ok(());
if self.member.stop().await.is_err() {
warn!(target: "phron-party", "Member stopped with en error");
result = Err(());
}
trace!(target: "phron-party", "Member stopped");
if self.aggregator.stop().await.is_err() {
warn!(target: "phron-party", "Aggregator stopped with en error");
result = Err(());
}
trace!(target: "phron-party", "Aggregator stopped");
if self.refresher.stop().await.is_err() {
warn!(target: "phron-party", "Refresher stopped with en error");
result = Err(());
}
trace!(target: "phron-party", "Refresher stopped");
if self.data_store.stop().await.is_err() {
warn!(target: "phron-party", "DataStore stopped with en error");
result = Err(());
}
trace!(target: "phron-party", "DataStore stopped");
result
}
pub async fn wait_completion(mut self) -> Result<(), ()> {
let result = tokio::select! {
_ = &mut self.exit => Ok(()),
res = self.member.stopped() => { debug!(target: "phron-party", "Member stopped early"); res },
res = self.aggregator.stopped() => { debug!(target: "phron-party", "Aggregator stopped early"); res },
res = self.refresher.stopped() => { debug!(target: "phron-party", "Refresher stopped early"); res },
res = self.data_store.stopped() => { debug!(target: "phron-party", "DataStore stopped early"); res },
};
let stop_result = self.stop().await;
debug!(target: "phron-party", "Stopped all processes");
result.and(stop_result)
}
}