Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions src/FSharp.Core/mailbox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators
open Microsoft.FSharp.Control
open Microsoft.FSharp.Control.AsyncBuilderImpl
open Microsoft.FSharp.Control.AsyncPrimitives
open Microsoft.FSharp.Collections

module AsyncHelpers =

Expand Down Expand Up @@ -370,7 +369,10 @@ type MailboxProcessor<'Msg>(body, ?cancellationToken) =
member _.UnsafeMessageQueueContents = mailbox.UnsafeContents
#endif

member x.Start() =
/// Mark the agent as started if not started and return the asynchronous computation to be started.
/// If the agent has already been started, then throw an exception. Because the agent is internally
/// marked as started in this method, be sure to subsequently start the agent after calling this method.
member private x.PrepareToStart() =
if started then
raise (new InvalidOperationException(SR.GetString(SR.mailboxProcessorAlreadyStarted)))
else
Expand All @@ -379,15 +381,20 @@ type MailboxProcessor<'Msg>(body, ?cancellationToken) =
// Protect the execution and send errors to the event.
// Note that exception stack traces are lost in this design - in an extended design
// the event could propagate an ExceptionDispatchInfo instead of an Exception.
let p =
async {
try
do! body x
with exn ->
errorEvent.Trigger exn
}
async {
try
do! body x
with exn ->
errorEvent.Trigger exn
}

member x.Start() =
let p = x.PrepareToStart()
Async.Start(computation = p, cancellationToken = cancellationToken)

Async.Start(computation = p, cancellationToken = cancellationToken)
member x.StartImmediate() =
let p = x.PrepareToStart()
Async.StartImmediate(computation = p, cancellationToken = cancellationToken)

member _.Post message =
mailbox.Post message
Expand Down Expand Up @@ -498,3 +505,10 @@ type MailboxProcessor<'Msg>(body, ?cancellationToken) =

mailboxProcessor.Start()
mailboxProcessor

static member StartImmediate(body, ?cancellationToken) =
let mailboxProcessor =
new MailboxProcessor<'Msg>(body, ?cancellationToken = cancellationToken)

mailboxProcessor.StartImmediate()
mailboxProcessor
19 changes: 19 additions & 0 deletions src/FSharp.Core/mailbox.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ type MailboxProcessor<'Msg> =
static member Start:
body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg>

/// <summary>Creates and starts an agent immediately on the current operating system thread. The <c>body</c>
/// function is used to generate the asynchronous computation executed by the agent.</summary>
///
/// <param name="body">The function to produce an asynchronous computation that will be executed
/// as the read loop for the MailboxProcessor when StartImmediately is called.</param>
/// <param name="cancellationToken">An optional cancellation token for the <c>body</c>.
/// Defaults to <c>Async.DefaultCancellationToken</c>.</param>
///
/// <returns>The created MailboxProcessor.</returns>
///
/// <example-tbd></example-tbd>
static member StartImmediate:
body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg>

/// <summary>Posts a message to the message queue of the MailboxProcessor, asynchronously.</summary>
///
/// <param name="message">The message to post.</param>
Expand Down Expand Up @@ -196,6 +210,11 @@ type MailboxProcessor<'Msg> =
/// <example-tbd></example-tbd>
member Start: unit -> unit

/// <summary>Starts the agent immediately on the current operating system thread.</summary>
///
/// <example-tbd></example-tbd>
member StartImmediate: unit -> unit

