@@ -12,103 +12,164 @@ trait Scheduler:
1212 def schedule (task : Runnable ): Unit = ???
1313
1414object Scheduler extends Scheduler :
15- given fromAsync (using async : Async ): Scheduler = async.client .scheduler
15+ given fromAsync (using async : Async ): Scheduler = async.runner .scheduler
1616end Scheduler
1717
18+ /** A context that allows one to suspend waiting for asynchronous data sources */
1819trait Async :
1920
20- /** Wait for completion of future `f`. This means:
21- * - ensure that computing `f` has started
22- * - wait for the completion and return the completed Try
23- */
24- def await [T ](f : Future [T ]): Try [T ]
21+ /** Wait for completion of async source `src` and return the result */
22+ def await [T ](src : Async .Source [T ]): T
2523
26- /** Wait for completion of the first of the futures `f1 `, `f2 `
27- * @return `Left(r1)` if `f1 ` completed first with `r1`
28- * `Right(r2)` if `f2 ` completed first with `r2`
24+ /** Wait for completion of the first of the sources `src1 `, `src2 `
25+ * @return `Left(r1)` if `src1 ` completed first with `r1`
26+ * `Right(r2)` if `src2 ` completed first with `r2`
2927 */
30- def awaitEither [T1 , T2 ](f1 : Future [T1 ], f2 : Future [T2 ]): Either [Try [ T1 ], Try [ T2 ] ]
28+ def awaitEither [T1 , T2 ](src1 : Async . Source [T1 ], src2 : Async . Source [T2 ]): Either [T1 , T2 ]
3129
32- /** The future computed by this async computation. */
33- def client : Future [ ? ]
30+ /** The runner underlying this async computation. */
31+ def runner : Async . Runner
3432
3533object Async :
34+
35+ /** The currently executing Async context */
3636 inline def current (using async : Async ): Async = async
37+
38+ /** An asynchronous data source. Sources can be persistent or ephemeral.
39+ * A persistent source will always return the same data to calls to `poll`
40+ * and pass the same data to calls of `handle`. An ephemeral source might pass new
41+ * data in every call. An example of a persistent source is `Future`. An
42+ * example of an ephemeral source is `Channel`.
43+ */
44+ trait Source [+ T ]:
45+
46+ /** Poll whether data is available
47+ * @return The data or None in an option. Depending on the nature of the
48+ * source, data might be returned only once in a poll. E.g. if
49+ * the source is a channel, a Some result might skip to the next
50+ * entry.
51+ */
52+ def poll : Option [T ]
53+
54+ /** When data is available, pass it to function `k`.
55+ */
56+ def handleWith (k : T => Unit ): Unit
57+
58+ end Source
59+
60+ /** A thread-like entity that can be cancelled */
61+ trait Runner :
62+
63+ /** The scheduler on which this computation is running */
64+ def scheduler : Scheduler
65+
66+ /** Cancel computation for this runner and all its children */
67+ def cancel (): Unit
68+
69+ /** Add a given child to this runner */
70+ def addChild (child : Runner ): Unit
71+ end Runner
72+
3773end Async
3874
39- class Future [+ T ](body : Async ?=> T )(using val scheduler : Scheduler ):
75+
76+ class Future [+ T ](body : Async ?=> T )(using val scheduler : Scheduler )
77+ extends Async .Source [Try [T ]], Async .Runner :
4078 import Future .{Status , Cancellation }, Status .*
4179
4280 @ volatile private var status : Status = Started
4381 private var result : Try [T ] = uninitialized
4482 private var waiting : ListBuffer [Try [T ] => Unit ] = ListBuffer ()
45- private var children : mutable.Set [Future [? ]] = mutable.Set ()
46-
47- private def addWaiting (k : Try [T ] => Unit ): Unit = synchronized :
48- if status == Completed then k(result)
49- else waiting += k
83+ private var children : mutable.Set [Async .Runner ] = mutable.Set ()
5084
5185 private def currentWaiting (): List [Try [T ] => Unit ] = synchronized :
5286 val ws = waiting.toList
5387 waiting.clear()
5488 ws
5589
56- private def currentChildren (): List [Future [ ? ] ] = synchronized :
90+ private def currentChildren (): List [Async . Runner ] = synchronized :
5791 val cs = children.toList
5892 children.clear()
5993 cs
6094
61- private def checkCancellation (): Unit =
62- if status == Cancelled then throw Cancellation ()
95+ def poll : Option [Try [T ]] = status match
96+ case Started => None
97+ case Completed => Some (result)
98+ case Cancelled => Some (Failure (Cancellation ()))
99+
100+ def handleWith (k : Try [T ] => Unit ): Unit = synchronized :
101+ if status == Completed then k(result)
102+ else waiting += k
103+
104+ /** Eventually stop computation of this future and fail with
105+ * a `Cancellation` exception. Also cancel all linked children.
106+ */
107+ def cancel (): Unit = synchronized :
108+ if status != Completed && status != Cancelled then
109+ status = Cancelled
110+ for f <- currentChildren() do f.cancel()
111+
112+ def addChild (child : Async .Runner ): Unit = synchronized :
113+ if status == Completed then child.cancel()
114+ else children += this
115+
116+ /** Links the future as a child to the current async client.
117+ * This means the future will be cancelled when the async client
118+ * completes.
119+ */
120+ def linked (using async : Async ): this .type = synchronized :
121+ if status != Completed then async.runner.addChild(this )
122+ this
63123
64124 /** Wait for this future to be completed, return its value in case of success,
65125 * or rethrow exception in case of failure.
66126 */
67127 def value (using async : Async ): T = async.await(this ).get
68128
129+ /** Block thread until future is completed and return result
130+ * N.B. This should be parameterized with a timeout.
131+ */
132+ def force (): T =
133+ while status != Completed do wait()
134+ result.get
135+
69136 // a handler for Async
70137 private def async (body : Async ?=> Unit ): Unit =
71138 boundary [Unit ]:
72139 given Async with
73-
74- private def resultOption [T ](f : Future [T ]): Option [Try [T ]] = f.status match
75- case Started =>
76- None
77- case Completed =>
78- Some (f.result)
79- case Cancelled =>
80- Some (Failure (Cancellation ()))
140+ private def checkCancellation (): Unit =
141+ if status == Cancelled then throw Cancellation ()
81142
82143 private inline def cancelChecked [T ](op : => T ): T =
83144 checkCancellation()
84145 val res = op
85146 checkCancellation()
86147 res
87148
88- def await [T ](f : Future [T ]): Try [ T ] =
149+ def await [T ](src : Async . Source [T ]): T =
89150 cancelChecked :
90- resultOption(f) .getOrElse:
91- suspend[Try [ T ] , Unit ]: s =>
92- f.addWaiting : result =>
151+ src.poll .getOrElse:
152+ suspend[T , Unit ]: k =>
153+ src.handleWith : result =>
93154 scheduler.schedule: () =>
94- s .resume(result)
155+ k .resume(result)
95156
96- def awaitEither [T1 , T2 ](f1 : Future [T1 ], f2 : Future [T2 ]): Either [Try [ T1 ], Try [ T2 ] ] =
157+ def awaitEither [T1 , T2 ](src1 : Async . Source [T1 ], src2 : Async . Source [T2 ]): Either [T1 , T2 ] =
97158 cancelChecked :
98- resultOption(f1) .map(Left (_)).getOrElse:
99- resultOption(f2) .map(Right (_)).getOrElse:
100- suspend[Either [Try [ T1 ], Try [ T2 ]] , Unit ]: s =>
159+ src1.poll .map(Left (_)).getOrElse:
160+ src2.poll .map(Right (_)).getOrElse:
161+ suspend[Either [T1 , T2 ], Unit ]: k =>
101162 var found = AtomicBoolean ()
102- f1.addWaiting : result =>
163+ src1.handleWith : result =>
103164 if ! found.getAndSet(true ) then
104165 scheduler.schedule: () =>
105- s .resume(Left (result))
106- f2.addWaiting : result =>
166+ k .resume(Left (result))
167+ src2.handleWith : result =>
107168 if ! found.getAndSet(true ) then
108169 scheduler.schedule: () =>
109- s .resume(Right (result))
170+ k .resume(Right (result))
110171
111- def client = Future .this
172+ def runner = Future .this
112173 end given
113174
114175 body
@@ -121,36 +182,9 @@ class Future[+T](body: Async ?=> T)(using val scheduler: Scheduler):
121182 catch case ex : Exception => Failure (ex)
122183 status = Completed
123184 for task <- currentWaiting() do task(result)
124- cancelChildren ()
185+ cancel ()
125186 notifyAll()
126187
127- /** Links the future as a child to the current async client.
128- * This means the future will be cancelled when the async client
129- * completes.
130- */
131- def linked (using async : Async ): this .type = synchronized :
132- if status != Completed then
133- async.client.children += this
134- this
135-
136- private def cancelChildren (): Unit =
137- for f <- currentChildren() do f.cancel()
138-
139- /** Eventually stop computation of this future and fail with
140- * a `Cancellation` exception. Also cancel all linked children.
141- */
142- def cancel (): Unit = synchronized :
143- if status != Completed && status != Cancelled then
144- status = Cancelled
145- cancelChildren()
146-
147- /** Block thread until future is completed and return result
148- * N.B. This should be parameterized with a timeout.
149- */
150- def force (): T =
151- while status != Completed do wait()
152- result.get
153-
154188 scheduler.schedule(() => complete())
155189end Future
156190
@@ -187,6 +221,30 @@ class Task[+T](val body: Async ?=> T):
187221 case Right (_ : Failure [? ]) => f1.value
188222end Task
189223
224+ /** An unbounded channel */
225+ class Channel [T ] extends Async .Source [T ]:
226+ private val pending = ListBuffer [T ]()
227+ private val waiting = ListBuffer [T => Unit ]()
228+ def send (x : T ): Unit = synchronized :
229+ if waiting.isEmpty then pending += x
230+ else
231+ val k = waiting.head
232+ waiting.dropInPlace(1 )
233+ k(x)
234+ def poll : Option [T ] = synchronized :
235+ if pending.isEmpty then None
236+ else
237+ val x = pending.head
238+ pending.dropInPlace(1 )
239+ Some (x)
240+ def handleWith (k : T => Unit ): Unit = synchronized :
241+ if pending.isEmpty then waiting += k
242+ else
243+ val x = pending.head
244+ pending.dropInPlace(1 )
245+ k(x)
246+ end Channel
247+
190248def Test (x : Future [Int ], xs : List [Future [Int ]])(using Scheduler ): Future [Int ] =
191249 Future :
192250 x.value + xs.map(_.value).sum
0 commit comments