Skip to content

Commit 33e15ba

Browse files
committed
cleanups
1 parent 060260e commit 33e15ba

File tree

1 file changed

+109
-140
lines changed

1 file changed

+109
-140
lines changed

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 109 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,12 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
289289
}
290290
};
291291

292+
/// Simple wrapper type to ensure we use the right methods to prepare and run a waiting tas.
293+
/// Run it with `runWaitingTask`.
294+
struct PreparedWaitingTask {
295+
AsyncTask *waitingTask;
296+
};
297+
292298
protected:
293299
#if SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY || SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
294300
// Synchronization is simple here. In a single threaded mode, all swift tasks
@@ -392,13 +398,20 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
392398
virtual void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) = 0;
393399

394400
/// Resume waiting task with result from `completedTask`
395-
AsyncTask *resumeWaitingTask(AsyncTask *waitingTask,
401+
PreparedWaitingTask prepareWaitingTaskWithTask(AsyncTask *waitingTask,
396402
AsyncTask *completedTask,
397403
TaskGroupStatus &assumed,
398404
bool hadErrorResult,
399405
bool alreadyDecremented = false,
400406
bool taskWasRetained = false);
401407

408+
// NOTE: In today's implementation we MUST hold the group lock when claiming a task.
409+
AsyncTask *claimWaitingTask();
410+
411+
/// Should be the final operation a group locking operation performs e.g. in waitAll or offer.
412+
/// This resumes unlocks the group and resumes the waiting task.
413+
void runWaitingTask(PreparedWaitingTask prepared);
414+
402415
// ==== Status manipulation -------------------------------------------------
403416

404417
TaskGroupStatus statusLoadRelaxed() const;
@@ -628,6 +641,34 @@ bool TaskGroupBase::statusCompletePendingReadyWaiting(TaskGroupStatus &old) {
628641
/*failure*/ std::memory_order_relaxed);
629642
}
630643

