diff --git a/quarkchain/cluster/master.py b/quarkchain/cluster/master.py index b703af127..ca3d9983d 100644 --- a/quarkchain/cluster/master.py +++ b/quarkchain/cluster/master.py @@ -50,6 +50,7 @@ GetWorkResponse, SubmitWorkRequest, SubmitWorkResponse, + AddMinorBlockHeaderListResponse, ) from quarkchain.cluster.rpc import ( ConnectToSlavesRequest, @@ -276,13 +277,14 @@ async def __sync_minor_blocks(self, minor_block_header_list): raise RuntimeError("Unable to download minor blocks from root block") if result.shard_stats: self.master_server.update_shard_stats(result.shard_stats) - for k, v in result.block_coinbase_map.items(): - self.root_state.add_validated_minor_block_hash(k, v.balance_map) for m_header in minor_block_header_list: if not self.root_state.db.contain_minor_block_by_hash(m_header.get_hash()): raise RuntimeError( - "minor block is still unavailable in master after root block sync" + "minor block {} from {} is still unavailable in master after root block sync".format( + m_header.get_hash().hex(), + m_header.branch.to_str(), + ) ) @@ -590,12 +592,27 @@ async def handle_add_minor_block_header_request(self, req): artificial_tx_config=self.master_server.get_artificial_tx_config(), ) + async def handle_add_minor_block_header_list_request(self, req): + check(len(req.minor_block_header_list) == len(req.coinbase_amount_map_list)) + for minor_block_header, coinbase_amount_map in zip(req.minor_block_header_list, req.coinbase_amount_map_list): + self.master_server.root_state.add_validated_minor_block_hash( + minor_block_header.get_hash(), coinbase_amount_map.balance_map + ) + Logger.info("adding {} mblock to db".format(minor_block_header.get_hash().hex())) + return AddMinorBlockHeaderListResponse( + error_code=0, + ) + OP_RPC_MAP = { ClusterOp.ADD_MINOR_BLOCK_HEADER_REQUEST: ( ClusterOp.ADD_MINOR_BLOCK_HEADER_RESPONSE, SlaveConnection.handle_add_minor_block_header_request, - ) + ), + ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_REQUEST: ( + ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_RESPONSE, + SlaveConnection.handle_add_minor_block_header_list_request, + ), } diff --git a/quarkchain/cluster/rpc.py b/quarkchain/cluster/rpc.py index 246b1d1a9..ec9c310ca 100644 --- a/quarkchain/cluster/rpc.py +++ b/quarkchain/cluster/rpc.py @@ -615,6 +615,32 @@ def __init__(self, error_code, artificial_tx_config): self.artificial_tx_config = artificial_tx_config +class AddMinorBlockHeaderListRequest(Serializable): + """ Notify master about a list of successfully added minor block. + Mostly used for minor block sync triggered by root block sync + """ + FIELDS = [ + ("minor_block_header_list", PrependedSizeListSerializer(4, MinorBlockHeader)), + ("coinbase_amount_map_list", PrependedSizeListSerializer(4, TokenBalanceMap)), + ] + + def __init__( + self, + minor_block_header_list, + coinbase_amount_map_list + ): + self.minor_block_header_list = minor_block_header_list + self.coinbase_amount_map_list = coinbase_amount_map_list + + +class AddMinorBlockHeaderListResponse(Serializable): + FIELDS = [ + ("error_code", uint32), + ] + + def __init__(self, error_code): + self.error_code = error_code + # slave -> slave @@ -878,6 +904,8 @@ class ClusterOp: GET_WORK_RESPONSE = 56 + CLUSTER_OP_BASE SUBMIT_WORK_REQUEST = 57 + CLUSTER_OP_BASE SUBMIT_WORK_RESPONSE = 58 + CLUSTER_OP_BASE + ADD_MINOR_BLOCK_HEADER_LIST_REQUEST = 59 + CLUSTER_OP_BASE + ADD_MINOR_BLOCK_HEADER_LIST_RESPONSE = 60 + CLUSTER_OP_BASE CLUSTER_OP_SERIALIZER_MAP = { @@ -938,4 +966,6 @@ class ClusterOp: ClusterOp.GET_WORK_RESPONSE: GetWorkResponse, ClusterOp.SUBMIT_WORK_REQUEST: SubmitWorkRequest, ClusterOp.SUBMIT_WORK_RESPONSE: SubmitWorkResponse, + ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_REQUEST: AddMinorBlockHeaderListRequest, + ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_RESPONSE: AddMinorBlockHeaderListResponse, } diff --git a/quarkchain/cluster/shard.py b/quarkchain/cluster/shard.py index 861f46e32..9071a3de4 100644 --- a/quarkchain/cluster/shard.py +++ b/quarkchain/cluster/shard.py @@ -38,6 +38,12 @@ from quarkchain.p2p.utils import RESERVED_CLUSTER_PEER_ID +class BlockCommitStatus: + UNCOMMITTED = 0 # The other slaves and the master may not have the block info + COMMITING = 1 # The block info is propagating to other slaves and the master + COMMITTED = 2 # The other slaves and the master have received the block info + + class PeerShardConnection(VirtualConnection): """ A virtual connection between local shard and remote shard """ @@ -621,13 +627,45 @@ async def handle_new_block(self, block): self.broadcast_new_block(block) await self.add_block(block) + def __get_block_commit_status_by_hash(self, block_hash): + # If the block is committed, it means + # - All neighor shards/slaves receives x-shard tx list + # - The block header is sent to master + # then return immediately + if self.state.is_committed_by_hash(block_hash): + return BlockCommitStatus.COMMITTED, None + + # Check if the block is being propagating to other slaves and the master + # Let's make sure all the shards and master got it before committing it + future = self.add_block_futures.get(block_hash, None) + if future is not None: + return BlockCommitStatus.COMMITTING, future + + return BlockCommitStatus.UNCOMMITTED, None + async def add_block(self, block): """ Returns true if block is successfully added. False on any error. called by 1. local miner (will not run if syncing) 2. SyncTask """ + + block_hash = block.header.get_hash() + commit_status, future = self.__get_block_commit_status_by_hash(block_hash) + if commit_status == BlockCommitStatus.COMMITTED: + return True + elif commit_status == BlockCommitStatus.COMMITING: + Logger.info( + "[{}] {} is being added ... waiting for it to finish".format( + block.header.branch.to_str(), block.header.height + ) + ) + await future + return True + + check(commit_status == BlockCommitStatus.UNCOMMITTED) + # Validate and add the block old_tip = self.state.header_tip try: - xshard_list, coinbase_amount_map = self.state.add_block(block) + xshard_list, coinbase_amount_map = self.state.add_block(block, force=True) except Exception as e: Logger.error_exception() return False @@ -635,7 +673,7 @@ async def add_block(self, block): # only remove from pool if the block successfully added to state, # this may cache failed blocks but prevents them being broadcasted more than needed # TODO add ttl to blocks in new_block_pool - self.state.new_block_pool.pop(block.header.get_hash(), None) + self.state.new_block_pool.pop(block_hash, None) # block has been added to local state, broadcast tip so that peers can sync if needed try: if old_tip != self.state.header_tip: @@ -643,21 +681,8 @@ async def add_block(self, block): except Exception: Logger.warning_every_sec("broadcast tip failure", 1) - # block already existed in local shard state - # but might not have been propagated to other shards and master - # let's make sure all the shards and master got it before return - if xshard_list is None: - future = self.add_block_futures.get(block.header.get_hash(), None) - if future: - Logger.info( - "[{}] {} is being added ... waiting for it to finish".format( - block.header.branch.to_str(), block.header.height - ) - ) - await future - return True - - self.add_block_futures[block.header.get_hash()] = self.loop.create_future() + # Add the block in future and wait + self.add_block_futures[block_hash] = self.loop.create_future() prev_root_height = self.state.db.get_root_block_header_by_hash( block.header.hash_prev_root_block @@ -671,8 +696,13 @@ async def add_block(self, block): self.state.get_shard_stats(), ) - self.add_block_futures[block.header.get_hash()].set_result(None) - del self.add_block_futures[block.header.get_hash()] + # Commit the block + self.state.commit_by_hash(block_hash) + Logger.debug("committed mblock {}".format(block_hash.hex())) + + # Notify the rest + self.add_block_futures[block_hash].set_result(None) + del self.add_block_futures[block_hash] return True async def add_block_list_for_sync(self, block_list): @@ -690,13 +720,39 @@ async def add_block_list_for_sync(self, block_list): existing_add_block_futures = [] block_hash_to_x_shard_list = dict() + uncommitted_block_header_list = [] + uncommitted_coinbase_amount_map_list = [] for block in block_list: check(block.header.branch.get_full_shard_id() == self.full_shard_id) block_hash = block.header.get_hash() + commit_status, future = self.__get_block_commit_status_by_hash(block_hash) + if commit_status == BlockCommitStatus.COMMITTED: + # Skip processing the block if it is already committed + Logger.warning( + "minor block to sync {} is already committed".format( + block_hash.hex(), + ) + ) + continue + elif commit_status == BlockCommitStatus.COMMITING: + # Check if the block is being propagating to other slaves and the master + # Let's make sure all the shards and master got it before committing it + Logger.info( + "[{}] {} is being added ... waiting for it to finish".format( + block.header.branch.to_str(), block.header.height + ) + ) + existing_add_block_futures.append(future) + continue + + check(commit_status == BlockCommitStatus.UNCOMMITTED) + # Validate and add the block try: xshard_list, coinbase_amount_map = self.state.add_block( - block, skip_if_too_old=False + block, + skip_if_too_old=False, + force=True, ) # coinbase_amount_map may be None if the block exists # adding the block header one since the block is already validated. @@ -705,28 +761,33 @@ async def add_block_list_for_sync(self, block_list): Logger.error_exception() return False, coinbase_amount_list - # block already existed in local shard state - # but might not have been propagated to other shards and master - # let's make sure all the shards and master got it before return - if xshard_list is None: - future = self.add_block_futures.get(block_hash, None) - if future: - existing_add_block_futures.append(future) - else: - prev_root_height = self.state.db.get_root_block_header_by_hash( - block.header.hash_prev_root_block - ).height - block_hash_to_x_shard_list[block_hash] = (xshard_list, prev_root_height) - self.add_block_futures[block_hash] = self.loop.create_future() + prev_root_height = self.state.db.get_root_block_header_by_hash( + block.header.hash_prev_root_block + ).height + block_hash_to_x_shard_list[block_hash] = (xshard_list, prev_root_height) + self.add_block_futures[block_hash] = self.loop.create_future() + uncommitted_block_header_list.append(block.header) + uncommitted_coinbase_amount_map_list.append(block.header.coinbase_amount_map) await self.slave.batch_broadcast_xshard_tx_list( block_hash_to_x_shard_list, block_list[0].header.branch ) + check(len(uncommitted_coinbase_amount_map_list) == len(uncommitted_block_header_list)) + await self.slave.send_minor_block_header_list_to_master( + uncommitted_block_header_list, + uncommitted_coinbase_amount_map_list, + ) + + # Commit all blocks and notify all rest add block operations + for block_header in uncommitted_block_header_list: + block_hash = block_header.get_hash() + self.state.commit_by_hash(block_hash) + Logger.debug("committed mblock {}".format(block_hash.hex())) - for block_hash in block_hash_to_x_shard_list.keys(): self.add_block_futures[block_hash].set_result(None) del self.add_block_futures[block_hash] + # Wait for the other add block operations await asyncio.gather(*existing_add_block_futures) return True, coinbase_amount_list diff --git a/quarkchain/cluster/shard_db_operator.py b/quarkchain/cluster/shard_db_operator.py index 98532e12f..1de7b143b 100644 --- a/quarkchain/cluster/shard_db_operator.py +++ b/quarkchain/cluster/shard_db_operator.py @@ -347,6 +347,12 @@ def get_block_count_by_height(self, height): """ Return the total number of blocks with the given height""" return len(self.height_to_minor_block_hashes.setdefault(height, set())) + def is_minor_block_committed_by_hash(self, h): + return self.db.get(b"commit_" + h) is not None + + def commit_minor_block_by_hash(self, h): + self.put(b"commit_" + h, b"") + # ------------------------- Transaction db operations -------------------------------- def put_transaction_index(self, tx, block_height, index): tx_hash = tx.get_hash() diff --git a/quarkchain/cluster/shard_state.py b/quarkchain/cluster/shard_state.py index b0cfa689a..30e1d9439 100644 --- a/quarkchain/cluster/shard_state.py +++ b/quarkchain/cluster/shard_state.py @@ -816,11 +816,16 @@ def __remove_transactions_from_block(self, block): self.tx_queue = self.tx_queue.diff(evm_tx_list) def add_block( - self, block, skip_if_too_old=True, gas_limit=None, xshard_gas_limit=None + self, + block, + skip_if_too_old=True, + gas_limit=None, + xshard_gas_limit=None, + force=False, ): """ Add a block to local db. Perform validate and update tip accordingly gas_limit and xshard_gas_limit are used for testing only. - Returns None if block is already added. + Returns None if block is already added (if force is False). Returns a list of CrossShardTransactionDeposit from block. Additionally, returns a map of reward token balances for this block Raises on any error. @@ -847,7 +852,7 @@ def add_block( ) block_hash = block.header.get_hash() - if self.db.contain_minor_block_by_hash(block_hash): + if not force and self.db.contain_minor_block_by_hash(block_hash): return None, None evm_tx_included = [] @@ -1749,3 +1754,9 @@ def _get_posw_coinbase_blockcnt( length = self.shard_config.POSW_CONFIG.WINDOW_SIZE coinbase_addrs = self.__get_coinbase_addresses_until_block(header_hash, length) return Counter(coinbase_addrs) + + def is_committed_by_hash(self, h): + return self.db.is_minor_block_committed_by_hash(h) + + def commit_by_hash(self, h): + self.db.commit_minor_block_by_hash(h) diff --git a/quarkchain/cluster/slave.py b/quarkchain/cluster/slave.py index 52880c1aa..830e5177a 100644 --- a/quarkchain/cluster/slave.py +++ b/quarkchain/cluster/slave.py @@ -31,6 +31,7 @@ GetWorkResponse, SubmitWorkRequest, SubmitWorkResponse, + AddMinorBlockHeaderListRequest, ) from quarkchain.cluster.rpc import ( AddRootBlockResponse, @@ -965,6 +966,20 @@ async def send_minor_block_header_to_master( check(resp.error_code == 0) self.artificial_tx_config = resp.artificial_tx_config + async def send_minor_block_header_list_to_master( + self, + minor_block_header_list, + coinbase_amount_map_list, + ): + request = AddMinorBlockHeaderListRequest( + minor_block_header_list, + coinbase_amount_map_list + ) + _, resp, _ = await self.master.write_rpc_request( + ClusterOp.ADD_MINOR_BLOCK_HEADER_LIST_REQUEST, request + ) + check(resp.error_code == 0) + def __get_branch_to_add_xshard_tx_list_request( self, block_hash, xshard_tx_list, prev_root_height ):