#!meta {"kernelInfo":{"defaultKernelName":"spiral","items":[{"aliases":[],"name":"spiral"}]}} #!markdown # Async (Polyglot) #!fsharp #!import ../../lib/fsharp/Notebooks.dib #!import ../../lib/fsharp/Testing.dib #!fsharp #!import ../../lib/fsharp/Common.fs #!fsharp #if !INTERACTIVE open Lib #endif #!fsharp open Common #!markdown ## choice #!fsharp let inline choice asyncs = async { let e = Event<_> () use cts = new System.Threading.CancellationTokenSource () let fn = asyncs |> Seq.map (fun a -> async { let! x = a e.Trigger x }) |> Async.Parallel |> Async.Ignore Async.Start (fn, cts.Token) let! result = Async.AwaitEvent e.Publish cts.Cancel () return result } #!markdown ## map #!fsharp let inline map fn a = async { let! x = a return fn x } #!markdown ## runWithTimeoutChoiceAsync #!fsharp let inline runWithTimeoutChoiceAsync (timeout : int) fn = let _locals () = $"timeout: {timeout} / {_locals ()}" let timeoutTask = async { do! Async.Sleep timeout trace Debug (fun () -> "runWithTimeoutChoiceAsync") _locals return None } let task = async { try let! result = fn return Some result with | :? System.AggregateException as ex when ex.InnerExceptions |> Seq.exists (function :? System.Threading.Tasks.TaskCanceledException -> true | _ -> false) -> trace Warning (fun () -> "runWithTimeoutChoiceAsync") (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}") return None | ex -> trace Critical (fun () -> "runWithTimeoutChoiceAsync") (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}") return None } [ timeoutTask; task ] |> choice #!fsharp let inline runWithTimeoutChoice timeout fn = fn |> runWithTimeoutChoiceAsync timeout |> Async.RunSynchronously #!fsharp //// test Async.Sleep 120 |> runWithTimeoutChoice 10 |> _assertEqual None #!fsharp //// test Async.Sleep 10 |> runWithTimeoutChoice 60 |> _assertEqual (Some ()) #!fsharp //// test async { raise (exn "error") } |> runWithTimeoutChoice 60 |> _assertEqual None #!markdown ## catch #!fsharp let inline catch a = a |> Async.Catch |> map (function | Choice1Of2 result -> Ok result | Choice2Of2 ex -> Error ex ) #!markdown ## runWithTimeoutAsync #!fsharp let inline runWithTimeoutAsync (timeout : int) fn = async { let _locals () = $"timeout: {timeout} / {_locals ()}" let! child = Async.StartChild (fn, timeout) return! child |> catch |> map (function | Ok result -> Some result | Error (:? System.TimeoutException as ex) -> trace Debug (fun () -> $"Async.runWithTimeoutAsync") _locals None | Error ex -> trace Critical (fun () -> $"Async.runWithTimeoutAsync** / ex: %A{ex}") _locals None ) } #!fsharp let inline runWithTimeout timeout fn = fn |> runWithTimeoutAsync timeout |> Async.RunSynchronously #!fsharp //// test Async.Sleep 60 |> runWithTimeout 10 |> _assertEqual None #!fsharp //// test Async.Sleep 10 |> runWithTimeout 60 |> _assertEqual (Some ()) #!fsharp //// test async { raise (exn "error") } |> runWithTimeout 60 |> _assertEqual None #!markdown ## runWithTimeoutStrict #!fsharp let inline runWithTimeoutStrict (timeout : int) fn = let _locals () = $"timeout: {timeout} / {_locals ()}" let timeoutTask = async { do! Async.Sleep timeout return None, _locals } let task = async { try return Async.RunSynchronously (fn, timeout) |> Some, _locals with | :? System.TimeoutException as ex -> let _locals () = $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}" return None, _locals | ex -> trace Critical (fun () -> "Async.runWithTimeoutStrict / async error") (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}") return raise ex } try [| timeoutTask; task |] |> Array.map Async.StartAsTask |> System.Threading.Tasks.Task.WhenAny |> fun task -> match task.Result.Result with | None, _locals -> trace Debug (fun () -> "runWithTimeoutStrict") _locals None | result, _ -> result with | :? System.AggregateException as ex when ex.InnerExceptions |> Seq.exists (function :? System.Threading.Tasks.TaskCanceledException -> true | _ -> false) -> trace Warning (fun () -> "Async.runWithTimeoutStrict") (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}") None | ex -> trace Critical (fun () -> "Async.runWithTimeoutStrict / task error") (fun () -> $"ex: {ex |> SpiralSm.format_exception} / {_locals ()}") None #!fsharp //// test Async.Sleep 60 |> runWithTimeoutStrict 10 |> _assertEqual None #!fsharp //// test Async.Sleep 10 |> runWithTimeoutStrict 60 |> _assertEqual (Some ()) #!fsharp //// test async { raise (exn "error") } |> runWithTimeoutStrict 60 |> _assertEqual None #!markdown ## awaitValueTask #!fsharp let inline awaitValueTaskUnit (task : System.Threading.Tasks.ValueTask) = task.AsTask () |> Async.AwaitTask let inline awaitValueTask (task : System.Threading.Tasks.ValueTask<_>) = task.AsTask () |> Async.AwaitTask #!markdown ## init #!fsharp let inline init x = async { return x } #!fsharp //// test init 1 |> Async.RunSynchronously |> _assertEqual 1 #!markdown ## withCancellationToken #!fsharp let inline withCancellationToken (ct : System.Threading.CancellationToken) fn = Async.StartImmediateAsTask (fn, ct) |> Async.AwaitTask #!fsharp //// test let cts = new System.Threading.CancellationTokenSource () async { let run = async { do! Async.Sleep 100 return 1 } let! child = run |> withCancellationToken cts.Token |> catch |> Async.StartChild do! Async.Sleep 50 cts.Cancel () return! child } |> Async.RunSynchronously |> Result.mapError _.Message |> _assertEqual (Error ("A task was canceled.")) #!markdown ## retryAsync #!fsharp let inline retryAsync retries fn = let rec loop retry lastError = async { try return! if retry <= retries then fn |> map Ok else lastError |> Error |> init with ex -> trace Debug (fun () -> $"Async.retryAsync / retry: {retry}/{retries} / ex: {ex |> SpiralSm.format_exception}") _locals do! Async.Sleep 30 return! loop (retry + 1) (ex |> SpiralSm.format_exception) } loop 1 "Async.retryAsync / invalid retries / retries: {retries}" #!fsharp //// test let retry_fn_test = ref 0 async { retry_fn_test.Value <- retry_fn_test.Value + 1 return retry_fn_test.Value } |> retryAsync 3 |> Async.RunSynchronously |> _assertEqual (Ok 1) #!fsharp //// test let retry_fn_test = ref 0 async { return if retry_fn_test.Value >= 2 then retry_fn_test.Value else retry_fn_test.Value <- retry_fn_test.Value + 1 failwith "test" } |> retryAsync 3 |> Async.RunSynchronously |> _assertEqual (Ok 2) #!markdown ## fold #!fsharp let fold folder state array = let rec loop acc i = async { if i < Array.length array then let! newAcc = folder acc array.[i] return! loop newAcc (i + 1) else return acc } loop state 0