/// <summary>Raises a timeout exception if a message not received in this amount of time. By default
/// no timeout is used.</summary>
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,12 +672,14 @@ Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Contro
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Control.FSharpAsync`1[T] Scan[T](Microsoft.FSharp.Core.FSharpFunc`2[TMsg,Microsoft.FSharp.Core.FSharpOption`1[Microsoft.FSharp.Control.FSharpAsync`1[T]]], Microsoft.FSharp.Core.FSharpOption`1[System.Int32])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Control.FSharpHandler`1[System.Exception] Error
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg] Start(Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg],Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit]], Microsoft.FSharp.Core.FSharpOption`1[System.Threading.CancellationToken])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg] StartImmediate(Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg],Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit]], Microsoft.FSharp.Core.FSharpOption`1[System.Threading.CancellationToken])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Core.FSharpOption`1[TReply] TryPostAndReply[TReply](Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpAsyncReplyChannel`1[TReply],TMsg], Microsoft.FSharp.Core.FSharpOption`1[System.Int32])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: TReply PostAndReply[TReply](Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpAsyncReplyChannel`1[TReply],TMsg], Microsoft.FSharp.Core.FSharpOption`1[System.Int32])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void .ctor(Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg],Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit]], Microsoft.FSharp.Core.FSharpOption`1[System.Threading.CancellationToken])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void Dispose()
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void Post(TMsg)
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void Start()
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void StartImmediate()
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void add_Error(Microsoft.FSharp.Control.FSharpHandler`1[System.Exception])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void remove_Error(Microsoft.FSharp.Control.FSharpHandler`1[System.Exception])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void set_DefaultTimeout(Int32)
Expand Down Expand Up @@ -2539,4 +2541,4 @@ Microsoft.FSharp.Reflection.UnionCaseInfo: System.String Name
Microsoft.FSharp.Reflection.UnionCaseInfo: System.String ToString()
Microsoft.FSharp.Reflection.UnionCaseInfo: System.String get_Name()
Microsoft.FSharp.Reflection.UnionCaseInfo: System.Type DeclaringType
Microsoft.FSharp.Reflection.UnionCaseInfo: System.Type get_DeclaringType()
Microsoft.FSharp.Reflection.UnionCaseInfo: System.Type get_DeclaringType()
Original file line number Diff line number Diff line change
Expand Up @@ -672,12 +672,14 @@ Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Contro
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Control.FSharpAsync`1[T] Scan[T](Microsoft.FSharp.Core.FSharpFunc`2[TMsg,Microsoft.FSharp.Core.FSharpOption`1[Microsoft.FSharp.Control.FSharpAsync`1[T]]], Microsoft.FSharp.Core.FSharpOption`1[System.Int32])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Control.FSharpHandler`1[System.Exception] Error
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg] Start(Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg],Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit]], Microsoft.FSharp.Core.FSharpOption`1[System.Threading.CancellationToken])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg] StartImmediate(Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg],Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit]], Microsoft.FSharp.Core.FSharpOption`1[System.Threading.CancellationToken])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Microsoft.FSharp.Core.FSharpOption`1[TReply] TryPostAndReply[TReply](Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpAsyncReplyChannel`1[TReply],TMsg], Microsoft.FSharp.Core.FSharpOption`1[System.Int32])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: TReply PostAndReply[TReply](Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpAsyncReplyChannel`1[TReply],TMsg], Microsoft.FSharp.Core.FSharpOption`1[System.Int32])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void .ctor(Microsoft.FSharp.Core.FSharpFunc`2[Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg],Microsoft.FSharp.Control.FSharpAsync`1[Microsoft.FSharp.Core.Unit]], Microsoft.FSharp.Core.FSharpOption`1[System.Threading.CancellationToken])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void Dispose()
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void Post(TMsg)
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void Start()
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void StartImmediate()
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void add_Error(Microsoft.FSharp.Control.FSharpHandler`1[System.Exception])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void remove_Error(Microsoft.FSharp.Control.FSharpHandler`1[System.Exception])
Microsoft.FSharp.Control.FSharpMailboxProcessor`1[TMsg]: Void set_DefaultTimeout(Int32)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ type Message =
| Fetch of AsyncReplyChannel<int>
| Reset

/// Bundles thread information into a type for testing StartImmediate
type StartImmediateThreadInfo =
{ Id: int
Name: string }

/// MailboxProcessor message type for testing StartImmediate
type StartImmediateMessage =
| GetThreadInfo of AsyncReplyChannel<StartImmediateThreadInfo>

type MailboxProcessorType() =

let getSimpleMailbox() =
Expand Down Expand Up @@ -356,3 +365,46 @@ type MailboxProcessorType() =
System.Threading.Thread.Sleep(5000) // cancellation after 500 pause for 5 seconds
if not gotGood || not gotBad then
failwith <| sprintf "Exected both good and bad async's to be cancelled afteMailbox should not fail! gotGood: %A, gotBad: %A" gotGood gotBad

[<Fact>]
member this.StartImmediateStartsOnCurrentThread() =
/// Gets the current thread's ID and name
let getThreadInfo () =
let currentThread = Thread.CurrentThread

{ Id = currentThread.ManagedThreadId
Name = currentThread.Name }

// Although the ManagedThreadId should be unique, go ahead and set the calling thread
// name to something specific prior to starting the MailboxProcessor
Thread.CurrentThread.Name <- "This is the thread that starts the MailboxProcessor"

// Capture the ID and name of the calling thread
let callingThreadInfo = getThreadInfo ()

// Start a MailboxProcessor with StartImmediate and have it wait for a single message
// requesting the information of the thread that it is running on
let mailbox = MailboxProcessor<StartImmediateMessage>.StartImmediate(fun inbox -> async{
// Get the MailboxProcessor's thread immediately after starting it
let threadInfo = getThreadInfo ()

// Block until a single message is received
let! message = inbox.Receive()

// Because the let! forces the asynchronous call Receive, the MailboxProcessor
// is not guaranteed to be on the same thread that it started on afterwards.
// In other words, placing getThreadInfo here or afterward will cause this test
// to fail.

// Reply with the MailboxProcessor's thread information
match message with
| GetThreadInfo reply -> reply.Reply threadInfo
})

// Get the MailboxProcessor's thread information
let mailboxThreadInfo = mailbox.PostAndReply GetThreadInfo

// Compare the calling thread information to that of the MailboxProcessor.
// If StartImmediate worked correctly, the information should be identical since
// the threads should be the same.
Assert.Equal(callingThreadInfo, mailboxThreadInfo)