644+
AsyncTask *TaskGroupBase::claimWaitingTask() {
645+
assert(statusLoadRelaxed().hasWaitingTask() &&
646+
"attempted to claim waiting task but status indicates no waiting "
647+
"task is present!");
648+
649+
auto waitingTask = waitQueue.load(std::memory_order_acquire);
650+
if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) {
651+
swift_Concurrency_fatalError(0, "Failed to claim waitingTask!");
652+
}
653+
return waitingTask;
654+
}
655+
void TaskGroupBase::runWaitingTask(PreparedWaitingTask prepared) {
656+
// The reason we might not have a task here to schedule is if we were running in the
657+
// task-per-thread single threaded mode, which would have executed the task in-line
658+
// and we must not schedule it here anymore.
659+
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
660+
assert(prepared.waitingTask == nullptr &&
661+
"unexpected task to schedule in TASK_TO_THREAD_MODEL!"
662+
"In this mode we should have run the task in-line, "
663+
"rather than return it for scheduling.")
664+
#endif
665+
if (auto waitingTask = prepared.waitingTask) {
666+
// TODO: allow the caller to suggest an executor
667+
waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
668+
}
669+
}
670+
671+
631672
bool TaskGroupBase::isCancelled() const {
632673
auto old = TaskGroupStatus{status.load(std::memory_order_relaxed)};
633674
return old.isCancelled();
@@ -834,7 +875,7 @@ class DiscardingTaskGroup: public TaskGroupBase {
834875

835876
private:
836877
/// Resume waiting task with specified error
837-
AsyncTask *resumeWaitingTaskWithError(AsyncTask* waitingTask,
878+
PreparedWaitingTask prepareWaitingTaskWithError(AsyncTask* waitingTask,
838879
SwiftError *error,
839880
TaskGroupStatus &assumed,
840881
bool alreadyDecremented);
@@ -1193,22 +1234,12 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex
11931234

11941235
// ==== a) has waiting task, so let us complete it right away
11951236
if (assumed.hasWaitingTask()) {
1196-
auto waitingTask = waitQueue.load(std::memory_order_acquire);
1197-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, loaded waiting task:%p", waitingTask);
1198-
if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) {
1199-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, FAILED CAS loaded waiting task:%p", waitingTask);
1200-
swift_Concurrency_fatalError(0, "Failed to claim waitingTask!");
1201-
}
1202-
1203-
auto task = resumeWaitingTask(/*complete=*/waitingTask, /*with=*/completedTask,
1204-
assumed, hadErrorResult);
1205-
// Must unlock before we resume the waiting task
1206-
unlock();
1207-
1208-
if (task) {
1209-
task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
1210-
}
1211-
return;
1237+
auto waitingTask = claimWaitingTask();
1238+
auto prepared = prepareWaitingTaskWithTask(
1239+
/*complete=*/waitingTask, /*with=*/completedTask,
1240+
assumed, hadErrorResult);
1241+
unlock(); // we MUST unlock before running the waiting task
1242+
return runWaitingTask(prepared);
12121243
} else {
12131244
// ==== b) enqueue completion ------------------------------------------------
12141245
//
@@ -1264,13 +1295,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
12641295
// We grab the waiting task while holding the group lock, because this
12651296
// allows a single task to get the waiting task and attempt to complete it.
12661297
// As another offer gets to run, it will have either a different waiting task, or no waiting task at all.
1267-
auto waitingTask = waitQueue.load(std::memory_order_acquire); // FIXME: consume the task here!!!!!
1268-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, loaded waiting task:%p", waitingTask);
1269-
if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) {
1270-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, FAILED CAS loaded waiting task:%p", waitingTask);
1271-
swift_Concurrency_fatalError(0, "Failed to claim waitingTask!");
1272-
}
1273-
assert(waitingTask && "status claimed to have waitingTask but waitQueue was empty!");
1298+
auto waitingTask = claimWaitingTask();
12741299

12751300
// This is the last pending task, and we must resume the waiting task.
12761301
// - if there already was a previous error stored, we resume using it,
@@ -1285,16 +1310,12 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
12851310
SWIFT_TASK_GROUP_DEBUG_LOG(
12861311
this, "offer, complete, resume waitingTask:%p, with raw error:%p",
12871312
waitingTask, readyErrorItem.getRawError(this));
1288-
auto task = resumeWaitingTaskWithError(
1313+
auto prepared = prepareWaitingTaskWithError(
12891314
/*complete=*/waitingTask,
12901315
/*with=*/readyErrorItem.getRawError(this), assumed,
12911316
alreadyDecrementedStatus);
1292-
unlock();
1293-
if (task) {
1294-
// TODO: allow the caller to suggest an executor
1295-
task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
1296-
}
1297-
return;
1317+
unlock(); // we MUST unlock before running the waiting task
1318+
return runWaitingTask(prepared);
12981319
}
12991320
case ReadyStatus::Error: {
13001321
// The completed task failed, but we already stored a different failed
@@ -1305,17 +1326,13 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
13051326
"offer, complete waitingTask:%p, discard error completedTask:%p, "
13061327
"resume with errorItem.task:%p",
13071328
waitingTask, completedTask, readyErrorItem.getTask());
1308-
auto task = resumeWaitingTask(
1329+
auto prepared = prepareWaitingTaskWithTask(
13091330
/*complete*/ waitingTask,
13101331
/*with=*/readyErrorItem.getTask(), assumed,
13111332
/*hadErrorResult=*/true, alreadyDecrementedStatus,
13121333
/*taskWasRetained=*/true);
1313-
unlock();
1314-
if (task) {
1315-
// TODO: allow the caller to suggest an executor
1316-
task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
1317-
}
1318-
return;
1334+
unlock(); // we MUST unlock before running the waiting task
1335+
return runWaitingTask(prepared);
13191336
}
13201337
default: {
13211338
swift_Concurrency_fatalError(
@@ -1329,14 +1346,10 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
13291346
// the group before we've given up the lock.
13301347
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
13311348
// There was no prior failed task stored, so we should resume the waitingTask with this (failed) completedTask
1332-
auto task = resumeWaitingTask(/*complete=*/waitingTask, /*with=*/completedTask,
1349+
auto prepared = prepareWaitingTaskWithTask(/*complete=*/waitingTask, /*with=*/completedTask,
13331350
assumed, hadErrorResult, alreadyDecrementedStatus);
1334-
unlock();
1335-
if (task) {
1336-
// TODO: allow the caller to suggest an executor
1337-
task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
1338-
}
1339-
return;
1351+
unlock(); // we MUST unlock before running the waiting task
1352+
return runWaitingTask(prepared);
13401353
}
13411354
} else if (readyQueue.isEmpty()) {
13421355
// There was no waiting task, or other tasks are still pending, so we cannot
@@ -1375,28 +1388,20 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
13751388

13761389
switch (readyErrorItem.getStatus()) {
13771390
case ReadyStatus::RawError: {
1378-
auto task = resumeWaitingTaskWithError(
1391+
auto task = prepareWaitingTaskWithError(
13791392
/*complete=*/waitingTask, /*with=*/readyErrorItem.getRawError(this),
13801393
assumed, alreadyDecrementedStatus);
1381-
unlock();
1382-
if (task) {
1383-
// TODO: allow the caller to suggest an executor
1384-
task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
1385-
}
1386-
return;
1394+
unlock(); // we MUST unlock before running the waiting task
1395+
return runWaitingTask(task);
13871396
}
13881397
case ReadyStatus::Error: {
1389-
auto task = resumeWaitingTask(
1398+
auto preparedWaitingTask = prepareWaitingTaskWithTask(
13901399
/*complete=*/waitingTask,
13911400
/*with=*/readyErrorItem.getTask(), assumed,
13921401
/*hadErrorResult=*/true, alreadyDecrementedStatus,
13931402
/*taskWasRetained=*/true);
1394-
unlock();
1395-
if (task) {
1396-
// TODO: allow the caller to suggest an executor
1397-
task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
1398-
}
1399-
return;
1403+
unlock(); // we MUST unlock before running the waiting task
1404+
return runWaitingTask(preparedWaitingTask);
14001405
}
14011406
default: {
14021407
swift_Concurrency_fatalError(
@@ -1407,14 +1412,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
14071412
} else {
14081413
// This is the last task, we have a waiting task and there was no error stored previously;
14091414
// We must resume the waiting task with a success, so let us return here.
1410-
auto task = resumeWaitingTask(/*complete=*/waitingTask, /*with=*/completedTask,
1411-
assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus);
1412-
unlock();
1413-
if (task) {
1414-
// TODO: allow the caller to suggest an executor
1415-
task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
1416-
}
1417-
return;
1415+
auto prepared = prepareWaitingTaskWithTask(
1416+
/*complete=*/waitingTask, /*with=*/completedTask,
1417+
assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus);
1418+
unlock(); // we MUST unlock before running the waiting task
1419+
return runWaitingTask(prepared);
14181420
}
14191421
} else {
14201422
// it wasn't the last pending task, and there is no-one to resume;
@@ -1425,7 +1427,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
14251427
}
14261428

14271429
/// Must be called while holding the TaskGroup lock.
1428-
AsyncTask *TaskGroupBase::resumeWaitingTask(
1430+
TaskGroupBase::PreparedWaitingTask TaskGroupBase::prepareWaitingTaskWithTask(
14291431
AsyncTask *waitingTask,
14301432
AsyncTask *completedTask,
14311433
TaskGroupStatus &assumed,
@@ -1434,23 +1436,8 @@ AsyncTask *TaskGroupBase::resumeWaitingTask(
14341436
bool taskWasRetained) {
14351437
SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume, waitingTask = %p, completedTask = %p, alreadyDecremented:%d, error:%d",
14361438
waitingTask, alreadyDecremented, hadErrorResult, completedTask);
1437-
1438-
// auto waitingTask = waitQueue.load(std::memory_order_acquire);
14391439
assert(waitingTask && "waitingTask must not be null when attempting to resume it");
14401440
assert(assumed.hasWaitingTask());
1441-
// while (true) {
1442-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "resumeWaitingTask, attempt CAS, waiting task = %p, waitQueue.head = %p, error:%d, complete with = %p",
1443-
waitingTask, waitQueue.load(std::memory_order_relaxed), hadErrorResult, completedTask);
1444-
1445-
// ==== a) run waiting task directly -------------------------------------
1446-
// // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!");
1447-
// // We are the "first" completed task to arrive,
1448-
// // and since there is a task waiting we immediately claim and complete it.
1449-
// if (waitQueue.compare_exchange_strong(
1450-
// waitingTask, nullptr,
1451-
// /*success*/ std::memory_order_release,
1452-
// /*failure*/ std::memory_order_acquire)) {
1453-
14541441
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
14551442
// In the task-to-thread model, child tasks are always actually
14561443
// run synchronously on the parent task's thread. For task groups
@@ -1466,8 +1453,7 @@ AsyncTask *TaskGroupBase::resumeWaitingTask(
14661453
// But since it's what we're doing, we basically take the same
14671454
// path as we would if there wasn't a waiter.
14681455
enqueueCompletedTask(completedTask, hadErrorResult);
1469-
return nullptr;
1470-
1456+
return {nullptr};
14711457
#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
14721458
if (!alreadyDecremented) {
14731459
(void) statusCompletePendingReadyWaiting(assumed);
@@ -1500,70 +1486,53 @@ AsyncTask *TaskGroupBase::resumeWaitingTask(
15001486
}
15011487

15021488
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
1503-
// // TODO: allow the caller to suggest an executor
1504-
// waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
1505-
return waitingTask;
1489+
return {waitingTask};
15061490
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
1507-
// } else {
1508-
// SWIFT_TASK_GROUP_DEBUG_LOG(this, "CAS failed, waitingTask = %p, complete with = %p, status = %s",
1509-
// waitingTask, completedTask, statusString().c_str());
1510-
// }
1511-
// }
15121491
}
15131492

15141493
/// Must be called while holding the TaskGroup lock.
1515-
AsyncTask* DiscardingTaskGroup::resumeWaitingTaskWithError(
1516-
AsyncTask *waitingTask,
1517-
SwiftError *error,
1518-
TaskGroupStatus &assumed,
1519-
bool alreadyDecremented) {
1520-
// auto waitingTask = waitQueue.load(std::memory_order_acquire);
1494+
TaskGroupBase::PreparedWaitingTask
1495+
DiscardingTaskGroup::prepareWaitingTaskWithError(AsyncTask *waitingTask,
1496+
SwiftError *error,
1497+
TaskGroupStatus &assumed,
1498+
bool alreadyDecremented) {
15211499
assert(waitingTask && "cannot resume 'null' waiting task!");
1522-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, with error = %p",
1523-
waitingTask, error);
1524-
// while (true) {
1525-
// ==== a) run waiting task directly -------------------------------------
1526-
assert(assumed.hasWaitingTask());
1527-
// // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!");
1528-
// // We are the "first" completed task to arrive,
1529-
// // and since there is a task waiting we immediately claim and complete it.
1530-
// if (waitQueue.compare_exchange_strong(
1531-
// waitingTask, nullptr,
1532-
// /*success*/ std::memory_order_release,
1533-
// /*failure*/ std::memory_order_acquire)) {
1500+
SWIFT_TASK_GROUP_DEBUG_LOG(this,
1501+
"resume waiting task = %p, with error = %p",
1502+
waitingTask, error);
1503+
assert(assumed.hasWaitingTask());
15341504

15351505
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1536-
// In the task-to-thread model, child tasks are always actually
1537-
// run synchronously on the parent task's thread. For task groups
1538-
// specifically, this means that poll() will pick a child task
1539-
// that was added to the group and run it to completion as a
1540-
// subroutine. Therefore, when we enter offer(), we know that
1541-
// the parent task is waiting and we can just return to it.
1542-
1543-
// The task-to-thread logic in poll() currently expects the child
1544-
// task to enqueue itself instead of just filling in the result in
1545-
// the waiting task. This is a little wasteful; there's no reason
1546-
// we can't just have the parent task set itself up as a waiter.
1547-
// But since it's what we're doing, we basically take the same
1548-
// path as we would if there wasn't a waiter.
1549-
_enqueueRawError(this, &readyQueue, error);
1550-
return nullptr;
1551-
1506+
// In the task-to-thread model, child tasks are always actually
1507+
// run synchronously on the parent task's thread. For task groups
1508+
// specifically, this means that poll() will pick a child task
1509+
// that was added to the group and run it to completion as a
1510+
// subroutine. Therefore, when we enter offer(), we know that
1511+
// the parent task is waiting and we can just return to it.
1512+
1513+
// The task-to-thread logic in poll() currently expects the child
1514+
// task to enqueue itself instead of just filling in the result in
1515+
// the waiting task. This is a little wasteful; there's no reason
1516+
// we can't just have the parent task set itself up as a waiter.
1517+
// But since it's what we're doing, we basically take the same
1518+
// path as we would if there wasn't a waiter.
1519+
_enqueueRawError(this, &readyQueue, error);
1520+
return {nullptr};
15521521
#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
1553-
if (alreadyDecremented || statusCompletePendingReadyWaiting(assumed)) {
1554-
// Run the task.
1555-
auto result = PollResult::getError(error);
1522+
if (!alreadyDecremented) {
1523+
statusCompletePendingReadyWaiting(assumed);
1524+
}
15561525

1557-
auto waitingContext =
1558-
static_cast<TaskFutureWaitAsyncContext *>(
1559-
waitingTask->ResumeContext);
1526+
// Run the task.
1527+
auto result = PollResult::getError(error);
15601528

1561-
fillGroupNextResult(waitingContext, result);
1562-
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
1563-
return waitingTask;
1564-
} // TODO: what in the else???
1565-
#endif
1566-
return nullptr;
1529+
auto waitingContext = static_cast<TaskFutureWaitAsyncContext *>(
1530+
waitingTask->ResumeContext);
1531+
1532+
fillGroupNextResult(waitingContext, result);
1533+
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
1534+
return {waitingTask};
1535+
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
15671536
}
15681537

15691538
SWIFT_CC(swiftasync)

0 commit comments

Comments
 (0)