Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 25 additions & 46 deletions codechain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::io;
use std::net::SocketAddr;

use crate::config::Config;
use crate::rpc_apis;
use crpc::{
jsonrpc_core, start_http, start_ipc, start_ws, HttpServer, IpcServer, MetaIoHandler, Middleware, WsError, WsServer,
Expand All @@ -33,38 +33,27 @@ pub struct RpcHttpConfig {
}

pub fn rpc_http_start(
cfg: RpcHttpConfig,
enable_devel_api: bool,
deps: &rpc_apis::ApiDependencies,
server: MetaIoHandler<(), impl Middleware<()>>,
config: RpcHttpConfig,
) -> Result<HttpServer, String> {
let url = format!("{}:{}", cfg.interface, cfg.port);
let url = format!("{}:{}", config.interface, config.port);
let addr = url.parse().map_err(|_| format!("Invalid JSONRPC listen host/port given: {}", url))?;
let server = setup_http_rpc_server(&addr, cfg.cors.clone(), cfg.hosts.clone(), enable_devel_api, deps)?;
cinfo!(RPC, "RPC Listening on {}", url);
if let Some(hosts) = cfg.hosts {
cinfo!(RPC, "Allowed hosts are {:?}", hosts);
}
if let Some(cors) = cfg.cors {
cinfo!(RPC, "CORS domains are {:?}", cors);
}
Ok(server)
}

fn setup_http_rpc_server(
url: &SocketAddr,
cors_domains: Option<Vec<String>>,
allowed_hosts: Option<Vec<String>>,
enable_devel_api: bool,
deps: &rpc_apis::ApiDependencies,
) -> Result<HttpServer, String> {
let server = setup_rpc_server(enable_devel_api, deps);
let start_result = start_http(url, cors_domains, allowed_hosts, server);
let start_result = start_http(&addr, config.cors.clone(), config.hosts.clone(), server);
match start_result {
Err(ref err) if err.kind() == io::ErrorKind::AddrInUse => {
Err(format!("RPC address {} is already in use, make sure that another instance of a CodeChain node is not running or change the address using the --jsonrpc-port option.", url))
},
Err(e) => Err(format!("RPC error: {:?}", e)),
Ok(server) => Ok(server),
Ok(server) => {
cinfo!(RPC, "RPC Listening on {}", url);
if let Some(hosts) = config.hosts {
cinfo!(RPC, "Allowed hosts are {:?}", hosts);
}
if let Some(cors) = config.cors {
cinfo!(RPC, "CORS domains are {:?}", cors);
}
Ok(server)
},
}
}

Expand All @@ -74,19 +63,17 @@ pub struct RpcIpcConfig {
}

pub fn rpc_ipc_start(
cfg: &RpcIpcConfig,
enable_devel_api: bool,
deps: &rpc_apis::ApiDependencies,
server: MetaIoHandler<(), impl Middleware<()>>,
config: RpcIpcConfig,
) -> Result<IpcServer, String> {
let server = setup_rpc_server(enable_devel_api, deps);
let start_result = start_ipc(&cfg.socket_addr, server);
let start_result = start_ipc(&config.socket_addr, server);
match start_result {
Err(ref err) if err.kind() == io::ErrorKind::AddrInUse => {
Err(format!("IPC address {} is already in use, make sure that another instance of a Codechain node is not running or change the address using the --ipc-path options.", cfg.socket_addr))
Err(format!("IPC address {} is already in use, make sure that another instance of a Codechain node is not running or change the address using the --ipc-path options.", config.socket_addr))
},
Err(e) => Err(format!("IPC error: {:?}", e)),
Ok(server) => {
cinfo!(RPC, "IPC Listening on {}", cfg.socket_addr);
cinfo!(RPC, "IPC Listening on {}", config.socket_addr);
Ok(server)
},
}
Expand All @@ -99,15 +86,10 @@ pub struct RpcWsConfig {
pub max_connections: usize,
}

pub fn rpc_ws_start(
cfg: &RpcWsConfig,
enable_devel_api: bool,
deps: &rpc_apis::ApiDependencies,
) -> Result<WsServer, String> {
let server = setup_rpc_server(enable_devel_api, deps);
let url = format!("{}:{}", cfg.interface, cfg.port);
pub fn rpc_ws_start(server: MetaIoHandler<(), impl Middleware<()>>, config: RpcWsConfig) -> Result<WsServer, String> {
let url = format!("{}:{}", config.interface, config.port);
let addr = url.parse().map_err(|_| format!("Invalid WebSockets listen host/port given: {}", url))?;
let start_result = start_ws(&addr, server, cfg.max_connections);
let start_result = start_ws(&addr, server, config.max_connections);
match start_result {
Err(WsError::Io(ref err)) if err.kind() == io::ErrorKind::AddrInUse => {
Err(format!("WebSockets address {} is already in use, make sure that another instance of a Codechain node is not running or change the address using the --ws-port options.", addr))
Expand All @@ -120,12 +102,9 @@ pub fn rpc_ws_start(
}
}

fn setup_rpc_server(
enable_devel_api: bool,
deps: &rpc_apis::ApiDependencies,
) -> MetaIoHandler<(), impl Middleware<()>> {
pub fn setup_rpc_server(config: &Config, deps: &rpc_apis::ApiDependencies) -> MetaIoHandler<(), impl Middleware<()>> {
let mut handler = MetaIoHandler::with_middleware(LogMiddleware::new());
deps.extend_api(enable_devel_api, &mut handler);
deps.extend_api(config, &mut handler);
rpc_apis::setup_rpc(handler)
}

Expand Down
6 changes: 4 additions & 2 deletions codechain/rpc_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use cnetwork::{EventSender, NetworkControl};
use crpc::{MetaIoHandler, Middleware, Params, Value};
use csync::BlockSyncEvent;

use crate::config::Config;

pub struct ApiDependencies {
pub client: Arc<Client>,
pub miner: Arc<Miner>,
Expand All @@ -31,11 +33,11 @@ pub struct ApiDependencies {
}

impl ApiDependencies {
pub fn extend_api(&self, enable_devel_api: bool, handler: &mut MetaIoHandler<(), impl Middleware<()>>) {
pub fn extend_api(&self, config: &Config, handler: &mut MetaIoHandler<(), impl Middleware<()>>) {
use crpc::v1::*;
handler.extend_with(ChainClient::new(Arc::clone(&self.client)).to_delegate());
handler.extend_with(MempoolClient::new(Arc::clone(&self.client)).to_delegate());
if enable_devel_api {
if config.rpc.enable_devel_api {
handler.extend_with(
DevelClient::new(Arc::clone(&self.client), Arc::clone(&self.miner), self.block_sync.clone())
.to_delegate(),
Expand Down
64 changes: 36 additions & 28 deletions codechain/run_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::config::{self, load_config};
use crate::constants::{DEFAULT_DB_PATH, DEFAULT_KEYS_PATH};
use crate::dummy_network_service::DummyNetworkService;
use crate::json::PasswordFile;
use crate::rpc::{rpc_http_start, rpc_ipc_start, rpc_ws_start};
use crate::rpc::{rpc_http_start, rpc_ipc_start, rpc_ws_start, setup_rpc_server};
use crate::rpc_apis::ApiDependencies;

fn network_start(
Expand Down Expand Up @@ -316,36 +316,43 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {
}
};

let rpc_apis_deps = ApiDependencies {
client: client.client(),
miner: Arc::clone(&miner),
network_control: Arc::clone(&network_service),
account_provider: ap,
block_sync: maybe_sync_sender,
};
let (rpc_server, ipc_server, ws_server) = {
let rpc_apis_deps = ApiDependencies {
client: client.client(),
miner: Arc::clone(&miner),
network_control: Arc::clone(&network_service),
account_provider: ap,
block_sync: maybe_sync_sender,
};

let rpc_server = {
if !config.rpc.disable.unwrap() {
let server = setup_rpc_server(&config, &rpc_apis_deps);
Some(rpc_http_start(server, config.rpc_http_config())?)
} else {
None
}
};

let rpc_server = {
if !config.rpc.disable.unwrap() {
Some(rpc_http_start(config.rpc_http_config(), config.rpc.enable_devel_api, &rpc_apis_deps)?)
} else {
None
}
};
let ipc_server = {
if !config.ipc.disable.unwrap() {
let server = setup_rpc_server(&config, &rpc_apis_deps);
Some(rpc_ipc_start(server, config.rpc_ipc_config())?)
} else {
None
}
};

let ipc_server = {
if !config.ipc.disable.unwrap() {
Some(rpc_ipc_start(&config.rpc_ipc_config(), config.rpc.enable_devel_api, &rpc_apis_deps)?)
} else {
None
}
};
let ws_server = {
if !config.ws.disable.unwrap() {
let server = setup_rpc_server(&config, &rpc_apis_deps);
Some(rpc_ws_start(server, config.rpc_ws_config())?)
} else {
None
}
};

let ws_server = {
if !config.ws.disable.unwrap() {
Some(rpc_ws_start(&config.rpc_ws_config(), config.rpc.enable_devel_api, &rpc_apis_deps)?)
} else {
None
}
(rpc_server, ipc_server, ws_server)
};

if (!config.stratum.disable.unwrap()) && (miner.engine_type() == EngineType::PoW) {
Expand All @@ -367,6 +374,7 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {

// drop the scheme to free up genesis state.
drop(scheme);
client.client().engine().complete_register();

cinfo!(TEST_SCRIPT, "Initialization complete");

Expand Down
5 changes: 5 additions & 0 deletions core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ impl<'x> OpenBlock<'x> {
self.block.header.set_seal(seal);
Ok(())
}

pub fn inner_mut(&mut self) -> &mut ExecutedBlock {
&mut self.block
}
}

/// Just like `OpenBlock`, except that we've applied `Engine::on_close_block`, finished up the non-seal header fields.
Expand Down Expand Up @@ -492,6 +496,7 @@ pub fn enact<C: ChainTimeInfo + EngineInfo + FindActionHandler + TermInfo>(
let mut b = OpenBlock::try_new(engine, db, parent, Address::default(), vec![])?;

b.populate_from(header);
engine.on_open_block(b.inner_mut())?;
b.push_transactions(transactions, client, parent.number(), parent.timestamp())?;

let term_common_params = client.term_common_params(BlockId::Hash(*header.parent_hash()));
Expand Down
7 changes: 7 additions & 0 deletions core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ pub trait ConsensusEngine: Sync + Send {
/// Stops any services that the may hold the Engine and makes it safe to drop.
fn stop(&self) {}

/// Block transformation functions, before the transactions.
fn on_open_block(&self, _block: &mut ExecutedBlock) -> Result<(), Error> {
Ok(())
}

/// Block transformation functions, after the transactions.
fn on_close_block(
&self,
Expand Down Expand Up @@ -261,6 +266,8 @@ pub trait ConsensusEngine: Sync + Send {

fn register_chain_notify(&self, _: &Client) {}

fn complete_register(&self) {}

fn get_best_block_from_best_proposal_header(&self, header: &HeaderView) -> BlockHash {
header.hash()
}
Expand Down
19 changes: 15 additions & 4 deletions core/src/consensus/tendermint/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ impl ConsensusEngine for Tendermint {

fn stop(&self) {}

/// Block transformation functions, before the transactions.
fn on_open_block(&self, block: &mut ExecutedBlock) -> Result<(), Error> {
let metadata = block.state().metadata()?.expect("Metadata must exist");
if block.header().number() == metadata.last_term_finished_block_num() + 1 {
// FIXME: on_term_open
}
Ok(())
}

fn on_close_block(
&self,
block: &mut ExecutedBlock,
Expand Down Expand Up @@ -269,10 +278,6 @@ impl ConsensusEngine for Tendermint {
let extension = service.register_extension(move |api| TendermintExtension::new(inner, timeouts, api));
let client = Arc::downgrade(&self.client().unwrap());
self.extension_initializer.send((extension, client)).unwrap();

let (result, receiver) = crossbeam::bounded(1);
self.inner.send(worker::Event::Restore(result)).unwrap();
receiver.recv().unwrap();
}

fn register_time_gap_config_to_worker(&self, time_gap_params: TimeGapParams) {
Expand All @@ -291,6 +296,12 @@ impl ConsensusEngine for Tendermint {
client.add_notify(Arc::downgrade(&self.chain_notify) as Weak<dyn ChainNotify>);
}

fn complete_register(&self) {
let (result, receiver) = crossbeam::bounded(1);
self.inner.send(worker::Event::Restore(result)).unwrap();
receiver.recv().unwrap();
}

fn get_best_block_from_best_proposal_header(&self, header: &HeaderView) -> BlockHash {
header.parent_hash()
}
Expand Down
1 change: 1 addition & 0 deletions core/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ impl Miner {
return Ok(None)
}
}
self.engine.on_open_block(open_block.inner_mut())?;

let mut invalid_transactions = Vec::new();

Expand Down
Loading