Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions quarkchain/cluster/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
GetWorkResponse,
SubmitWorkRequest,
SubmitWorkResponse,
AddMinorBlockHeaderListResponse,
)
from quarkchain.cluster.rpc import (
ConnectToSlavesRequest,
Expand Down Expand Up @@ -276,13 +277,14 @@ async def __sync_minor_blocks(self, minor_block_header_list):
raise RuntimeError("Unable to download minor blocks from root block")
if result.shard_stats:
self.master_server.update_shard_stats(result.shard_stats)
for k, v in result.block_coinbase_map.items():
self.root_state.add_validated_minor_block_hash(k, v.balance_map)

for m_header in minor_block_header_list:
if not self.root_state.db.contain_minor_block_by_hash(m_header.get_hash()):
raise RuntimeError(
"minor block is still unavailable in master after root block sync"
"minor block {} from {} is still unavailable in master after root block sync".format(
m_header.get_hash().hex(),
m_header.branch.to_str(),
)
)


Expand Down Expand Up @@ -590,12 +592,27 @@ async def handle_add_minor_block_header_request(self, req):
artificial_tx_config=self.master_server.get_artificial_tx_config(),
)

async def handle_add_minor_block_header_list_request(self, req):
check(len(req.minor_block_header_list) == len(req.coinbase_amount_map_list))
for minor_block_header, coinbase_amount_map in zip(req.minor_block_header_list, req.coinbase_amount_map_list):
self.master_server.root_state.add_validated_minor_block_hash(
minor_block_header.get_hash(), coinbase_amount_map.balance_map
)
Logger.info("adding {} mblock to db".format(minor_block_header.get_hash().hex()))
return AddMinorBlockHeaderListResponse(
error_code=0,
)


OP_RPC_MAP = {
ClusterOp.ADD_MINOR_BLOCK_HEADER_REQUEST: (
ClusterOp.ADD_MINOR_BLOCK_HEADER_RESPONSE,
SlaveConnection.handle_add_minor_block_header_request,
)
),
ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_REQUEST: (
ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_RESPONSE,
SlaveConnection.handle_add_minor_block_header_list_request,
),
}


Expand Down
30 changes: 30 additions & 0 deletions quarkchain/cluster/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,32 @@ def __init__(self, error_code, artificial_tx_config):
self.artificial_tx_config = artificial_tx_config


class AddMinorBlockHeaderListRequest(Serializable):
""" Notify master about a list of successfully added minor block.
Mostly used for minor block sync triggered by root block sync
"""
FIELDS = [
("minor_block_header_list", PrependedSizeListSerializer(4, MinorBlockHeader)),
("coinbase_amount_map_list", PrependedSizeListSerializer(4, TokenBalanceMap)),
]

def __init__(
self,
minor_block_header_list,
coinbase_amount_map_list
):
self.minor_block_header_list = minor_block_header_list
self.coinbase_amount_map_list = coinbase_amount_map_list


class AddMinorBlockHeaderListResponse(Serializable):
FIELDS = [
("error_code", uint32),
]

def __init__(self, error_code):
self.error_code = error_code

# slave -> slave


Expand Down Expand Up @@ -878,6 +904,8 @@ class ClusterOp:
GET_WORK_RESPONSE = 56 + CLUSTER_OP_BASE
SUBMIT_WORK_REQUEST = 57 + CLUSTER_OP_BASE
SUBMIT_WORK_RESPONSE = 58 + CLUSTER_OP_BASE
ADD_MINOR_BLOCK_HEADER_LIST_REQUEST = 59 + CLUSTER_OP_BASE
ADD_MINOR_BLOCK_HEADER_LIST_RESPONSE = 60 + CLUSTER_OP_BASE


CLUSTER_OP_SERIALIZER_MAP = {
Expand Down Expand Up @@ -938,4 +966,6 @@ class ClusterOp:
ClusterOp.GET_WORK_RESPONSE: GetWorkResponse,
ClusterOp.SUBMIT_WORK_REQUEST: SubmitWorkRequest,
ClusterOp.SUBMIT_WORK_RESPONSE: SubmitWorkResponse,
ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_REQUEST: AddMinorBlockHeaderListRequest,
ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_RESPONSE: AddMinorBlockHeaderListResponse,
}
129 changes: 95 additions & 34 deletions quarkchain/cluster/shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
from quarkchain.p2p.utils import RESERVED_CLUSTER_PEER_ID


class BlockCommitStatus:
UNCOMMITTED = 0 # The other slaves and the master may not have the block info
COMMITING = 1 # The block info is propagating to other slaves and the master
COMMITTED = 2 # The other slaves and the master have received the block info


class PeerShardConnection(VirtualConnection):
""" A virtual connection between local shard and remote shard
"""
Expand Down Expand Up @@ -621,43 +627,62 @@ async def handle_new_block(self, block):
self.broadcast_new_block(block)
await self.add_block(block)

