diff --git a/README.md b/README.md index 30899d71..ad45a894 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,8 @@ The following is the progress report: | | `distinctBy` | `dictinctBy` | `distinctByAsync` | | | ✅ [#2][] | `empty` | `empty` | | | | ✅ [#23][] | `exactlyOne` | `exactlyOne` | | | -| | `except` | `except` | | | +| ✅ [#83][] | `except` | `except` | | | +| ✅ [#83][] | | `exceptOfSeq` | | | | ✅ [#70][] | `exists` | `exists` | `existsAsync` | | | | `exists2` | `exists2` | | | | ✅ [#23][] | `filter` | `filter` | `filterAsync` | | @@ -611,4 +612,5 @@ module TaskSeq = [#76]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/76 [#81]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/81 [#82]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/82 +[#83]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/83 diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index d030c085..72f861a7 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -21,6 +21,7 @@ + diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Except.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Except.Tests.fs new file mode 100644 index 00000000..4a946df7 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Except.Tests.fs @@ -0,0 +1,141 @@ +module TaskSeq.Tests.Except + +open System +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharp.Control + +// +// TaskSeq.except +// TaskSeq.exceptOfSeq +// + + +module EmptySeq = + [)>] + let ``TaskSeq-except`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.except (Gen.getEmptyVariant variant) + |> verifyEmpty + + [)>] + let ``TaskSeq-exceptOfSeq`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.exceptOfSeq Seq.empty + |> verifyEmpty + + [)>] + let ``TaskSeq-except v2`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.except TaskSeq.empty + |> verifyEmpty + + [)>] + let ``TaskSeq-except v3`` variant = + TaskSeq.empty + |> TaskSeq.except (Gen.getEmptyVariant variant) + |> verifyEmpty + + [)>] + let ``TaskSeq-except no side effect in exclude seq if source seq is empty`` variant = + let mutable i = 0 + + let exclude = taskSeq { + i <- i + 1 + yield 12 + } + + TaskSeq.empty + |> TaskSeq.except exclude + |> verifyEmpty + |> Task.map (fun () -> i |> should equal 0) // exclude seq is only enumerated after first item in source + +module Immutable = + [)>] + let ``TaskSeq-except removes duplicates`` variant = + TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ] + |> TaskSeq.except (Gen.getSeqImmutable variant) + |> TaskSeq.toArrayAsync + |> Task.map (should equal [| 12; 13; 99 |]) + + [] + let ``TaskSeq-except removes duplicates with empty itemsToExcept`` () = + TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ] + |> TaskSeq.except TaskSeq.empty + |> TaskSeq.toArrayAsync + |> Task.map (should equal [| 1; 2; 3; 4; 12; 13; 99 |]) + + [)>] + let ``TaskSeq-except removes everything`` variant = + Gen.getSeqImmutable variant + |> TaskSeq.except (Gen.getSeqImmutable variant) + |> verifyEmpty + + [)>] + let ``TaskSeq-except removes everything with duplicates`` variant = + taskSeq { + yield! Gen.getSeqImmutable variant + yield! Gen.getSeqImmutable variant + yield! Gen.getSeqImmutable variant + yield! Gen.getSeqImmutable variant + } + |> TaskSeq.except (Gen.getSeqImmutable variant) + |> verifyEmpty + + [)>] + let ``TaskSeq-exceptOfSeq removes duplicates`` variant = + TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ] + |> TaskSeq.exceptOfSeq [ 1..10 ] + |> TaskSeq.toArrayAsync + |> Task.map (should equal [| 12; 13; 99 |]) + + [] + let ``TaskSeq-exceptOfSeq removes duplicates with empty itemsToExcept`` () = + TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ] + |> TaskSeq.exceptOfSeq Seq.empty + |> TaskSeq.toArrayAsync + |> Task.map (should equal [| 1; 2; 3; 4; 12; 13; 99 |]) + + [)>] + let ``TaskSeq-exceptOfSeq removes everything`` variant = + Gen.getSeqImmutable variant + |> TaskSeq.exceptOfSeq [ 1..10 ] + |> verifyEmpty + + [)>] + let ``TaskSeq-exceptOfSeq removes everything with duplicates`` variant = + taskSeq { + yield! Gen.getSeqImmutable variant + yield! Gen.getSeqImmutable variant + yield! Gen.getSeqImmutable variant + yield! Gen.getSeqImmutable variant + } + |> TaskSeq.exceptOfSeq [ 1..10 ] + |> verifyEmpty + +module SideEffects = + [)>] + let ``TaskSeq-except removes duplicates`` variant = + TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ] + |> TaskSeq.except (Gen.getSeqWithSideEffect variant) + |> TaskSeq.toArrayAsync + |> Task.map (should equal [| 12; 13; 99 |]) + + [)>] + let ``TaskSeq-except removes everything`` variant = + Gen.getSeqWithSideEffect variant + |> TaskSeq.except (Gen.getSeqWithSideEffect variant) + |> verifyEmpty + + [)>] + let ``TaskSeq-except removes everything with duplicates`` variant = + taskSeq { + yield! Gen.getSeqWithSideEffect variant + yield! Gen.getSeqWithSideEffect variant + yield! Gen.getSeqWithSideEffect variant + yield! Gen.getSeqWithSideEffect variant + } + |> TaskSeq.except (Gen.getSeqWithSideEffect variant) + |> verifyEmpty diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index e26ea68c..9fb7e314 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -56,29 +56,6 @@ module TaskSeq = e.DisposeAsync().AsTask().Wait() } - // FIXME: incomplete and incorrect code!!! TODO: still needed? - let toSeqOfTasks (source: taskSeq<'T>) = seq { - let e = source.GetAsyncEnumerator(CancellationToken()) - - // TODO: check this! - try - let mutable go = false - - while go do - yield task { - let! step = e.MoveNextAsync() - go <- step - - if step then - return e.Current - else - return Unchecked.defaultof<_> // FIXME! - } - - finally - e.DisposeAsync().AsTask().Wait() - } - let toArrayAsync source = Internal.toResizeArrayAsync source |> Task.map (fun a -> a.ToArray()) @@ -281,6 +258,8 @@ module TaskSeq = let tryFindAsync predicate source = Internal.tryFind (PredicateAsync predicate) source let tryFindIndex predicate source = Internal.tryFindIndex (Predicate predicate) source let tryFindIndexAsync predicate source = Internal.tryFindIndex (PredicateAsync predicate) source + let except itemsToExclude source = Internal.except itemsToExclude source + let exceptOfSeq itemsToExclude source = Internal.exceptOfSeq itemsToExclude source let exists predicate source = Internal.tryFind (Predicate predicate) source diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index 6c367526..dcbab450 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -500,6 +500,44 @@ module TaskSeq = /// Thrown when the input sequence is null. val existsAsync: predicate: ('T -> #Task) -> source: taskSeq<'T> -> Task + /// + /// Returns a new task sequence with the distinct elements of the second task sequence which do not appear in the + /// , using generic hash and equality comparisons to compare values. + /// + /// + /// + /// Note that this function returns a task sequence that digests the whole of the first input task sequence as soon as + /// the result sequence first gets awaited or iterated. As a result this function should not be used with + /// large or infinite sequences in the first parameter. The function makes no assumption on the ordering of the first input + /// sequence. + /// + /// + /// A task sequence whose elements that also occur in the second sequence will cause those elements to be removed from the returned sequence. + /// A sequence whose elements that are not also in first will be returned. + /// A sequence that contains the set difference of the elements of two sequences. + /// + /// Thrown when either of the two input sequences is null. + val except<'T when 'T: equality> : itemsToExclude: taskSeq<'T> -> source: taskSeq<'T> -> taskSeq<'T> + + /// + /// Returns a new task sequence with the distinct elements of the second task sequence which do not appear in the + /// , using generic hash and equality comparisons to compare values. + /// + /// + /// + /// Note that this function returns a task sequence that digests the whole of the first input task sequence as soon as + /// the result sequence first gets awaited or iterated. As a result this function should not be used with + /// large or infinite sequences in the first parameter. The function makes no assumption on the ordering of the first input + /// sequence. + /// + /// + /// A task sequence whose elements that also occur in the second sequence will cause those elements to be removed from the returned sequence. + /// A sequence whose elements that are not also in first will be returned. + /// A sequence that contains the set difference of the elements of two sequences. + /// + /// Thrown when either of the two input sequences is null. + val exceptOfSeq<'T when 'T: equality> : itemsToExclude: seq<'T> -> source: taskSeq<'T> -> taskSeq<'T> + /// /// Zips two task sequences, returning a taskSeq of the tuples of each sequence, in order. May raise ArgumentException /// if the sequences are or unequal length. diff --git a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs index f0bcec9d..7779429a 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs @@ -517,3 +517,103 @@ module internal TaskSeqInternal = | true -> yield item | false -> () } + // Consider turning using an F# version of this instead? + // https://github.com/i3arnon/ConcurrentHashSet + type ConcurrentHashSet<'T when 'T: equality>(ct) = + let _rwLock = new ReaderWriterLockSlim() + let hashSet = HashSet<'T>(Array.empty, HashIdentity.Structural) + + member _.Add item = + _rwLock.EnterWriteLock() + + try + hashSet.Add item + finally + _rwLock.ExitWriteLock() + + member _.AddMany items = + _rwLock.EnterWriteLock() + + try + for item in items do + hashSet.Add item |> ignore + + finally + _rwLock.ExitWriteLock() + + member _.AddManyAsync(source: taskSeq<'T>) = task { + use e = source.GetAsyncEnumerator(ct) + let mutable go = true + let! step = e.MoveNextAsync() + go <- step + + while go do + // NOTE: r/w lock cannot cross thread boundaries. Should we use SemaphoreSlim instead? + // or alternatively, something like this: https://github.com/StephenCleary/AsyncEx/blob/8a73d0467d40ca41f9f9cf827c7a35702243abb8/src/Nito.AsyncEx.Coordination/AsyncReaderWriterLock.cs#L16 + // not sure how they compare. + + _rwLock.EnterWriteLock() + + try + hashSet.Add e.Current |> ignore + finally + _rwLock.ExitWriteLock() + + let! step = e.MoveNextAsync() + go <- step + } + + interface IAsyncDisposable with + override _.DisposeAsync() = + if not (isNull _rwLock) then + _rwLock.Dispose() + + ValueTask.CompletedTask + + let except itemsToExclude (source: taskSeq<_>) = taskSeq { + use e = source.GetAsyncEnumerator(CancellationToken()) + let mutable go = true + let! step = e.MoveNextAsync() + go <- step + + if step then + // only create hashset by the time we actually start iterating + use hashSet = new ConcurrentHashSet<_>(CancellationToken()) + do! hashSet.AddManyAsync itemsToExclude + + while go do + let current = e.Current + + // if true, it was added, and therefore unique, so we return it + // if false, it existed, and therefore a duplicate, and we skip + if hashSet.Add current then + yield current + + let! step = e.MoveNextAsync() + go <- step + + } + + let exceptOfSeq itemsToExclude (source: taskSeq<_>) = taskSeq { + use e = source.GetAsyncEnumerator(CancellationToken()) + let mutable go = true + let! step = e.MoveNextAsync() + go <- step + + if step then + // only create hashset by the time we actually start iterating + use hashSet = new ConcurrentHashSet<_>(CancellationToken()) + do hashSet.AddMany itemsToExclude + + while go do + let current = e.Current + + // if true, it was added, and therefore unique, so we return it + // if false, it existed, and therefore a duplicate, and we skip + if hashSet.Add current then + yield current + + let! step = e.MoveNextAsync() + go <- step + + }