From 0116b0953ff51054b97397914fca49871c221b58 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 8 Aug 2023 07:38:44 +0900 Subject: [PATCH 01/13] reenable async_taskgroup_discarding_dontLeak.swift --- .../async_taskgroup_discarding_dontLeak.swift | 226 ++++++++++++------ 1 file changed, 148 insertions(+), 78 deletions(-) diff --git a/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift b/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift index 253f8934669d2..93d0e9844f5e1 100644 --- a/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift +++ b/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift @@ -1,9 +1,6 @@ // RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s --dump-input=always // TODO: move to target-run-simple-leaks-swift once CI is using at least Xcode 14.3 -// rdar://109998145 - Temporarily disable this test -// REQUIRES: rdar109998145 - // REQUIRES: concurrency // REQUIRES: executable_test // REQUIRES: concurrency_runtime @@ -16,6 +13,37 @@ import _Concurrency +actor SimpleCountDownLatch { + let from: Int + var count: Int + + var continuation: CheckedContinuation? + + init(from: Int) { + self.from = from + self.count = from + } + + func hit() { + defer { count -= 1 } + if count == 0 { + fatalError("Counted down more times than expected! (From: \(from))") + } else if count == 1 { + continuation?.resume() + } + } + + func wait() async { + guard self.count > 0 else { + return // we're done + } + + return await withCheckedContinuation { cc in + self.continuation = cc + } + } +} + final class PrintDeinit { let id: String init(id: String) { @@ -60,96 +88,138 @@ final class SomeClass: @unchecked Sendable { // NOTE: Not as StdlibUnittest/TestSuite since these types of tests are unreasonably slow to load/debug. -@main struct Main { - static func main() async { - _ = try? await withThrowingDiscardingTaskGroup() { group in - group.addTask { - throw Boom(id: "race-boom-class") - } +func testTwo() async { + let latch = SimpleCountDownLatch(from: 2) + + _ = try? await withThrowingDiscardingTaskGroup() { group in + group.addTask { + await latch.hit() + throw Boom(id: "race-boom") + } + group.addTask { + await latch.hit() + SomeClass(id: "race-boom-class") // will be discarded + } + + return 12 + } + + // since values may deinit in any order, we just assert their count basically + // CHECK-DAG: deinit, id: race-boom + // CHECK-DAG: deinit, id: race-boom + await latch.wait() + print("done") // CHECK: done +} + +func manyOk() async { + let latch = SimpleCountDownLatch(from: 6) + + _ = try? await withThrowingDiscardingTaskGroup() { group in + for i in 0..<6 { group.addTask { - SomeClass(id: "race-boom-class") // will be discarded + await latch.hit() + _ = SomeClass(id: "many-ok") // will be discarded } - // since values may deinit in any order, we just assert their count basically - // CHECK-DAG: deinit, id: race-boom-class - // CHECK-DAG: deinit, id: race-boom-class - - return 12 } - // many ok - _ = try? await withThrowingDiscardingTaskGroup() { group in + return 12 + } + // since values may deinit in any order, we just assert their count basically + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + // CHECK-DAG: deinit, id: many-ok + + await latch.wait() + print("done") // CHECK: done +} + +func manyThrows() async { + let latch = SimpleCountDownLatch(from: 6) + + do { + let value: Void = try await withThrowingDiscardingTaskGroup() { group in for i in 0..<6 { group.addTask { - SomeClass(id: "many-ok") // will be discarded + await latch.hit() + throw BoomClass(id: "many-error") // will be rethrown } - // since values may deinit in any order, we just assert their count basically - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok - // CHECK-DAG: deinit, id: many-ok } - return 12 + // since values may deinit in any order, we just assert their count basically + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + // CHECK-DAG: deinit, id: many-error + + 12 // must be ignored } + preconditionFailure("Should throw") + } catch { + precondition("\(error)" == "main.BoomClass", "error was: \(error)") + } - // many throws - do { - let value = try await withThrowingDiscardingTaskGroup() { group in - for i in 0..<6 { - group.addTask { - throw BoomClass(id: "many-error") // will be rethrown - } - } + await latch.wait() + print("done") // CHECK: done +} - // since values may deinit in any order, we just assert their count basically - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error - // CHECK-DAG: deinit, id: many-error +func manyValuesThrows() async { + let latch = SimpleCountDownLatch(from: 6) - 12 // must be ignored - } - preconditionFailure("Should throw") - } catch { - precondition("\(error)" == "main.BoomClass", "error was: \(error)") + // many errors, many values + _ = try? await withThrowingDiscardingTaskGroup() { group in + group.addTask { + await latch.hit() + _ = SomeClass(id: "mixed-ok") // will be discarded + } + group.addTask { + await latch.hit() + _ = SomeClass(id: "mixed-ok") // will be discarded + } + group.addTask { + await latch.hit() + _ = SomeClass(id: "mixed-ok") // will be discarded + } + group.addTask { + await latch.hit() + throw Boom(id: "mixed-error") + } + group.addTask { + await latch.hit() + throw Boom(id: "mixed-error") + } + group.addTask { + await latch.hit() + throw Boom(id: "mixed-error") } - // many errors, many values - _ = try? await withThrowingDiscardingTaskGroup() { group in - group.addTask { - SomeClass(id: "mixed-ok") // will be discarded - } - group.addTask { - SomeClass(id: "mixed-ok") // will be discarded - } - group.addTask { - SomeClass(id: "mixed-ok") // will be discarded - } - group.addTask { - throw Boom(id: "mixed-error") - } - group.addTask { - throw Boom(id: "mixed-error") - } - group.addTask { - throw Boom(id: "mixed-error") - } - // since values may deinit in any order, we just assert their count basically - // three ok's - // CHECK-DAG: deinit, id: mixed - // CHECK-DAG: deinit, id: mixed - // CHECK-DAG: deinit, id: mixed - // three errors - // CHECK-DAG: deinit, id: mixed - // CHECK-DAG: deinit, id: mixed - // CHECK-DAG: deinit, id: mixed - - return 12 - } + return 12 + } + + // since values may deinit in any order, we just assert their count basically + // three ok's + // CHECK-DAG: deinit, id: mixed + // CHECK-DAG: deinit, id: mixed + // CHECK-DAG: deinit, id: mixed + // three errors + // CHECK-DAG: deinit, id: mixed + // CHECK-DAG: deinit, id: mixed + // CHECK-DAG: deinit, id: mixed + + await latch.wait() + print("done") // CHECK: done +} + +@main struct Main { + static func main() async { + await testTwo() + await manyOk() + await manyThrows() + await manyValuesThrows() } } From 9d9f8cb143b247ec6f1b97f84320782533329cf7 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Fri, 28 Jul 2023 14:22:59 +0900 Subject: [PATCH 02/13] [DiscardingTaskGroup] Properly detach when LAST task is failed, and prior failure was already stored --- stdlib/public/Concurrency/TaskGroup.cpp | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index 03f726b65b05e..95412c25f2c09 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -65,6 +65,7 @@ using namespace swift; #if 0 +#define SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED 1 #define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \ fprintf(stderr, "[%#lx] [%s:%d][group(%p%s)] (%s) " fmt "\n", \ (unsigned long)Thread::current().platformThreadId(), \ @@ -81,6 +82,7 @@ fprintf(stderr, "[%#lx] [%s:%d][group(%p)] (%s) " fmt "\n", \ __FUNCTION__, \ __VA_ARGS__) #else +#define SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED 0 #define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) (void)0 #define SWIFT_TASK_GROUP_DEBUG_LOG_0(group, fmt, ...) (void)0 #endif @@ -941,7 +943,7 @@ static void swift_taskGroup_initializeWithFlagsImpl(size_t rawGroupFlags, // ==== child task management -------------------------------------------------- void TaskGroup::addChildTask(AsyncTask *child) { - SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p", child, this); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "attach child task = %p", child); // Add the child task to this task group. The corresponding removal // won't happen until the parent task successfully polls for this child @@ -959,7 +961,7 @@ void TaskGroup::addChildTask(AsyncTask *child) { } void TaskGroup::removeChildTask(AsyncTask *child) { - SWIFT_TASK_DEBUG_LOG("detach child task = %p from group = %p", child, this); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "detach child task = %p", child); auto groupRecord = asBaseImpl(this)->getTaskRecord(); @@ -979,7 +981,7 @@ static void swift_taskGroup_destroyImpl(TaskGroup *group) { } void AccumulatingTaskGroup::destroy() { -#if SWIFT_TASK_DEBUG_LOG_ENABLED +#if SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED if (!this->isEmpty()) { auto status = this->statusLoadRelaxed(); SWIFT_TASK_GROUP_DEBUG_LOG(this, "destroy, tasks .ready = %d, .pending = %llu", @@ -988,7 +990,10 @@ void AccumulatingTaskGroup::destroy() { SWIFT_TASK_DEBUG_LOG("destroying task group = %p", this); } #endif + // Verify using the group's status that indeed we're expected to be empty assert(this->isEmpty() && "Attempted to destroy non-empty task group!"); + // Double check by inspecting the group record, it should contain no children + assert(getTaskRecord()->getFirstChild() == nullptr && "Task group record still has child task!"); // First, remove the group from the task and deallocate the record removeStatusRecordFromSelf(getTaskRecord()); @@ -1002,7 +1007,7 @@ void AccumulatingTaskGroup::destroy() { } void DiscardingTaskGroup::destroy() { -#if SWIFT_TASK_DEBUG_LOG_ENABLED +#if SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED if (!this->isEmpty()) { auto status = this->statusLoadRelaxed(); SWIFT_TASK_GROUP_DEBUG_LOG(this, "destroy, tasks .ready = %d, .pending = %llu", @@ -1011,7 +1016,10 @@ void DiscardingTaskGroup::destroy() { SWIFT_TASK_DEBUG_LOG("destroying discarding task group = %p", this); } #endif + // Verify using the group's status that indeed we're expected to be empty assert(this->isEmpty() && "Attempted to destroy non-empty task group!"); + // Double check by inspecting the group record, it should contain no children + assert(getTaskRecord()->getFirstChild() == nullptr && "Task group record still has child task!"); // First, remove the group from the task and deallocate the record removeStatusRecordFromSelf(getTaskRecord()); @@ -1249,7 +1257,12 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) alreadyDecrementedStatus); break; case ReadyStatus::Error: - SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, resume with errorItem.task:%p", readyErrorItem.getTask()); + // The completed task failed, but we already stored a different failed task. + // Thus we discard this error and complete with the previously stored. + SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, discard error completedTask %p, resume with errorItem.task:%p", + completedTask, + readyErrorItem.getTask()); + _swift_taskGroup_detachChild(asAbstract(this), completedTask); resumeWaitingTask(readyErrorItem.getTask(), assumed, /*hadErrorResult=*/true, alreadyDecrementedStatus, From b135ecd8f6e163f9671da1fdc06a70a962d9bc98 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 8 Aug 2023 15:14:21 +0900 Subject: [PATCH 03/13] [TaskGroup] Must detach discarded task, THEN unlock before resume waiting --- stdlib/public/Concurrency/TaskGroup.cpp | 62 ++++++++++++++++--------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index 95412c25f2c09..38de14b2a8d29 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -1158,7 +1158,7 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex // This is wasteful, and the task completion function should be fixed to // transfer ownership of a retain into this function, in which case we // will need to release in the other path. - lock(); // TODO: remove fragment lock, and use status for synchronization + lock(); SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p, status:%s", completedTask, @@ -1190,9 +1190,9 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex // ==== a) has waiting task, so let us complete it right away if (assumed.hasWaitingTask()) { - resumeWaitingTask(completedTask, assumed, hadErrorResult); - unlock(); // TODO: remove fragment lock, and use status for synchronization - return; + // Must unlock before we resume the waiting task + unlock(); + return resumeWaitingTask(completedTask, assumed, hadErrorResult); } else { // ==== b) enqueue completion ------------------------------------------------ // @@ -1202,7 +1202,7 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex assert(!waitQueue.load(std::memory_order_relaxed)); enqueueCompletedTask(completedTask, hadErrorResult); - unlock(); // TODO: remove fragment lock, and use status for synchronization + return unlock(); } } @@ -1253,41 +1253,54 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) switch (readyErrorItem.getStatus()) { case ReadyStatus::RawError: SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, resume with raw error:%p", readyErrorItem.getRawError(this)); - resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, + // The following MUST be done in the following order: detach, unlock, resume waitingTask. + // because we do not want to allow another task to run and have the potential to lock or even destroy + // the group before we've given up the lock. + _swift_taskGroup_detachChild(asAbstract(this), completedTask); + unlock(); + return resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus); - break; case ReadyStatus::Error: // The completed task failed, but we already stored a different failed task. // Thus we discard this error and complete with the previously stored. SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, discard error completedTask %p, resume with errorItem.task:%p", completedTask, readyErrorItem.getTask()); + // The following MUST be done in the following order: detach, unlock, resume waitingTask. + // because we do not want to allow another task to run and have the potential to lock or even destroy + // the group before we've given up the lock. _swift_taskGroup_detachChild(asAbstract(this), completedTask); - resumeWaitingTask(readyErrorItem.getTask(), assumed, + unlock(); + return resumeWaitingTask(readyErrorItem.getTask(), assumed, /*hadErrorResult=*/true, alreadyDecrementedStatus, /*taskWasRetained=*/true); - break; default: swift_Concurrency_fatalError(0, "only errors can be stored by a discarding task group, yet it wasn't an error! 1"); } } else { + // The following MUST be done in the following order: detach, unlock, resume waitingTask. + // because we do not want to allow another task to run and have the potential to lock or even destroy + // the group before we've given up the lock. + _swift_taskGroup_detachChild(asAbstract(this), completedTask); + unlock(); // There was no prior failed task stored, so we should resume the waitingTask with this (failed) completedTask - resumeWaitingTask(completedTask, assumed, hadErrorResult, alreadyDecrementedStatus); + return resumeWaitingTask(completedTask, assumed, hadErrorResult, alreadyDecrementedStatus); } } else if (readyQueue.isEmpty()) { // There was no waiting task, or other tasks are still pending, so we cannot // it is the first error we encountered, thus we need to store it for future throwing SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, enqueue child task:%p", completedTask); enqueueCompletedTask(completedTask, hadErrorResult); + return unlock(); } else { SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, discard child task:%p", completedTask); _swift_taskGroup_detachChild(asAbstract(this), completedTask); + return unlock(); } - - unlock(); - return; + swift_Concurrency_fatalError(0, "expected to early return from when " + "handling offer of last task in group"); } assert(!hadErrorResult && "only successfully completed tasks can reach here"); @@ -1302,31 +1315,38 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) _swift_taskGroup_detachChild(asAbstract(this), completedTask); switch (readyErrorItem.getStatus()) { case ReadyStatus::RawError: - resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus); - break; + // The following MUST be done in the following order: detach, unlock, resume waitingTask. + // because we do not want to allow another task to run and have the potential to lock or even destroy + // the group before we've given up the lock. + _swift_taskGroup_detachChild(asAbstract(this), completedTask); + unlock(); + return resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus); case ReadyStatus::Error: - resumeWaitingTask(readyErrorItem.getTask(), assumed, + // The following MUST be done in the following order: detach, unlock, resume waitingTask. + // because we do not want to allow another task to run and have the potential to lock or even destroy + // the group before we've given up the lock. + _swift_taskGroup_detachChild(asAbstract(this), completedTask); + unlock(); + return resumeWaitingTask(readyErrorItem.getTask(), assumed, /*hadErrorResult=*/true, alreadyDecrementedStatus, /*taskWasRetained=*/true); - break; default: swift_Concurrency_fatalError(0, "only errors can be stored by a discarding task group, yet it wasn't an error! 2"); } } else { + unlock(); // This is the last task, we have a waiting task and there was no error stored previously; // We must resume the waiting task with a success, so let us return here. - resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus); + return resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus); } } else { // it wasn't the last pending task, and there is no-one to resume; // Since this is a successful result, and we're a discarding task group -- always just ignore this task. _swift_taskGroup_detachChild(asAbstract(this), completedTask); + return unlock(); } - - unlock(); - return; } /// Must be called while holding the TaskGroup lock. From 61d783c5b3bf27349c95bad78d61e5846edbbb89 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 8 Aug 2023 17:34:43 +0900 Subject: [PATCH 04/13] revamping locking scheme, test this a bunch --- stdlib/public/Concurrency/TaskGroup.cpp | 281 +++++++++++------- stdlib/public/runtime/Backtrace.cpp | 2 +- ...taskgroup_next_not_invoked_cancelAll.swift | 18 +- 3 files changed, 190 insertions(+), 111 deletions(-) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index 38de14b2a8d29..4fffb79aa190d 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -64,7 +64,7 @@ using namespace swift; -#if 0 +#if 1 #define SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED 1 #define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \ fprintf(stderr, "[%#lx] [%s:%d][group(%p%s)] (%s) " fmt "\n", \ @@ -392,7 +392,8 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord { virtual void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) = 0; /// Resume waiting task with result from `completedTask` - void resumeWaitingTask(AsyncTask *completedTask, + AsyncTask *resumeWaitingTask(AsyncTask *waitingTask, + AsyncTask *completedTask, TaskGroupStatus &assumed, bool hadErrorResult, bool alreadyDecremented = false, @@ -833,7 +834,8 @@ class DiscardingTaskGroup: public TaskGroupBase { private: /// Resume waiting task with specified error - void resumeWaitingTaskWithError(SwiftError *error, + AsyncTask *resumeWaitingTaskWithError(AsyncTask* waitingTask, + SwiftError *error, TaskGroupStatus &assumed, bool alreadyDecremented); }; @@ -915,7 +917,8 @@ static void swift_taskGroup_initializeWithFlagsImpl(size_t rawGroupFlags, TaskGroup *group, const Metadata *T) { TaskGroupFlags groupFlags(rawGroupFlags); - SWIFT_TASK_GROUP_DEBUG_LOG_0(group, "create group; flags: isDiscardingResults=%d", + SWIFT_TASK_GROUP_DEBUG_LOG_0(group, "create group, from task:%p; flags: isDiscardingResults=%d", + swift_task_getCurrent(), groupFlags.isDiscardResults()); TaskGroupBase *impl; @@ -1190,9 +1193,22 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex // ==== a) has waiting task, so let us complete it right away if (assumed.hasWaitingTask()) { + auto waitingTask = waitQueue.load(std::memory_order_acquire); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, loaded waiting task:%p", waitingTask); + if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) { + SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, FAILED CAS loaded waiting task:%p", waitingTask); + swift_Concurrency_fatalError(0, "Failed to claim waitingTask!"); + } + + auto task = resumeWaitingTask(/*complete=*/waitingTask, /*with=*/completedTask, + assumed, hadErrorResult); // Must unlock before we resume the waiting task unlock(); - return resumeWaitingTask(completedTask, assumed, hadErrorResult); + + if (task) { + task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + } + return; } else { // ==== b) enqueue completion ------------------------------------------------ // @@ -1213,7 +1229,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) assert(completedTask->hasGroupChildFragment()); assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this)); - lock(); // TODO: remove fragment lock, and use status for synchronization + lock(); // Since we don't maintain ready counts in a discarding group, only load the status. TaskGroupStatus assumed = statusLoadAcquire(); @@ -1245,48 +1261,82 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) cancelAll(); if (afterComplete.hasWaitingTask() && afterComplete.pendingTasks(this) == 0) { + // We grab the waiting task while holding the group lock, because this + // allows a single task to get the waiting task and attempt to complete it. + // As another offer gets to run, it will have either a different waiting task, or no waiting task at all. + auto waitingTask = waitQueue.load(std::memory_order_acquire); // FIXME: consume the task here!!!!! + SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, loaded waiting task:%p", waitingTask); + if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) { + SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, FAILED CAS loaded waiting task:%p", waitingTask); + swift_Concurrency_fatalError(0, "Failed to claim waitingTask!"); + } + assert(waitingTask && "status claimed to have waitingTask but waitQueue was empty!"); + // This is the last pending task, and we must resume the waiting task. // - if there already was a previous error stored, we resume using it, // - otherwise, we resume using this current (failed) completedTask ReadyQueueItem readyErrorItem; if (readyQueue.dequeue(readyErrorItem)) { + // Always detach the completed task, we're instead going to use the stored value from the readyQueue + _swift_taskGroup_detachChild(asAbstract(this), completedTask); + switch (readyErrorItem.getStatus()) { - case ReadyStatus::RawError: - SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, resume with raw error:%p", readyErrorItem.getRawError(this)); - // The following MUST be done in the following order: detach, unlock, resume waitingTask. - // because we do not want to allow another task to run and have the potential to lock or even destroy - // the group before we've given up the lock. - _swift_taskGroup_detachChild(asAbstract(this), completedTask); - unlock(); - return resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, - alreadyDecrementedStatus); - case ReadyStatus::Error: - // The completed task failed, but we already stored a different failed task. - // Thus we discard this error and complete with the previously stored. - SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, discard error completedTask %p, resume with errorItem.task:%p", - completedTask, - readyErrorItem.getTask()); - // The following MUST be done in the following order: detach, unlock, resume waitingTask. - // because we do not want to allow another task to run and have the potential to lock or even destroy - // the group before we've given up the lock. - _swift_taskGroup_detachChild(asAbstract(this), completedTask); - unlock(); - return resumeWaitingTask(readyErrorItem.getTask(), assumed, - /*hadErrorResult=*/true, - alreadyDecrementedStatus, - /*taskWasRetained=*/true); - default: - swift_Concurrency_fatalError(0, - "only errors can be stored by a discarding task group, yet it wasn't an error! 1"); + case ReadyStatus::RawError: { + SWIFT_TASK_GROUP_DEBUG_LOG( + this, "offer, complete, resume waitingTask:%p, with raw error:%p", + waitingTask, readyErrorItem.getRawError(this)); + auto task = resumeWaitingTaskWithError( + /*complete=*/waitingTask, + /*with=*/readyErrorItem.getRawError(this), assumed, + alreadyDecrementedStatus); + unlock(); + if (task) { + // TODO: allow the caller to suggest an executor + task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + } + return; + } + case ReadyStatus::Error: { + // The completed task failed, but we already stored a different failed + // task. Thus we discard this error and complete with the previously + // stored. + SWIFT_TASK_GROUP_DEBUG_LOG( + this, + "offer, complete waitingTask:%p, discard error completedTask:%p, " + "resume with errorItem.task:%p", + waitingTask, completedTask, readyErrorItem.getTask()); + auto task = resumeWaitingTask( + /*complete*/ waitingTask, + /*with=*/readyErrorItem.getTask(), assumed, + /*hadErrorResult=*/true, alreadyDecrementedStatus, + /*taskWasRetained=*/true); + unlock(); + if (task) { + // TODO: allow the caller to suggest an executor + task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + } + return; + } + default: { + swift_Concurrency_fatalError( + 0, "only errors can be stored by a discarding task group, yet it " + "wasn't an error! 1"); + } } } else { // The following MUST be done in the following order: detach, unlock, resume waitingTask. // because we do not want to allow another task to run and have the potential to lock or even destroy // the group before we've given up the lock. _swift_taskGroup_detachChild(asAbstract(this), completedTask); - unlock(); // There was no prior failed task stored, so we should resume the waitingTask with this (failed) completedTask - return resumeWaitingTask(completedTask, assumed, hadErrorResult, alreadyDecrementedStatus); + auto task = resumeWaitingTask(/*complete=*/waitingTask, /*with=*/completedTask, + assumed, hadErrorResult, alreadyDecrementedStatus); + unlock(); + if (task) { + // TODO: allow the caller to suggest an executor + task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + } + return; } } else if (readyQueue.isEmpty()) { // There was no waiting task, or other tasks are still pending, so we cannot @@ -1305,41 +1355,67 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) assert(!hadErrorResult && "only successfully completed tasks can reach here"); if (afterComplete.hasWaitingTask() && afterComplete.pendingTasks(this) == 0) { + // We grab the waiting task while holding the group lock, because this + // allows a single task to get the waiting task and attempt to complete it. + // As another offer gets to run, it will have either a different waiting task, or no waiting task at all. + auto waitingTask = waitQueue.load(std::memory_order_acquire); + if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) { + swift_Concurrency_fatalError(0, "Failed to claim waitingTask!"); + } + assert(waitingTask && "status claimed to have waitingTask but waitQueue was empty!"); + SWIFT_TASK_GROUP_DEBUG_LOG(this, - "offer, last pending task completed successfully, resume waitingTask with completedTask:%p", - completedTask); + "offer, last pending task completed successfully, resume waitingTask:%p with completedTask:%p", + waitingTask, completedTask); /// If there was an error previously stored, we must resume the waitingTask using that error. ReadyQueueItem readyErrorItem; if (readyQueue.dequeue(readyErrorItem)) { + // Always detach the completed task, we're instead going to use the stored value from the readyQueue _swift_taskGroup_detachChild(asAbstract(this), completedTask); + switch (readyErrorItem.getStatus()) { - case ReadyStatus::RawError: - // The following MUST be done in the following order: detach, unlock, resume waitingTask. - // because we do not want to allow another task to run and have the potential to lock or even destroy - // the group before we've given up the lock. - _swift_taskGroup_detachChild(asAbstract(this), completedTask); + case ReadyStatus::RawError: { + auto task = resumeWaitingTaskWithError( + /*complete=*/waitingTask, /*with=*/readyErrorItem.getRawError(this), + assumed, alreadyDecrementedStatus); unlock(); - return resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus); - case ReadyStatus::Error: - // The following MUST be done in the following order: detach, unlock, resume waitingTask. - // because we do not want to allow another task to run and have the potential to lock or even destroy - // the group before we've given up the lock. - _swift_taskGroup_detachChild(asAbstract(this), completedTask); - unlock(); - return resumeWaitingTask(readyErrorItem.getTask(), assumed, - /*hadErrorResult=*/true, - alreadyDecrementedStatus, - /*taskWasRetained=*/true); - default: - swift_Concurrency_fatalError(0, - "only errors can be stored by a discarding task group, yet it wasn't an error! 2"); + if (task) { + // TODO: allow the caller to suggest an executor + task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + } + return; + } + case ReadyStatus::Error: { + auto task = resumeWaitingTask( + /*complete=*/waitingTask, + /*with=*/readyErrorItem.getTask(), assumed, + /*hadErrorResult=*/true, alreadyDecrementedStatus, + /*taskWasRetained=*/true); + unlock(); + if (task) { + // TODO: allow the caller to suggest an executor + task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + } + return; + } + default: { + swift_Concurrency_fatalError( + 0, "only errors can be stored by a discarding task group, yet it " + "wasn't an error! 2"); + } } } else { - unlock(); // This is the last task, we have a waiting task and there was no error stored previously; // We must resume the waiting task with a success, so let us return here. - return resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus); + auto task = resumeWaitingTask(/*complete=*/waitingTask, /*with=*/completedTask, + assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus); + unlock(); + if (task) { + // TODO: allow the caller to suggest an executor + task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + } + return; } } else { // it wasn't the last pending task, and there is no-one to resume; @@ -1350,29 +1426,31 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) } /// Must be called while holding the TaskGroup lock. -void TaskGroupBase::resumeWaitingTask( +AsyncTask *TaskGroupBase::resumeWaitingTask( + AsyncTask *waitingTask, AsyncTask *completedTask, TaskGroupStatus &assumed, bool hadErrorResult, bool alreadyDecremented, bool taskWasRetained) { - auto waitingTask = waitQueue.load(std::memory_order_acquire); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume, waitingTask = %p, completedTask = %p, alreadyDecremented:%d, error:%d", + waitingTask, alreadyDecremented, hadErrorResult, completedTask); + + // auto waitingTask = waitQueue.load(std::memory_order_acquire); assert(waitingTask && "waitingTask must not be null when attempting to resume it"); assert(assumed.hasWaitingTask()); - SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, alreadyDecremented:%d, error:%d, complete with = %p", - waitingTask, alreadyDecremented, hadErrorResult, completedTask); - while (true) { +// while (true) { SWIFT_TASK_GROUP_DEBUG_LOG(this, "resumeWaitingTask, attempt CAS, waiting task = %p, waitQueue.head = %p, error:%d, complete with = %p", waitingTask, waitQueue.load(std::memory_order_relaxed), hadErrorResult, completedTask); // ==== a) run waiting task directly ------------------------------------- - // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!"); - // We are the "first" completed task to arrive, - // and since there is a task waiting we immediately claim and complete it. - if (waitQueue.compare_exchange_strong( - waitingTask, nullptr, - /*success*/ std::memory_order_release, - /*failure*/ std::memory_order_acquire)) { +// // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!"); +// // We are the "first" completed task to arrive, +// // and since there is a task waiting we immediately claim and complete it. +// if (waitQueue.compare_exchange_strong( +// waitingTask, nullptr, +// /*success*/ std::memory_order_release, +// /*failure*/ std::memory_order_acquire)) { #if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL // In the task-to-thread model, child tasks are always actually @@ -1389,7 +1467,7 @@ void TaskGroupBase::resumeWaitingTask( // But since it's what we're doing, we basically take the same // path as we would if there wasn't a waiter. enqueueCompletedTask(completedTask, hadErrorResult); - return; + return nullptr; #else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ if (!alreadyDecremented) { @@ -1423,36 +1501,37 @@ void TaskGroupBase::resumeWaitingTask( } _swift_tsan_acquire(static_cast(waitingTask)); - // TODO: allow the caller to suggest an executor - waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - return; +// // TODO: allow the caller to suggest an executor +// waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + return waitingTask; #endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ - } else { - SWIFT_TASK_GROUP_DEBUG_LOG(this, "CAS failed, task = %p, backup = %p, complete with = %p, status = %s", - waitingTask, completedTask, statusString().c_str()); - } - } +// } else { +// SWIFT_TASK_GROUP_DEBUG_LOG(this, "CAS failed, waitingTask = %p, complete with = %p, status = %s", +// waitingTask, completedTask, statusString().c_str()); +// } +// } } /// Must be called while holding the TaskGroup lock. -void DiscardingTaskGroup::resumeWaitingTaskWithError( +AsyncTask* DiscardingTaskGroup::resumeWaitingTaskWithError( + AsyncTask *waitingTask, SwiftError *error, TaskGroupStatus &assumed, bool alreadyDecremented) { - auto waitingTask = waitQueue.load(std::memory_order_acquire); +// auto waitingTask = waitQueue.load(std::memory_order_acquire); assert(waitingTask && "cannot resume 'null' waiting task!"); SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, with error = %p", waitingTask, error); - while (true) { +// while (true) { // ==== a) run waiting task directly ------------------------------------- assert(assumed.hasWaitingTask()); - // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!"); - // We are the "first" completed task to arrive, - // and since there is a task waiting we immediately claim and complete it. - if (waitQueue.compare_exchange_strong( - waitingTask, nullptr, - /*success*/ std::memory_order_release, - /*failure*/ std::memory_order_acquire)) { +// // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!"); +// // We are the "first" completed task to arrive, +// // and since there is a task waiting we immediately claim and complete it. +// if (waitQueue.compare_exchange_strong( +// waitingTask, nullptr, +// /*success*/ std::memory_order_release, +// /*failure*/ std::memory_order_acquire)) { #if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL // In the task-to-thread model, child tasks are always actually @@ -1469,7 +1548,7 @@ void DiscardingTaskGroup::resumeWaitingTaskWithError( // But since it's what we're doing, we basically take the same // path as we would if there wasn't a waiter. _enqueueRawError(this, &readyQueue, error); - return; + return nullptr; #else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ if (alreadyDecremented || statusCompletePendingReadyWaiting(assumed)) { @@ -1483,13 +1562,13 @@ void DiscardingTaskGroup::resumeWaitingTaskWithError( fillGroupNextResult(waitingContext, result); _swift_tsan_acquire(static_cast(waitingTask)); - // TODO: allow the caller to suggest an executor - waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - return; - } // else, try again +// // TODO: allow the caller to suggest an executor +// waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + return waitingTask; + } // TODO: what in the else??? #endif - } - } +// } +// } } SWIFT_CC(swiftasync) @@ -1591,8 +1670,8 @@ static void swift_taskGroup_wait_next_throwingImpl( } PollResult AccumulatingTaskGroup::poll(AsyncTask *waitingTask) { + lock(); SWIFT_TASK_GROUP_DEBUG_LOG(this, "poll, waitingTask:%p", waitingTask); - lock(); // TODO: remove group lock, and use status for synchronization assert(isAccumulatingResults() && "attempted to poll TaskGroup in discard-results mode!"); @@ -1622,7 +1701,7 @@ reevaluate_if_taskgroup_has_results:; statusRemoveWaitingRelease(); result.status = PollStatus::Empty; result.successType = this->successType; - unlock(); // TODO: remove group lock, and use status for synchronization + unlock(); return result; } @@ -1672,7 +1751,7 @@ reevaluate_if_taskgroup_has_results:; result.retainedTask = item.getTask(); assert(result.retainedTask && "polled a task, it must be not null"); _swift_tsan_acquire(static_cast(result.retainedTask)); - unlock(); // TODO: remove fragment lock, and use status for synchronization + unlock(); return result; case ReadyStatus::Error: @@ -1684,7 +1763,7 @@ reevaluate_if_taskgroup_has_results:; result.retainedTask = item.getTask(); assert(result.retainedTask && "polled a task, it must be not null"); _swift_tsan_acquire(static_cast(result.retainedTask)); - unlock(); // TODO: remove fragment lock, and use status for synchronization + unlock(); return result; case ReadyStatus::Empty: @@ -1692,7 +1771,7 @@ reevaluate_if_taskgroup_has_results:; result.storage = nullptr; result.retainedTask = nullptr; result.successType = this->successType; - unlock(); // TODO: remove fragment lock, and use status for synchronization + unlock(); return result; case ReadyStatus::RawError: diff --git a/stdlib/public/runtime/Backtrace.cpp b/stdlib/public/runtime/Backtrace.cpp index 53d663958a36d..c4d8214d61352 100644 --- a/stdlib/public/runtime/Backtrace.cpp +++ b/stdlib/public/runtime/Backtrace.cpp @@ -267,7 +267,7 @@ bool isPrivileged() { } // namespace BacktraceInitializer::BacktraceInitializer() { - const char *backtracing = swift::runtime::environment::SWIFT_BACKTRACE(); + const char *backtracing = "enable=yes,color=yes";// swift::runtime::environment::SWIFT_BACKTRACE(); // Force off for setuid processes. if (isPrivileged()) { diff --git a/test/Concurrency/Runtime/async_taskgroup_next_not_invoked_cancelAll.swift b/test/Concurrency/Runtime/async_taskgroup_next_not_invoked_cancelAll.swift index 367098715243b..b2166dd0d7ab3 100644 --- a/test/Concurrency/Runtime/async_taskgroup_next_not_invoked_cancelAll.swift +++ b/test/Concurrency/Runtime/async_taskgroup_next_not_invoked_cancelAll.swift @@ -14,14 +14,14 @@ import Dispatch func test_skipCallingNext_butInvokeCancelAll() async { let numbers = [1, 1] - let result = try! await withTaskGroup(of: Int.self) { (group) async -> Int in + let result = await withTaskGroup(of: Int.self) { (group) async -> Int in for n in numbers { - print("group.spawn { \(n) }") - group.spawn { [group] () async -> Int in + print("group.addTask { \(n) }") + group.addTask { [group] () async -> Int in await Task.sleep(1_000_000_000) - print(" inside group.spawn { \(n) }") - print(" inside group.spawn { \(n) } (group cancelled: \(group.isCancelled))") - print(" inside group.spawn { \(n) } (group child task cancelled: \(Task.isCancelled))") + print(" inside group.addTask { \(n) }") + print(" inside group.addTask { \(n) } (group cancelled: \(group.isCancelled))") + print(" inside group.addTask { \(n) } (group child task cancelled: \(Task.isCancelled))") return n } } @@ -34,13 +34,13 @@ func test_skipCallingNext_butInvokeCancelAll() async { return 0 } - // CHECK: group.spawn { 1 } + // CHECK: group.addTask { 1 } // // CHECK: return immediately 0 (group cancelled: true) // CHECK: return immediately 0 (task cancelled: false) // - // CHECK: inside group.spawn { 1 } (group cancelled: true) - // CHECK: inside group.spawn { 1 } (group child task cancelled: true) + // CHECK: inside group.addTask { 1 } (group cancelled: true) + // CHECK: inside group.addTask { 1 } (group child task cancelled: true) // CHECK: result: 0 print("result: \(result)") From 00f674bb40441c11d699a24909c05dadf932cea5 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 8 Aug 2023 17:45:47 +0900 Subject: [PATCH 05/13] stabilize println based test a bit more against timing --- .../Runtime/async_taskgroup_discarding_dontLeak.swift | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift b/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift index 93d0e9844f5e1..2adac05c32c5b 100644 --- a/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift +++ b/test/Concurrency/Runtime/async_taskgroup_discarding_dontLeak.swift @@ -108,6 +108,8 @@ func testTwo() async { // CHECK-DAG: deinit, id: race-boom // CHECK-DAG: deinit, id: race-boom await latch.wait() + try? await Task.sleep(for: .milliseconds(300)) + print("done") // CHECK: done } @@ -133,6 +135,8 @@ func manyOk() async { // CHECK-DAG: deinit, id: many-ok await latch.wait() + try? await Task.sleep(for: .milliseconds(300)) + print("done") // CHECK: done } @@ -164,6 +168,8 @@ func manyThrows() async { } await latch.wait() + try? await Task.sleep(for: .milliseconds(300)) + print("done") // CHECK: done } @@ -197,7 +203,6 @@ func manyValuesThrows() async { throw Boom(id: "mixed-error") } - return 12 } @@ -212,6 +217,8 @@ func manyValuesThrows() async { // CHECK-DAG: deinit, id: mixed await latch.wait() + try? await Task.sleep(for: .milliseconds(300)) + print("done") // CHECK: done } From 34f8da366a0157928601b93a8946041ffa0c9f4a Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 8 Aug 2023 17:46:00 +0900 Subject: [PATCH 06/13] re-enable tsan test: async_taskgroup_next --- test/Sanitizers/tsan/async_taskgroup_next.swift | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/Sanitizers/tsan/async_taskgroup_next.swift b/test/Sanitizers/tsan/async_taskgroup_next.swift index 3be948cfc88c1..5d1999f7b2814 100644 --- a/test/Sanitizers/tsan/async_taskgroup_next.swift +++ b/test/Sanitizers/tsan/async_taskgroup_next.swift @@ -1,8 +1,5 @@ // RUN: %target-run-simple-swift( %import-libdispatch -parse-as-library -sanitize=thread) -// Segfaulted in CI on TSan bot. rdar://78264164 -// REQUIRES: rdar78264164 - // REQUIRES: executable_test // REQUIRES: concurrency // REQUIRES: libdispatch From 0cc31cabe25250fe5ceccec5afbc279dcaf184b5 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 8 Aug 2023 17:47:07 +0900 Subject: [PATCH 07/13] reenable async_taskgroup_next_on_pending --- .../Runtime/async_taskgroup_asynciterator_semantics.swift | 1 + test/Concurrency/Runtime/async_taskgroup_next_on_pending.swift | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift b/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift index a2961f5da8394..474048500539f 100644 --- a/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift +++ b/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift @@ -1,4 +1,5 @@ // RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s --dump-input=always + // REQUIRES: executable_test // REQUIRES: concurrency // REQUIRES: concurrency_runtime diff --git a/test/Concurrency/Runtime/async_taskgroup_next_on_pending.swift b/test/Concurrency/Runtime/async_taskgroup_next_on_pending.swift index f4a2b9e24ac22..08a802776d98e 100644 --- a/test/Concurrency/Runtime/async_taskgroup_next_on_pending.swift +++ b/test/Concurrency/Runtime/async_taskgroup_next_on_pending.swift @@ -4,11 +4,8 @@ // REQUIRES: concurrency // REQUIRES: libdispatch -// rdar://76038845 // REQUIRES: concurrency_runtime -// REQUIRES: rdar75096485 - import Dispatch func completeSlowly(n: Int) async -> Int { From 383c62f73182a9dd21e9ca84b7304c2a5e276c5e Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 8 Aug 2023 20:22:04 +0900 Subject: [PATCH 08/13] disable debugging tricks --- stdlib/public/Concurrency/TaskGroup.cpp | 2 +- stdlib/public/runtime/Backtrace.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index 4fffb79aa190d..e2f9c15af4a5f 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -64,7 +64,7 @@ using namespace swift; -#if 1 +#if 0 #define SWIFT_TASK_GROUP_DEBUG_LOG_ENABLED 1 #define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \ fprintf(stderr, "[%#lx] [%s:%d][group(%p%s)] (%s) " fmt "\n", \ diff --git a/stdlib/public/runtime/Backtrace.cpp b/stdlib/public/runtime/Backtrace.cpp index c4d8214d61352..53d663958a36d 100644 --- a/stdlib/public/runtime/Backtrace.cpp +++ b/stdlib/public/runtime/Backtrace.cpp @@ -267,7 +267,7 @@ bool isPrivileged() { } // namespace BacktraceInitializer::BacktraceInitializer() { - const char *backtracing = "enable=yes,color=yes";// swift::runtime::environment::SWIFT_BACKTRACE(); + const char *backtracing = swift::runtime::environment::SWIFT_BACKTRACE(); // Force off for setuid processes. if (isPrivileged()) { From 376a9a80c311b62afa035af3c3691f3edea8b69f Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 8 Aug 2023 20:27:43 +0900 Subject: [PATCH 09/13] unlock test: async_taskgroup_asynciterator_semantics --- .../async_taskgroup_asynciterator_semantics.swift | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift b/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift index 474048500539f..a5b3568752b02 100644 --- a/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift +++ b/test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift @@ -6,8 +6,6 @@ // UNSUPPORTED: back_deployment_runtime // UNSUPPORTED: OS=linux-gnu -// REQUIRES: rdar86028226 - struct Boom: Error {} func boom() async throws -> Int { @@ -18,7 +16,7 @@ func boom() async throws -> Int { func test_taskGroup_next() async { let sum = await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in for n in 1...10 { - group.spawn { + group.addTask { return n.isMultiple(of: 3) ? try await boom() : n } } @@ -51,7 +49,7 @@ func test_taskGroup_next() async { func test_taskGroup_for_in() async { let sum = await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in for n in 1...10 { - group.spawn { + group.addTask { return n.isMultiple(of: 3) ? try await boom() : n } } @@ -82,7 +80,7 @@ func test_taskGroup_for_in() async { func test_taskGroup_asyncIterator() async { let sum = await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in for n in 1...10 { - group.spawn { + group.addTask { return n.isMultiple(of: 3) ? try await boom() : n } } @@ -120,7 +118,7 @@ func test_taskGroup_asyncIterator() async { func test_taskGroup_contains() async { let sum = await withTaskGroup(of: Int.self, returning: Int.self) { group in for n in 1...4 { - group.spawn { + group.addTask { return n } } @@ -129,7 +127,7 @@ func test_taskGroup_contains() async { print("three = \(three)") // CHECK: three = true for n in 5...7 { - group.spawn { + group.addTask { return n } } From 060260e959e923733c5d82db82920fb5b58093bd Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 8 Aug 2023 20:35:59 +0900 Subject: [PATCH 10/13] make use of unreachable --- stdlib/public/Concurrency/TaskGroup.cpp | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index e2f9c15af4a5f..b931f6c004c99 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -1349,8 +1349,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) _swift_taskGroup_detachChild(asAbstract(this), completedTask); return unlock(); } - swift_Concurrency_fatalError(0, "expected to early return from when " - "handling offer of last task in group"); + swift_unreachable("expected to early return from when handling offer of last task in group"); } assert(!hadErrorResult && "only successfully completed tasks can reach here"); @@ -1560,15 +1559,11 @@ AsyncTask* DiscardingTaskGroup::resumeWaitingTaskWithError( waitingTask->ResumeContext); fillGroupNextResult(waitingContext, result); - _swift_tsan_acquire(static_cast(waitingTask)); -// // TODO: allow the caller to suggest an executor -// waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); return waitingTask; } // TODO: what in the else??? #endif -// } -// } + return nullptr; } SWIFT_CC(swiftasync) From 7d93bba7df99b111eed369e8391947ec25bac625 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Wed, 9 Aug 2023 12:13:46 +0900 Subject: [PATCH 11/13] cleanups --- stdlib/public/Concurrency/TaskGroup.cpp | 251 +++++++++++------------- 1 file changed, 110 insertions(+), 141 deletions(-) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index b931f6c004c99..ee275c42de085 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -289,6 +289,12 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord { } }; + /// Simple wrapper type to ensure we use the right methods to prepare and run a waiting tas. + /// Run it with `runWaitingTask`. + struct PreparedWaitingTask { + AsyncTask *waitingTask; + }; + protected: #if SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY || SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL // Synchronization is simple here. In a single threaded mode, all swift tasks @@ -392,13 +398,20 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord { virtual void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) = 0; /// Resume waiting task with result from `completedTask` - AsyncTask *resumeWaitingTask(AsyncTask *waitingTask, + PreparedWaitingTask prepareWaitingTaskWithTask(AsyncTask *waitingTask, AsyncTask *completedTask, TaskGroupStatus &assumed, bool hadErrorResult, bool alreadyDecremented = false, bool taskWasRetained = false); + // NOTE: In today's implementation we MUST hold the group lock when claiming a task. + AsyncTask *claimWaitingTask(); + + /// Should be the final operation a group locking operation performs e.g. in waitAll or offer. + /// This resumes unlocks the group and resumes the waiting task. + void runWaitingTask(PreparedWaitingTask prepared); + // ==== Status manipulation ------------------------------------------------- TaskGroupStatus statusLoadRelaxed() const; @@ -628,6 +641,34 @@ bool TaskGroupBase::statusCompletePendingReadyWaiting(TaskGroupStatus &old) { /*failure*/ std::memory_order_relaxed); } +AsyncTask *TaskGroupBase::claimWaitingTask() { + assert(statusLoadRelaxed().hasWaitingTask() && + "attempted to claim waiting task but status indicates no waiting " + "task is present!"); + + auto waitingTask = waitQueue.load(std::memory_order_acquire); + if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) { + swift_Concurrency_fatalError(0, "Failed to claim waitingTask!"); + } + return waitingTask; +} +void TaskGroupBase::runWaitingTask(PreparedWaitingTask prepared) { + // The reason we might not have a task here to schedule is if we were running in the + // task-per-thread single threaded mode, which would have executed the task in-line + // and we must not schedule it here anymore. +#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL + assert(prepared.waitingTask == nullptr && + "unexpected task to schedule in TASK_TO_THREAD_MODEL!" + "In this mode we should have run the task in-line, " + "rather than return it for scheduling.") +#endif + if (auto waitingTask = prepared.waitingTask) { + // TODO: allow the caller to suggest an executor + waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + } +} + + bool TaskGroupBase::isCancelled() const { auto old = TaskGroupStatus{status.load(std::memory_order_relaxed)}; return old.isCancelled(); @@ -834,7 +875,7 @@ class DiscardingTaskGroup: public TaskGroupBase { private: /// Resume waiting task with specified error - AsyncTask *resumeWaitingTaskWithError(AsyncTask* waitingTask, + PreparedWaitingTask prepareWaitingTaskWithError(AsyncTask* waitingTask, SwiftError *error, TaskGroupStatus &assumed, bool alreadyDecremented); @@ -1193,22 +1234,12 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex // ==== a) has waiting task, so let us complete it right away if (assumed.hasWaitingTask()) { - auto waitingTask = waitQueue.load(std::memory_order_acquire); - SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, loaded waiting task:%p", waitingTask); - if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) { - SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, FAILED CAS loaded waiting task:%p", waitingTask); - swift_Concurrency_fatalError(0, "Failed to claim waitingTask!"); - } - - auto task = resumeWaitingTask(/*complete=*/waitingTask, /*with=*/completedTask, - assumed, hadErrorResult); - // Must unlock before we resume the waiting task - unlock(); - - if (task) { - task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - } - return; + auto waitingTask = claimWaitingTask(); + auto prepared = prepareWaitingTaskWithTask( + /*complete=*/waitingTask, /*with=*/completedTask, + assumed, hadErrorResult); + unlock(); // we MUST unlock before running the waiting task + return runWaitingTask(prepared); } else { // ==== b) enqueue completion ------------------------------------------------ // @@ -1264,13 +1295,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) // We grab the waiting task while holding the group lock, because this // allows a single task to get the waiting task and attempt to complete it. // As another offer gets to run, it will have either a different waiting task, or no waiting task at all. - auto waitingTask = waitQueue.load(std::memory_order_acquire); // FIXME: consume the task here!!!!! - SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, loaded waiting task:%p", waitingTask); - if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) { - SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, FAILED CAS loaded waiting task:%p", waitingTask); - swift_Concurrency_fatalError(0, "Failed to claim waitingTask!"); - } - assert(waitingTask && "status claimed to have waitingTask but waitQueue was empty!"); + auto waitingTask = claimWaitingTask(); // This is the last pending task, and we must resume the waiting task. // - if there already was a previous error stored, we resume using it, @@ -1285,16 +1310,12 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) SWIFT_TASK_GROUP_DEBUG_LOG( this, "offer, complete, resume waitingTask:%p, with raw error:%p", waitingTask, readyErrorItem.getRawError(this)); - auto task = resumeWaitingTaskWithError( + auto prepared = prepareWaitingTaskWithError( /*complete=*/waitingTask, /*with=*/readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus); - unlock(); - if (task) { - // TODO: allow the caller to suggest an executor - task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - } - return; + unlock(); // we MUST unlock before running the waiting task + return runWaitingTask(prepared); } case ReadyStatus::Error: { // The completed task failed, but we already stored a different failed @@ -1305,17 +1326,13 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) "offer, complete waitingTask:%p, discard error completedTask:%p, " "resume with errorItem.task:%p", waitingTask, completedTask, readyErrorItem.getTask()); - auto task = resumeWaitingTask( + auto prepared = prepareWaitingTaskWithTask( /*complete*/ waitingTask, /*with=*/readyErrorItem.getTask(), assumed, /*hadErrorResult=*/true, alreadyDecrementedStatus, /*taskWasRetained=*/true); - unlock(); - if (task) { - // TODO: allow the caller to suggest an executor - task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - } - return; + unlock(); // we MUST unlock before running the waiting task + return runWaitingTask(prepared); } default: { swift_Concurrency_fatalError( @@ -1329,14 +1346,10 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) // the group before we've given up the lock. _swift_taskGroup_detachChild(asAbstract(this), completedTask); // There was no prior failed task stored, so we should resume the waitingTask with this (failed) completedTask - auto task = resumeWaitingTask(/*complete=*/waitingTask, /*with=*/completedTask, + auto prepared = prepareWaitingTaskWithTask(/*complete=*/waitingTask, /*with=*/completedTask, assumed, hadErrorResult, alreadyDecrementedStatus); - unlock(); - if (task) { - // TODO: allow the caller to suggest an executor - task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - } - return; + unlock(); // we MUST unlock before running the waiting task + return runWaitingTask(prepared); } } else if (readyQueue.isEmpty()) { // There was no waiting task, or other tasks are still pending, so we cannot @@ -1375,28 +1388,20 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) switch (readyErrorItem.getStatus()) { case ReadyStatus::RawError: { - auto task = resumeWaitingTaskWithError( + auto task = prepareWaitingTaskWithError( /*complete=*/waitingTask, /*with=*/readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus); - unlock(); - if (task) { - // TODO: allow the caller to suggest an executor - task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - } - return; + unlock(); // we MUST unlock before running the waiting task + return runWaitingTask(task); } case ReadyStatus::Error: { - auto task = resumeWaitingTask( + auto preparedWaitingTask = prepareWaitingTaskWithTask( /*complete=*/waitingTask, /*with=*/readyErrorItem.getTask(), assumed, /*hadErrorResult=*/true, alreadyDecrementedStatus, /*taskWasRetained=*/true); - unlock(); - if (task) { - // TODO: allow the caller to suggest an executor - task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - } - return; + unlock(); // we MUST unlock before running the waiting task + return runWaitingTask(preparedWaitingTask); } default: { swift_Concurrency_fatalError( @@ -1407,14 +1412,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) } else { // This is the last task, we have a waiting task and there was no error stored previously; // We must resume the waiting task with a success, so let us return here. - auto task = resumeWaitingTask(/*complete=*/waitingTask, /*with=*/completedTask, - assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus); - unlock(); - if (task) { - // TODO: allow the caller to suggest an executor - task->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - } - return; + auto prepared = prepareWaitingTaskWithTask( + /*complete=*/waitingTask, /*with=*/completedTask, + assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus); + unlock(); // we MUST unlock before running the waiting task + return runWaitingTask(prepared); } } else { // it wasn't the last pending task, and there is no-one to resume; @@ -1425,7 +1427,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) } /// Must be called while holding the TaskGroup lock. -AsyncTask *TaskGroupBase::resumeWaitingTask( +TaskGroupBase::PreparedWaitingTask TaskGroupBase::prepareWaitingTaskWithTask( AsyncTask *waitingTask, AsyncTask *completedTask, TaskGroupStatus &assumed, @@ -1434,23 +1436,8 @@ AsyncTask *TaskGroupBase::resumeWaitingTask( bool taskWasRetained) { SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume, waitingTask = %p, completedTask = %p, alreadyDecremented:%d, error:%d", waitingTask, alreadyDecremented, hadErrorResult, completedTask); - - // auto waitingTask = waitQueue.load(std::memory_order_acquire); assert(waitingTask && "waitingTask must not be null when attempting to resume it"); assert(assumed.hasWaitingTask()); -// while (true) { - SWIFT_TASK_GROUP_DEBUG_LOG(this, "resumeWaitingTask, attempt CAS, waiting task = %p, waitQueue.head = %p, error:%d, complete with = %p", - waitingTask, waitQueue.load(std::memory_order_relaxed), hadErrorResult, completedTask); - - // ==== a) run waiting task directly ------------------------------------- -// // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!"); -// // We are the "first" completed task to arrive, -// // and since there is a task waiting we immediately claim and complete it. -// if (waitQueue.compare_exchange_strong( -// waitingTask, nullptr, -// /*success*/ std::memory_order_release, -// /*failure*/ std::memory_order_acquire)) { - #if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL // In the task-to-thread model, child tasks are always actually // run synchronously on the parent task's thread. For task groups @@ -1466,8 +1453,7 @@ AsyncTask *TaskGroupBase::resumeWaitingTask( // But since it's what we're doing, we basically take the same // path as we would if there wasn't a waiter. enqueueCompletedTask(completedTask, hadErrorResult); - return nullptr; - + return {nullptr}; #else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ if (!alreadyDecremented) { (void) statusCompletePendingReadyWaiting(assumed); @@ -1500,70 +1486,53 @@ AsyncTask *TaskGroupBase::resumeWaitingTask( } _swift_tsan_acquire(static_cast(waitingTask)); -// // TODO: allow the caller to suggest an executor -// waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - return waitingTask; + return {waitingTask}; #endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ -// } else { -// SWIFT_TASK_GROUP_DEBUG_LOG(this, "CAS failed, waitingTask = %p, complete with = %p, status = %s", -// waitingTask, completedTask, statusString().c_str()); -// } -// } } /// Must be called while holding the TaskGroup lock. -AsyncTask* DiscardingTaskGroup::resumeWaitingTaskWithError( - AsyncTask *waitingTask, - SwiftError *error, - TaskGroupStatus &assumed, - bool alreadyDecremented) { -// auto waitingTask = waitQueue.load(std::memory_order_acquire); +TaskGroupBase::PreparedWaitingTask +DiscardingTaskGroup::prepareWaitingTaskWithError(AsyncTask *waitingTask, + SwiftError *error, + TaskGroupStatus &assumed, + bool alreadyDecremented) { assert(waitingTask && "cannot resume 'null' waiting task!"); - SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, with error = %p", - waitingTask, error); -// while (true) { - // ==== a) run waiting task directly ------------------------------------- - assert(assumed.hasWaitingTask()); -// // assert(assumed.pendingTasks(this) && "offered to group with no pending tasks!"); -// // We are the "first" completed task to arrive, -// // and since there is a task waiting we immediately claim and complete it. -// if (waitQueue.compare_exchange_strong( -// waitingTask, nullptr, -// /*success*/ std::memory_order_release, -// /*failure*/ std::memory_order_acquire)) { + SWIFT_TASK_GROUP_DEBUG_LOG(this, + "resume waiting task = %p, with error = %p", + waitingTask, error); + assert(assumed.hasWaitingTask()); #if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL - // In the task-to-thread model, child tasks are always actually - // run synchronously on the parent task's thread. For task groups - // specifically, this means that poll() will pick a child task - // that was added to the group and run it to completion as a - // subroutine. Therefore, when we enter offer(), we know that - // the parent task is waiting and we can just return to it. - - // The task-to-thread logic in poll() currently expects the child - // task to enqueue itself instead of just filling in the result in - // the waiting task. This is a little wasteful; there's no reason - // we can't just have the parent task set itself up as a waiter. - // But since it's what we're doing, we basically take the same - // path as we would if there wasn't a waiter. - _enqueueRawError(this, &readyQueue, error); - return nullptr; - + // In the task-to-thread model, child tasks are always actually + // run synchronously on the parent task's thread. For task groups + // specifically, this means that poll() will pick a child task + // that was added to the group and run it to completion as a + // subroutine. Therefore, when we enter offer(), we know that + // the parent task is waiting and we can just return to it. + + // The task-to-thread logic in poll() currently expects the child + // task to enqueue itself instead of just filling in the result in + // the waiting task. This is a little wasteful; there's no reason + // we can't just have the parent task set itself up as a waiter. + // But since it's what we're doing, we basically take the same + // path as we would if there wasn't a waiter. + _enqueueRawError(this, &readyQueue, error); + return {nullptr}; #else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ - if (alreadyDecremented || statusCompletePendingReadyWaiting(assumed)) { - // Run the task. - auto result = PollResult::getError(error); + if (!alreadyDecremented) { + statusCompletePendingReadyWaiting(assumed); + } - auto waitingContext = - static_cast( - waitingTask->ResumeContext); + // Run the task. + auto result = PollResult::getError(error); - fillGroupNextResult(waitingContext, result); - _swift_tsan_acquire(static_cast(waitingTask)); - return waitingTask; - } // TODO: what in the else??? -#endif - return nullptr; + auto waitingContext = static_cast( + waitingTask->ResumeContext); + + fillGroupNextResult(waitingContext, result); + _swift_tsan_acquire(static_cast(waitingTask)); + return {waitingTask}; +#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ } SWIFT_CC(swiftasync) @@ -1665,8 +1634,8 @@ static void swift_taskGroup_wait_next_throwingImpl( } PollResult AccumulatingTaskGroup::poll(AsyncTask *waitingTask) { - lock(); SWIFT_TASK_GROUP_DEBUG_LOG(this, "poll, waitingTask:%p", waitingTask); + lock(); assert(isAccumulatingResults() && "attempted to poll TaskGroup in discard-results mode!"); From 8aa70dc73331ea03f8f1c69971bf081c1be9e271 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Wed, 9 Aug 2023 13:08:04 +0900 Subject: [PATCH 12/13] cleanup for freestanding mode --- stdlib/public/Concurrency/TaskGroup.cpp | 2 +- stdlib/public/Concurrency/TaskPrivate.h | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index ee275c42de085..bc8cf5a4c773f 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -660,7 +660,7 @@ void TaskGroupBase::runWaitingTask(PreparedWaitingTask prepared) { assert(prepared.waitingTask == nullptr && "unexpected task to schedule in TASK_TO_THREAD_MODEL!" "In this mode we should have run the task in-line, " - "rather than return it for scheduling.") + "rather than return it for scheduling."); #endif if (auto waitingTask = prepared.waitingTask) { // TODO: allow the caller to suggest an executor diff --git a/stdlib/public/Concurrency/TaskPrivate.h b/stdlib/public/Concurrency/TaskPrivate.h index 414bf3dc75c11..f39b4f3816a89 100644 --- a/stdlib/public/Concurrency/TaskPrivate.h +++ b/stdlib/public/Concurrency/TaskPrivate.h @@ -781,8 +781,10 @@ struct AsyncTask::PrivateStorage { // Destroy the opaque storage of the task void destroy() { +#ifndef NDEBUG auto oldStatus = _status().load(std::memory_order_relaxed); assert(oldStatus.isComplete()); +#endif this->~PrivateStorage(); } From 1195955647fb7b3a62e8d59e5718a54e537b5d29 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Thu, 10 Aug 2023 21:15:22 +0900 Subject: [PATCH 13/13] Review feedback: add better code comments --- stdlib/public/Concurrency/TaskGroup.cpp | 52 ++++++++++++++++++++----- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index bc8cf5a4c773f..b9007cebb57c9 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -1238,7 +1238,11 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex auto prepared = prepareWaitingTaskWithTask( /*complete=*/waitingTask, /*with=*/completedTask, assumed, hadErrorResult); - unlock(); // we MUST unlock before running the waiting task + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); return runWaitingTask(prepared); } else { // ==== b) enqueue completion ------------------------------------------------ @@ -1314,7 +1318,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) /*complete=*/waitingTask, /*with=*/readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus); - unlock(); // we MUST unlock before running the waiting task + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); return runWaitingTask(prepared); } case ReadyStatus::Error: { @@ -1331,7 +1339,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) /*with=*/readyErrorItem.getTask(), assumed, /*hadErrorResult=*/true, alreadyDecrementedStatus, /*taskWasRetained=*/true); - unlock(); // we MUST unlock before running the waiting task + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); return runWaitingTask(prepared); } default: { @@ -1348,7 +1360,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) // There was no prior failed task stored, so we should resume the waitingTask with this (failed) completedTask auto prepared = prepareWaitingTaskWithTask(/*complete=*/waitingTask, /*with=*/completedTask, assumed, hadErrorResult, alreadyDecrementedStatus); - unlock(); // we MUST unlock before running the waiting task + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); return runWaitingTask(prepared); } } else if (readyQueue.isEmpty()) { @@ -1370,7 +1386,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) // We grab the waiting task while holding the group lock, because this // allows a single task to get the waiting task and attempt to complete it. // As another offer gets to run, it will have either a different waiting task, or no waiting task at all. - auto waitingTask = waitQueue.load(std::memory_order_acquire); + auto waitingTask = waitQueue.load(std::memory_order_acquire); if (!waitQueue.compare_exchange_strong(waitingTask, nullptr)) { swift_Concurrency_fatalError(0, "Failed to claim waitingTask!"); } @@ -1391,7 +1407,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) auto task = prepareWaitingTaskWithError( /*complete=*/waitingTask, /*with=*/readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus); - unlock(); // we MUST unlock before running the waiting task + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); return runWaitingTask(task); } case ReadyStatus::Error: { @@ -1400,7 +1420,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) /*with=*/readyErrorItem.getTask(), assumed, /*hadErrorResult=*/true, alreadyDecrementedStatus, /*taskWasRetained=*/true); - unlock(); // we MUST unlock before running the waiting task + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); return runWaitingTask(preparedWaitingTask); } default: { @@ -1415,7 +1439,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context) auto prepared = prepareWaitingTaskWithTask( /*complete=*/waitingTask, /*with=*/completedTask, assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus); - unlock(); // we MUST unlock before running the waiting task + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); return runWaitingTask(prepared); } } else { @@ -1459,7 +1487,7 @@ TaskGroupBase::PreparedWaitingTask TaskGroupBase::prepareWaitingTaskWithTask( (void) statusCompletePendingReadyWaiting(assumed); } - // Run the task. + // Populate the waiting task with value from completedTask. auto result = PollResult::get(completedTask, hadErrorResult); SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting DONE, task = %p, error:%d, complete with = %p, status = %s", @@ -1759,7 +1787,11 @@ reevaluate_if_taskgroup_has_results:; waitHead, waitingTask, /*success*/ std::memory_order_release, /*failure*/ std::memory_order_acquire)) { - unlock(); // TODO: remove fragment lock, and use status for synchronization + // we must unlock before running the waiting task, + // in order to avoid the potential for the resumed task + // to cause a group destroy, in which case the unlock might + // attempt memory in an invalid state. + unlock(); #if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL // The logic here is paired with the logic in TaskGroupBase::offer. Once // we run the