#!meta {"kernelInfo":{"defaultKernelName":"spiral","items":[{"aliases":[],"name":"spiral"}]}} #!markdown # AsyncSeq (Polyglot) #!fsharp #!import ../../lib/fsharp/Notebooks.dib #!import ../../lib/fsharp/Testing.dib #!fsharp #r @"../../../../../../../.nuget/packages/fsharp.control.asyncseq/3.2.1/lib/netstandard2.1/FSharp.Control.AsyncSeq.dll" #r @"../../../../../../../.nuget/packages/system.reactive/6.0.1-preview.1/lib/net6.0/System.Reactive.dll" #r @"../../../../../../../.nuget/packages/system.reactive.linq/6.0.1-preview.1/lib/netstandard2.0/System.Reactive.Linq.dll" #!fsharp #!import ../../lib/fsharp/Common.fs #!import ../../lib/fsharp/Async.fs #!fsharp #if !INTERACTIVE open Lib #endif #!fsharp open Common #!markdown ## subscribeEvent #!fsharp let inline subscribeEvent (event: IEvent<'H, 'A>) map = let observable = System.Reactive.Linq.Observable.FromEventPattern<'H, 'A>(event.AddHandler, event.RemoveHandler) System.Reactive.Linq.Observable.Select (observable, fun event -> map event.EventArgs) |> FSharp.Control.AsyncSeq.ofObservableBuffered #!fsharp //// test type TestEvent () as self = member val Calls = [] with get, set member val Event = Event () with get member _.AddCall text = self.Calls <- self.Calls @ [ text ] member _.EventInterface = { new IEvent with member _.AddHandler handler = self.AddCall "AddHandler" self.Event.Publish.AddHandler handler member _.RemoveHandler handler = self.AddCall "RemoveHandler" self.Event.Publish.RemoveHandler handler member _.Subscribe observer = self.AddCall "IObservable.Subscribe" let disposable = self.Event.Publish.Subscribe observer new_disposable (fun () -> self.AddCall "IObservable.Dispose" disposable.Dispose () ) } member _.Subscribe () = subscribeEvent self.EventInterface (fun args -> let result = args.GetException () |> SpiralSm.format_exception self.AddCall $"TestEvent.Subscribe({result})" result ) member _.Iter subscription = subscription |> FSharp.Control.AsyncSeq.iteriAsync (fun i error -> async { self.AddCall $"TestEvent.Iter({i}: {error})" }) member _.WaitCall text = async { while self.Calls |> List.last <> text do do! Async.SwitchToThreadPool () } #!fsharp //// test let testEvent = TestEvent () async { testEvent.AddCall "1" let! child = testEvent.Subscribe () |> testEvent.Iter |> Async.StartChild do! testEvent.WaitCall "AddHandler" testEvent.AddCall "2" do! child testEvent.AddCall "3" } |> Async.runWithTimeout 300 |> _assertEqual None testEvent.Calls |> Seq.toList |> _assertEqual [ "1"; "AddHandler"; "2"; "RemoveHandler" ] #!fsharp //// test let testEvent = TestEvent () async { testEvent.AddCall "1" let! child = testEvent.Subscribe () |> testEvent.Iter |> Async.StartChild do! testEvent.WaitCall "AddHandler" testEvent.AddCall "2" use _ = testEvent.EventInterface.Subscribe (fun args -> testEvent.AddCall $"testEvent.EventInterface.Subscribe({args})" ) testEvent.AddCall "3" do! child testEvent.AddCall "4" } |> Async.runWithTimeout 300 |> _assertEqual None testEvent.Calls |> _assertEqual [ "1"; "AddHandler"; "2"; "IObservable.Subscribe"; "3"; "RemoveHandler"; "IObservable.Dispose" ] #!fsharp //// test let testEvent = TestEvent () async { testEvent.AddCall "1" let! child = testEvent.Subscribe () |> testEvent.Iter |> Async.StartChild do! testEvent.WaitCall "AddHandler" testEvent.AddCall "2" use _ = testEvent.EventInterface.Subscribe (fun args -> async { do! testEvent.WaitCall "TestEvent.Iter(0: System.Exception: error)" testEvent.AddCall $"testEvent.EventInterface.Subscribe({args.GetException () |> SpiralSm.format_exception})" } |> Async.RunSynchronously ) testEvent.AddCall "3" testEvent.Event.Trigger (null, ErrorEventArgs (Exception "error")) testEvent.AddCall "4" do! child testEvent.AddCall "5" } |> Async.runWithTimeout 300 |> _assertEqual None testEvent.Calls |> _assertEqual [ "1" "AddHandler" "2" "IObservable.Subscribe" "3" "TestEvent.Subscribe(System.Exception: error)" "TestEvent.Iter(0: System.Exception: error)" "testEvent.EventInterface.Subscribe(System.Exception: error)" "4" "RemoveHandler" "IObservable.Dispose" ] #!markdown ## subscribeToken #!fsharp let subscribeToken (token : System.Threading.CancellationToken) = let tcs = new System.Threading.Tasks.TaskCompletionSource () System.Action tcs.SetResult |> token.Register |> ignore let start = System.DateTime.Now.Ticks FSharp.Control.AsyncSeq.unfoldAsync (fun () -> async { do! tcs.Task |> Async.AwaitTask return Some (System.DateTime.Now.Ticks - start, ()) }) () #!fsharp //// test let cts = new System.Threading.CancellationTokenSource () async { let! child = cts.Token |> subscribeToken |> FSharp.Control.AsyncSeq.tryFirst |> Async.StartChild do! Async.Sleep 100 cts.Cancel () return! child } |> Async.RunSynchronously |> Option.get |> fun x -> x > 900000 |> _assertEqual true