|  | 
|  | 1 | +// This file is part of Substrate. | 
|  | 2 | + | 
|  | 3 | +// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. | 
|  | 4 | +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 | 
|  | 5 | + | 
|  | 6 | +// This program is free software: you can redistribute it and/or modify | 
|  | 7 | +// it under the terms of the GNU General Public License as published by | 
|  | 8 | +// the Free Software Foundation, either version 3 of the License, or | 
|  | 9 | +// (at your option) any later version. | 
|  | 10 | + | 
|  | 11 | +// This program is distributed in the hope that it will be useful, | 
|  | 12 | +// but WITHOUT ANY WARRANTY; without even the implied warranty of | 
|  | 13 | +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 
|  | 14 | +// GNU General Public License for more details. | 
|  | 15 | + | 
|  | 16 | +// You should have received a copy of the GNU General Public License | 
|  | 17 | +// along with this program. If not, see <https://www.gnu.org/licenses/>. | 
|  | 18 | + | 
|  | 19 | +//! Helper for sending rate-limited gossip messages. | 
|  | 20 | +//! | 
|  | 21 | +//! # Context | 
|  | 22 | +//! | 
|  | 23 | +//! The [`NetworkService`] struct provides a way to send notifications to a certain peer through | 
|  | 24 | +//! the [`NetworkService::notification_sender`] method. This method is quite low level and isn't | 
|  | 25 | +//! expected to be used directly. | 
|  | 26 | +//! | 
|  | 27 | +//! The [`QueuedSender`] struct provided by this module is built on top of | 
|  | 28 | +//! [`NetworkService::notification_sender`] and provides a cleaner way to send notifications. | 
|  | 29 | +//! | 
|  | 30 | +//! # Behaviour | 
|  | 31 | +//! | 
|  | 32 | +//! An instance of [`QueuedSender`] is specific to a certain combination of `PeerId` and | 
|  | 33 | +//! protocol name. It maintains a buffer of messages waiting to be sent out. The user of this API | 
|  | 34 | +//! is able to manipulate that queue, adding or removing obsolete messages. | 
|  | 35 | +//! | 
|  | 36 | +//! Creating a [`QueuedSender`] also returns a opaque `Future` whose responsibility it to | 
|  | 37 | +//! drain that queue and actually send the messages. If the substream with the given combination | 
|  | 38 | +//! of peer and protocol is closed, the queue is silently discarded. It is the role of the user | 
|  | 39 | +//! to track which peers we are connected to. | 
|  | 40 | +//! | 
|  | 41 | +//! In normal situations, messages sent through a [`QueuedSender`] will arrive in the same | 
|  | 42 | +//! order as they have been sent. | 
|  | 43 | +//! It is possible, in the situation of disconnects and reconnects, that messages arrive in a | 
|  | 44 | +//! different order. See also https://github.com/paritytech/substrate/issues/6756. | 
|  | 45 | +//! However, if multiple instances of [`QueuedSender`] exist for the same peer and protocol, or | 
|  | 46 | +//! if some other code uses the [`NetworkService`] to send notifications to this combination or | 
|  | 47 | +//! peer and protocol, then the notifications will be interleaved in an unpredictable way. | 
|  | 48 | +//! | 
|  | 49 | +
 | 
