From 26259a24e639dc887dc0eff90902dc2e1cbbc8cb Mon Sep 17 00:00:00 2001 From: Gabriel Scherer Date: Thu, 5 Jan 2023 17:22:38 +0100 Subject: [PATCH] Task: avoid double handler installation Explanation of the suspected issue: (This suspected issue was noticed by @clef-men; he convinced me that this looks credible and we wrote the proposed fix together.) The Task implementation seems to be slightly wasteful in that, if we understand correctly, it may install its effect handler twice. This occurs in the following scenario: - call Task.run, under which you - create a promise with `async` - `await` this promise - when `await` gives control back, two `step` handlers are on the stack Two handlers are on the stack because: - when the current task is paused by `await`, the continuation `k` contains the deep handler of the surrounding `step` call: ```ocaml let cont v (k, c) = Multi_channel.send c (Work (fun _ -> continue k v)) ``` invoking this Work function will thus reinstate the surrounding deep handler. - when this continuation is received by another worker, `step` is called again, installing a second handler ```ocaml let rec worker task_chan = match Multi_channel.recv task_chan with | Quit -> Multi_channel.clear_local_state task_chan | Work f -> step f (); worker task_chan ``` At this point, if we understand correctly, there are two `step` handlers on the call stack. Note that this does not grow to an unbounded number of nested handlers: on the next Wait effect, the inner handler either continues immediately (still 2 handlers) or pushes the current continuation to a Pending queue and returns, popping the two handlers. Once this continuation is invoked again under `step`, we are back to 2 handlers. Note that in some case the current implementation does need the `step` call in `worker`, because the function argument of `Work f` does not systematically include a handler: ```ocaml let async pool f = let pd = get_pool_data pool in let p = Atomic.make (Pending []) in Multi_channel.send pd.task_chan (Work (fun _ -> do_task f p)); p ``` Explanation of our proposed fix: This PR proposes to fix the issue by enforcing the invariant that the Work function always includes its own handler for Task effects: for the Work functions that use `continue` and `discontinue` there is nothing to do, for the Work function of `async` we call `step` at this point. Another approach would be to use a shallow handler, and use `step` in the same way as currently (around each Work function), but this may be slightly slower -- we would be exactly encoding a deep handler using a shallow handler. (The current code reads like the authors had the shallow-handler semantics in mind.) Co-authored-by: Clement Allain --- lib/task.ml | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/lib/task.ml b/lib/task.ml index 3599925..30b9caf 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -4,7 +4,9 @@ open Effect.Deep type 'a task = unit -> 'a type message = - Work of (unit -> unit) +| Work of (unit -> unit) + (* Invariant: the Work function does not need to run under the 'step' handler, + it installs its own handler or re-invokes a deep-handler continuation. *) | Quit type task_chan = message Multi_channel.t @@ -48,12 +50,6 @@ let do_task (type a) (f : unit -> a) (p : a promise) : unit = | Pending l -> List.iter action l | _ -> failwith "Task.do_task: impossible, can only set result of task once" -let async pool f = - let pd = get_pool_data pool in - let p = Atomic.make (Pending []) in - Multi_channel.send pd.task_chan (Work (fun _ -> do_task f p)); - p - let await pool promise = let pd = get_pool_data pool in match Atomic.get promise with @@ -78,10 +74,16 @@ let step (type a) (f : a -> unit) (v : a) : unit = loop ()) | _ -> None } +let async pool f = + let pd = get_pool_data pool in + let p = Atomic.make (Pending []) in + Multi_channel.send pd.task_chan (Work (fun _ -> step (do_task f) p)); + p + let rec worker task_chan = match Multi_channel.recv task_chan with | Quit -> Multi_channel.clear_local_state task_chan - | Work f -> step f (); worker task_chan + | Work f -> f (); worker task_chan let run (type a) pool (f : unit -> a) : a = let pd = get_pool_data pool in @@ -93,7 +95,7 @@ let run (type a) pool (f : unit -> a) : a = begin try match Multi_channel.recv_poll pd.task_chan with - | Work f -> step f () + | Work f -> f () | Quit -> failwith "Task.run: tasks are active on pool" with Exit -> Domain.cpu_relax () end;