use phronesis_runtime::{self};
use core_primitives::{Block, Balance, Nonce, AccountId};
pub use phronesis_runtime::RuntimeApi as PhronRuntimeApi;
use phron_finality::{
run_validator_node, PhronBlockImport, PhronConfig, BlockImporter, Justification, JustificationTranslator,
MillisecsPerBlock, Protocol, ProtocolNaming, RateLimiterConfig, RedirectingBlockImport, SessionPeriod,
SubstrateChainStatus, TimingBlockMetrics, TracingBlockImport, ValidatorAddressCache, SyncOracle,
};
use sc_client_api::{BlockBackend};
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
pub use sc_executor::NativeElseWasmExecutor;
use sc_service::{
error::Error as ServiceError, Configuration, TaskManager,
};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
use std::{sync::{Arc, Mutex}, collections::BTreeMap};
use fc_rpc_core::types::{FeeHistoryCache, FeeHistoryCacheLimit};
use futures::{channel::mpsc};
use sc_consensus::ImportQueue;
use sp_api::{ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use core_primitives::PhronSessionApi;
#[cfg(feature = "runtime-benchmarks")]
pub type HostFunctions = frame_benchmarking::benchmarking::HostFunctions;
#[cfg(not(feature = "runtime-benchmarks"))]
pub type HostFunctions = ();
pub struct ExecutorDispatch;
impl sc_executor::NativeExecutionDispatch for ExecutorDispatch {
type ExtendHostFunctions = HostFunctions;
fn dispatch(method: &str, data: &[u8]) -> Option<Vec<u8>> {
phronesis_runtime::api::dispatch(method, data)
}
fn native_version() -> sc_executor::NativeVersion {
phronesis_runtime::native_version()
}
}
pub trait RuntimeApiCollection: PhronSessionApi<Block> + fp_rpc::ConvertTransactionRuntimeApi<Block>
+ fp_rpc::EthereumRuntimeRPCApi<Block>
+ pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi<Block, Balance>
+ sp_api::ApiExt<Block>
+ sp_api::Metadata<Block>
+ sp_block_builder::BlockBuilder<Block>
+ sp_offchain::OffchainWorkerApi<Block>
+ sp_session::SessionKeys<Block>
+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>
+ substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>
+ sp_consensus_aura::AuraApi<Block, <<sp_consensus_aura::sr25519::AuthorityId as sp_runtime::app_crypto::AppCrypto>::Pair as sp_core::Pair>::Public>
{}
impl<Api> RuntimeApiCollection for Api where
Api: PhronSessionApi<Block> + fp_rpc::ConvertTransactionRuntimeApi<Block>
+ fp_rpc::EthereumRuntimeRPCApi<Block>
+ pallet_transaction_payment_rpc_runtime_api::TransactionPaymentApi<Block, Balance>
+ sp_api::ApiExt<Block>
+ sp_api::Metadata<Block>
+ sp_block_builder::BlockBuilder<Block>
+ sp_offchain::OffchainWorkerApi<Block>
+ sp_session::SessionKeys<Block>
+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>
+ substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>
+ sp_consensus_aura::AuraApi<Block, <<sp_consensus_aura::sr25519::AuthorityId as sp_runtime::app_crypto::AppCrypto>::Pair as sp_core::Pair>::Public>
{}
pub type FullClient =
sc_service::TFullClient<Block, PhronRuntimeApi, NativeElseWasmExecutor<ExecutorDispatch>>;
type FullBackend = sc_service::TFullBackend<Block>;
type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
#[allow(clippy::type_complexity)]
pub fn new_partial(
config: &Configuration,
eth_api_options: &crate::evm_tracing_types::EvmTracingConfig,
) -> Result<
sc_service::PartialComponents<
FullClient,
FullBackend,
FullSelectChain,
sc_consensus::DefaultImportQueue<Block>,
sc_transaction_pool::FullPool<Block, FullClient>,
(
fc_db::Backend<Block>,
Option<fc_rpc_core::types::FilterPool>,
Option<Telemetry>,
mpsc::UnboundedSender<Justification>,
mpsc::UnboundedReceiver<Justification>,
(FeeHistoryCache, FeeHistoryCacheLimit),
),
>,
sc_service::Error,
>
{
let telemetry = config
.telemetry_endpoints
.clone()
.filter(|x| !x.is_empty())
.map(|endpoints| -> Result<_, sc_telemetry::Error> {
let worker = TelemetryWorker::new(16)?;
let telemetry = worker.handle().new_telemetry(endpoints);
Ok((worker, telemetry))
})
.transpose()?;
let executor = sc_service::new_native_or_wasm_executor(config);
let (client, backend, keystore_container, task_manager) =
sc_service::new_full_parts::<Block, PhronRuntimeApi, _>(
config,
telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
executor,
)?;
let client = Arc::new(client);
let telemetry = telemetry.map(|(worker, telemetry)| {
task_manager.spawn_handle().spawn("telemetry", None, worker.run());
telemetry
});
let select_chain = sc_consensus::LongestChain::new(backend.clone());
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_essential_handle(),
client.clone(),
);
let fee_history_limit: u64 = 2048;
let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
let fee_history_cache_limit: FeeHistoryCacheLimit = fee_history_limit;
let fee_history = (fee_history_cache, fee_history_cache_limit);
let filter_pool = Some(Arc::new(Mutex::new(BTreeMap::new())));
let frontier_backend = crate::evm_tracing_types::frontier_backend(config, client.clone(), eth_api_options.clone())?;
let (justification_tx, justification_rx) = mpsc::unbounded();
let metrics = TimingBlockMetrics::new(config.prometheus_registry()).unwrap_or_else(|e| {
log::warn!("Failed to create metrics for block import: {}", e);
TimingBlockMetrics::noop()
});
let tracing_block_import = TracingBlockImport::new(client.clone(), metrics);
let justification_translator = JustificationTranslator::new(
SubstrateChainStatus::new(backend.clone())
.map_err(|e| ServiceError::Other(format!("Failed to create chain status: {}", e)))?,
);
let phron_block_import = PhronBlockImport::new(
tracing_block_import,
justification_tx.clone(),
justification_translator,
);
let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
let import_queue =
sc_consensus_aura::import_queue::<AuraPair, _, _, _, _, _>(ImportQueueParams {
block_import: phron_block_import.clone(),
justification_import: Some(Box::new(phron_block_import)),
client: client.clone(),
create_inherent_data_providers: move |_, ()| async move {
let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
*timestamp,
slot_duration,
);
Ok((slot, timestamp))
},
spawner: &task_manager.spawn_essential_handle(),
registry: config.prometheus_registry(),
check_for_equivocation: Default::default(),
telemetry: telemetry.as_ref().map(|x| x.handle()),
compatibility_mode: Default::default(),
})?;
Ok(sc_service::PartialComponents {
client,
backend,
task_manager,
import_queue,
keystore_container,
select_chain,
transaction_pool,
other: (frontier_backend, filter_pool, telemetry, justification_tx, justification_rx, fee_history),
})
}
pub fn new_full(
mut config: Configuration,
eth_api_options: &crate::evm_tracing_types::EvmTracingConfig,
phron_cli: crate::phron_cli::PhronCli,
hwbench: Option<sc_sysinfo::HwBench>,
) -> Result<TaskManager, ServiceError>
{
let sc_service::PartialComponents {
client,
backend,
mut task_manager,
import_queue,
keystore_container,
select_chain,
transaction_pool,
other: (frontier_backend,
filter_pool,
mut telemetry,
justification_tx,
justification_rx,
fee_history),
} = new_partial(&config, eth_api_options)?;
if let Some(hwbench) = hwbench {
sc_sysinfo::print_hwbench(&hwbench);
if let Some(ref mut telemetry) = telemetry {
let telemetry_handle = telemetry.handle();
task_manager.spawn_handle().spawn(
"telemetry_hwbench",
None,
sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
);
}
}
let (block_import, block_rx) = RedirectingBlockImport::new(client.clone());
let finalized = client.info().finalized_hash;
let session_period: SessionPeriod = SessionPeriod(client.runtime_api().session_period(finalized).unwrap());
let millisecs_per_block = MillisecsPerBlock(client.runtime_api().millisecs_per_block(finalized).unwrap());
let collect_extra_debugging_data = !phron_cli.no_collection_of_extra_debugging_data();
let chain_status = SubstrateChainStatus::new(backend.clone())
.map_err(|e| ServiceError::Other(format!("Failed to create chain status: {}", e)))?;
let import_queue_handle = BlockImporter::new(import_queue.service());
let prometheus_registry = config.prometheus_registry().cloned();
let force_authoring = config.force_authoring;
config.rpc_id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
let (
_rpc_handlers,
network,
sync_service,
protocol_naming,
network_starter,
validator_address_cache,
sync_oracle
) = setup(
config,
backend,
chain_status.clone(),
&keystore_container,
import_queue,
transaction_pool.clone(),
&mut task_manager,
client.clone(),
&mut telemetry,
justification_tx,
collect_extra_debugging_data,
frontier_backend,
fee_history,
filter_pool,
)?;
let backoff_authoring_blocks: Option<()> = None;
let proposer_factory = sc_basic_authorship::ProposerFactory::new(
task_manager.spawn_handle(),
client.clone(),
transaction_pool,
prometheus_registry.as_ref(),
telemetry.as_ref().map(|x| x.handle()),
);
let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
let aura = sc_consensus_aura::start_aura::<AuraPair, _, _, _, _, _, _, _, _, _, _>(
StartAuraParams {
slot_duration,
client: client.clone(),
select_chain: select_chain.clone(),
block_import,
proposer_factory,
create_inherent_data_providers: move |_, ()| async move {
let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
let slot =
sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
*timestamp,
slot_duration,
);
Ok((slot, timestamp))
},
force_authoring,
backoff_authoring_blocks,
keystore: keystore_container.keystore(),
sync_oracle: sync_oracle.clone(),
justification_sync_link: (),
block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
max_block_proposal_slot_portion: None,
telemetry: telemetry.as_ref().map(|x| x.handle()),
compatibility_mode: Default::default(),
},
)?;
task_manager
.spawn_essential_handle()
.spawn_blocking("aura", None, aura);
if phron_cli.external_addresses().is_empty() {
panic!("Cannot run a validator node without external addresses, stopping!");
}
let rate_limiter_config = RateLimiterConfig {
phron_bit_rate_per_connection: phron_cli
.phron_bit_rate_per_connection()
.try_into()
.unwrap_or(usize::MAX),
};
let phron_config = PhronConfig {
network,
sync_network: sync_service,
client,
chain_status,
import_queue_handle,
select_chain,
spawn_handle: task_manager.spawn_handle().into(),
keystore: keystore_container.keystore(),
justification_rx,
block_rx,
metrics: TimingBlockMetrics::noop(),
registry: None,
session_period,
millisecs_per_block,
unit_creation_delay: phron_cli.unit_creation_delay(),
backup_saving_path: None,
external_addresses: phron_cli.external_addresses(),
validator_port: phron_cli.validator_port(),
protocol_naming,
rate_limiter_config,
sync_oracle,
validator_address_cache,
};
task_manager.spawn_essential_handle().spawn_blocking(
"phron-finality",
None,
run_validator_node(phron_config),
);
network_starter.start_network();
Ok(task_manager)
}
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
fn setup(
config: Configuration,
backend: Arc<FullBackend>,
chain_status: SubstrateChainStatus,
keystore_container: &sc_service::KeystoreContainer,
import_queue: sc_consensus::DefaultImportQueue<Block>,
transaction_pool: Arc<sc_transaction_pool::FullPool<Block, FullClient>>,
task_manager: &mut TaskManager,
client: Arc<FullClient>,
telemetry: &mut Option<Telemetry>,
import_justification_tx: mpsc::UnboundedSender<Justification>,
collect_extra_debugging_data: bool,
frontier_backend: fc_db::Backend<Block>,
fee_history: (FeeHistoryCache, FeeHistoryCacheLimit),
filter_pool: Option<fc_rpc_core::types::FilterPool>,
) -> Result<
(
sc_service::RpcHandlers,
Arc<sc_network::NetworkService<Block, core_primitives::BlockHash>>,
Arc<sc_network_sync::SyncingService<Block>>,
ProtocolNaming,
sc_service::NetworkStarter,
Option<ValidatorAddressCache>,
SyncOracle
),
ServiceError,
>
{
let genesis_hash = client
.block_hash(0)
.ok()
.flatten()
.expect("we should have a hash");
let chain_prefix = match config.chain_spec.fork_id() {
Some(fork_id) => format!("/{genesis_hash}/{fork_id}"),
None => format!("/{genesis_hash}"),
};
let protocol_naming = ProtocolNaming::new(chain_prefix);
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
net_config.add_notification_protocol(phron_finality::peers_set_config(
protocol_naming.clone(),
Protocol::Authentication,
));
net_config.add_notification_protocol(phron_finality::peers_set_config(
protocol_naming.clone(),
Protocol::BlockSync,
));
let sync_oracle = SyncOracle::new();
let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
net_config,
client: client.clone(),
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_params: None,
})?;
let validator_address_cache = match collect_extra_debugging_data {
true => Some(ValidatorAddressCache::new()),
false => None,
};
let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
fc_mapping_sync::EthereumBlockNotification<Block>,
> = Default::default();
let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
let rpc_extensions_builder = {
let client = client.clone();
let pool = transaction_pool.clone();
let network = network.clone();
let is_authority = config.role.is_authority();
let frontier_backend = frontier_backend;
let overrides = crate::rpc::overrides_handle(client.clone());
let prometheus_registry = config.prometheus_registry().cloned();
let (fee_history_cache, fee_history_cache_limit) = fee_history;
let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
task_manager.spawn_handle(),
overrides.clone(),
50,
50,
prometheus_registry.clone(),
));
let sync_service = sync_service.clone();
let sync_oracle = sync_oracle.clone();
let validator_address_cache = validator_address_cache.clone();
let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
let pending_create_inherent_data_providers = move |_, ()| async move {
let current = sp_timestamp::InherentDataProvider::from_system_time();
let next_slot = current.timestamp().as_millis() + slot_duration.as_millis();
let timestamp = sp_timestamp::InherentDataProvider::new(next_slot.into());
let slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
*timestamp,
slot_duration,
);
let dynamic_fee = fp_dynamic_fee::InherentDataProvider(sp_core::U256::from(1));
Ok((slot, timestamp, dynamic_fee))
};
let pubsub_notification_sinks = pubsub_notification_sinks;
let filter_pool = filter_pool;
let evm_spawn_params = crate::rpc::tracing::SpawnTaskParams {
task_manager,
client: client.clone(),
substrate_backend: backend.clone(),
frontier_backend: frontier_backend.clone(),
filter_pool: filter_pool.clone(),
overrides: overrides.clone(),
fee_history_cache: fee_history_cache.clone(),
fee_history_limit: fee_history_cache_limit,
sync: sync_service.clone(),
pubsub_notification_sinks: pubsub_notification_sinks.clone(),
prometheus_registry,
};
crate::rpc::tracing::spawn_evm_tracing(
evm_spawn_params,
);
Box::new(move |deny_unsafe, subscription_task_executor| {
let deps =
crate::rpc::FullDeps {
client: client.clone(),
pool: pool.clone(),
graph: pool.pool().clone(),
deny_unsafe,
is_authority,
network: network.clone(),
sync: sync_service.clone(),
frontier_backend: match frontier_backend.clone() {
fc_db::Backend::KeyValue(db) => Arc::new(db),
fc_db::Backend::Sql(db) => Arc::new(db),
},
filter_pool: filter_pool.clone(),
overrides: overrides.clone(),
fee_history_cache: fee_history_cache.clone(),
block_data_cache: block_data_cache.clone(),
fee_history_cache_limit,
forced_parent_hashes: None,
pending_create_inherent_data_providers,
import_justification_tx: import_justification_tx.clone(),
justification_translator: JustificationTranslator::new(chain_status.clone()),
sync_oracle: sync_oracle.clone(),
validator_address_cache: validator_address_cache.clone(),
};
crate::rpc::create_full::<_, _, _, _, _, _, crate::rpc::DefaultEthConfig<_, _>>(
deps,
pubsub_notification_sinks.clone(),
subscription_task_executor,
).map_err(Into::into)
})
};
let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
network: network.clone(),
client,
keystore: keystore_container.keystore(),
task_manager,
transaction_pool,
rpc_builder: rpc_extensions_builder,
backend,
system_rpc_tx,
tx_handler_controller,
sync_service: sync_service.clone(),
config,
telemetry: telemetry.as_mut(),
})?;
Ok((
rpc_handlers,
network,
sync_service,
protocol_naming,
network_starter,
validator_address_cache,
sync_oracle
))
}