Skip to content

Commit daff298

Browse files
yiweichijonastheis
andauthored
feat: add flags to enable broadcast blocks and transactions to all peers (#1219)
* feat: add flag to enable broadcast blocks and transactions to all peers * fix: typo * feat: add broadcast to all cap * chore: auto version bump [bot] * fix: ci --------- Co-authored-by: Jonas Theis <[email protected]>
1 parent bb06ef9 commit daff298

File tree

7 files changed

+95
-57
lines changed

7 files changed

+95
-57
lines changed

cmd/geth/main.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,11 @@ var (
176176
utils.CircuitCapacityCheckWorkersFlag,
177177
utils.RollupVerifyEnabledFlag,
178178
utils.ShadowforkPeersFlag,
179-
utils.TxGossipBroadcastDisabledFlag,
180-
utils.TxGossipReceivingDisabledFlag,
181-
utils.TxGossipSequencerHTTPFlag,
179+
utils.GossipTxBroadcastDisabledFlag,
180+
utils.GossipTxReceivingDisabledFlag,
181+
utils.GossipSequencerHTTPFlag,
182+
utils.GossipBroadcastToAllEnabledFlag,
183+
utils.GossipBroadcastToAllCapFlag,
182184
utils.DASyncEnabledFlag,
183185
utils.DAMissingHeaderFieldsBaseURLFlag,
184186
utils.DABlockNativeAPIEndpointFlag,

cmd/geth/usage.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,11 @@ var AppHelpFlagGroups = []flags.FlagGroup{
250250
utils.DARecoveryProduceBlocksFlag,
251251
utils.CircuitCapacityCheckEnabledFlag,
252252
utils.CircuitCapacityCheckWorkersFlag,
253-
utils.TxGossipBroadcastDisabledFlag,
254-
utils.TxGossipReceivingDisabledFlag,
255-
utils.TxGossipSequencerHTTPFlag,
253+
utils.GossipTxBroadcastDisabledFlag,
254+
utils.GossipTxReceivingDisabledFlag,
255+
utils.GossipSequencerHTTPFlag,
256+
utils.GossipBroadcastToAllEnabledFlag,
257+
utils.GossipBroadcastToAllCapFlag,
256258
},
257259
},
258260
{

cmd/utils/flags.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -893,19 +893,28 @@ var (
893893
Usage: "peer ids of shadow fork peers",
894894
}
895895

896-
// Tx gossip settings
897-
TxGossipBroadcastDisabledFlag = cli.BoolFlag{
898-
Name: "gossip.disablebroadcast",
896+
// Gossip settings
897+
GossipTxBroadcastDisabledFlag = cli.BoolFlag{
898+
Name: "gossip.disabletxbroadcast",
899899
Usage: "Disable gossip broadcast transactions to other peers",
900900
}
901-
TxGossipReceivingDisabledFlag = cli.BoolFlag{
902-
Name: "gossip.disablereceiving",
901+
GossipTxReceivingDisabledFlag = cli.BoolFlag{
902+
Name: "gossip.disabletxreceiving",
903903
Usage: "Disable gossip receiving transactions from other peers",
904904
}
905-
TxGossipSequencerHTTPFlag = &cli.StringFlag{
905+
GossipSequencerHTTPFlag = &cli.StringFlag{
906906
Name: "gossip.sequencerhttp",
907907
Usage: "Sequencer mempool HTTP endpoint",
908908
}
909+
GossipBroadcastToAllEnabledFlag = cli.BoolFlag{
910+
Name: "gossip.enablebroadcasttoall",
911+
Usage: "Enable gossip broadcast blocks and transactions to all peers",
912+
}
913+
GossipBroadcastToAllCapFlag = cli.IntFlag{
914+
Name: "gossip.broadcasttoallcap",
915+
Usage: "Maximum number of peers for broadcasting blocks and transactions (effective only when gossip.enablebroadcasttoall is enabled)",
916+
Value: 30,
917+
}
909918

910919
// DA syncing settings
911920
DASyncEnabledFlag = cli.BoolFlag{
@@ -1819,17 +1828,22 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
18191828
cfg.ShadowForkPeerIDs = ctx.GlobalStringSlice(ShadowforkPeersFlag.Name)
18201829
log.Info("Shadow fork peers", "ids", cfg.ShadowForkPeerIDs)
18211830
}
1822-
if ctx.GlobalIsSet(TxGossipBroadcastDisabledFlag.Name) {
1823-
cfg.TxGossipBroadcastDisabled = ctx.GlobalBool(TxGossipBroadcastDisabledFlag.Name)
1824-
log.Info("Transaction gossip broadcast disabled", "disabled", cfg.TxGossipBroadcastDisabled)
1831+
if ctx.GlobalIsSet(GossipTxBroadcastDisabledFlag.Name) {
1832+
cfg.GossipTxBroadcastDisabled = ctx.GlobalBool(GossipTxBroadcastDisabledFlag.Name)
1833+
log.Info("Gossip transaction broadcast disabled", "disabled", cfg.GossipTxBroadcastDisabled)
1834+
}
1835+
if ctx.GlobalIsSet(GossipTxReceivingDisabledFlag.Name) {
1836+
cfg.GossipTxReceivingDisabled = ctx.GlobalBool(GossipTxReceivingDisabledFlag.Name)
1837+
log.Info("Gossip transaction receiving disabled", "disabled", cfg.GossipTxReceivingDisabled)
18251838
}
1826-
if ctx.GlobalIsSet(TxGossipReceivingDisabledFlag.Name) {
1827-
cfg.TxGossipReceivingDisabled = ctx.GlobalBool(TxGossipReceivingDisabledFlag.Name)
1828-
log.Info("Transaction gossip receiving disabled", "disabled", cfg.TxGossipReceivingDisabled)
1839+
if ctx.GlobalIsSet(GossipBroadcastToAllEnabledFlag.Name) {
1840+
cfg.GossipBroadcastToAllEnabled = ctx.GlobalBool(GossipBroadcastToAllEnabledFlag.Name)
1841+
cfg.GossipBroadcastToAllCap = ctx.GlobalInt(GossipBroadcastToAllCapFlag.Name)
1842+
log.Info("Gossip broadcast to all enabled", "enabled", cfg.GossipBroadcastToAllEnabled, "cap", cfg.GossipBroadcastToAllCap)
18291843
}
18301844
// Only configure sequencer http flag if we're running in verifier mode i.e. --mine is disabled.
1831-
if ctx.IsSet(TxGossipSequencerHTTPFlag.Name) && !ctx.IsSet(MiningEnabledFlag.Name) {
1832-
cfg.TxGossipSequencerHTTP = ctx.String(TxGossipSequencerHTTPFlag.Name)
1845+
if ctx.IsSet(GossipSequencerHTTPFlag.Name) && !ctx.IsSet(MiningEnabledFlag.Name) {
1846+
cfg.GossipSequencerHTTP = ctx.String(GossipSequencerHTTPFlag.Name)
18331847
}
18341848

18351849
// Cap the cache allowance and tune the garbage collector

eth/backend.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -285,18 +285,20 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client l1.Client) (*Ether
285285
checkpoint = params.TrustedCheckpoints[genesisHash]
286286
}
287287
if eth.handler, err = newHandler(&handlerConfig{
288-
Database: chainDb,
289-
Chain: eth.blockchain,
290-
TxPool: eth.txPool,
291-
Network: config.NetworkId,
292-
Sync: config.SyncMode,
293-
BloomCache: uint64(cacheLimit),
294-
EventMux: eth.eventMux,
295-
Checkpoint: checkpoint,
296-
Whitelist: config.Whitelist,
297-
ShadowForkPeerIDs: config.ShadowForkPeerIDs,
298-
DisableTxBroadcast: config.TxGossipBroadcastDisabled,
299-
DisableTxReceiving: config.TxGossipReceivingDisabled,
288+
Database: chainDb,
289+
Chain: eth.blockchain,
290+
TxPool: eth.txPool,
291+
Network: config.NetworkId,
292+
Sync: config.SyncMode,
293+
BloomCache: uint64(cacheLimit),
294+
EventMux: eth.eventMux,
295+
Checkpoint: checkpoint,
296+
Whitelist: config.Whitelist,
297+
ShadowForkPeerIDs: config.ShadowForkPeerIDs,
298+
DisableTxBroadcast: config.GossipTxBroadcastDisabled,
299+
DisableTxReceiving: config.GossipTxReceivingDisabled,
300+
EnableBroadcastToAll: config.GossipBroadcastToAllEnabled,
301+
BroadcastToAllCap: config.GossipBroadcastToAllCap,
300302
}); err != nil {
301303
return nil, err
302304
}
@@ -306,7 +308,7 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client l1.Client) (*Ether
306308
// Some of the extraData is used with Clique consensus (before EuclidV2). After EuclidV2 we use SystemContract consensus where this is overridden when creating a block.
307309
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
308310

309-
eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, config.TxGossipReceivingDisabled, eth, nil}
311+
eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, config.GossipTxReceivingDisabled, eth, nil}
310312
if eth.APIBackend.allowUnprotectedTxs {
311313
log.Info("Unprotected transactions allowed")
312314
}
@@ -317,9 +319,9 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client l1.Client) (*Ether
317319
gpoParams.DefaultBasePrice = new(big.Int).SetUint64(config.TxPool.PriceLimit)
318320
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
319321

320-
if config.TxGossipSequencerHTTP != "" {
322+
if config.GossipSequencerHTTP != "" {
321323
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
322-
client, err := rpc.DialContext(ctx, config.TxGossipSequencerHTTP)
324+
client, err := rpc.DialContext(ctx, config.GossipSequencerHTTP)
323325
cancel()
324326
if err != nil {
325327
return nil, fmt.Errorf("cannot initialize rollup sequencer client: %w", err)

eth/ethconfig/config.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,11 @@ type Config struct {
231231
// DA syncer options
232232
DA da_syncer.Config
233233

234-
TxGossipBroadcastDisabled bool
235-
TxGossipReceivingDisabled bool
236-
TxGossipSequencerHTTP string
234+
GossipTxBroadcastDisabled bool
235+
GossipTxReceivingDisabled bool
236+
GossipSequencerHTTP string
237+
GossipBroadcastToAllEnabled bool
238+
GossipBroadcastToAllCap int
237239
}
238240

239241
// CreateConsensusEngine creates a consensus engine for the given chain configuration.

eth/handler.go

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,11 @@ type handlerConfig struct {
9494
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
9595
ShadowForkPeerIDs []string // List of peer ids that take part in the shadow-fork
9696

97-
DisableTxBroadcast bool
98-
DisableTxReceiving bool
97+
// Gossip configs
98+
DisableTxBroadcast bool
99+
DisableTxReceiving bool
100+
EnableBroadcastToAll bool
101+
BroadcastToAllCap int
99102
}
100103

101104
type handler struct {
@@ -134,9 +137,11 @@ type handler struct {
134137
wg sync.WaitGroup
135138
peerWG sync.WaitGroup
136139

137-
shadowForkPeerIDs []string
138-
disableTxBroadcast bool
139-
disableTxReceiving bool
140+
shadowForkPeerIDs []string
141+
disableTxBroadcast bool
142+
disableTxReceiving bool
143+
enableBroadcastToAll bool
144+
broadcastToAllCap int
140145
}
141146

142147
// newHandler returns a handler for all Ethereum chain management protocol.
@@ -146,18 +151,20 @@ func newHandler(config *handlerConfig) (*handler, error) {
146151
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
147152
}
148153
h := &handler{
149-
networkID: config.Network,
150-
forkFilter: forkid.NewFilter(config.Chain),
151-
eventMux: config.EventMux,
152-
database: config.Database,
153-
txpool: config.TxPool,
154-
chain: config.Chain,
155-
peers: newPeerSet(),
156-
whitelist: config.Whitelist,
157-
quitSync: make(chan struct{}),
158-
shadowForkPeerIDs: config.ShadowForkPeerIDs,
159-
disableTxBroadcast: config.DisableTxBroadcast,
160-
disableTxReceiving: config.DisableTxReceiving,
154+
networkID: config.Network,
155+
forkFilter: forkid.NewFilter(config.Chain),
156+
eventMux: config.EventMux,
157+
database: config.Database,
158+
txpool: config.TxPool,
159+
chain: config.Chain,
160+
peers: newPeerSet(),
161+
whitelist: config.Whitelist,
162+
quitSync: make(chan struct{}),
163+
shadowForkPeerIDs: config.ShadowForkPeerIDs,
164+
disableTxBroadcast: config.DisableTxBroadcast,
165+
disableTxReceiving: config.DisableTxReceiving,
166+
enableBroadcastToAll: config.EnableBroadcastToAll,
167+
broadcastToAllCap: config.BroadcastToAllCap,
161168
}
162169
if config.Sync == downloader.FullSync {
163170
// The database seems empty as the current block is the genesis. Yet the fast
@@ -477,7 +484,12 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
477484
return
478485
}
479486
// Send the block to a subset of our peers
480-
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
487+
numDirect := int(math.Sqrt(float64(len(peers))))
488+
// If enableBroadcastToAll is true, broadcast blocks directly to all peers (capped at broadcastToAllCap).
489+
if h.enableBroadcastToAll {
490+
numDirect = min(h.broadcastToAllCap, len(peers))
491+
}
492+
transfer := peers[:numDirect]
481493
for _, peer := range transfer {
482494
peer.AsyncSendNewBlock(block, td)
483495
}
@@ -518,6 +530,10 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
518530
peers := onlyShadowForkPeers(h.shadowForkPeerIDs, h.peers.peersWithoutTransaction(tx.Hash()))
519531
// Send the tx unconditionally to a subset of our peers
520532
numDirect := int(math.Sqrt(float64(len(peers))))
533+
// If enableBroadcastToAll is true, broadcast transactions directly to all peers (capped at broadcastToAllCap).
534+
if h.enableBroadcastToAll {
535+
numDirect = min(h.broadcastToAllCap, len(peers))
536+
}
521537
for _, peer := range peers[:numDirect] {
522538
txset[peer] = append(txset[peer], tx.Hash())
523539
}

params/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
const (
2525
VersionMajor = 5 // Major version component of the current release
2626
VersionMinor = 8 // Minor version component of the current release
27-
VersionPatch = 67 // Patch version component of the current release
27+
VersionPatch = 68 // Patch version component of the current release
2828
VersionMeta = "mainnet" // Version metadata to append to the version string
2929
)
3030

0 commit comments

Comments
 (0)