Skip to content

Commit

Permalink
Add and use Async.AsTask
Browse files Browse the repository at this point in the history
  • Loading branch information
gusty committed Feb 12, 2024
1 parent 2091c9e commit 1166af9
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 52 deletions.
5 changes: 3 additions & 2 deletions src/FSharpPlus/Control/Comonad.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ open System.Runtime.InteropServices
open System.Threading.Tasks

open FSharpPlus
open FSharpPlus.Extensions
open FSharpPlus.Internals
#if !FABLE_COMPILER4

// Comonad class ----------------------------------------------------------

type Extract =
static member Extract (x: Async<'T> ) =
static member Extract (x: Async<'T>) =
#if FABLE_COMPILER_3 || FABLE_COMPILER_4
Async.RunSynchronously x
#else
Async.StartImmediateAsTask(x).Result
Async.AsTask(x).Result
#endif
static member Extract (x: Lazy<'T> ) = x.Value
static member Extract ((_: 'W, a: 'T) ) = a
Expand Down
7 changes: 4 additions & 3 deletions src/FSharpPlus/Control/Functor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ open Microsoft.FSharp.Quotations
open FSharpPlus.Internals
open FSharpPlus.Internals.Prelude
open FSharpPlus
open FSharpPlus.Extensions
open FSharpPlus.Data

#if (!FABLE_COMPILER || FABLE_COMPILER_3) && ! FABLE_COMPILER_4
Expand All @@ -21,7 +22,7 @@ type Iterate =
static member Iterate (x: Lazy<'T> , action) = action x.Value : unit
static member Iterate (x: seq<'T> , action) = Seq.iter action x
static member Iterate (x: option<'T> , action) = match x with Some x -> action x | _ -> ()
static member Iterate (x: voption<'T> , action) = match x with ValueSome x -> action x | _ -> ()
static member Iterate (x: voption<'T>, action) = match x with ValueSome x -> action x | _ -> ()
static member Iterate (x: list<'T> , action) = List.iter action x
static member Iterate ((_: 'W, a: 'T), action) = action a :unit
static member Iterate (x: 'T [] , action) = Array.iter action x
Expand All @@ -35,10 +36,10 @@ type Iterate =
for l = 0 to Array4D.length4 x - 1 do
action x.[i,j,k,l]
#endif
#if !FABLE_COMPILER
#if FABLE_COMPILER
static member Iterate (x: Async<'T> , action) = action (Async.RunSynchronously x) : unit
#else
static member Iterate (x: Async<'T> , action) = action (x |> Async.Ignore |> Async.StartImmediate) : unit
static member Iterate (x: Async<'T> , action: 'T -> unit) = (x |> Async.map action |> Async.AsTask).Wait ()
#endif
static member Iterate (x: Result<'T, 'E> , action) = match x with Ok x -> action x | _ -> ()
static member Iterate (x: Choice<'T, 'E> , action) = match x with Choice1Of2 x -> action x | _ -> ()
Expand Down
8 changes: 4 additions & 4 deletions src/FSharpPlus/Control/Traversable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type Traverse =
return seq {
use enum = t.GetEnumerator ()
while enum.MoveNext() do
yield Async.RunSynchronously (f enum.Current, cancellationToken = ct) }}
yield Async.AsTask(f enum.Current, cancellationToken = ct).Result }}
#endif

#if !FABLE_COMPILER
Expand All @@ -102,7 +102,7 @@ type Traverse =
return seq {
use enum = t.GetEnumerator ()
while enum.MoveNext() do
yield Async.RunSynchronously (f enum.Current, cancellationToken = ct) } |> NonEmptySeq.unsafeOfSeq }
yield Async.AsTask(f enum.Current, cancellationToken = ct).Result } |> NonEmptySeq.unsafeOfSeq }
#endif

static member Traverse (t: Id<'t>, f: 't -> option<'u>, [<Optional>]_output: option<Id<'u>>, [<Optional>]_impl: Traverse) =
Expand Down Expand Up @@ -332,7 +332,7 @@ type Gather =
return seq {
use enum = t.GetEnumerator ()
while enum.MoveNext() do
yield Async.RunSynchronously (f enum.Current, cancellationToken = ct) }}
yield Async.AsTask(f enum.Current, cancellationToken = ct).Result }}
#endif

#if !FABLE_COMPILER
Expand All @@ -345,7 +345,7 @@ type Gather =
return seq {
use enum = t.GetEnumerator ()
while enum.MoveNext() do
yield Async.RunSynchronously (f enum.Current, cancellationToken = ct) } |> NonEmptySeq.unsafeOfSeq }
yield Async.AsTask(f enum.Current, cancellationToken = ct).Result } |> NonEmptySeq.unsafeOfSeq }
#endif