def __get_block_commit_status_by_hash(self, block_hash):
# If the block is committed, it means
# - All neighor shards/slaves receives x-shard tx list
# - The block header is sent to master
# then return immediately
if self.state.is_committed_by_hash(block_hash):
return BlockCommitStatus.COMMITTED, None

# Check if the block is being propagating to other slaves and the master
# Let's make sure all the shards and master got it before committing it
future = self.add_block_futures.get(block_hash, None)
if future is not None:
return BlockCommitStatus.COMMITTING, future
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oopsy, can you spot the bug here? @qizhou


return BlockCommitStatus.UNCOMMITTED, None

async def add_block(self, block):
""" Returns true if block is successfully added. False on any error.
called by 1. local miner (will not run if syncing) 2. SyncTask
"""

block_hash = block.header.get_hash()
commit_status, future = self.__get_block_commit_status_by_hash(block_hash)
if commit_status == BlockCommitStatus.COMMITTED:
return True
elif commit_status == BlockCommitStatus.COMMITING:
Logger.info(
"[{}] {} is being added ... waiting for it to finish".format(
block.header.branch.to_str(), block.header.height
)
)
await future
return True

check(commit_status == BlockCommitStatus.UNCOMMITTED)
# Validate and add the block
old_tip = self.state.header_tip
try:
xshard_list, coinbase_amount_map = self.state.add_block(block)
xshard_list, coinbase_amount_map = self.state.add_block(block, force=True)
except Exception as e:
Logger.error_exception()
return False

# only remove from pool if the block successfully added to state,
# this may cache failed blocks but prevents them being broadcasted more than needed
# TODO add ttl to blocks in new_block_pool
self.state.new_block_pool.pop(block.header.get_hash(), None)
self.state.new_block_pool.pop(block_hash, None)
# block has been added to local state, broadcast tip so that peers can sync if needed
try:
if old_tip != self.state.header_tip:
self.broadcast_new_tip()
except Exception:
Logger.warning_every_sec("broadcast tip failure", 1)

# block already existed in local shard state
# but might not have been propagated to other shards and master
# let's make sure all the shards and master got it before return
if xshard_list is None:
future = self.add_block_futures.get(block.header.get_hash(), None)
if future:
Logger.info(
"[{}] {} is being added ... waiting for it to finish".format(
block.header.branch.to_str(), block.header.height
)
)
await future
return True

self.add_block_futures[block.header.get_hash()] = self.loop.create_future()
# Add the block in future and wait
self.add_block_futures[block_hash] = self.loop.create_future()

prev_root_height = self.state.db.get_root_block_header_by_hash(
block.header.hash_prev_root_block
Expand All @@ -671,8 +696,13 @@ async def add_block(self, block):
self.state.get_shard_stats(),
)

self.add_block_futures[block.header.get_hash()].set_result(None)
del self.add_block_futures[block.header.get_hash()]
# Commit the block
self.state.commit_by_hash(block_hash)
Logger.debug("committed mblock {}".format(block_hash.hex()))

# Notify the rest
self.add_block_futures[block_hash].set_result(None)
del self.add_block_futures[block_hash]
return True

async def add_block_list_for_sync(self, block_list):
Expand All @@ -690,13 +720,39 @@ async def add_block_list_for_sync(self, block_list):

existing_add_block_futures = []
block_hash_to_x_shard_list = dict()
uncommitted_block_header_list = []
uncommitted_coinbase_amount_map_list = []
for block in block_list:
check(block.header.branch.get_full_shard_id() == self.full_shard_id)

block_hash = block.header.get_hash()
commit_status, future = self.__get_block_commit_status_by_hash(block_hash)
if commit_status == BlockCommitStatus.COMMITTED:
# Skip processing the block if it is already committed
Logger.warning(
"minor block to sync {} is already committed".format(
block_hash.hex(),
)
)
continue
elif commit_status == BlockCommitStatus.COMMITING:
# Check if the block is being propagating to other slaves and the master
# Let's make sure all the shards and master got it before committing it
Logger.info(
"[{}] {} is being added ... waiting for it to finish".format(
block.header.branch.to_str(), block.header.height
)
)
existing_add_block_futures.append(future)
continue

check(commit_status == BlockCommitStatus.UNCOMMITTED)
# Validate and add the block
try:
xshard_list, coinbase_amount_map = self.state.add_block(
block, skip_if_too_old=False
block,
skip_if_too_old=False,
force=True,
)
# coinbase_amount_map may be None if the block exists
# adding the block header one since the block is already validated.
Expand All @@ -705,28 +761,33 @@ async def add_block_list_for_sync(self, block_list):
Logger.error_exception()
return False, coinbase_amount_list

