44
55open System
66open System.Collections .Concurrent
7- open System.Collections .Generic
87open System.Threading
98
10- /// The agent handles two kind of messages - the 'Start' message is sent
11- /// when the caller wants to start a new work item. The 'Finished' message
12- /// is sent (by the agent itself) when one work item is completed.
13- type LimitAgentMessage =
14- | Start of Async < unit >
15- | Finished
16-
17- /// A function that takes the limit - the maximal number of operations it
18- /// will run in parallel - and returns an agent that accepts new
19- /// tasks via the 'Start' message
20- let threadingLimitAgent limit ( ct : CancellationToken ) =
21- let act ( inbox : MailboxProcessor < LimitAgentMessage >) =
22- async {
23- // Keep number of items running & queue of items to run later
24- // NOTE: We keep an explicit queue, so that we can e.g. start dropping
25- // items if there are too many requests (or do something else)
26- // NOTE: The loop is only accessed from one thread at each time
27- // so we can just use non-thread-safe queue & mutation
28- let queue = Queue<_>()
29- let mutable count = 0
30-
31- while true do
32- let! msg = inbox.Receive()
33- // When we receive Start, add the work to the queue
34- // When we receive Finished, do count--
35- match msg with
36- | Start work -> queue.Enqueue( work)
37- | Finished -> count <- count + 1
38- // After something happened, we check if we can
39- // start a next task from the queue
40- if count < limit && queue.Count > 0 then
41- count <- count + 1
42- let work = queue.Dequeue()
43- // Start it in a thread pool (on background)
44- Async.Start(
45- async {
46- do ! work
47- inbox.Post( Finished)
48- }
49- )
50- }
51-
52- MailboxProcessor.Start( act, ct)
53-
54- // TODO Test this version
55- /// Untested version that uses MailboxProcessor.
56- /// See http://www.fssnip.net/nX/title/Limit-degree-of-parallelism-using-an-agent for implementation
57- let processInParallelUsingMailbox
58- ( firstItems : 'Item [])
59- ( work : 'Item -> Async < 'Item []>)
60- ( parallelism : int )
61- ( notify : int -> unit )
62- ( ct : CancellationToken )
63- : unit =
64- let processedCountLock = Object()
65- let mutable processedCount = 0
66- let agent = threadingLimitAgent parallelism ct
67-
68- let rec processItem item =
69- async {
70- let! toSchedule = work item
71-
72- let pc =
73- lock processedCountLock ( fun () ->
74- processedCount <- processedCount + 1
75- processedCount)
76-
77- notify pc
78- toSchedule |> Array.iter ( fun x -> agent.Post( Start( processItem x)))
79- }
80-
81- firstItems |> Array.iter ( fun x -> agent.Post( Start( processItem x)))
82-
839// TODO Could replace with MailboxProcessor+Tasks/Asyncs instead of BlockingCollection + Threads
8410// See http://www.fssnip.net/nX/title/Limit-degree-of-parallelism-using-an-agent
8511/// Process items in parallel, allow more work to be scheduled as a result of finished work,
@@ -88,51 +14,38 @@ let processInParallel
8814 ( firstItems : 'Item [])
8915 ( work : 'Item -> 'Item [])
9016 ( parallelism : int )
91- ( stop : int -> bool )
17+ ( shouldStop : int -> bool )
9218 ( ct : CancellationToken )
93- ( _itemToString )
19+ ( _itemToString : 'Item -> string )
9420 : unit =
9521 let bc = new BlockingCollection< 'Item>()
9622 firstItems |> Array.iter bc.Add
9723 let processedCountLock = Object()
9824 let mutable processedCount = 0
9925
10026 let processItem item =
101- // printfn $"Processing {itemToString item}"
27+ printfn $" Processing {_itemToString item}"
10228 let toSchedule = work item
10329
10430 let processedCount =
10531 lock processedCountLock ( fun () ->
10632 processedCount <- processedCount + 1
10733 processedCount)
108- // printfn $"ToSchedule {toSchedule.Length}"
109- toSchedule |> Array.iter ( fun next -> bc.Add( next))
34+ let toScheduleString =
35+ toSchedule
36+ |> Array.map _ itemToString
37+ |> fun names -> String.Join( " , " , names)
38+ printfn $" Scheduling {toSchedule.Length} items: {toScheduleString}"
39+ toSchedule |> Array.iter bc.Add
11040 processedCount
11141
11242 // TODO Could avoid workers with some semaphores
11343 let workerWork () : unit =
11444 for node in bc.GetConsumingEnumerable( ct) do
11545 if not ct.IsCancellationRequested then // improve
11646 let processedCount = processItem node
117-
118- if stop processedCount then
47+ if shouldStop processedCount then
11948 bc.CompleteAdding()
12049
121- Array.Parallel.map workerWork ( Array.init parallelism ( fun _ -> ())) |> ignore // use cancellation
122- ()
123-
124- let test () =
125- // Create an agent that can run at most 2 tasks in parallel
126- // and send 10 work items that take 1 second to the queue
127- use cts = new CancellationTokenSource()
128- let agent = threadingLimitAgent 2 cts.Token
129-
130- for i in 0 .. 10 do
131- agent.Post(
132- Start(
133- async {
134- do ! Async.Sleep( 1000 )
135- printfn $" Finished: %d {i}"
136- }
137- )
138- )
50+ // TODO Do we need to handle cancellation given that workers do it already?
51+ Array.Parallel.map workerWork ( Array.init parallelism ( fun _ -> ())) |> ignore
0 commit comments