static member Gather (t: Id<'t>, f: 't -> option<'u>, [<Optional>]_output: option<Id<'u>>, [<Optional>]_impl: Gather) =
Expand Down
24 changes: 5 additions & 19 deletions src/FSharpPlus/Extensions/Async.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,6 @@ module Async =
open System
open System.Threading.Tasks

#if !FABLE_COMPILER
// Proper Async.StartImmediateAsTask implementation, without aggregate exception wrapping.
let private startImmediateAsTask (computation: Async<'T>, cancellationToken) : Task<'T> =
let ts = TaskCompletionSource<'T> ()
Async.StartWithContinuations (
computation,
ts.SetResult,
(function
| :? AggregateException as agg -> ts.SetException agg.InnerExceptions
| exn -> ts.SetException exn),
(fun _ -> ts.SetCanceled ()),
cancellationToken)
ts.Task
#endif
open FSharpPlus.Extensions

/// <summary>Creates an async workflow from another workflow 'x', mapping its result with 'f'.</summary>
Expand Down Expand Up @@ -61,8 +47,8 @@ module Async =
#else
let map2 mapper (async1: Async<'T1>) (async2: Async<'T2>) : Async<'U> = async {
let! ct = Async.CancellationToken
let t1 = startImmediateAsTask (async1, ct)
let t2 = startImmediateAsTask (async2, ct)
let t1 = Async.AsTask (async1, ct)
let t2 = Async.AsTask (async2, ct)
return! Async.Await (Task.map2 mapper t1 t2) }
#endif