# block already existed in local shard state
# but might not have been propagated to other shards and master
# let's make sure all the shards and master got it before return
if xshard_list is None:
future = self.add_block_futures.get(block_hash, None)
if future:
existing_add_block_futures.append(future)
else:
prev_root_height = self.state.db.get_root_block_header_by_hash(
block.header.hash_prev_root_block
).height
block_hash_to_x_shard_list[block_hash] = (xshard_list, prev_root_height)
self.add_block_futures[block_hash] = self.loop.create_future()
prev_root_height = self.state.db.get_root_block_header_by_hash(
block.header.hash_prev_root_block
).height
block_hash_to_x_shard_list[block_hash] = (xshard_list, prev_root_height)
self.add_block_futures[block_hash] = self.loop.create_future()
uncommitted_block_header_list.append(block.header)
uncommitted_coinbase_amount_map_list.append(block.header.coinbase_amount_map)

await self.slave.batch_broadcast_xshard_tx_list(
block_hash_to_x_shard_list, block_list[0].header.branch
)
check(len(uncommitted_coinbase_amount_map_list) == len(uncommitted_block_header_list))
await self.slave.send_minor_block_header_list_to_master(
uncommitted_block_header_list,
uncommitted_coinbase_amount_map_list,
)

# Commit all blocks and notify all rest add block operations
for block_header in uncommitted_block_header_list:
block_hash = block_header.get_hash()
self.state.commit_by_hash(block_hash)
Logger.debug("committed mblock {}".format(block_hash.hex()))

for block_hash in block_hash_to_x_shard_list.keys():
self.add_block_futures[block_hash].set_result(None)
del self.add_block_futures[block_hash]

# Wait for the other add block operations
await asyncio.gather(*existing_add_block_futures)

return True, coinbase_amount_list
Expand Down
6 changes: 6 additions & 0 deletions quarkchain/cluster/shard_db_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ def get_block_count_by_height(self, height):
""" Return the total number of blocks with the given height"""
return len(self.height_to_minor_block_hashes.setdefault(height, set()))

def is_minor_block_committed_by_hash(self, h):
return self.db.get(b"commit_" + h) is not None

def commit_minor_block_by_hash(self, h):
self.put(b"commit_" + h, b"")

# ------------------------- Transaction db operations --------------------------------
def put_transaction_index(self, tx, block_height, index):
tx_hash = tx.get_hash()
Expand Down
17 changes: 14 additions & 3 deletions quarkchain/cluster/shard_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,11 +816,16 @@ def __remove_transactions_from_block(self, block):
self.tx_queue = self.tx_queue.diff(evm_tx_list)

def add_block(
self, block, skip_if_too_old=True, gas_limit=None, xshard_gas_limit=None
self,
block,
skip_if_too_old=True,
gas_limit=None,
xshard_gas_limit=None,
force=False,
):
""" Add a block to local db. Perform validate and update tip accordingly
gas_limit and xshard_gas_limit are used for testing only.
Returns None if block is already added.
Returns None if block is already added (if force is False).
Returns a list of CrossShardTransactionDeposit from block.
Additionally, returns a map of reward token balances for this block
Raises on any error.
Expand All @@ -847,7 +852,7 @@ def add_block(
)

block_hash = block.header.get_hash()
if self.db.contain_minor_block_by_hash(block_hash):
if not force and self.db.contain_minor_block_by_hash(block_hash):
return None, None

evm_tx_included = []
Expand Down Expand Up @@ -1749,3 +1754,9 @@ def _get_posw_coinbase_blockcnt(
length = self.shard_config.POSW_CONFIG.WINDOW_SIZE
coinbase_addrs = self.__get_coinbase_addresses_until_block(header_hash, length)
return Counter(coinbase_addrs)

def is_committed_by_hash(self, h):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer letting the caller directly access shard_state.db and call is_minor_block_committed_by_hash / commit_minor_block_by_hash directly. didn't see many benefits over additional wrapping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an abstraction, the client of state_shard should not access db directly - i.e., we should probably rename db to __db to enforce that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. however currently shard_state.db is directly accessed in many other places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is bad, and we should fix that....

return self.db.is_minor_block_committed_by_hash(h)

def commit_by_hash(self, h):
self.db.commit_minor_block_by_hash(h)
15 changes: 15 additions & 0 deletions quarkchain/cluster/slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
GetWorkResponse,
SubmitWorkRequest,
SubmitWorkResponse,
AddMinorBlockHeaderListRequest,
)
from quarkchain.cluster.rpc import (
AddRootBlockResponse,
Expand Down Expand Up @@ -965,6 +966,20 @@ async def send_minor_block_header_to_master(
check(resp.error_code == 0)
self.artificial_tx_config = resp.artificial_tx_config

async def send_minor_block_header_list_to_master(
self,
minor_block_header_list,
coinbase_amount_map_list,
):
request = AddMinorBlockHeaderListRequest(
minor_block_header_list,
coinbase_amount_map_list
)
_, resp, _ = await self.master.write_rpc_request(
ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_REQUEST, request
)
check(resp.error_code == 0)

def __get_branch_to_add_xshard_tx_list_request(
self, block_hash, xshard_tx_list, prev_root_height
):
Expand Down