diff --git a/src/fsharp/FSharp.Core/control.fs b/src/fsharp/FSharp.Core/control.fs index a42d43986c4..6be535a316e 100644 --- a/src/fsharp/FSharp.Core/control.fs +++ b/src/fsharp/FSharp.Core/control.fs @@ -329,21 +329,13 @@ namespace Microsoft.FSharp.Control - // Reify exceptional results as exceptions + /// Reify exceptional results as exceptions let commit res = match res with | Ok res -> res | Error edi -> edi.ThrowAny() | Canceled exn -> raise exn - // Reify exceptional results as exceptionsJIT 64 doesn't always take tailcalls correctly - - let commitWithPossibleTimeout res = - match res with - | None -> raise (System.TimeoutException()) - | Some res -> commit res - - //---------------------------------- // PRIMITIVE ASYNC INVOCATION @@ -713,11 +705,15 @@ namespace Microsoft.FSharp.Control [] type ResultCell<'T>() = let mutable result = None + // The continuations for the result let mutable savedConts : list> = [] + // The WaitHandle event for the result. Only created if needed, and set to null when disposed. let mutable resEvent = null + let mutable disposed = false + // All writers of result are protected by lock on syncRoot. let syncRoot = new Object() @@ -752,13 +748,11 @@ namespace Microsoft.FSharp.Control interface IDisposable with member x.Dispose() = x.Close() // ; System.GC.SuppressFinalize(x) - member x.GrabResult() = match result with | Some res -> res | None -> failwith "Unexpected no result" - /// Record the result in the ResultCell. member x.RegisterResult (res:'T, reuseThread) = let grabbedConts = @@ -795,7 +789,10 @@ namespace Microsoft.FSharp.Control member x.ResultAvailable = result.IsSome - member x.AwaitResult = + /// Await the result of a result cell, without a direct timeout or direct + /// cancellation. That is, the underlying computation must fill the result + /// if cancellation or timeout occurs. + member x.AwaitResult_NoDirectCancelOrTimeout = unprotectedPrimitive(fun args -> // Check if a result is available synchronously let resOpt = @@ -860,10 +857,10 @@ namespace Microsoft.FSharp.Control // If timeout is provided, we govern the async by our own CTS, to cancel // when execution times out. Otherwise, the user-supplied token governs the async. match timeout with - | None -> token,None - | Some _ -> - let subSource = new LinkedSubSource(token) - subSource.Token, Some subSource + | None -> token, None + | Some _ -> + let subSource = new LinkedSubSource(token) + subSource.Token, Some subSource use resultCell = new ResultCell>() queueAsync @@ -1252,7 +1249,8 @@ namespace Microsoft.FSharp.Control aux.econt edi ) - static member AwaitWaitHandle(waitHandle:WaitHandle,?millisecondsTimeout:int) = + /// Wait for a wait handle. Both timeout and cancellation are supported + static member AwaitWaitHandle(waitHandle: WaitHandle, ?millisecondsTimeout:int) = let millisecondsTimeout = defaultArg millisecondsTimeout Threading.Timeout.Infinite if millisecondsTimeout = 0 then async.Delay(fun () -> @@ -1312,61 +1310,61 @@ namespace Microsoft.FSharp.Control return! Async.AwaitWaitHandle(iar.AsyncWaitHandle, ?millisecondsTimeout=millisecondsTimeout) } - /// Await the result of a result cell without a timeout - static member ReifyResult(result:AsyncImplResult<'T>) : Async<'T> = + /// Bind the result of a result cell, calling the appropriate continuation. + static member BindResult(result: AsyncImplResult<'T>) : Async<'T> = unprotectedPrimitive(fun ({ aux = aux } as args) -> (match result with | Ok v -> args.cont v | Error exn -> aux.econt exn | Canceled exn -> aux.ccont exn) ) - /// Await the result of a result cell without a timeout - static member AwaitAndReifyResult(resultCell:ResultCell>) : Async<'T> = + /// Await and use the result of a result cell. The resulting async doesn't support cancellation + /// or timeout directly, rather the underlying computation must fill the result if cancellation + /// or timeout occurs. + static member AwaitAndBindResult_NoDirectCancelOrTimeout(resultCell: ResultCell>) : Async<'T> = async { - let! result = resultCell.AwaitResult - return! Async.ReifyResult(result) + let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout + return! Async.BindResult(result) } - - - /// Await the result of a result cell without a timeout - /// - /// Always resyncs to the synchronization context if needed, by virtue of it being built - /// from primitives which resync. - static member AsyncWaitAsyncWithTimeout(innerCTS : CancellationTokenSource, resultCell:ResultCell>,millisecondsTimeout) : Async<'T> = + /// Await the result of a result cell belonging to a child computation. The resulting async supports timeout and if + /// it happens the child computation will be cancelled. The resulting async doesn't support cancellation + /// directly, rather the underlying computation must fill the result if cancellation occurs. + static member AwaitAndBindChildResult(innerCTS: CancellationTokenSource, resultCell: ResultCell>, millisecondsTimeout) : Async<'T> = match millisecondsTimeout with | None | Some -1 -> - resultCell |> Async.AwaitAndReifyResult + resultCell |> Async.AwaitAndBindResult_NoDirectCancelOrTimeout | Some 0 -> async { if resultCell.ResultAvailable then return commit (resultCell.GrabResult()) else - return commitWithPossibleTimeout None } + return raise (System.TimeoutException()) } | _ -> async { try if resultCell.ResultAvailable then return commit (resultCell.GrabResult()) else - let! ok = Async.AwaitWaitHandle (resultCell.GetWaitHandle(),?millisecondsTimeout=millisecondsTimeout) + let! ok = Async.AwaitWaitHandle (resultCell.GetWaitHandle(), ?millisecondsTimeout=millisecondsTimeout) if ok then - return commitWithPossibleTimeout (Some (resultCell.GrabResult())) + return commit (resultCell.GrabResult()) else // timed out // issue cancellation signal innerCTS.Cancel() // wait for computation to quiesce let! _ = Async.AwaitWaitHandle (resultCell.GetWaitHandle()) - return commitWithPossibleTimeout None + return raise (System.TimeoutException()) finally resultCell.Close() } - static member FromBeginEnd(beginAction,endAction,?cancelAction): Async<'T> = + static member FromBeginEnd(beginAction, endAction, ?cancelAction): Async<'T> = async { let! cancellationToken = getCancellationToken() let resultCell = new ResultCell<_>() let once = Once() let registration : CancellationTokenRegistration = + let onCancel (_:obj) = // Call the cancellation routine match cancelAction with @@ -1381,7 +1379,9 @@ namespace Microsoft.FSharp.Control // If we get an exception from a cooperative cancellation function // we assume the operation has already completed. try cancel() with _ -> () + cancellationToken.Register(Action(onCancel), null) + let callback = new System.AsyncCallback(fun iar -> if not iar.CompletedSynchronously then @@ -1405,15 +1405,15 @@ namespace Microsoft.FSharp.Control // ResultCell allows a race and throws away whichever comes last. resultCell.RegisterResult(res,reuseThread=true) |> unfake else ()) - - let (iar:IAsyncResult) = beginAction (callback,(null:obj)) if iar.CompletedSynchronously then registration.Dispose() return endAction iar else - return! Async.AwaitAndReifyResult(resultCell) } + // Note: ok to use "NoDirectCancel" here because cancellation has been registered above + // Note: ok to use "NoDirectTimeout" here because no timeout parameter to this method + return! Async.AwaitAndBindResult_NoDirectCancelOrTimeout(resultCell) } static member FromBeginEnd(arg,beginAction,endAction,?cancelAction): Async<'T> = @@ -1567,7 +1567,9 @@ namespace Microsoft.FSharp.Control event.AddHandler(del) // Return the async computation that allows us to await the result - return! Async.AwaitAndReifyResult(resultCell) } + // Note: ok to use "NoDirectCancel" here because cancellation has been registered above + // Note: ok to use "NoDirectTimeout" here because no timeout parameter to this method + return! Async.AwaitAndBindResult_NoDirectCancelOrTimeout(resultCell) } type Async with static member Ignore (computation: Async<'T>) = bindA computation (fun _ -> doneA) @@ -1597,7 +1599,7 @@ namespace Microsoft.FSharp.Control computation |> unfake - return Async.AsyncWaitAsyncWithTimeout(innerCTS, resultCell,millisecondsTimeout) } + return Async.AwaitAndBindChildResult(innerCTS, resultCell, millisecondsTimeout) } static member SwitchToContext syncContext = async { match syncContext with @@ -1681,10 +1683,6 @@ namespace Microsoft.FSharp.Control Async.FromBeginEnd (buffer,offset,count,stream.BeginWrite,stream.EndWrite) #endif - type System.Threading.WaitHandle with - member waitHandle.AsyncWaitOne(?millisecondsTimeout:int) = // only used internally, not a public API - Async.AwaitWaitHandle(waitHandle,?millisecondsTimeout=millisecondsTimeout) - type IObservable<'Args> with [] // give the extension member a 'nice', unmangled compiled name, unique within this module @@ -1715,7 +1713,7 @@ namespace Microsoft.FSharp.Control | :? System.Net.WebException as webExn when webExn.Status = System.Net.WebExceptionStatus.RequestCanceled && !canceled -> - Async.ReifyResult(AsyncImplResult.Canceled (OperationCanceledException webExn.Message)) + Async.BindResult(AsyncImplResult.Canceled (OperationCanceledException webExn.Message)) | _ -> edi.ThrowAny()) @@ -1791,7 +1789,10 @@ namespace Microsoft.FSharp.Control ) start a1 Choice1Of2 start a2 Choice2Of2 - let! result = c.AwaitResult + // Note: It is ok to use "NoDirectCancel" here because the started computations use the same + // cancellation token and will register a cancelled result if cancellation occurs. + // Note: It is ok to use "NoDirectTimeout" here because there is no specific timeout log to this routine. + let! result = c.AwaitResult_NoDirectCancelOrTimeout return! reify result } let timeout msec cancellationToken = @@ -1805,7 +1806,10 @@ namespace Microsoft.FSharp.Control exceptionContinuation=ignore, cancellationContinuation=ignore, cancellationToken = cancellationToken) - c.AwaitResult + // Note: It is ok to use "NoDirectCancel" here because the started computations use the same + // cancellation token and will register a cancelled result if cancellation occurs. + // Note: It is ok to use "NoDirectTimeout" here because the child compuation above looks after the timeout. + c.AwaitResult_NoDirectCancelOrTimeout [] [] @@ -1854,7 +1858,7 @@ namespace Microsoft.FSharp.Control failwith "multiple waiting reader continuations for mailbox") let waitOneWithCancellation(timeout) = - ensurePulse().AsyncWaitOne(millisecondsTimeout=timeout) + Async.AwaitWaitHandle(ensurePulse(), millisecondsTimeout=timeout) let waitOne(timeout) = if timeout < 0 && not cancellationSupported then @@ -2125,36 +2129,34 @@ namespace Microsoft.FSharp.Control let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> // Note the ResultCell may have been disposed if the operation // timed out. In this case RegisterResult drops the result on the floor. - resultCell.RegisterResult(reply,reuseThread=false) |> unfake)) + resultCell.RegisterResult(reply, reuseThread=false) |> unfake)) mailbox.Post(msg) match timeout with - | Threading.Timeout.Infinite -> - async { let! result = resultCell.AwaitResult - return Some(result) - } + | Threading.Timeout.Infinite when not cancellationSupported -> + async { let! result = resultCell.AwaitResult_NoDirectCancelOrTimeout + return Some result } - | _ -> - async { use _disposeCell = resultCell - let! ok = resultCell.GetWaitHandle().AsyncWaitOne(millisecondsTimeout=timeout) - let res = (if ok then Some(resultCell.GrabResult()) else None) - return res } + | _ -> + async { use _disposeCell = resultCell + let! ok = Async.AwaitWaitHandle(resultCell.GetWaitHandle(), millisecondsTimeout=timeout) + let res = (if ok then Some(resultCell.GrabResult()) else None) + return res } member x.PostAndAsyncReply(buildMessage, ?timeout:int) = let timeout = defaultArg timeout defaultTimeout match timeout with - | Threading.Timeout.Infinite -> - // Nothing to dispose, no wait handles used - let resultCell = new ResultCell<_>() - let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply,reuseThread=false) |> unfake)) - mailbox.Post(msg) - resultCell.AwaitResult - | _ -> - let asyncReply = x.PostAndTryAsyncReply(buildMessage,timeout=timeout) - async { let! res = asyncReply - match res with - | None -> return! raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut))) - | Some res -> return res - } + | Threading.Timeout.Infinite when not cancellationSupported -> + // Nothing to dispose, no wait handles used + let resultCell = new ResultCell<_>() + let msg = buildMessage (new AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply,reuseThread=false) |> unfake)) + mailbox.Post(msg) + resultCell.AwaitResult_NoDirectCancelOrTimeout + | _ -> + let asyncReply = x.PostAndTryAsyncReply(buildMessage,timeout=timeout) + async { let! res = asyncReply + match res with + | None -> return! raise (TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut))) + | Some res -> return res } member x.Receive(?timeout) = mailbox.Receive(timeout=defaultArg timeout defaultTimeout) member x.TryReceive(?timeout) = mailbox.TryReceive(timeout=defaultArg timeout defaultTimeout) diff --git a/tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/Cancellation.fs b/tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/Cancellation.fs index 54720b29d7c..47f241f2462 100644 --- a/tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/Cancellation.fs +++ b/tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/Cancellation.fs @@ -306,3 +306,5 @@ type CancellationType() = Assert.IsFalse((r1a <> r1a')) Assert.IsTrue((r1a <> r1b)) Assert.IsTrue((r1a <> r2)) + + diff --git a/tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/MailboxProcessorType.fs b/tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/MailboxProcessorType.fs index 74b0abaf291..de56a4b1455 100644 --- a/tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/MailboxProcessorType.fs +++ b/tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/MailboxProcessorType.fs @@ -292,3 +292,43 @@ type MailboxProcessorType() = test() + [] + member this.PostAndAsyncReply_Cancellation() = + + use cancel = new CancellationTokenSource(500) + let mutable gotGood = false + let mutable gotBad = false + + let goodAsync = async { + try + for i in Seq.initInfinite (fun i -> i) do + if i % 10000000 = 0 then + printfn "good async working..." + finally + printfn "good async exited - that's what we want" + gotGood <- true + } + + let badAsync (mbox:MailboxProcessor>) = async { + try + printfn "bad async working..." + let! result = mbox.PostAndAsyncReply id // <- got stuck in here forever + printfn "%d" result + finally + printfn "bad async exited - that's what we want" // <- we never got here + gotBad <- true + } + + let mbox = MailboxProcessor.Start(fun inbox -> async { + let! (reply : AsyncReplyChannel) = inbox.Receive() + do! Async.Sleep 1000000 + reply.Reply (200) + }, cancel.Token) + + [goodAsync; badAsync mbox] + |> Async.Parallel + |> Async.Ignore + |> fun x -> Async.Start(x, cancel.Token) + System.Threading.Thread.Sleep(1000) // cancellation after 500 + if not gotGood || not gotBad then + failwith "Exected both good and bad async's to be cancelled afteMailbox should not fail!"