Expand All @@ -80,9 +66,9 @@ module Async =
#else
let map3 mapper (async1: Async<'T1>) (async2: Async<'T2>) (async3: Async<'T3>) : Async<'U> = async {
let! ct = Async.CancellationToken
let t1 = startImmediateAsTask (async1, ct)
let t2 = startImmediateAsTask (async2, ct)
let t3 = startImmediateAsTask (async3, ct)
let t1 = Async.AsTask (async1, ct)
let t2 = Async.AsTask (async2, ct)
let t3 = Async.AsTask (async3, ct)
return! Async.Await (Task.map3 mapper t1 t2 t3) }
#endif

Expand Down
5 changes: 1 addition & 4 deletions src/FSharpPlus/Extensions/AsyncEnumerable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ module AsyncEnumerable =
let toAsyncSeq (source: Collections.Generic.IAsyncEnumerable<_>) : SeqT<Async<_>, _> = monad.plus {
let! ct = SeqT.lift Async.CancellationToken
let e = source.GetAsyncEnumerator ct
use _ =
{ new IDisposable with
member _.Dispose () =
e.DisposeAsync().AsTask () |> Async.Await |> Async.RunSynchronously }
use _ = { new IDisposable with member _.Dispose () = e.DisposeAsync().AsTask().Wait () }

let mutable currentResult = true
while currentResult do
Expand Down
56 changes: 51 additions & 5 deletions src/FSharpPlus/Extensions/Extensions.fs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,55 @@ module Extensions =

#if !FABLE_COMPILER

/// <summary>Runs an asynchronous computation, starting immediately on the current operating system
/// thread, but also returns the execution as <see cref="T:System.Threading.Tasks.Task`1"/>
/// This behaves exactly like Async.StartImmediateAsTask but without unexpected exceptions-wrapping.
/// </summary>
///
/// <remarks>If no cancellation token is provided then the default cancellation token is used.
/// You may prefer using this method if you want to achive a similar behviour to async await in C# as
/// async computation starts on the current thread with an ability to return a result.
/// </remarks>
///
/// <param name="computation">The asynchronous computation to execute.</param>
/// <param name="cancellationToken">The <c>CancellationToken</c> to associate with the computation.
/// The default is used if this parameter is not provided.</param>
///
/// <returns>A <see cref="T:System.Threading.Tasks.Task"/> that will be completed
/// in the corresponding state once the computation terminates (produces the result, throws exception or gets canceled)</returns>
///
/// <category index="0">FSharp.Core Extensions</category>
///
/// <example id="as-task-1">
/// <code lang="fsharp">
/// printfn "A"
///
/// let t =
/// async {
/// printfn "B"
/// do! Async.Sleep(1000)
/// printfn "C"
/// } |> Async.AsTask
///
/// printfn "D"
/// t.Wait()
/// printfn "E"
/// </code>
/// Prints "A", "B", "D" immediately, then "C", "E" in 1 second.
/// </example>
static member AsTask (computation: Async<'T>, ?cancellationToken) : Task<'T> =
let cancellationToken = defaultArg cancellationToken (new CancellationToken ())
let ts = TaskCompletionSource<'T> ()
Async.StartWithContinuations (
computation,
ts.SetResult,
(function
| :? AggregateException as agg -> ts.SetException agg.InnerExceptions
| exn -> ts.SetException exn),
(fun _ -> ts.SetCanceled ()),
cancellationToken)
ts.Task

// See https://github.com/fsharp/fslang-suggestions/issues/840

/// <summary>Return an asynchronous computation that will wait for the given task to complete and return
Expand Down Expand Up @@ -126,12 +175,9 @@ module Extensions =


/// Combine all asyncs in one, chaining them in sequence order.
static member Sequence (t:seq<Async<_>>) : Async<seq<_>> = async {
let startImmediateAsTask ct a =
Async.StartImmediateAsTask(a, ct).Result

static member Sequence (t: seq<Async<'T>>) : Async<seq<_>> = async {
let! ct = Async.CancellationToken
return t |> Seq.map (startImmediateAsTask ct) }
return Seq.map (fun t -> Async.AsTask(t, ct).Result) t }
#endif

/// Combine all asyncs in one, chaining them in sequence order.
Expand Down
17 changes: 2 additions & 15 deletions tests/FSharpPlus.Tests/Asyncs.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,12 @@ module Async =
exception TestException of string

type Async with
static member StartImmediateAsTaskCorrect (computation: Async<'T>, ?cancellationToken) : Task<'T> =
let cancellationToken = defaultArg cancellationToken (new CancellationToken ())
let ts = TaskCompletionSource<'T> ()
Async.StartWithContinuations (
computation,
ts.SetResult,
(function
| :? AggregateException as agg -> ts.SetException agg.InnerExceptions
| exn -> ts.SetException exn),
(fun _ -> ts.SetCanceled ()),
cancellationToken)
ts.Task

static member AsTaskAndWait computation =
let t = Async.StartImmediateAsTaskCorrect computation
let t = Async.AsTask computation
Task.WaitAny t |> ignore
t

static member WhenAll (source: Async<'T> seq) = source |> Seq.map (fun x -> Async.StartImmediateAsTaskCorrect x) |> Task.WhenAll |> Async.Await
static member WhenAll (source: Async<'T> seq) = source |> Seq.map (fun x -> Async.AsTask x) |> Task.WhenAll |> Async.Await


module AsyncTests =
Expand Down

0 comments on commit 1166af9

Please sign in to comment.