@@ -69,6 +69,12 @@ struct alignas(64) Packet {
69
69
Payload payload;
70
70
};
71
71
72
+ // TODO: This should be configured by the server and passed in. The general rule
73
+ // of thumb is that you should have at least as many ports as possible
74
+ // concurrent work items on the GPU to mitigate the lack offorward
75
+ // progress guarantees on the GPU.
76
+ constexpr uint64_t default_port_count = 64 ;
77
+
72
78
// / A common process used to synchronize communication between a client and a
73
79
// / server. The process contains an inbox and an outbox used for signaling
74
80
// / ownership of the shared buffer between both sides.
@@ -96,22 +102,31 @@ template <bool InvertInbox> struct Process {
96
102
LIBC_INLINE Process &operator =(const Process &) = default ;
97
103
LIBC_INLINE ~Process () = default ;
98
104
105
+ uint64_t port_count;
99
106
uint32_t lane_size;
100
107
cpp::Atomic<uint32_t > *lock;
101
108
cpp::Atomic<uint32_t > *inbox;
102
109
cpp::Atomic<uint32_t > *outbox;
103
- Packet *buffer ;
110
+ Packet *packet ;
104
111
105
112
// / Initialize the communication channels.
106
- LIBC_INLINE void reset (uint32_t lane_size, void *lock, void *inbox,
107
- void *outbox, void *buffer) {
108
- *this = {
109
- lane_size,
110
- reinterpret_cast <cpp::Atomic<uint32_t > *>(lock),
111
- reinterpret_cast <cpp::Atomic<uint32_t > *>(inbox),
112
- reinterpret_cast <cpp::Atomic<uint32_t > *>(outbox),
113
- reinterpret_cast <Packet *>(buffer),
114
- };
113
+ LIBC_INLINE void reset (uint64_t port_count, uint32_t lane_size, void *lock,
114
+ void *inbox, void *outbox, void *packet) {
115
+ *this = {port_count,
116
+ lane_size,
117
+ reinterpret_cast <cpp::Atomic<uint32_t > *>(lock),
118
+ reinterpret_cast <cpp::Atomic<uint32_t > *>(inbox),
119
+ reinterpret_cast <cpp::Atomic<uint32_t > *>(outbox),
120
+ reinterpret_cast <Packet *>(packet)};
121
+ }
122
+
123
+ // / The length of the packet is flexible because the server needs to look up
124
+ // / the lane size at runtime. This helper indexes at the proper offset.
125
+ LIBC_INLINE Packet &get_packet (uint64_t index) {
126
+ return *reinterpret_cast <Packet *>(
127
+ reinterpret_cast <uint8_t *>(packet) +
128
+ index * align_up (sizeof (Header) + lane_size * sizeof (Buffer),
129
+ alignof (Packet)));
115
130
}
116
131
117
132
// / Inverting the bits loaded from the inbox in exactly one of the pair of
@@ -190,25 +205,25 @@ template <bool InvertInbox> struct Process {
190
205
191
206
// / Invokes a function accross every active buffer across the total lane size.
192
207
LIBC_INLINE void invoke_rpc (cpp::function<void (Buffer *)> fn,
193
- uint32_t index ) {
208
+ Packet &packet ) {
194
209
if constexpr (is_process_gpu ()) {
195
- fn (&buffer[index] .payload .slot [gpu::get_lane_id ()]);
210
+ fn (&packet .payload .slot [gpu::get_lane_id ()]);
196
211
} else {
197
212
for (uint32_t i = 0 ; i < lane_size; i += gpu::get_lane_size ())
198
- if (buffer[index] .header .mask & 1ul << i)
199
- fn (&buffer[index] .payload .slot [i]);
213
+ if (packet .header .mask & 1ul << i)
214
+ fn (&packet .payload .slot [i]);
200
215
}
201
216
}
202
217
203
218
// / Alternate version that also provides the index of the current lane.
204
219
LIBC_INLINE void invoke_rpc (cpp::function<void (Buffer *, uint32_t )> fn,
205
- uint32_t index ) {
220
+ Packet &packet ) {
206
221
if constexpr (is_process_gpu ()) {
207
- fn (&buffer[index] .payload .slot [gpu::get_lane_id ()], gpu::get_lane_id ());
222
+ fn (&packet .payload .slot [gpu::get_lane_id ()], gpu::get_lane_id ());
208
223
} else {
209
224
for (uint32_t i = 0 ; i < lane_size; i += gpu::get_lane_size ())
210
- if (buffer[index] .header .mask & 1ul << i)
211
- fn (&buffer[index] .payload .slot [i], i);
225
+ if (packet .header .mask & 1ul << i)
226
+ fn (&packet .payload .slot [i], i);
212
227
}
213
228
}
214
229
};
@@ -234,7 +249,7 @@ template <bool T> struct Port {
234
249
template <typename A> LIBC_INLINE void recv_n (A alloc);
235
250
236
251
LIBC_INLINE uint16_t get_opcode () const {
237
- return process.buffer [ index] .header .opcode ;
252
+ return process.get_packet ( index) .header .opcode ;
238
253
}
239
254
240
255
LIBC_INLINE void close () { process.unlock (lane_mask, index); }
@@ -281,7 +296,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
281
296
}
282
297
283
298
// Apply the \p fill function to initialize the buffer and release the memory.
284
- process.invoke_rpc (fill, index);
299
+ process.invoke_rpc (fill, process. get_packet ( index) );
285
300
out = !out;
286
301
atomic_thread_fence (cpp::MemoryOrder::RELEASE);
287
302
process.outbox [index].store (out, cpp::MemoryOrder::RELAXED);
@@ -299,7 +314,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
299
314
atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
300
315
301
316
// Apply the \p use function to read the memory out of the buffer.
302
- process.invoke_rpc (use, index);
317
+ process.invoke_rpc (use, process. get_packet ( index) );
303
318
out = !out;
304
319
process.outbox [index].store (out, cpp::MemoryOrder::RELAXED);
305
320
}
@@ -340,7 +355,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
340
355
inline_memcpy (buffer->data , ptr + idx, len);
341
356
});
342
357
}
343
- gpu::sync_lane (process.buffer [ index] .header .mask );
358
+ gpu::sync_lane (process.get_packet ( index) .header .mask );
344
359
}
345
360
346
361
// / Receives an arbitrarily sized data buffer across the shared channel in
@@ -396,32 +411,34 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
396
411
// / participating thread.
397
412
[[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
398
413
Client::try_open (uint16_t opcode) {
399
- constexpr uint64_t index = 0 ;
400
- const uint64_t lane_mask = gpu::get_lane_mask ();
401
-
402
- // Attempt to acquire the lock on this index.
403
- if (!try_lock (lane_mask, index))
404
- return cpp::nullopt ;
405
-
406
- // The mailbox state must be read with the lock held.
407
- atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
408
-
409
- uint32_t in = load_inbox (index);
410
- uint32_t out = outbox[index].load (cpp::MemoryOrder::RELAXED);
411
-
412
- // Once we acquire the index we need to check if we are in a valid sending
413
- // state.
414
- if (buffer_unavailable (in, out)) {
415
- unlock (lane_mask, index);
416
- return cpp::nullopt ;
417
- }
414
+ // Perform a naive linear scan for a port that can be opened to send data.
415
+ for ( uint64_t index = 0 ; index < port_count; ++index) {
416
+ // Attempt to acquire the lock on this index.
417
+ uint64_t lane_mask = gpu::get_lane_mask ();
418
+ if (!try_lock (lane_mask, index))
419
+ continue ;
420
+
421
+ // The mailbox state must be read with the lock held.
422
+ atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
423
+
424
+ uint32_t in = load_inbox (index);
425
+ uint32_t out = outbox[index].load (cpp::MemoryOrder::RELAXED);
426
+
427
+ // Once we acquire the index we need to check if we are in a valid sending
428
+ // state.
429
+ if (buffer_unavailable (in, out)) {
430
+ unlock (lane_mask, index);
431
+ continue ;
432
+ }
418
433
419
- if (is_first_lane (lane_mask)) {
420
- buffer[index].header .opcode = opcode;
421
- buffer[index].header .mask = lane_mask;
434
+ if (is_first_lane (lane_mask)) {
435
+ get_packet (index).header .opcode = opcode;
436
+ get_packet (index).header .mask = lane_mask;
437
+ }
438
+ gpu::sync_lane (lane_mask);
439
+ return Port (*this , lane_mask, index, out);
422
440
}
423
- gpu::sync_lane (lane_mask);
424
- return Port (*this , lane_mask, index, out);
441
+ return cpp::nullopt;
425
442
}
426
443
427
444
LIBC_INLINE Client::Port Client::open (uint16_t opcode) {
@@ -436,33 +453,36 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
436
453
// / port if it has a pending receive operation
437
454
[[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
438
455
Server::try_open () {
439
- constexpr uint64_t index = 0 ;
440
- const uint64_t lane_mask = gpu::get_lane_mask ();
441
-
442
- uint32_t in = load_inbox (index);
443
- uint32_t out = outbox[index].load (cpp::MemoryOrder::RELAXED);
444
-
445
- // The server is passive, if there is no work pending don't bother
446
- // opening a port.
447
- if (buffer_unavailable (in, out))
448
- return cpp::nullopt;
449
-
450
- // Attempt to acquire the lock on this index.
451
- if (!try_lock (lane_mask, index))
452
- return cpp::nullopt;
453
-
454
- // The mailbox state must be read with the lock held.
455
- atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
456
-
457
- in = load_inbox (index);
458
- out = outbox[index].load (cpp::MemoryOrder::RELAXED);
456
+ // Perform a naive linear scan for a port that has a pending request.
457
+ for (uint64_t index = 0 ; index < port_count; ++index) {
458
+ uint32_t in = load_inbox (index);
459
+ uint32_t out = outbox[index].load (cpp::MemoryOrder::RELAXED);
460
+
461
+ // The server is passive, if there is no work pending don't bother
462
+ // opening a port.
463
+ if (buffer_unavailable (in, out))
464
+ continue ;
465
+
466
+ // Attempt to acquire the lock on this index.
467
+ uint64_t lane_mask = gpu::get_lane_mask ();
468
+ // Attempt to acquire the lock on this index.
469
+ if (!try_lock (lane_mask, index))
470
+ continue ;
471
+
472
+ // The mailbox state must be read with the lock held.
473
+ atomic_thread_fence (cpp::MemoryOrder::ACQUIRE);
474
+
475
+ in = load_inbox (index);
476
+ out = outbox[index].load (cpp::MemoryOrder::RELAXED);
477
+
478
+ if (buffer_unavailable (in, out)) {
479
+ unlock (lane_mask, index);
480
+ continue ;
481
+ }
459
482
460
- if (buffer_unavailable (in, out)) {
461
- unlock (lane_mask, index);
462
- return cpp::nullopt;
483
+ return Port (*this , lane_mask, index, out);
463
484
}
464
-
465
- return Port (*this , lane_mask, index, out);
485
+ return cpp::nullopt;
466
486
}
467
487
468
488
LIBC_INLINE Server::Port Server::open () {
0 commit comments