Skip to content

Commit 1d4ba78

Browse files
authored
[fix]: add host-level logic for reentrant event handling (#381)
#### Terminology 1. `Sink`: public action handler type vended out to client code. these are the primary channel through which events from the 'outside world' are sent into a Workflow runtime. is a value type that wraps a (weak) reference to internal event handling infrastructure. 1. `ReusableSink`: internal action handler type that receives actions forwarded through from `Sink`s. it is a reference type that is weakly referenced by `Sink`s. 1. `EventPipe`: internal action handler type that implements an event handling state machine. it is a reference type that is owned by either a `ReusableSink` to propagate 'outside' events, or by a `SubtreeManager` to propagate outputs from child workflows. 1. `SubtreeManager`: type that coordinates interactions between a workflow node, its children, and the 'outside world'. #### Background the current event handling system in Workflow tracks event processing state locally – each node has a 'subtree manager' which is responsible for orchestrating interactions with both child workflow outputs and events from the 'outside world' sent to the corresponding node. each `SubtreeManager` coordinates a collection of 'event pipes' which handle event propagation; the 'outside world' has (indirect) access to them via 'sinks', and child workflow's communicate 'up' to their parents through `EventPipe` instances. these event pipes also encode the state machine transitions for event handling – if they're sent events in an invalid state they will generally halt the program. the `EventPipe` state machine consists of the following states: 1. `preparing`: the pipe has been created, but this state indicates a call to the corresponding node's `render()` method is still ongoing. 1. `pending`: the corresponding node's `render()` method is complete, but the full 'render pass' over the entire tree may not yet be. 1. `enabled`: the tree's render pass is finished, and the event pipe is valid to use. this state has a corresponding event handler closure that can be invoked when handling an event. 1. `invalid`: the event pipe is no longer valid to use. a node's event pipes are invalidated as one of the first parts of a render pass. the currently expected state transitions are: 1. <none> -> `preparing` - `preparing` is the initial state 1. `preparing` -> `pending` - after a node finishes `render()` 1. `pending` -> `enabled` - after the tree finishes a render pass 1. `enabled` -> `invalid` - when a node is about to be re-rendered[^1] [^1]: as an aside, it's reasonable to wonder if we may be allowing nodes to 'linger on' without invalidating their event handling infrastructure appropriately. what happens if a node is rendered in one pass and not rendered in the next? i think in most cases it just deallocates and so implicitly ends up ignoring any subsequent events, but... would probably be good to verify that and formalize what should be happening... the way current event pipe state machine works is: 1. during a render pass, every node gets new event pipes created, initially in the `preparing` state. any existing event pipes are set to the `invalid` state. 1. after a node is rendered, the event pipes that were created for that pass are moved to the `pending` state. 1. after a render pass is 'complete' and any Output and a new Rendering have been emitted, the nodes in the tree are walked and all event pipes are moved to the `enabled` state and hooked up with the right callbacks to invoke when they're sent events. some additional notes about the current implementation: 1. a number of invalid state transitions are banned outright and will cause a runtime trap if attempted. 1. the `ReusableSink` type contains logic to attempt to detect certain forms of reentrancy. specifically it will check if the event pipe state is `pending` and if it is, will enqueue the forwarding of the event into the future. 1. there is some limited reentrancy detection implemented when event pipes in the `invalid` state are reentrantly messaged. #### Issue the existing implementation seems to have generally worked reasonably well in practice, but there are cases when it falls down. in particular, two problematic cases that have been seen 'in the wild' are: 1. reentrant action emissions from synchronous side effects. perhaps the 'canonical' example of this is when an action is being processed that, during processing, leads to a change in `UIResponder` state (.e.g `resignFirstResponder()` is called), and some _other_ part of the Workflow tree responds to that change by emitting another action. 1. APIs that do manual `RunLoop` spinning, leading to reentrant action handling. one instance of this we've seen is when the UI layer responds to a new rendering by deriving an attributed string from HTML via some the `UIKit` extension methods it can end up spinning the main thread's run loop waiting for `WebKit` to do HTML processing, and if there are other Workflow events that have been enqueued they will cause the runtime to attempt to reentrantly process an event (generally leading to a `fatalError()`). as the existing implementation models the event handling state machine at the node level, it seems ill-equipped to deal with this problem holistically since the 'is the tree currently processing an event' bit of information is inherently a tree-level fact. we could try to augment the existing code with a state that represents 'the runtime is currently handling an event', but that seems somewhat awkward to fit into the existing model since a node would have to walk to the root and then update the whole tree with this new state[^2]. [^2]: except maybe _not_ the path from node to root? see, it seems kind of awkward... #### Proposed Solution in this PR, the approach taken to solve this is: - introduce a new `SinkEventHandler` type to track tree-level event processing state - plumb a new callback closure through the tree down to the `ReusableSink` instances (which are responsible to forwarding 'external' events into the `EventPipe` machinery) - the new callback takes two parameters: a closure to be immediately invoked if there is no event currently being handled, and a closure to be enqueued and called in the future if there is. - the callback implementation checks the current state and either invokes the 'run now' closure (adjusting processing state as appropriate) or enqueues the 'run later' closure. #### Alternatives i also drafted a more minimal change to address the new test case that i added which simulates the 'spinning the run loop during a render pass' problem in #379. instead of changing the plumbing and adding tree-level state tracking, it just changes how the enqueing logic in the `ReusableSink` implementation works. previously the enqueuing logic would defer handling and unconditionally forward the event through after the async block was processed. after the change, the method becomes fully recursive, so will check whether the original action should be enqueued again. while this approach requires changing less code, it is also less of a 'real' fix, as it won't solve cases in which someone, say, emits a second sink event targeting an ancestor node in the tree while a first sink event is being handled. --- #### Updates after initial feedback, made the following changes: - made the new event handling behavior conditional and added a `RuntimeConfig` value to opt into it - added a queue precondition check into the new `ReusableSink` event handling logic - removed the `initializing` state from `SinkEventHandler` in favor of just 'busy' and 'ready' - added a mechanism to explicitly enter the `busy` state while invoking a block so that the `WorkflowHost` can update the root node and ensure no event handling can synchronously occur during that process - added several new test cases (and minor refactoring of existing test utilities)
1 parent 15e8c02 commit 1d4ba78

File tree

7 files changed

+609
-68
lines changed

7 files changed

+609
-68
lines changed

Workflow/Sources/RuntimeConfiguration.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ extension Runtime {
8888

8989
/// Note: this doesn't control anything yet, but is here as a placeholder
9090
public var renderOnlyIfStateChanged: Bool = false
91+
92+
/// Whether action handling should be delegated to the `SinkEventHandler` type.
93+
/// This is expected to eventually be removed and become the default behavior.
94+
public var useSinkEventHandler: Bool = false
9195
}
9296

9397
struct BootstrappableConfiguration {

Workflow/Sources/SubtreeManager.swift

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,10 @@ extension WorkflowNode.SubtreeManager {
295295
func makeSink<Action: WorkflowAction>(
296296
of actionType: Action.Type
297297
) -> Sink<Action> where WorkflowType == Action.WorkflowType {
298-
let reusableSink = sinkStore.findOrCreate(actionType: Action.self)
298+
let reusableSink = sinkStore.findOrCreate(
299+
actionType: Action.self,
300+
onSinkEvent: hostContext.onSinkEvent
301+
)
299302

300303
let sink = Sink<Action> { [weak reusableSink] action in
301304
WorkflowLogger.logSinkEvent(ref: SignpostRef(), action: action)
@@ -334,7 +337,10 @@ extension WorkflowNode.SubtreeManager {
334337
self.usedSinks = [:]
335338
}
336339

337-
mutating func findOrCreate<Action: WorkflowAction>(actionType: Action.Type) -> ReusableSink<Action> {
340+
mutating func findOrCreate<Action: WorkflowAction>(
341+
actionType: Action.Type,
342+
onSinkEvent: OnSinkEvent?
343+
) -> ReusableSink<Action> {
338344
let key = ObjectIdentifier(actionType)
339345

340346
let reusableSink: ReusableSink<Action>
@@ -348,7 +354,7 @@ extension WorkflowNode.SubtreeManager {
348354
reusableSink = usedSink
349355
} else {
350356
// Create a new reusable sink.
351-
reusableSink = ReusableSink<Action>()
357+
reusableSink = ReusableSink<Action>(onSinkEvent: onSinkEvent)
352358
}
353359

354360
usedSinks[key] = reusableSink
@@ -359,15 +365,24 @@ extension WorkflowNode.SubtreeManager {
359365

360366
/// Type-erased base class for reusable sinks.
361367
fileprivate class AnyReusableSink {
368+
/// The callback to invoke when an event is to be handled.
369+
let onSinkEvent: OnSinkEvent?
362370
var eventPipe: EventPipe
363371

364-
init() {
372+
init(onSinkEvent: OnSinkEvent?) {
373+
self.onSinkEvent = onSinkEvent
365374
self.eventPipe = EventPipe()
366375
}
367376
}
368377

369378
fileprivate final class ReusableSink<Action: WorkflowAction>: AnyReusableSink where Action.WorkflowType == WorkflowType {
370379
func handle(action: Action) {
380+
if let onSinkEvent {
381+
handleWithSinkEventHandler(action: action, onSinkEvent: onSinkEvent)
382+
return
383+
}
384+
385+
// Prior logic
371386
let output = Output.update(
372387
action,
373388
source: .external,
@@ -384,6 +399,32 @@ extension WorkflowNode.SubtreeManager {
384399
}
385400
eventPipe.handle(event: output)
386401
}
402+
403+
private func handleWithSinkEventHandler(
404+
action: Action,
405+
onSinkEvent: OnSinkEvent
406+
) {
407+
// new `SinkEventHandler` logic
408+
dispatchPrecondition(condition: .onQueue(DispatchQueue.workflowExecution))
409+
410+
// If we can process now, forward through the `EventPipe`
411+
let immediatePerform: () -> Void = {
412+
let output = Output.update(
413+
action,
414+
source: .external,
415+
subtreeInvalidated: false // initial state
416+
)
417+
418+
self.eventPipe.handle(event: output)
419+
}
420+
421+
// Otherwise, try to recurse again in the future
422+
let deferredPerform: () -> Void = { [weak self] in
423+
self?.handle(action: action)
424+
}
425+
426+
onSinkEvent(immediatePerform, deferredPerform)
427+
}
387428
}
388429
}
389430

Workflow/Sources/WorkflowHost.swift

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17+
import Dispatch
1718
import ReactiveSwift
1819

1920
/// Defines a type that receives debug information about a running workflow hierarchy.
@@ -50,6 +51,8 @@ public final class WorkflowHost<WorkflowType: Workflow> {
5051
context.debugger
5152
}
5253

54+
let sinkEventHandler: SinkEventHandler
55+
5356
/// Initializes a new host with the given workflow at the root.
5457
///
5558
/// - Parameter workflow: The root workflow in the hierarchy
@@ -61,15 +64,22 @@ public final class WorkflowHost<WorkflowType: Workflow> {
6164
observers: [WorkflowObserver] = [],
6265
debugger: WorkflowDebugger? = nil
6366
) {
67+
self.sinkEventHandler = SinkEventHandler()
68+
defer { sinkEventHandler.state = .ready }
69+
6470
let observer = WorkflowObservation
6571
.sharedObserversInterceptor
6672
.workflowObservers(for: observers)
6773
.chained()
6874

75+
let config = Runtime.configuration
76+
let sinkEventCallback = config.useSinkEventHandler ? sinkEventHandler.makeOnSinkEventCallback() : nil
77+
6978
self.context = HostContext(
7079
observer: observer,
7180
debugger: debugger,
72-
runtimeConfig: Runtime.configuration
81+
runtimeConfig: config,
82+
onSinkEvent: sinkEventCallback
7383
)
7484

7585
self.rootNode = WorkflowNode(
@@ -91,6 +101,16 @@ public final class WorkflowHost<WorkflowType: Workflow> {
91101

92102
/// Update the input for the workflow. Will cause a render pass.
93103
public func update(workflow: WorkflowType) {
104+
if context.runtimeConfig.useSinkEventHandler {
105+
sinkEventHandler.withEventHandlingSuspended {
106+
updateRootNode(workflow: workflow)
107+
}
108+
} else {
109+
updateRootNode(workflow: workflow)
110+
}
111+
}
112+
113+
private func updateRootNode(workflow: WorkflowType) {
94114
rootNode.update(workflow: workflow)
95115

96116
// Treat the update as an "output" from the workflow originating from an external event to force a render pass.
@@ -158,14 +178,19 @@ struct HostContext {
158178
let debugger: WorkflowDebugger?
159179
let runtimeConfig: Runtime.Configuration
160180

181+
/// Event handler to be plumbed through the runtime down to the (reusable) Sinks.
182+
let onSinkEvent: OnSinkEvent?
183+
161184
init(
162185
observer: WorkflowObserver?,
163186
debugger: WorkflowDebugger?,
164-
runtimeConfig: Runtime.Configuration
187+
runtimeConfig: Runtime.Configuration,
188+
onSinkEvent: OnSinkEvent?
165189
) {
166190
self.observer = observer
167191
self.debugger = debugger
168192
self.runtimeConfig = runtimeConfig
193+
self.onSinkEvent = onSinkEvent
169194
}
170195
}
171196

@@ -176,3 +201,88 @@ extension HostContext {
176201
debugger != nil ? perform() : nil
177202
}
178203
}
204+
205+
// MARK: - SinkEventHandler
206+
207+
/// Callback signature for the internal `ReusableSink` types to invoke when
208+
/// they receive an event from the 'outside world'.
209+
/// - Parameter immediatePerform: The event handler to invoke if the event can be processed immediately.
210+
/// - Parameter deferredPerform: The event handler to invoke in the future if the event cannot currently be processed.
211+
typealias OnSinkEvent = (
212+
_ immediatePerform: () -> Void,
213+
_ deferredPerform: @escaping () -> Void
214+
) -> Void
215+
216+
/// Handles events from 'Sinks' such that runtime-level event handling state is appropriately
217+
/// managed, and attempts to perform reentrant action handling can be detected and dealt with.
218+
final class SinkEventHandler {
219+
enum State {
220+
/// Ready to handle an event.
221+
case ready
222+
223+
/// The event handler is busy. Usually this indicates another event is being
224+
/// processed, but it may also be set when some other condition prevents
225+
/// event handling (e.g. a `WorkflowHost` was told to update its root node).
226+
case busy
227+
}
228+
229+
fileprivate(set) var state: State
230+
231+
init(state: State = .busy) {
232+
self.state = state
233+
}
234+
235+
/// Synchronously performs or enqueues the specified event handlers based on the current
236+
/// event handler state.
237+
/// - Parameters:
238+
/// - immediate: The event handling action to perform immediately if possible.
239+
/// - deferred: The event handling action to enqueue if the event handler is already processing an event.
240+
func performOrEnqueueEvent(
241+
immediate: () -> Void,
242+
deferred: @escaping () -> Void
243+
) {
244+
switch state {
245+
case .ready:
246+
withEventHandlingSuspended(immediate)
247+
248+
case .busy:
249+
DispatchQueue.workflowExecution.async(execute: deferred)
250+
}
251+
}
252+
253+
/// Invokes the given closure with event handling explicitly set to the `busy` state, so
254+
/// any incoming events produced while executing the closure's body will be enqueued.
255+
/// - Parameter body: The closure to invoke.
256+
func withEventHandlingSuspended(_ body: () -> Void) {
257+
switch state {
258+
case .ready:
259+
state = .busy
260+
defer { state = .ready }
261+
body()
262+
263+
case .busy:
264+
body()
265+
}
266+
}
267+
268+
/// Creates the callback that should be invoked by Sinks to handle their event appropriately
269+
/// given the `SinkEventHandler`'s current state.
270+
/// - Returns: The callback that should be invoked.
271+
func makeOnSinkEventCallback() -> OnSinkEvent {
272+
// We may not actually need the weak ref, but it's more defensive to keep it.
273+
let onSinkEvent: OnSinkEvent = { [weak self] immediate, deferred in
274+
guard let self else {
275+
// We just drop the events here. Should we signal this somehow?
276+
// Maybe as a debug-only thing? Or is it just noise?
277+
return
278+
}
279+
280+
performOrEnqueueEvent(
281+
immediate: immediate,
282+
deferred: deferred
283+
)
284+
}
285+
286+
return onSinkEvent
287+
}
288+
}

0 commit comments

Comments
 (0)