diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index 03f726b65b05e..b9007cebb57c9 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -65,6 +65,7 @@ using namespace swift; #if 0 +#define SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED 1 #define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \ fprintf(stderr, "[%#lx] [%s:%d][group(%p%s)] (%s) " fmt "\n", \ (unsigned long)Thread::current().platformThreadId(), \ @@ -81,6 +82,7 @@ fprintf(stderr, "[%#lx] [%s:%d][group(%p)] (%s) " fmt "\n", \ __FUNCTION__, \ __VA_ARGS__) #else +#define SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED 0 #define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) (void)0 #define SWIFT_TASK_GROUP_DEBUG_LOG_0(group, fmt, ...) (void)0 #endif @@ -287,6 +289,12 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord { } }; + /// Simple wrapper type to ensure we use the right methods to prepare and run a waiting tas. + /// Run it with `runWaitingTask`. + struct PreparedWaitingTask { + AsyncTask *waitingTask; + }; + protected: #if SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY || SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL // Synchronization is simple here. In a single threaded mode, all swift tasks @@ -390,12 +398,20 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord { virtual void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) = 0; /// Resume waiting task with result from `completedTask` - void resumeWaitingTask(AsyncTask *completedTask, + PreparedWaitingTask prepareWaitingTaskWithTask(AsyncTask *waitingTask, + AsyncTask *completedTask, TaskGroupStatus &assumed, bool hadErrorResult, bool alreadyDecremented = false, bool taskWasRetained = false); + // NOTE: In today's implementation we MUST hold the group lock when claiming a task. + AsyncTask *claimWaitingTask(); + + /// Should be the final operation a group locking operation performs e.g. in waitAll or offer. + /// This resumes unlocks the group and resumes the waiting task. + void runWaitingTask(PreparedWaitingTask prepared); + // ==== Status manipulation ------------------------------------------------- TaskGroupStatus statusLoadRelaxed() const; @@ -625,6 +641,34 @@ bool TaskGroupBase::statusCompletePendingReadyWaiting(TaskGroupStatus &old) { /*failure*/ std::memory_order_relaxed); } +AsyncTask *TaskGroupBase::claimWaitingTask() { + assert(statusLoadRelaxed().hasWaitingTask() && + "attempted to claim waiting task but status indicates no waiting " + "task is present!"); + + auto waitingTask = waitQueue.load(std::memory_order_acquire); + if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) { + swift_Concurrency_fatalError(0, "Failed to claim waitingTask!"); + } + return waitingTask; +} +void TaskGroupBase::runWaitingTask(PreparedWaitingTask prepared) { + // The reason we might not have a task here to schedule is if we were running in the + // task-per-thread single threaded mode, which would have executed the task in-line + // and we must not schedule it here anymore. +#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL + assert(prepared.waitingTask == nullptr && + "unexpected task to schedule in TASK_TO_THREAD_MODEL!" + "In this mode we should have run the task in-line, " + "rather than return it for scheduling."); +#endif + if (auto waitingTask = prepared.waitingTask) { + // TODO: allow the caller to suggest an executor + waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + } +} + + bool TaskGroupBase::isCancelled() const { auto old = TaskGroupStatus{status.load(std::memory_order_relaxed)}; return old.isCancelled(); @@ -831,7 +875,8 @@ class DiscardingTaskGroup: public TaskGroupBase { private: /// Resume waiting task with specified error - void resumeWaitingTaskWithError(SwiftError *error, + PreparedWaitingTask prepareWaitingTaskWithError(AsyncTask* waitingTask, + SwiftError *error, TaskGroupStatus &assumed, bool alreadyDecremented); }; @@ -913,7 +958,8 @@ static void swift_taskGroup_initializeWithFlagsImpl(size_t rawGroupFlags, TaskGroup *group, const Metadata *T) { TaskGroupFlags groupFlags(rawGroupFlags); - SWIFT_TASK_GROUP_DEBUG_LOG_0(group, "create group; flags: isDiscardingResults=%d", + SWIFT_TASK_GROUP_DEBUG_LOG_0(group, "create group, from task:%p; flags: isDiscardingResults=%d", + swift_task_getCurrent(), groupFlags.isDiscardResults()); TaskGroupBase *impl; @@ -941,7 +987,7 @@ static void swift_taskGroup_initializeWithFlagsImpl(size_t rawGroupFlags, // ==== child task management -------------------------------------------------- void TaskGroup::addChildTask(AsyncTask *child) { - SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p", child, this); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "attach child task = %p", child); // Add the child task to this task group. The corresponding removal // won't happen until the parent task successfully polls for this child @@ -959,7 +1005,7 @@ void TaskGroup::addChildTask(AsyncTask *child) { } void TaskGroup::removeChildTask(AsyncTask *child) { - SWIFT_TASK_DEBUG_LOG("detach child task = %p from group = %p", child, this); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "detach child task = %p", child); auto groupRecord = asBaseImpl(this)->getTaskRecord(); @@ -979,7 +1025,7 @@ static void swift_taskGroup_destroyImpl(TaskGroup *group) { } void AccumulatingTaskGroup::destroy() { -#if SWIFT_TASK_DEBUG_LOG_ENABLED +#if SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED if (!this->isEmpty()) { auto status = this->statusLoadRelaxed(); SWIFT_TASK_GROUP_DEBUG_LOG(this, "destroy, tasks .ready = %d, .pending = %llu", @@ -988,7 +1034,10 @@ void AccumulatingTaskGroup::destroy() { SWIFT_TASK_DEBUG_LOG("destroying task group = %p", this); } #endif + // Verify using the group's status that indeed we're expected to be empty assert(this->isEmpty() && "Attempted to destroy non-empty task group!"); + // Double check by inspecting the group record, it should contain no children + assert(getTaskRecord()->getFirstChild() == nullptr && "Task group record still has child task!"); // First, remove the group from the task and deallocate the record removeStatusRecordFromSelf(getTaskRecord()); @@ -1002,7 +1051,7 @@ void AccumulatingTaskGroup::destroy() { } void DiscardingTaskGroup::destroy() { -#if SWIFT_TASK_DEBUG_LOG_ENABLED +#if SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED if (!this->isEmpty()) { auto status = this->statusLoadRelaxed(); SWIFT_TASK_GROUP_DEBUG_LOG(this, "destroy, tasks .ready = %d, .pending = %llu", @@ -1011,7 +1060,10 @@ void DiscardingTaskGroup::destroy() { SWIFT_TASK_DEBUG_LOG("destroying discarding task group = %p", this); } #endif + // Verify using the group's status that indeed we're expected to be empty assert(this->isEmpty() && "Attempted to destroy non-empty task group!"); + // Double check by inspecting the group record, it should contain no children + assert(getTaskRecord()->getFirstChild() == nullptr && "Task group record still has child task!"); // First, remove the group from the task and deallocate the record removeStatusRecordFromSelf(getTaskRecord()); @@ -1150,7 +1202,7 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex // This is wasteful, and the task completion function should be fixed to // transfer ownership of a retain into this function, in which case we // will need to release in the other path. - lock(); // TODO: remove fragment lock, and use status for synchronization + lock(); SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p, status:%s", completedTask, @@ -1182,9 +1234,16 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex // ==== a) has waiting task, so let us complete it right away if (assumed.hasWaitingTask()) { - resumeWaitingTask(completedTask, assumed, hadErrorResult); - unlock(); // TODO: remove fragment lock, and use status for synchronization - return; + auto waitingTask = claimWaitingTask(); + auto prepared = prepareWaitingTaskWithTask( + /*complete=*/waitingTask, /*with=*/completedTask, + assumed, hadErrorResult); + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); + return runWaitingTask(prepared); } else { // ==== b) enqueue completion ------------------------------------------------ // @@ -1194,7 +1253,7 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex assert(!waitQueue.load(std::memory_order_relaxed)); enqueueCompletedTask(completedTask, hadErrorResult); - unlock(); // TODO: remove fragment lock, and use status for synchronization + return unlock(); } } @@ -1205,7 +1264,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) assert(completedTask->hasGroupChildFragment()); assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this)); - lock(); // TODO: remove fragment lock, and use status for synchronization + lock(); // Since we don't maintain ready counts in a discarding group, only load the status. TaskGroupStatus assumed = statusLoadAcquire(); @@ -1237,110 +1296,176 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) cancelAll(); if (afterComplete.hasWaitingTask() && afterComplete.pendingTasks(this) == 0) { + // We grab the waiting task while holding the group lock, because this + // allows a single task to get the waiting task and attempt to complete it. + // As another offer gets to run, it will have either a different waiting task, or no waiting task at all. + auto waitingTask = claimWaitingTask(); + // This is the last pending task, and we must resume the waiting task. // - if there already was a previous error stored, we resume using it, // - otherwise, we resume using this current (failed) completedTask ReadyQueueItem readyErrorItem; if (readyQueue.dequeue(readyErrorItem)) { + // Always detach the completed task, we're instead going to use the stored value from the readyQueue + _swift_taskGroup_detachChild(asAbstract(this), completedTask); + switch (readyErrorItem.getStatus()) { - case ReadyStatus::RawError: - SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, resume with raw error:%p", readyErrorItem.getRawError(this)); - resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, - alreadyDecrementedStatus); - break; - case ReadyStatus::Error: - SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, resume with errorItem.task:%p", readyErrorItem.getTask()); - resumeWaitingTask(readyErrorItem.getTask(), assumed, - /*hadErrorResult=*/true, - alreadyDecrementedStatus, - /*taskWasRetained=*/true); - break; - default: - swift_Concurrency_fatalError(0, - "only errors can be stored by a discarding task group, yet it wasn't an error! 1"); + case ReadyStatus::RawError: { + SWIFT_TASK_GROUP_DEBUG_LOG( + this, "offer, complete, resume waitingTask:%p, with raw error:%p", + waitingTask, readyErrorItem.getRawError(this)); + auto prepared = prepareWaitingTaskWithError( + /*complete=*/waitingTask, + /*with=*/readyErrorItem.getRawError(this), assumed, + alreadyDecrementedStatus); + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); + return runWaitingTask(prepared); + } + case ReadyStatus::Error: { + // The completed task failed, but we already stored a different failed + // task. Thus we discard this error and complete with the previously + // stored. + SWIFT_TASK_GROUP_DEBUG_LOG( + this, + "offer, complete waitingTask:%p, discard error completedTask:%p, " + "resume with errorItem.task:%p", + waitingTask, completedTask, readyErrorItem.getTask()); + auto prepared = prepareWaitingTaskWithTask( + /*complete*/ waitingTask, + /*with=*/readyErrorItem.getTask(), assumed, + /*hadErrorResult=*/true, alreadyDecrementedStatus, + /*taskWasRetained=*/true); + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); + return runWaitingTask(prepared); + } + default: { + swift_Concurrency_fatalError( + 0, "only errors can be stored by a discarding task group, yet it " + "wasn't an error! 1"); + } } } else { + // The following MUST be done in the following order: detach, unlock, resume waitingTask. + // because we do not want to allow another task to run and have the potential to lock or even destroy + // the group before we've given up the lock. + _swift_taskGroup_detachChild(asAbstract(this), completedTask); // There was no prior failed task stored, so we should resume the waitingTask with this (failed) completedTask - resumeWaitingTask(completedTask, assumed, hadErrorResult, alreadyDecrementedStatus); + auto prepared = prepareWaitingTaskWithTask(/*complete=*/waitingTask, /*with=*/completedTask, + assumed, hadErrorResult, alreadyDecrementedStatus); + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); + return runWaitingTask(prepared); } } else if (readyQueue.isEmpty()) { // There was no waiting task, or other tasks are still pending, so we cannot // it is the first error we encountered, thus we need to store it for future throwing SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, enqueue child task:%p", completedTask); enqueueCompletedTask(completedTask, hadErrorResult); + return unlock(); } else { SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, discard child task:%p", completedTask); _swift_taskGroup_detachChild(asAbstract(this), completedTask); + return unlock(); } - - unlock(); - return; + swift_unreachable("expected to early return from when handling offer of last task in group"); } assert(!hadErrorResult && "only successfully completed tasks can reach here"); if (afterComplete.hasWaitingTask() && afterComplete.pendingTasks(this) == 0) { + // We grab the waiting task while holding the group lock, because this + // allows a single task to get the waiting task and attempt to complete it. + // As another offer gets to run, it will have either a different waiting task, or no waiting task at all. + auto waitingTask = waitQueue.load(std::memory_order_acquire); + if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) { + swift_Concurrency_fatalError(0, "Failed to claim waitingTask!"); + } + assert(waitingTask && "status claimed to have waitingTask but waitQueue was empty!"); + SWIFT_TASK_GROUP_DEBUG_LOG(this, - "offer, last pending task completed successfully, resume waitingTask with completedTask:%p", - completedTask); + "offer, last pending task completed successfully, resume waitingTask:%p with completedTask:%p", + waitingTask, completedTask); /// If there was an error previously stored, we must resume the waitingTask using that error. ReadyQueueItem readyErrorItem; if (readyQueue.dequeue(readyErrorItem)) { + // Always detach the completed task, we're instead going to use the stored value from the readyQueue _swift_taskGroup_detachChild(asAbstract(this), completedTask); + switch (readyErrorItem.getStatus()) { - case ReadyStatus::RawError: - resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus); - break; - case ReadyStatus::Error: - resumeWaitingTask(readyErrorItem.getTask(), assumed, - /*hadErrorResult=*/true, - alreadyDecrementedStatus, - /*taskWasRetained=*/true); - break; - default: - swift_Concurrency_fatalError(0, - "only errors can be stored by a discarding task group, yet it wasn't an error! 2"); + case ReadyStatus::RawError: { + auto task = prepareWaitingTaskWithError( + /*complete=*/waitingTask, /*with=*/readyErrorItem.getRawError(this), + assumed, alreadyDecrementedStatus); + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); + return runWaitingTask(task); + } + case ReadyStatus::Error: { + auto preparedWaitingTask = prepareWaitingTaskWithTask( + /*complete=*/waitingTask, + /*with=*/readyErrorItem.getTask(), assumed, + /*hadErrorResult=*/true, alreadyDecrementedStatus, + /*taskWasRetained=*/true); + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); + return runWaitingTask(preparedWaitingTask); + } + default: { + swift_Concurrency_fatalError( + 0, "only errors can be stored by a discarding task group, yet it " + "wasn't an error! 2"); + } } } else { // This is the last task, we have a waiting task and there was no error stored previously; // We must resume the waiting task with a success, so let us return here. - resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus); + auto prepared = prepareWaitingTaskWithTask( + /*complete=*/waitingTask, /*with=*/completedTask, + assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus); + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); + return runWaitingTask(prepared); } } else { // it wasn't the last pending task, and there is no-one to resume; // Since this is a successful result, and we're a discarding task group -- always just ignore this task. _swift_taskGroup_detachChild(asAbstract(this), completedTask); + return unlock(); } - - unlock(); - return; } /// Must be called while holding the TaskGroup lock. -void TaskGroupBase::resumeWaitingTask( +TaskGroupBase::PreparedWaitingTask TaskGroupBase::prepareWaitingTaskWithTask( + AsyncTask *waitingTask, AsyncTask *completedTask, TaskGroupStatus &assumed, bool hadErrorResult, bool alreadyDecremented, bool taskWasRetained) { - auto waitingTask = waitQueue.load(std::memory_order_acquire); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume, waitingTask = %p, completedTask = %p, alreadyDecremented:%d, error:%d", + waitingTask, alreadyDecremented, hadErrorResult, completedTask); assert(waitingTask && "waitingTask must not be null when attempting to resume it"); assert(assumed.hasWaitingTask()); - SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, alreadyDecremented:%d, error:%d, complete with = %p", - waitingTask, alreadyDecremented, hadErrorResult, completedTask); - while (true) { - SWIFT_TASK_GROUP_DEBUG_LOG(this, "resumeWaitingTask, attempt CAS, waiting task = %p, waitQueue.head = %p, error:%d, complete with = %p", - waitingTask, waitQueue.load(std::memory_order_relaxed), hadErrorResult, completedTask); - - // ==== a) run waiting task directly ------------------------------------- - // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!"); - // We are the "first" completed task to arrive, - // and since there is a task waiting we immediately claim and complete it. - if (waitQueue.compare_exchange_strong( - waitingTask, nullptr, - /*success*/ std::memory_order_release, - /*failure*/ std::memory_order_acquire)) { - #if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL // In the task-to-thread model, child tasks are always actually // run synchronously on the parent task's thread. For task groups @@ -1356,14 +1481,13 @@ void TaskGroupBase::resumeWaitingTask( // But since it's what we're doing, we basically take the same // path as we would if there wasn't a waiter. enqueueCompletedTask(completedTask, hadErrorResult); - return; - + return {nullptr}; #else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ if (!alreadyDecremented) { (void) statusCompletePendingReadyWaiting(assumed); } - // Run the task. + // Populate the waiting task with value from completedTask. auto result = PollResult::get(completedTask, hadErrorResult); SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting DONE, task = %p, error:%d, complete with = %p, status = %s", @@ -1390,73 +1514,53 @@ void TaskGroupBase::resumeWaitingTask( } _swift_tsan_acquire(static_cast(waitingTask)); - // TODO: allow the caller to suggest an executor - waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - return; + return {waitingTask}; #endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ - } else { - SWIFT_TASK_GROUP_DEBUG_LOG(this, "CAS failed, task = %p, backup = %p, complete with = %p, status = %s", - waitingTask, completedTask, statusString().c_str()); - } - } } /// Must be called while holding the TaskGroup lock. -void DiscardingTaskGroup::resumeWaitingTaskWithError( - SwiftError *error, - TaskGroupStatus &assumed, - bool alreadyDecremented) { - auto waitingTask = waitQueue.load(std::memory_order_acquire); +TaskGroupBase::PreparedWaitingTask +DiscardingTaskGroup::prepareWaitingTaskWithError(AsyncTask *waitingTask, + SwiftError *error, + TaskGroupStatus &assumed, + bool alreadyDecremented) { assert(waitingTask && "cannot resume 'null' waiting task!"); - SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, with error = %p", - waitingTask, error); - while (true) { - // ==== a) run waiting task directly ------------------------------------- - assert(assumed.hasWaitingTask()); - // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!"); - // We are the "first" completed task to arrive, - // and since there is a task waiting we immediately claim and complete it. - if (waitQueue.compare_exchange_strong( - waitingTask, nullptr, - /*success*/ std::memory_order_release, - /*failure*/ std::memory_order_acquire)) { + SWIFT_TASK_GROUP_DEBUG_LOG(this, + "resume waiting task = %p, with error = %p", + waitingTask, error); + assert(assumed.hasWaitingTask()); #if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL - // In the task-to-thread model, child tasks are always actually - // run synchronously on the parent task's thread. For task groups - // specifically, this means that poll() will pick a child task - // that was added to the group and run it to completion as a - // subroutine. Therefore, when we enter offer(), we know that - // the parent task is waiting and we can just return to it. - - // The task-to-thread logic in poll() currently expects the child - // task to enqueue itself instead of just filling in the result in - // the waiting task. This is a little wasteful; there's no reason - // we can't just have the parent task set itself up as a waiter. - // But since it's what we're doing, we basically take the same - // path as we would if there wasn't a waiter. - _enqueueRawError(this, &readyQueue, error); - return; - + // In the task-to-thread model, child tasks are always actually + // run synchronously on the parent task's thread. For task groups + // specifically, this means that poll() will pick a child task + // that was added to the group and run it to completion as a + // subroutine. Therefore, when we enter offer(), we know that + // the parent task is waiting and we can just return to it. + + // The task-to-thread logic in poll() currently expects the child + // task to enqueue itself instead of just filling in the result in + // the waiting task. This is a little wasteful; there's no reason + // we can't just have the parent task set itself up as a waiter. + // But since it's what we're doing, we basically take the same + // path as we would if there wasn't a waiter. + _enqueueRawError(this, &readyQueue, error); + return {nullptr}; #else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ - if (alreadyDecremented || statusCompletePendingReadyWaiting(assumed)) { - // Run the task. - auto result = PollResult::getError(error); + if (!alreadyDecremented) { + statusCompletePendingReadyWaiting(assumed); + } - auto waitingContext = - static_cast( - waitingTask->ResumeContext); + // Run the task. + auto result = PollResult::getError(error); - fillGroupNextResult(waitingContext, result); + auto waitingContext = static_cast( + waitingTask->ResumeContext); - _swift_tsan_acquire(static_cast(waitingTask)); - // TODO: allow the caller to suggest an executor - waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - return; - } // else, try again -#endif - } - } + fillGroupNextResult(waitingContext, result); + _swift_tsan_acquire(static_cast(waitingTask)); + return {waitingTask}; +#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ } SWIFT_CC(swiftasync) @@ -1559,7 +1663,7 @@ static void swift_taskGroup_wait_next_throwingImpl( PollResult AccumulatingTaskGroup::poll(AsyncTask *waitingTask) { SWIFT_TASK_GROUP_DEBUG_LOG(this, "poll, waitingTask:%p", waitingTask); - lock(); // TODO: remove group lock, and use status for synchronization + lock(); assert(isAccumulatingResults() && "attempted to poll TaskGroup in discard-results mode!"); @@ -1589,7 +1693,7 @@ reevaluate_if_taskgroup_has_results:; statusRemoveWaitingRelease(); result.status = PollStatus::Empty; result.successType = this->successType; - unlock(); // TODO: remove group lock, and use status for synchronization + unlock(); return result; } @@ -1639,7 +1743,7 @@ reevaluate_if_taskgroup_has_results:; result.retainedTask = item.getTask(); assert(result.retainedTask && "polled a task, it must be not null"); _swift_tsan_acquire(static_cast(result.retainedTask)); - unlock(); // TODO: remove fragment lock, and use status for synchronization + unlock(); return result; case ReadyStatus::Error: @@ -1651,7 +1755,7 @@ reevaluate_if_taskgroup_has_results:; result.retainedTask = item.getTask(); assert(result.retainedTask && "polled a task, it must be not null"); _swift_tsan_acquire(static_cast(result.retainedTask)); - unlock(); // TODO: remove fragment lock, and use status for synchronization + unlock(); return result; case ReadyStatus::Empty: @@ -1659,7 +1763,7 @@ reevaluate_if_taskgroup_has_results:; result.storage = nullptr; result.retainedTask = nullptr; result.successType = this->successType; - unlock(); // TODO: remove fragment lock, and use status for synchronization + unlock(); return result; case ReadyStatus::RawError: @@ -1683,7 +1787,11 @@ reevaluate_if_taskgroup_has_results:; waitHead, waitingTask, /*success*/ std::memory_order_release, /*failure*/ std::memory_order_acquire)) { - unlock(); // TODO: remove fragment lock, and use status for synchronization + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); #if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL // The logic here is paired with the logic in TaskGroupBase::offer. Once // we run the diff --git a/stdlib/public/Concurrency/TaskPrivate.h b/stdlib/public/Concurrency/TaskPrivate.h index 414bf3dc75c11..f39b4f3816a89 100644 --- a/stdlib/public/Concurrency/TaskPrivate.h +++ b/stdlib/public/Concurrency/TaskPrivate.h @@ -781,8 +781,10 @@ struct AsyncTask::PrivateStorage { // Destroy the opaque storage of the task void destroy() { +#ifndef NDEBUG auto oldStatus = _status().load(std::memory_order_relaxed); assert(oldStatus.isComplete()); +#endif this->~PrivateStorage(); } diff --git a/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift b/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift index a2961f5da8394..a5b3568752b02 100644 --- a/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift +++ b/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift @@ -1,12 +1,11 @@ // RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s --dump-input=always + // REQUIRES: executable_test // REQUIRES: concurrency // REQUIRES: concurrency_runtime // UNSUPPORTED: back_deployment_runtime // UNSUPPORTED: OS=linux-gnu -// REQUIRES: rdar86028226 - struct Boom: Error {} func boom() async throws -> Int { @@ -17,7 +16,7 @@ func boom() async throws -> Int { func test_taskGroup_next() async { let sum = await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in for n in 1...10 { - group.spawn { + group.addTask { return n.isMultiple(of: 3) ? try await boom() : n } } @@ -50,7 +49,7 @@ func test_taskGroup_next() async { func test_taskGroup_for_in() async { let sum = await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in for n in 1...10 { - group.spawn { + group.addTask { return n.isMultiple(of: 3) ? try await boom() : n } } @@ -81,7 +80,7 @@ func test_taskGroup_for_in() async { func test_taskGroup_asyncIterator() async { let sum = await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in for n in 1...10 { - group.spawn { + group.addTask { return n.isMultiple(of: 3) ? try await boom() : n } } @@ -119,7 +118,7 @@ func test_taskGroup_asyncIterator() async { func test_taskGroup_contains() async { let sum = await withTaskGroup(of: Int.self, returning: Int.self) { group in for n in 1...4 { - group.spawn { + group.addTask { return n } } @@ -128,7 +127,7 @@ func test_taskGroup_contains() async { print("three = \(three)") // CHECK: three = true for n in 5...7 { - group.spawn { + group.addTask { return n } } diff --git a/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift b/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift index 253f8934669d2..2adac05c32c5b 100644 --- a/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift +++ b/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift @@ -1,9 +1,6 @@ // RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s --dump-input=always // TODO: move to target-run-simple-leaks-swift once CI is using at least Xcode 14.3 -// rdar://109998145 - Temporarily disable this test -// REQUIRES: rdar109998145 - // REQUIRES: concurrency // REQUIRES: executable_test // REQUIRES: concurrency_runtime @@ -16,6 +13,37 @@ import _Concurrency +actor SimpleCountDownLatch { + let from: Int + var count: Int + + var continuation: CheckedContinuation? + + init(from: Int) { + self.from = from + self.count = from + } + + func hit() { + defer { count -= 1 } + if count == 0 { + fatalError("Counted down more times than expected! (From: \(from))") + } else if count == 1 { + continuation?.resume() + } + } + + func wait() async { + guard self.count > 0 else { + return // we're done + } + + return await withCheckedContinuation { cc in + self.continuation = cc + } + } +} + final class PrintDeinit { let id: String init(id: String) { @@ -60,96 +88,145 @@ final class SomeClass: @unchecked Sendable { // NOTE: Not as StdlibUnittest/TestSuite since these types of tests are unreasonably slow to load/debug. -@main struct Main { - static func main() async { - _ = try? await withThrowingDiscardingTaskGroup() { group in - group.addTask { - throw Boom(id: "race-boom-class") - } +func testTwo() async { + let latch = SimpleCountDownLatch(from: 2) + + _ = try? await withThrowingDiscardingTaskGroup() { group in + group.addTask { + await latch.hit() + throw Boom(id: "race-boom") + } + group.addTask { + await latch.hit() + SomeClass(id: "race-boom-class") // will be discarded + } + + return 12 + } + + // since values may deinit in any order, we just assert their count basically + // CHECK-DAG: deinit, id: race-boom + // CHECK-DAG: deinit, id: race-boom + await latch.wait() + try? await Task.sleep(for: .milliseconds(300)) + + print("done") // CHECK: done +} + +func manyOk() async { + let latch = SimpleCountDownLatch(from: 6) + + _ = try? await withThrowingDiscardingTaskGroup() { group in + for i in 0..<6 { group.addTask { - SomeClass(id: "race-boom-class") // will be discarded + await latch.hit() + _ = SomeClass(id: "many-ok") // will be discarded } - // since values may deinit in any order, we just assert their count basically - // CHECK-DAG: deinit, id: race-boom-class - // CHECK-DAG: deinit, id: race-boom-class - - return 12 } - // many ok - _ = try? await withThrowingDiscardingTaskGroup() { group in + return 12 + } + // since values may deinit in any order, we just assert their count basically + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + + await latch.wait() + try? await Task.sleep(for: .milliseconds(300)) + + print("done") // CHECK: done +} + +func manyThrows() async { + let latch = SimpleCountDownLatch(from: 6) + + do { + let value: Void = try await withThrowingDiscardingTaskGroup() { group in for i in 0..<6 { group.addTask { - SomeClass(id: "many-ok") // will be discarded + await latch.hit() + throw BoomClass(id: "many-error") // will be rethrown } - // since values may deinit in any order, we just assert their count basically - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok } - return 12 + // since values may deinit in any order, we just assert their count basically + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + + 12 // must be ignored } + preconditionFailure("Should throw") + } catch { + precondition("\(error)" == "main.BoomClass", "error was: \(error)") + } - // many throws - do { - let value = try await withThrowingDiscardingTaskGroup() { group in - for i in 0..<6 { - group.addTask { - throw BoomClass(id: "many-error") // will be rethrown - } - } + await latch.wait() + try? await Task.sleep(for: .milliseconds(300)) - // since values may deinit in any order, we just assert their count basically - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error + print("done") // CHECK: done +} - 12 // must be ignored - } - preconditionFailure("Should throw") - } catch { - precondition("\(error)" == "main.BoomClass", "error was: \(error)") +func manyValuesThrows() async { + let latch = SimpleCountDownLatch(from: 6) + + // many errors, many values + _ = try? await withThrowingDiscardingTaskGroup() { group in + group.addTask { + await latch.hit() + _ = SomeClass(id: "mixed-ok") // will be discarded + } + group.addTask { + await latch.hit() + _ = SomeClass(id: "mixed-ok") // will be discarded + } + group.addTask { + await latch.hit() + _ = SomeClass(id: "mixed-ok") // will be discarded + } + group.addTask { + await latch.hit() + throw Boom(id: "mixed-error") + } + group.addTask { + await latch.hit() + throw Boom(id: "mixed-error") + } + group.addTask { + await latch.hit() + throw Boom(id: "mixed-error") } - // many errors, many values - _ = try? await withThrowingDiscardingTaskGroup() { group in - group.addTask { - SomeClass(id: "mixed-ok") // will be discarded - } - group.addTask { - SomeClass(id: "mixed-ok") // will be discarded - } - group.addTask { - SomeClass(id: "mixed-ok") // will be discarded - } - group.addTask { - throw Boom(id: "mixed-error") - } - group.addTask { - throw Boom(id: "mixed-error") - } - group.addTask { - throw Boom(id: "mixed-error") - } + return 12 + } - // since values may deinit in any order, we just assert their count basically - // three ok's - // CHECK-DAG: deinit, id: mixed - // CHECK-DAG: deinit, id: mixed - // CHECK-DAG: deinit, id: mixed - // three errors - // CHECK-DAG: deinit, id: mixed - // CHECK-DAG: deinit, id: mixed - // CHECK-DAG: deinit, id: mixed - - return 12 - } + // since values may deinit in any order, we just assert their count basically + // three ok's + // CHECK-DAG: deinit, id: mixed + // CHECK-DAG: deinit, id: mixed + // CHECK-DAG: deinit, id: mixed + // three errors + // CHECK-DAG: deinit, id: mixed + // CHECK-DAG: deinit, id: mixed + // CHECK-DAG: deinit, id: mixed + + await latch.wait() + try? await Task.sleep(for: .milliseconds(300)) + + print("done") // CHECK: done +} + +@main struct Main { + static func main() async { + await testTwo() + await manyOk() + await manyThrows() + await manyValuesThrows() } } diff --git a/test/Concurrency/Runtime/async_taskgroup_next_not_invoked_cancelAll.swift b/test/Concurrency/Runtime/async_taskgroup_next_not_invoked_cancelAll.swift index 367098715243b..b2166dd0d7ab3 100644 --- a/test/Concurrency/Runtime/async_taskgroup_next_not_invoked_cancelAll.swift +++ b/test/Concurrency/Runtime/async_taskgroup_next_not_invoked_cancelAll.swift @@ -14,14 +14,14 @@ import Dispatch func test_skipCallingNext_butInvokeCancelAll() async { let numbers = [1, 1] - let result = try! await withTaskGroup(of: Int.self) { (group) async -> Int in + let result = await withTaskGroup(of: Int.self) { (group) async -> Int in for n in numbers { - print("group.spawn { \(n) }") - group.spawn { [group] () async -> Int in + print("group.addTask { \(n) }") + group.addTask { [group] () async -> Int in await Task.sleep(1_000_000_000) - print(" inside group.spawn { \(n) }") - print(" inside group.spawn { \(n) } (group cancelled: \(group.isCancelled))") - print(" inside group.spawn { \(n) } (group child task cancelled: \(Task.isCancelled))") + print(" inside group.addTask { \(n) }") + print(" inside group.addTask { \(n) } (group cancelled: \(group.isCancelled))") + print(" inside group.addTask { \(n) } (group child task cancelled: \(Task.isCancelled))") return n } } @@ -34,13 +34,13 @@ func test_skipCallingNext_butInvokeCancelAll() async { return 0 } - // CHECK: group.spawn { 1 } + // CHECK: group.addTask { 1 } // // CHECK: return immediately 0 (group cancelled: true) // CHECK: return immediately 0 (task cancelled: false) // - // CHECK: inside group.spawn { 1 } (group cancelled: true) - // CHECK: inside group.spawn { 1 } (group child task cancelled: true) + // CHECK: inside group.addTask { 1 } (group cancelled: true) + // CHECK: inside group.addTask { 1 } (group child task cancelled: true) // CHECK: result: 0 print("result: \(result)") diff --git a/test/Concurrency/Runtime/async_taskgroup_next_on_pending.swift b/test/Concurrency/Runtime/async_taskgroup_next_on_pending.swift index f4a2b9e24ac22..08a802776d98e 100644 --- a/test/Concurrency/Runtime/async_taskgroup_next_on_pending.swift +++ b/test/Concurrency/Runtime/async_taskgroup_next_on_pending.swift @@ -4,11 +4,8 @@ // REQUIRES: concurrency // REQUIRES: libdispatch -// rdar://76038845 // REQUIRES: concurrency_runtime -// REQUIRES: rdar75096485 - import Dispatch func completeSlowly(n: Int) async -> Int { diff --git a/test/Sanitizers/tsan/async_taskgroup_next.swift b/test/Sanitizers/tsan/async_taskgroup_next.swift index 3be948cfc88c1..5d1999f7b2814 100644 --- a/test/Sanitizers/tsan/async_taskgroup_next.swift +++ b/test/Sanitizers/tsan/async_taskgroup_next.swift @@ -1,8 +1,5 @@ // RUN: %target-run-simple-swift( %import-libdispatch -parse-as-library -sanitize=thread) -// Segfaulted in CI on TSan bot. rdar://78264164 -// REQUIRES: rdar78264164 - // REQUIRES: executable_test // REQUIRES: concurrency // REQUIRES: libdispatch