diff --git a/lib/task.ml b/lib/task.ml index 3599925..30b9caf 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -4,7 +4,9 @@ open Effect.Deep type 'a task = unit -> 'a type message = - Work of (unit -> unit) +| Work of (unit -> unit) + (* Invariant: the Work function does not need to run under the 'step' handler, + it installs its own handler or re-invokes a deep-handler continuation. *) | Quit type task_chan = message Multi_channel.t @@ -48,12 +50,6 @@ let do_task (type a) (f : unit -> a) (p : a promise) : unit = | Pending l -> List.iter action l | _ -> failwith "Task.do_task: impossible, can only set result of task once" -let async pool f = - let pd = get_pool_data pool in - let p = Atomic.make (Pending []) in - Multi_channel.send pd.task_chan (Work (fun _ -> do_task f p)); - p - let await pool promise = let pd = get_pool_data pool in match Atomic.get promise with @@ -78,10 +74,16 @@ let step (type a) (f : a -> unit) (v : a) : unit = loop ()) | _ -> None } +let async pool f = + let pd = get_pool_data pool in + let p = Atomic.make (Pending []) in + Multi_channel.send pd.task_chan (Work (fun _ -> step (do_task f) p)); + p + let rec worker task_chan = match Multi_channel.recv task_chan with | Quit -> Multi_channel.clear_local_state task_chan - | Work f -> step f (); worker task_chan + | Work f -> f (); worker task_chan let run (type a) pool (f : unit -> a) : a = let pd = get_pool_data pool in @@ -93,7 +95,7 @@ let run (type a) pool (f : unit -> a) : a = begin try match Multi_channel.recv_poll pd.task_chan with - | Work f -> step f () + | Work f -> f () | Quit -> failwith "Task.run: tasks are active on pool" with Exit -> Domain.cpu_relax () end;