use std::{boxed::Box, pin::Pin};
use futures::channel::oneshot;
use log::{debug, warn};
use network_clique::SpawnHandleT;
use crate::{Future, SpawnHandle};
pub type Handle = Pin<Box<(dyn Future<Output = sc_service::Result<(), ()>> + Send + 'static)>>;
pub struct Task {
handle: Handle,
exit: oneshot::Sender<()>,
cached_result: Option<Result<(), ()>>,
}
impl Task {
pub fn new(handle: Handle, exit: oneshot::Sender<()>) -> Self {
Task {
handle,
exit,
cached_result: None,
}
}
pub async fn stop(self) -> Result<(), ()> {
if let Some(result) = self.cached_result {
return result;
}
if self.exit.send(()).is_err() {
warn!(target: "phron-party", "Failed to send exit signal to authority");
}
self.handle.await
}
pub async fn stopped(&mut self) -> Result<(), ()> {
if let Some(result) = self.cached_result {
return result;
}
let result = (&mut self.handle).await;
self.cached_result = Some(result);
result
}
}
#[derive(Clone)]
pub struct TaskCommon {
pub spawn_handle: SpawnHandle,
pub session_id: u32,
}
#[async_trait::async_trait]
pub trait Runnable: Send + 'static {
async fn run(self, exit: oneshot::Receiver<()>);
}
pub fn task<R: Runnable>(subtask_common: TaskCommon, runnable: R, name: &'static str) -> Task {
let TaskCommon {
spawn_handle,
session_id,
} = subtask_common;
let (stop, exit) = oneshot::channel();
let task = {
async move {
debug!(target: "phron-party", "Running the {} task for {:?}", name, session_id);
runnable.run(exit).await;
debug!(target: "phron-party", "The {} task stopped for {:?}", name, session_id);
}
};
let handle = spawn_handle.spawn_essential("phron/consensus_session_task", task);
Task::new(handle, stop)
}