|  | 50 | +use crate::{ExHashT, NetworkService}; | 
|  | 51 | + | 
|  | 52 | +use async_std::sync::{Condvar, Mutex, MutexGuard}; | 
|  | 53 | +use futures::prelude::*; | 
|  | 54 | +use libp2p::PeerId; | 
|  | 55 | +use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; | 
|  | 56 | +use std::{ | 
|  | 57 | +	collections::VecDeque, | 
|  | 58 | +	fmt, | 
|  | 59 | +	sync::{atomic, Arc}, | 
|  | 60 | +	time::Duration, | 
|  | 61 | +}; | 
|  | 62 | + | 
|  | 63 | +#[cfg(test)] | 
|  | 64 | +mod tests; | 
|  | 65 | + | 
|  | 66 | +/// Notifications sender for a specific combination of network service, peer, and protocol. | 
|  | 67 | +pub struct QueuedSender<M> { | 
|  | 68 | +	/// Shared between the front and the back task. | 
|  | 69 | +	shared: Arc<Shared<M>>, | 
|  | 70 | +} | 
|  | 71 | + | 
|  | 72 | +impl<M> QueuedSender<M> { | 
|  | 73 | +	/// Returns a new [`QueuedSender`] containing a queue of message for this specific | 
|  | 74 | +	/// combination of peer and protocol. | 
|  | 75 | +	/// | 
|  | 76 | +	/// In addition to the [`QueuedSender`], also returns a `Future` whose role is to drive | 
|  | 77 | +	/// the messages sending forward. | 
|  | 78 | +	pub fn new<B, H, F>( | 
|  | 79 | +		service: Arc<NetworkService<B, H>>, | 
|  | 80 | +		peer_id: PeerId, | 
|  | 81 | +		protocol: ConsensusEngineId, | 
|  | 82 | +		queue_size_limit: usize, | 
|  | 83 | +		messages_encode: F | 
|  | 84 | +	) -> (Self, impl Future<Output = ()> + Send + 'static) | 
|  | 85 | +	where | 
|  | 86 | +		M: Send + 'static, | 
|  | 87 | +		B: BlockT + 'static, | 
|  | 88 | +		H: ExHashT, | 
|  | 89 | +		F: Fn(M) -> Vec<u8> + Send + 'static, | 
|  | 90 | +	{ | 
|  | 91 | +		let shared = Arc::new(Shared { | 
|  | 92 | +			stop_task: atomic::AtomicBool::new(false), | 
|  | 93 | +			condvar: Condvar::new(), | 
|  | 94 | +			queue_size_limit, | 
|  | 95 | +			messages_queue: Mutex::new(VecDeque::with_capacity(queue_size_limit)), | 
|  | 96 | +		}); | 
|  | 97 | + | 
|  | 98 | +		let task = spawn_task( | 
|  | 99 | +			service, | 
|  | 100 | +			peer_id, | 
|  | 101 | +			protocol, | 
|  | 102 | +			shared.clone(), | 
|  | 103 | +			messages_encode | 
|  | 104 | +		); | 
|  | 105 | + | 
|  | 106 | +		(QueuedSender { shared }, task) | 
|  | 107 | +	} | 
|  | 108 | + | 
|  | 109 | +	/// Locks the queue of messages towards this peer. | 
|  | 110 | +	/// | 
|  | 111 | +	/// The returned `Future` is expected to be ready quite quickly. | 
|  | 112 | +	pub async fn lock_queue<'a>(&'a self) -> QueueGuard<'a, M> { | 
|  | 113 | +		QueueGuard { | 
|  | 114 | +			messages_queue: self.shared.messages_queue.lock().await, | 
|  | 115 | +			condvar: &self.shared.condvar, | 
|  | 116 | +			queue_size_limit: self.shared.queue_size_limit, | 
|  | 117 | +		} | 
|  | 118 | +	} | 
|  | 119 | + | 
|  | 120 | +	/// Pushes a message to the queue, or discards it if the queue is full. | 
|  | 121 | +	/// | 
|  | 122 | +	/// The returned `Future` is expected to be ready quite quickly. | 
|  | 123 | +	pub async fn queue_or_discard(&self, message: M) | 
|  | 124 | +	where | 
|  | 125 | +		M: Send + 'static | 
|  | 126 | +	{ | 
|  | 127 | +		self.lock_queue().await.push_or_discard(message); | 
|  | 128 | +	} | 
|  | 129 | +} | 
|  | 130 | + | 
|  | 131 | +impl<M> fmt::Debug for QueuedSender<M> { | 
|  | 132 | +	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | 
|  | 133 | +		f.debug_struct("QueuedSender").finish() | 
|  | 134 | +	} | 
|  | 135 | +} | 
|  | 136 | + | 
|  | 137 | +impl<M> Drop for QueuedSender<M> { | 
|  | 138 | +	fn drop(&mut self) { | 
|  | 139 | +		// The "clean" way to notify the `Condvar` here is normally to first lock the `Mutex`, | 
|  | 140 | +		// then notify the `Condvar` while the `Mutex` is locked. Unfortunately, the `Mutex` | 
|  | 141 | +		// being asynchronous, it can't reasonably be locked from within a destructor. | 
|  | 142 | +		// See also the corresponding code in the background task. | 
|  | 143 | +		self.shared.stop_task.store(true, atomic::Ordering::Release); | 
|  | 144 | +		self.shared.condvar.notify_all(); | 
|  | 145 | +	} | 
|  | 146 | +} | 
|  | 147 | + | 
|  | 148 | +/// Locked queue of messages to the given peer. | 
|  | 149 | +/// | 
|  | 150 | +/// As long as this struct exists, the background task is asleep and the owner of the [`QueueGuard`] | 
|  | 151 | +/// is in total control of the buffer. Messages can only ever be sent out after the [`QueueGuard`] | 
|  | 152 | +/// is dropped. | 
|  | 153 | +#[must_use] | 
|  | 154 | +pub struct QueueGuard<'a, M> { | 
|  | 155 | +	messages_queue: MutexGuard<'a, VecDeque<M>>, | 
|  | 156 | +	condvar: &'a Condvar, | 
|  | 157 | +	/// Same as [`Shared::queue_size_limit`]. | 
|  | 158 | +	queue_size_limit: usize, | 
|  | 159 | +} | 
|  | 160 | + | 
|  | 161 | +impl<'a, M: Send + 'static> QueueGuard<'a, M> { | 
|  | 162 | +	/// Pushes a message to the queue, or discards it if the queue is full. | 
|  | 163 | +	/// | 
|  | 164 | +	/// The message will only start being sent out after the [`QueueGuard`] is dropped. | 
|  | 165 | +	pub fn push_or_discard(&mut self, message: M) { | 
|  | 166 | +		if self.messages_queue.len() < self.queue_size_limit { | 
|  | 167 | +			self.messages_queue.push_back(message); | 
|  | 168 | +		} | 
|  | 169 | +	} | 
|  | 170 | + | 
|  | 171 | +	/// Calls `filter` for each message in the queue, and removes the ones for which `false` is | 
|  | 172 | +	/// returned. | 
|  | 173 | +	/// | 
|  | 174 | +	/// > **Note**: The parameter of `filter` is a `&M` and not a `&mut M` (which would be | 
|  | 175 | +	/// >           better) because the underlying implementation relies on `VecDeque::retain`. | 
|  | 176 | +	pub fn retain(&mut self, filter: impl FnMut(&M) -> bool) { | 
|  | 177 | +		self.messages_queue.retain(filter); | 
|  | 178 | +	} | 
|  | 179 | +} | 
|  | 180 | + | 
|  | 181 | +impl<'a, M> Drop for QueueGuard<'a, M> { | 
|  | 182 | +	fn drop(&mut self) { | 
|  | 183 | +		// We notify the `Condvar` in the destructor in order to be able to push multiple | 
|  | 184 | +		// messages and wake up the background task only once afterwards. | 
|  | 185 | +		self.condvar.notify_one(); | 
|  | 186 | +	} | 
|  | 187 | +} | 
|  | 188 | + | 
|  | 189 | +#[derive(Debug)] | 
|  | 190 | +struct Shared<M> { | 
|  | 191 | +	/// Read by the background task after locking `locked`. If true, the task stops. | 
|  | 192 | +	stop_task: atomic::AtomicBool, | 
|  | 193 | +	/// Queue of messages waiting to be sent out. | 
|  | 194 | +	messages_queue: Mutex<VecDeque<M>>, | 
|  | 195 | +	/// Must be notified every time the content of `locked` changes. | 
|  | 196 | +	condvar: Condvar, | 
|  | 197 | +	/// Maximum number of elements in `messages_queue`. | 
|  | 198 | +	queue_size_limit: usize, | 
|  | 199 | +} | 
|  | 200 | + | 
|  | 201 | +async fn spawn_task<B: BlockT, H: ExHashT, M, F: Fn(M) -> Vec<u8>>( | 
|  | 202 | +	service: Arc<NetworkService<B, H>>, | 
|  | 203 | +	peer_id: PeerId, | 
|  | 204 | +	protocol: ConsensusEngineId, | 
|  | 205 | +	shared: Arc<Shared<M>>, | 
|  | 206 | +	messages_encode: F, | 
|  | 207 | +) { | 
|  | 208 | +	loop { | 
|  | 209 | +		let next_message = 'next_msg: loop { | 
|  | 210 | +			let mut queue = shared.messages_queue.lock().await; | 
|  | 211 | + | 
|  | 212 | +			loop { | 
|  | 213 | +				if shared.stop_task.load(atomic::Ordering::Acquire) { | 
|  | 214 | +					return; | 
|  | 215 | +				} | 
|  | 216 | + | 
|  | 217 | +				if let Some(msg) = queue.pop_front() { | 
|  | 218 | +					break 'next_msg msg; | 
|  | 219 | +				} | 
|  | 220 | + | 
|  | 221 | +				// It is possible that the destructor of `QueuedSender` sets `stop_task` to | 
|  | 222 | +				// true and notifies the `Condvar` after the background task loads `stop_task` | 
|  | 223 | +				// and before it calls `Condvar::wait`. | 
|  | 224 | +				// See also the corresponding comment in `QueuedSender::drop`. | 
|  | 225 | +				// For this reason, we use `wait_timeout`. In the worst case scenario, | 
|  | 226 | +				// `stop_task` will always be checked again after the timeout is reached. | 
|  | 227 | +				queue = shared.condvar.wait_timeout(queue, Duration::from_secs(10)).await.0; | 
|  | 228 | +			} | 
|  | 229 | +		}; | 
|  | 230 | + | 
|  | 231 | +		// Starting from below, we try to send the message. If an error happens when sending, | 
|  | 232 | +		// the only sane option we have is to silently discard the message. | 
|  | 233 | +		let sender = match service.notification_sender(peer_id.clone(), protocol) { | 
|  | 234 | +			Ok(s) => s, | 
|  | 235 | +			Err(_) => continue, | 
|  | 236 | +		}; | 
|  | 237 | + | 
|  | 238 | +		let ready = match sender.ready().await { | 
|  | 239 | +			Ok(r) => r, | 
|  | 240 | +			Err(_) => continue, | 
|  | 241 | +		}; | 
|  | 242 | + | 
|  | 243 | +		let _ = ready.send(messages_encode(next_message)); | 
|  | 244 | +	} | 
|  | 245 | +} | 
0 commit comments