11/// Parallel processing of graph of work items with dependencies
22module ParallelTypeCheckingTests.GraphProcessing
33
4- open System.Collections .Concurrent
5- open System.Collections .Generic
64open System.Threading
7- open ParallelTypeCheckingTests.Parallel
85
9- /// Used for processing
106type NodeInfo < 'Item > =
117 {
128 Item: 'Item
@@ -15,38 +11,42 @@ type NodeInfo<'Item> =
1511 Dependants: 'Item []
1612 }
1713
18- // TODO Do not expose this type to other files
19- type Node < 'Item , 'Result > =
14+ type private PrivateNode < 'Item , 'Result > =
2015 {
2116 Info: NodeInfo < 'Item >
2217 mutable ProcessedDepsCount: int
2318 mutable Result: 'Result option
2419 }
25-
26- /// Basic concurrent set implemented using ConcurrentDictionary
27- type private ConcurrentSet < 'a >() =
28- let dict = ConcurrentDictionary< 'a, unit>()
29-
30- member this.Add ( item : 'a ): bool =
31- dict.TryAdd( item, ())
3220
21+ type ProcessedNode < 'Item , 'Result > =
22+ {
23+ Info: NodeInfo < 'Item >
24+ Result: 'Result
25+ }
26+
3327/// <summary>
3428/// A generic method to generate results for a graph of work items in parallel.
3529/// Processes leaves first, and after each node has been processed, schedules any now unblocked dependants.
3630/// Returns a list of results, per item.
31+ /// Uses ThreadPool to schedule work.
3732/// </summary>
3833/// <param name="graph">Graph of work items</param>
39- /// <param name="doWork">A function to generate results for a single item</param>
40- let processGraphSimple < 'Item , 'Result when 'Item : equality and 'Item : comparison >
34+ /// <param name="work">A function to generate results for a single item</param>
35+ /// <param name="ct">Cancellation token</param>
36+ /// <remarks>
37+ /// An alternative scheduling approach is to schedule N parallel tasks that process items from a BlockingCollection.
38+ /// My basic tests suggested it's faster, although confirming that would require more detailed testing.
39+ /// </remarks>
40+ let processGraph < 'Item , 'Result when 'Item : equality and 'Item : comparison >
4141 ( graph : Graph < 'Item >)
42- // TODO Avoid exposing mutable nodes to the caller
43- ( doWork : IReadOnlyDictionary < 'Item , Node < 'Item , 'Result >> -> Node < 'Item , 'Result > -> 'Result )
44- : ' Result[] // Results in order defined in 'graph'
42+ ( work : ( 'Item -> ProcessedNode < 'Item , 'Result >) -> NodeInfo < 'Item > -> 'Result )
43+ ( ct : CancellationToken )
44+ : ( 'Item * ' Result) [] // Individual item results
4545 =
4646 let transitiveDeps = graph |> Graph.transitiveOpt
4747 let dependants = graph |> Graph.reverse
4848
49- let makeNode ( item : 'Item ) : Node < 'Item , 'Result > =
49+ let makeNode ( item : 'Item ) : PrivateNode < 'Item , 'Result > =
5050 let info =
5151 let exists = graph.ContainsKey item
5252
@@ -80,40 +80,59 @@ let processGraphSimple<'Item, 'Result when 'Item: equality and 'Item: comparison
8080 |> Seq.filter ( fun n -> n.Info.Deps.Length = 0 )
8181 |> Seq.toArray
8282
83- printfn $" Node count: {nodes.Count}"
84- use cts = new CancellationTokenSource()
85-
86- let mutable processedCount = 0
8783 let waitHandle = new AutoResetEvent( false )
88- let rec post node =
89- Async.Start( async { work node}, cts.Token)
90- and work
91- ( node : Node < 'Item , 'Result >)
84+
85+ let getItemPublicNode item =
86+ let node = nodes[ item]
87+ {
88+ ProcessedNode.Info = node.Info
89+ ProcessedNode.Result =
90+ node.Result
91+ |> Option.defaultWith ( fun () -> failwith $" Results for item '{node.Info.Item}' are not yet available" )
92+ }
93+
94+ let incrementProcessedCount =
95+ let mutable processedCount = 0
96+ fun () ->
97+ if Interlocked.Increment(& processedCount) = nodes.Count then
98+ waitHandle.Set() |> ignore
99+
100+ let rec queueNode node =
101+ Async.Start( async { processNode node}, ct)
102+
103+ and processNode
104+ ( node : PrivateNode < 'Item , 'Result >)
92105 : unit =
93- let singleRes = doWork nodes node
106+ let info = node.Info
107+
108+ let singleRes = work getItemPublicNode info
94109 node.Result <- Some singleRes
110+
95111 let unblockedDependants =
96112 node.Info.Dependants
97113 |> lookupMany
98114 // For every dependant, increment its number of processed dependencies,
99- // and filter ones which now have all dependencies processed.
115+ // and filter dependants which now have all dependencies processed (but didn't before) .
100116 |> Array.filter ( fun dependant ->
101117 // This counter can be incremented by multiple workers on different threads.
102118 let pdc = Interlocked.Increment(& dependant.ProcessedDepsCount)
103119 // Note: We cannot read 'dependant.ProcessedDepsCount' again to avoid returning the same item multiple times.
104120 pdc = dependant.Info.Deps.Length)
105- unblockedDependants |> Array.iter post
106- if Interlocked.Increment (& processedCount ) = nodes.Count then
107- waitHandle.Set () |> ignore
121+
122+ unblockedDependants |> Array.iter queueNode
123+ incrementProcessedCount ()
108124
109- leaves |> Array.iter post
125+ leaves |> Array.iter queueNode
110126 // TODO Handle async exceptions
111127 // q.Error += ...
112128 waitHandle.WaitOne() |> ignore
113129
114130 nodes.Values
115131 |> Seq.map ( fun node ->
116- node.Result
117- |> Option.defaultWith ( fun () -> failwith $" Unexpected lack of result for item '{node.Info.Item}'" )
132+ let result =
133+ node.Result
134+ |> Option.defaultWith ( fun () -> failwith $" Unexpected lack of result for item '{node.Info.Item}'" )
135+ node.Info.Item, result
118136 )
119- |> Seq.toArray
137+ |> Seq.sortBy fst
138+ |> Seq.toArray
0 commit comments