diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c0f47a46b..7e8ac4349 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -21,6 +21,8 @@ jobs: runs-on: ubuntu-latest container: image: faasm.azurecr.io/faabric:0.16.0 + env: + DEPLOYMENT_TYPE: gha-ci steps: - name: "Check-out code" uses: actions/checkout@v4 @@ -59,7 +61,7 @@ jobs: needs: [conan-cache] runs-on: ubuntu-latest env: - HOST_TYPE: ci + DEPLOYMENT_TYPE: gha-ci REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: @@ -98,7 +100,7 @@ jobs: matrix: sanitiser: [None, Address, Thread, Undefined] env: - HOST_TYPE: ci + DEPLOYMENT_TYPE: gha-ci REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: @@ -119,6 +121,7 @@ jobs: run: ./bin/inv_wrapper.sh dev.cc faabric_tests - name: "Run tests" run: ./bin/inv_wrapper.sh tests + timeout-minutes: 15 dist-tests: if: github.event.pull_request.draft == false @@ -136,6 +139,10 @@ jobs: run: ./dist-test/build.sh - name: "Run the distributed tests" run: ./dist-test/run.sh + timeout-minutes: 7 + - name: "Print dist-test server logs" + if: always() + run: docker compose logs dist-test-server - name: "Print planner logs" if: always() run: docker compose logs planner @@ -145,7 +152,7 @@ jobs: needs: [conan-cache] runs-on: ubuntu-latest env: - HOST_TYPE: ci + DEPLOYMENT_TYPE: gha-ci REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: diff --git a/CMakeLists.txt b/CMakeLists.txt index 364d2fcf0..a7d421e66 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,6 +6,7 @@ option(FAABRIC_BUILD_TESTS "Build Faabric tests" ON) option(FAABRIC_SELF_TRACING "Turn on system tracing using the logger" OFF) option(FAABRIC_CODE_COVERAGE "Build Faabric with code coverage profiling" OFF) option(FAABRIC_TARGET_CPU "CPU to optimise for, e.g. skylake, icelake or native" OFF) +option(FAABRIC_USE_SPINLOCK "Use spinlocks for low-latency messaging" ON) # Enable colorized compiler output if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU") @@ -16,10 +17,10 @@ endif() # Optimise for CPU if(FAABRIC_TARGET_CPU) - message(STATUS "Optimising Faabric for CPU ${FAABRIC_TARGET_CPU}") + message(STATUS "Faabric: optimising for CPU ${FAABRIC_TARGET_CPU}") add_compile_options(-march=${FAABRIC_TARGET_CPU} -mtune=${FAABRIC_TARGET_CPU}) else() - message(STATUS "Faabric not optimised for specific CPU") + message(STATUS "Faabric: not optimised for specific CPU") endif() # Top-level CMake config @@ -38,6 +39,15 @@ if(${FAABRIC_SELF_TRACING}) add_definitions(-DTRACE_ALL=1) endif() +# We want to disable the usage of spinlocks (and CPU pinning) in GHA runners +# as they have a very low number of vCPU cores +if (${FAABRIC_USE_SPINLOCK} AND NOT "$ENV{DEPLOYMENT_TYPE}" STREQUAL "gha-ci") + message(STATUS "Faabric: enabled spin-locks") + add_definitions(-DFAABRIC_USE_SPINLOCK) +else() + message(STATUS "Faabric: disabled spin-locks") +endif() + # Set-up use of sanitisers if (FAABRIC_USE_SANITISER STREQUAL "Address") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") diff --git a/cmake/ExternalProjects.cmake b/cmake/ExternalProjects.cmake index f47545665..4f64c643c 100644 --- a/cmake/ExternalProjects.cmake +++ b/cmake/ExternalProjects.cmake @@ -15,6 +15,7 @@ if(NOT EXISTS "${CMAKE_CURRENT_BINARY_DIR}/conan.cmake") TLS_VERIFY ON) endif() +set(CONAN_CMAKE_SILENT_OUTPUT ON CACHE INTERNAL "") include(${CMAKE_CURRENT_BINARY_DIR}/conan.cmake) conan_check(VERSION 1.63.0 REQUIRED) @@ -73,19 +74,19 @@ conan_cmake_install(PATH_OR_REFERENCE . include(${CMAKE_CURRENT_BINARY_DIR}/conan_paths.cmake) -find_package(absl REQUIRED) -find_package(Boost 1.80.0 REQUIRED) -find_package(Catch2 REQUIRED) -find_package(flatbuffers REQUIRED) -find_package(fmt REQUIRED) -find_package(hiredis REQUIRED) +find_package(absl QUIET REQUIRED) +find_package(Boost 1.80.0 QUIET REQUIRED) +find_package(Catch2 QUIET REQUIRED) +find_package(flatbuffers QUIET REQUIRED) +find_package(fmt QUIET REQUIRED) +find_package(hiredis QUIET REQUIRED) # 27/01/2023 - Pin OpenSSL to a specific version to avoid incompatibilities # with the system's (i.e. Ubuntu 22.04) OpenSSL -find_package(OpenSSL 3.0.2 REQUIRED) -find_package(Protobuf 3.20.0 REQUIRED) -find_package(readerwriterqueue REQUIRED) -find_package(spdlog REQUIRED) -find_package(ZLIB REQUIRED) +find_package(OpenSSL 3.0.2 QUIET REQUIRED) +find_package(Protobuf 3.20.0 QUIET REQUIRED) +find_package(readerwriterqueue QUIET REQUIRED) +find_package(spdlog QUIET REQUIRED) +find_package(ZLIB QUIET REQUIRED) # -------------------------------- # Fetch content dependencies @@ -109,16 +110,26 @@ set(ZSTD_LZ4_SUPPORT OFF CACHE INTERNAL "") # nng (Conan version out of date) set(NNG_TESTS OFF CACHE INTERNAL "") -FetchContent_Declare(zstd_ext - GIT_REPOSITORY "https://github.com/facebook/zstd" - GIT_TAG "v1.5.2" - SOURCE_SUBDIR "build/cmake" +FetchContent_Declare(atomic_queue_ext + GIT_REPOSITORY "https://github.com/max0x7ba/atomic_queue" + GIT_TAG "7c36f0997979a0fee5be84c9511ee0f6032057ec" ) FetchContent_Declare(nng_ext GIT_REPOSITORY "https://github.com/nanomsg/nng" # NNG tagged version 1.7.1 GIT_TAG "ec4b5722fba105e3b944e3dc0f6b63c941748b3f" ) +FetchContent_Declare(zstd_ext + GIT_REPOSITORY "https://github.com/facebook/zstd" + GIT_TAG "v1.5.2" + SOURCE_SUBDIR "build/cmake" +) + +FetchContent_MakeAvailable(atomic_queue_ext) +add_library(atomic_queue::atomic_queue ALIAS atomic_queue) + +FetchContent_MakeAvailable(nng_ext) +add_library(nng::nng ALIAS nng) FetchContent_MakeAvailable(zstd_ext) # Work around zstd not declaring its targets properly @@ -127,9 +138,6 @@ target_include_directories(libzstd_shared SYSTEM INTERFACE $> /dev/null # Run the build +export FAABRIC_DEPLOYMENT_TYPE=gha-ci docker compose \ run \ --rm \ diff --git a/dist-test/build_internal.sh b/dist-test/build_internal.sh index d10daccee..ede7f3a62 100755 --- a/dist-test/build_internal.sh +++ b/dist-test/build_internal.sh @@ -9,7 +9,7 @@ pushd ${PROJ_ROOT} >> /dev/null source ./bin/workon.sh # Run the debug build -inv dev.cmake --build=Debug +inv dev.cmake --build=Debug --clean inv dev.cc faabric_dist_tests inv dev.cc faabric_dist_test_server inv dev.cc planner_server diff --git a/docker-compose.yml b/docker-compose.yml index 11d71b35c..40d03ba97 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,6 +29,7 @@ services: tty: true privileged: true environment: + - DEPLOYMENT_TYPE=${FAABRIC_DEPLOYMENT_TYPE:-compose} - LOG_LEVEL=${LOG_LEVEL:-debug} - PLANNER_HOST=planner - PLANNER_PORT=8080 @@ -48,6 +49,7 @@ services: - ./conan-cache/:/root/.conan working_dir: /build/faabric/static environment: + - DEPLOYMENT_TYPE=${FAABRIC_DEPLOYMENT_TYPE:-compose} - LOG_LEVEL=debug - PLANNER_HOST=planner - PLANNER_PORT=8080 diff --git a/include/faabric/mpi/MpiWorld.h b/include/faabric/mpi/MpiWorld.h index 465b0dacf..0f56deb79 100644 --- a/include/faabric/mpi/MpiWorld.h +++ b/include/faabric/mpi/MpiWorld.h @@ -26,7 +26,11 @@ namespace faabric::mpi { // as the broker already has mocking capabilities std::vector getMpiMockedMessages(int sendRank); +#ifdef FAABRIC_USE_SPINLOCK +typedef faabric::util::SpinLockQueue InMemoryMpiQueue; +#else typedef faabric::util::FixedCapacityQueue InMemoryMpiQueue; +#endif class MpiWorld { diff --git a/include/faabric/transport/tcp/RecvSocket.h b/include/faabric/transport/tcp/RecvSocket.h index 273954482..861c8fbd8 100644 --- a/include/faabric/transport/tcp/RecvSocket.h +++ b/include/faabric/transport/tcp/RecvSocket.h @@ -31,5 +31,7 @@ class RecvSocket int port; std::deque openConnections; + + void setSocketOptions(int connFd); }; } diff --git a/include/faabric/transport/tcp/SocketOptions.h b/include/faabric/transport/tcp/SocketOptions.h index 1ae675206..6c3dc5a0c 100644 --- a/include/faabric/transport/tcp/SocketOptions.h +++ b/include/faabric/transport/tcp/SocketOptions.h @@ -5,8 +5,11 @@ void setReuseAddr(int connFd); void setNoDelay(int connFd); void setQuickAck(int connFd); +// Blocking/Non-blocking sockets void setNonBlocking(int connFd); void setBlocking(int connFd); - bool isNonBlocking(int connFd); + +// Enable busy polling for non-blocking sockets +void setBusyPolling(int connFd); } diff --git a/include/faabric/util/config.h b/include/faabric/util/config.h index 5902e3aab..83eeb491c 100644 --- a/include/faabric/util/config.h +++ b/include/faabric/util/config.h @@ -27,6 +27,7 @@ class SystemConfig // Scheduling int overrideCpuCount; + int overrideFreeCpuStart; std::string batchSchedulerMode; // Worker-related timeouts diff --git a/include/faabric/util/hwloc.h b/include/faabric/util/hwloc.h new file mode 100644 index 000000000..9aa4f158b --- /dev/null +++ b/include/faabric/util/hwloc.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + +namespace faabric::util { + +const int NO_CPU_IDX = -1; +const int GHA_CPU_IDX = -2; + +class FaabricCpuSet +{ + public: + FaabricCpuSet(int cpuIdxIn = NO_CPU_IDX); + FaabricCpuSet& operator=(const FaabricCpuSet&) = delete; + FaabricCpuSet(const FaabricCpuSet&) = delete; + + ~FaabricCpuSet(); + + cpu_set_t* get() { return &cpuSet; } + + private: + cpu_set_t cpuSet; + + // CPU index in internal CPU accounting + int cpuIdx = NO_CPU_IDX; +}; + +// Pin thread to any "unpinned" CPUs. Returns the CPU set it was pinned to. +// We return a unique pointer to enforce RAII on the pinned-to CPU +std::unique_ptr pinThreadToFreeCpu(pthread_t thread); +} diff --git a/include/faabric/util/queue.h b/include/faabric/util/queue.h index 6d89aab18..1840fc29f 100644 --- a/include/faabric/util/queue.h +++ b/include/faabric/util/queue.h @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -215,6 +216,32 @@ class FixedCapacityQueue moodycamel::BlockingReaderWriterCircularBuffer mq; }; +template +class SpinLockQueue +{ + public: + void enqueue(T& value) { mq.push(value); } + + T dequeue() { return mq.pop(); } + + long size() + { + throw std::runtime_error("Size for fast queue unimplemented!"); + } + + void drain() + { + while (mq.pop()) { + ; + } + } + + void reset() { ; } + + private: + atomic_queue::AtomicQueue2 mq; +}; + class TokenPool { public: diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 5d401cb73..6fcc82b55 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -392,9 +392,10 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // MPI migration if (msg.ismpi()) { - auto& mpiWorld = faabric::mpi::getMpiWorldRegistry().getWorld( - msg.mpiworldid()); - mpiWorld.destroy(); + auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry(); + if (mpiWorldRegistry.worldExists(msg.mpiworldid())) { + mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy(); + } } } catch (const std::exception& ex) { returnValue = 1; @@ -403,6 +404,14 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) "Task {} threw exception. What: {}", msg.id(), ex.what()); SPDLOG_ERROR(errorMessage); msg.set_outputdata(errorMessage); + + // MPI-specific clean-up after we throw an exception + if (msg.ismpi()) { + auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry(); + if (mpiWorldRegistry.worldExists(msg.mpiworldid())) { + mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy(); + } + } } // Unset context diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index 4c707432f..d27b259f5 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -52,6 +53,9 @@ struct MpiRankState // have not `wait`ed on yet. std::vector>> unackedMessageBuffers; + // CPU that this thread is pinned to + std::unique_ptr pinnedCpu; + // ----- Remote Messaging ----- // This structure contains one send socket per remote rank @@ -71,13 +75,12 @@ struct MpiRankState for (auto& umb : unackedMessageBuffers) { if (umb != nullptr) { if (!umb->empty()) { + // Do not throw exceptions here as this method is + // called as part of the world destructor SPDLOG_ERROR( "Destroying the MPI world with outstanding {}" " messages in the message buffer", umb->size()); - throw std::runtime_error( - "Destroying world with a non-empty MPI message " - "buffer"); } } } @@ -89,6 +92,9 @@ struct MpiRankState recvSocket.reset(); recvConnPool.clear(); + // Free the pinned-to CPU + pinnedCpu.reset(); + // Local message count msgCount = 1; @@ -218,7 +224,12 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) void MpiWorld::destroy() { - SPDLOG_TRACE("Destroying MPI world {}", id); + if (rankState.msg != nullptr) { + SPDLOG_TRACE("{}:{}:{} destroying MPI world", + rankState.msg->appid(), + rankState.msg->groupid(), + rankState.msg->mpirank()); + } // ----- Per-rank cleanup ----- @@ -254,6 +265,13 @@ void MpiWorld::initialiseRankFromMsg(faabric::Message& msg) { rankState.msg = &msg; + // Pin this thread to a free CPU +#ifdef FAABRIC_USE_SPINLOCK + if (rankState.pinnedCpu == nullptr) { + rankState.pinnedCpu = faabric::util::pinThreadToFreeCpu(pthread_self()); + } +#endif + // Initialise the TCP sockets for remote messaging initSendRecvSockets(); } @@ -1737,13 +1755,7 @@ std::shared_ptr MpiWorld::getLocalQueue(int sendRank, // This method is called by all MPI ranks simultaneously during world creation, // and needs to establish bi-directional TCP sockets between each pair of -// remote ranks. We follow a very simple group handshake protocol. For each -// remote rank, if the rank is larger than us we CONNECT, and then ACCEPT (i.e -// open a send socket first, and then accept their connection into our single -// reception socket). If the rank is smaller than us, we do the symmetric. -// To guarantee progress and correctness we can only start CONNECT-ing once -// all smaller local ranks are done CONNECT-ing. Otherwise, when the remote -// rank ACCEPTs it won't know which local rank CONNECT-ed to it. +// remote ranks void MpiWorld::initSendRecvSockets() { // Do not need to initialise any send/recv sockets in mock mode @@ -1753,12 +1765,25 @@ void MpiWorld::initSendRecvSockets() } #endif + // Do not need to initialise anything if there are no remote ranks + int numRemoteRanks = size - ranksForHost.at(thisHost).size(); + if (numRemoteRanks == 0) { + return; + } + + // Guard against calling this method twice from the same thread if (rankState.recvSocket != nullptr) { assert(rankState.sendSockets.size() == size); assert(rankState.recvConnPool.size() == size); return; } + // TODO: delete or debug/trace me + SPDLOG_INFO("MPI begin TCP handhsake {}:{}:{}", + rankState.msg->appid(), + rankState.msg->groupid(), + rankState.msg->mpirank()); + assert(rankState.msg != nullptr); int thisRank = rankState.msg->groupidx(); int thisPort = getPortForRank(thisRank); @@ -1773,149 +1798,79 @@ void MpiWorld::initSendRecvSockets() rankState.recvConnPool = std::vector(size, 0); rankState.recvSocket->listen(); - // To ensure progress and avoid race conditions, we do not want to CONNECT - // to remote ranks larger than us until all (local) smaller ranks have - // CONNECT-ed to them first. To achieve this goal, we wait for our - // immediately smaller (colocated) rank to be done - int localSendRank = -1; - int localRecvRank = -1; - auto itr = ranksForHost.at(thisHost).find(thisRank); - if (itr != ranksForHost.at(thisHost).begin()) { - localSendRank = *(std::prev(ranksForHost.at(thisHost).find(thisRank))); - } - if (std::next(itr) != ranksForHost.at(thisHost).end()) { - localRecvRank = *(std::next(itr)); - } - - // Do the handshake between sending and receiving sockets + // Once we have bound and listened on the main socket, we can CONNECT to + // all remote ranks. Given that we have already bound the listening socket, + // CONNECT requests are non-blocking for (int otherRank = 0; otherRank < size; otherRank++) { - if (otherRank == thisRank) { + if (thisRank == otherRank) { continue; } std::string otherHost = getHostForRank(otherRank); int otherPort = getPortForRank(otherRank); - // If we need to receive a synchronisation message, do so first - if (otherRank == localSendRank) { - SPDLOG_TRACE( - "MPI waiting for local sync to handshake {} <-> {} (wait on: {})", - thisRank, - otherRank, - localSendRank); - recv(localSendRank, - thisRank, - nullptr, - MPI_INT, - 0, - nullptr, - MpiMessageType::HANDSHAKE); - continue; - } - - // If the rank is co-located (and is not the rank we sync from, pass) if (otherHost == thisHost) { continue; } - if (thisRank < otherRank) { - // If we are a smaller rank, we first CONNECT and then ACCEPT - SPDLOG_TRACE( - "MPI establishing remote connection {} -> {}:{}:{} (CONNECT)", - thisRank, - otherHost, - otherPort, - otherRank); - rankState.sendSockets.at(otherRank) = - std::make_unique(otherHost, - otherPort); - rankState.sendSockets.at(otherRank)->dial(); - - SPDLOG_TRACE( - "MPI establishing remote connection {}:{}:{} -> {} (ACCEPT)", - otherHost, - otherPort, - otherRank, - thisRank); - rankState.recvConnPool.at(otherRank) = - rankState.recvSocket->accept(); - - // Smoke-test the bi-directional link - send(thisRank, - otherRank, - nullptr, - MPI_INT, - 0, - MpiMessageType::HANDSHAKE); - recv(otherRank, - thisRank, - nullptr, - MPI_INT, - 0, - nullptr, - MpiMessageType::HANDSHAKE); + SPDLOG_TRACE( + "MPI establishing remote connection {} -> {}:{}:{} (CONNECT)", + thisRank, + otherHost, + otherPort, + otherRank); + rankState.sendSockets.at(otherRank) = + std::make_unique(otherHost, + otherPort); + rankState.sendSockets.at(otherRank)->dial(); + // Right after connecting, we send our rank over the raw socket to + // identify ourselves + rankState.sendSockets.at(otherRank)->sendOne((const uint8_t*)&thisRank, + sizeof(thisRank)); + } - SPDLOG_DEBUG("MPI handshake succeeded {} <-> {}:{}:{}", - thisRank, - otherHost, - otherPort, - otherRank); - } else { - // If we are a larger rank, we ACCEPT the connection from the - // smaller rank, then CONNECT to it - SPDLOG_TRACE( - "MPI establishing remote connection {}:{}:{} -> {} (ACCEPT)", - otherHost, - otherPort, - otherRank, - thisRank); - rankState.recvConnPool.at(otherRank) = - rankState.recvSocket->accept(); - - SPDLOG_TRACE( - "MPI establishing remote connection {} -> {}:{}:{} (CONNECT)", - thisRank, - otherHost, - otherPort, - otherRank); - rankState.sendSockets.at(otherRank) = - std::make_unique(otherHost, - otherPort); - rankState.sendSockets.at(otherRank)->dial(); - - // Smoke-test the bi-directional link - recv(otherRank, - thisRank, - nullptr, - MPI_INT, - 0, - nullptr, - MpiMessageType::HANDSHAKE); - send(thisRank, - otherRank, - nullptr, - MPI_INT, - 0, - MpiMessageType::HANDSHAKE); + // ACCEPT from all remote ranks. Note that ACCEPTs may come out of order + for (int i = 0; i < numRemoteRanks; i++) { + SPDLOG_TRACE("MPI establishing remote connection ?:?:? -> {} (ACCEPT)", + thisRank); + int newConnFd = rankState.recvSocket->accept(); + + // Work-out who CONNECT-ed to us + int otherRank = -1; + rankState.recvSocket->recvOne( + newConnFd, BYTES(&otherRank), sizeof(otherRank)); + assert(otherRank >= 0); + assert(otherRank < size); + assert(otherRank != thisRank); + + std::string otherHost = getHostForRank(otherRank); + int otherPort = getPortForRank(otherRank); - SPDLOG_DEBUG("MPI handshake succeeded {}:{}:{} <-> {}", + if (rankState.recvConnPool.at(otherRank) != 0 || + otherRank == thisRank) { + SPDLOG_ERROR("MPI accepted connection for repeated rank {}:{}:{} " + "-> {} (app: {})", otherHost, otherPort, otherRank, - thisRank); + thisRank, + rankState.msg->appid()); + throw std::runtime_error("MPI ACCEPTed repeated connection!"); } - } + rankState.recvConnPool.at(otherRank) = newConnFd; - // Once we are done with our handshake protocol, notify the next local - // rank (if it exists) - if (localRecvRank != -1) { - send(thisRank, - localRecvRank, - nullptr, - MPI_INT, - 0, - MpiMessageType::HANDSHAKE); + SPDLOG_DEBUG("MPI accepted remote connection {}:{}:{} -> {} (ACCEPT)", + otherHost, + otherPort, + otherRank, + thisRank); } + + // TODO: delete or debug me + SPDLOG_INFO("MPI end TCP handhsake {}:{}:{}", + rankState.msg->appid(), + rankState.msg->groupid(), + rankState.msg->mpirank()); } // We pre-allocate all _potentially_ necessary queues in advance. Queues are @@ -2093,6 +2048,7 @@ void MpiWorld::prepareMigration(int thisRank, bool thisRankMustMigrate) // so that we pick up the right ranks for the remote ranks rankState.sendSockets.clear(); rankState.recvSocket.reset(); + rankState.recvConnPool.clear(); // Update local records if (thisRank == localLeader) { diff --git a/src/transport/tcp/RecvSocket.cpp b/src/transport/tcp/RecvSocket.cpp index 67cf3b441..29dbf76d8 100644 --- a/src/transport/tcp/RecvSocket.cpp +++ b/src/transport/tcp/RecvSocket.cpp @@ -2,6 +2,9 @@ #include #include +#include +#include + namespace faabric::transport::tcp { RecvSocket::RecvSocket(int port, const std::string& host) : addr(host, port) @@ -54,8 +57,24 @@ int RecvSocket::accept() { int connFd = sock.get(); - // Make accept blocking to make sure we account for races in initialisation - setBlocking(connFd); + // We cannot set a timeout on the ACCEPT system call. Instead, we poll + // on the listening file descriptor until someone has CONNECT-ed to us + // tiggering a POLLIN event + struct pollfd polledFds[1]; + polledFds[0].fd = connFd; + polledFds[0].events = POLLIN; + int pollTimeoutMs = 2000; + int numReady = ::poll(polledFds, 1, pollTimeoutMs); + if (numReady < 1) { + SPDLOG_ERROR( + "Error accepting connection on {}:{} (fd: {}): poll timed out", + host, + port, + connFd); + throw std::runtime_error("Poll timed-out!"); + } + + // Once poll has returned succesfully, we should be able to accept int newConn = ::accept(sock.get(), 0, 0); if (newConn < 1) { SPDLOG_ERROR("Error accepting connection on {}:{} (fd: {}): {}", @@ -65,11 +84,9 @@ int RecvSocket::accept() std::strerror(errno)); throw std::runtime_error("Error accepting TCP connection"); } - setNonBlocking(connFd); - // Set the newly accepted connection as blocking (we want `recv` to block) - // TODO: do we want to block, or do we want to spinlock?? - setBlocking(newConn); + // Set socket options for the newly created receive socket + setSocketOptions(newConn); // TODO: add constructor parameter of max num conn openConnections.push_back(newConn); @@ -77,28 +94,52 @@ int RecvSocket::accept() return newConn; } +// Single function to configure _all_ TCP options for a reception socket +void RecvSocket::setSocketOptions(int connFd) +{ +#ifdef FAABRIC_USE_SPINLOCK + if (!isNonBlocking(connFd)) { + setNonBlocking(connFd); + } + + // TODO: not clear if this helps or not + setBusyPolling(connFd); +#else + if (isNonBlocking(connFd)) { + setBlocking(connFd); + } +#endif +} + void RecvSocket::recvOne(int conn, uint8_t* buffer, size_t bufferSize) { size_t numRecvd = 0; while (numRecvd < bufferSize) { // Receive from socket +#ifdef FAABRIC_USE_SPINLOCK + int got = ::recv(conn, buffer, bufferSize - numRecvd, MSG_DONTWAIT); +#else int got = ::recv(conn, buffer, bufferSize - numRecvd, 0); - if (got < 0) { - // TODO: why? - if (errno != EAGAIN) { - SPDLOG_ERROR("TCP Server error receiving in {}: {}", - conn, - std::strerror(errno)); - throw std::runtime_error("TCP error receiving!"); +#endif + if (got == -1) { +#ifdef FAABRIC_USE_SPINLOCK + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + _mm_pause(); +#else + if (errno == EAGAIN) { +#endif + continue; } - // TODO: ?? - SPDLOG_ERROR("not expected: {}", std::strerror(errno)); - } else { - buffer += got; - numRecvd += got; + SPDLOG_ERROR("TCP Server error receiving in {}: {}", + conn, + std::strerror(errno)); + throw std::runtime_error("TCP error receiving!"); } + + buffer += got; + numRecvd += got; } assert(numRecvd == bufferSize); diff --git a/src/transport/tcp/SocketOptions.cpp b/src/transport/tcp/SocketOptions.cpp index 84cbd84cf..39390e635 100644 --- a/src/transport/tcp/SocketOptions.cpp +++ b/src/transport/tcp/SocketOptions.cpp @@ -2,8 +2,8 @@ #include #include -#include // IPPROTO_TCP -#include // TCP_NODELAY, TCP_QUICKACK +#include +#include #include namespace faabric::transport::tcp { @@ -13,7 +13,9 @@ void setReuseAddr(int connFd) int opt = 1; int ret = setsockopt(connFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); if (ret < 0) { - SPDLOG_ERROR("setsockopt(SO_REUSEADDR) failed on fd: {}", connFd); + SPDLOG_ERROR("setsockopt(SO_REUSEADDR) failed on socket {}: {}", + connFd, + std::strerror(errno)); throw std::runtime_error("Error setting socket option: SO_REUSADDR"); } } @@ -23,7 +25,10 @@ void setNoDelay(int connFd) int opt = 1; int ret = setsockopt(connFd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); if (ret < 0) { - SPDLOG_ERROR("setsockopt(TCP_NODELAY) failed on fd: {}", connFd); + SPDLOG_ERROR("setsockopt(TCP_NODELAY) failed on socket {}: {}", + connFd, + std::strerror(errno)); + throw std::runtime_error("Error setting socket option: TCP_NODELAY"); } } @@ -32,7 +37,10 @@ void setQuickAck(int connFd) int opt = 1; int ret = setsockopt(connFd, IPPROTO_TCP, TCP_QUICKACK, &opt, sizeof(opt)); if (ret < 0) { - SPDLOG_ERROR("setsockopt(TCP_QUICKACK) failed on fd: {}", connFd); + SPDLOG_ERROR("setsockopt(TCP_QUICKACK) failed on socket {}: {}", + connFd, + std::strerror(errno)); + throw std::runtime_error("Error setting socket option: TCP_QUICKACK"); } } @@ -40,7 +48,9 @@ void setNonBlocking(int connFd) { int flags = fcntl(connFd, F_GETFL, 0); if (flags < 0) { - SPDLOG_ERROR("fcntl(fd, F_GETFL, 0) failed on fd: {}", connFd); + SPDLOG_ERROR("fcntl(fd, F_GETFL, 0) failed on socket {}: {}", + connFd, + std::strerror(errno)); throw std::runtime_error("Error setting socket as non-blocking"); } @@ -52,7 +62,9 @@ void setBlocking(int connFd) { int flags = fcntl(connFd, F_GETFL, 0); if (flags < 0) { - SPDLOG_ERROR("fcntl(fd, F_GETFL, 0) failed on fd: {}", connFd); + SPDLOG_ERROR("fcntl(fd, F_GETFL, 0) failed on socket {}: {}", + connFd, + std::strerror(errno)); throw std::runtime_error("Error setting socket as blocking"); } @@ -64,10 +76,31 @@ bool isNonBlocking(int connFd) { int flags = fcntl(connFd, F_GETFL, 0); if (flags < 0) { - SPDLOG_ERROR("fcntl(fd, F_GETFL, 0) failed on fd: {}", connFd); + SPDLOG_ERROR("fcntl(fd, F_GETFL, 0) failed on socket {}: {}", + connFd, + std::strerror(errno)); throw std::runtime_error("Error checking if socket is blocking"); } return (flags & O_NONBLOCK) != 0; } + +// Set busy polling in kernel space +void setBusyPolling(int connFd) +{ + // Number of microseonds to busy poll in kernel space + int numMicroSecToPoll = 10000; + + int ret = setsockopt(connFd, + SOL_SOCKET, + SO_BUSY_POLL, + &numMicroSecToPoll, + sizeof(numMicroSecToPoll)); + if (ret == -1) { + SPDLOG_ERROR("Error setting kernel busy poll for socket {}: {}", + connFd, + std::strerror(errno)); + throw std::runtime_error("Error setting kernel busy poll"); + } +} } diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 0e6121dd8..6fa0557de 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -14,6 +14,7 @@ faabric_lib(util files.cpp func.cpp gids.cpp + hwloc.cpp json.cpp latch.cpp locks.cpp diff --git a/src/util/config.cpp b/src/util/config.cpp index 89797b020..8bfb9bbf3 100644 --- a/src/util/config.cpp +++ b/src/util/config.cpp @@ -34,6 +34,8 @@ void SystemConfig::initialise() // Scheduling // TODO(planner-scheduler): remove some of this overrideCpuCount = this->getSystemConfIntParam("OVERRIDE_CPU_COUNT", "0"); + overrideFreeCpuStart = + this->getSystemConfIntParam("OVERRIDE_FREE_CPU_START", "0"); batchSchedulerMode = getEnvVar("BATCH_SCHEDULER_MODE", "bin-pack"); // Worker-related timeouts (all in seconds) diff --git a/src/util/hwloc.cpp b/src/util/hwloc.cpp new file mode 100644 index 000000000..6ecddc101 --- /dev/null +++ b/src/util/hwloc.cpp @@ -0,0 +1,110 @@ +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace faabric::util { + +// Work-around not being able to define and default-initialise an array of +// atomics +class FreeCpus +{ + public: + FreeCpus() + { + for (int i = 0; i < std::jthread::hardware_concurrency(); i++) { + cpuVec.emplace_back(std::make_unique>(true)); + } + } + + std::vector>> cpuVec; +}; +static FreeCpus freeCpus; + +FaabricCpuSet::FaabricCpuSet(int cpuIdxIn) + : cpuIdx(cpuIdxIn) +{ + CPU_ZERO(&cpuSet); + + // Populate the CPU set struct + if (cpuIdx != NO_CPU_IDX && cpuIdx != GHA_CPU_IDX) { + assert(cpuIdx >= 0); + assert(cpuIdx < std::jthread::hardware_concurrency()); + + CPU_SET(cpuIdx, &cpuSet); + } else if (cpuIdx == GHA_CPU_IDX) { + // If overcommitting on GHA, pin to zero + CPU_SET(0, &cpuSet); + } +} + +FaabricCpuSet::~FaabricCpuSet() +{ + // Free the occupied CPU + if (cpuIdx != NO_CPU_IDX && cpuIdx != GHA_CPU_IDX) { + assert(cpuIdx >= 0); + assert(cpuIdx < std::jthread::hardware_concurrency()); + + [[maybe_unused]] bool oldVal = std::atomic_exchange_explicit( + freeCpus.cpuVec.at(cpuIdx).get(), true, std::memory_order_acquire); + // Check that the value held was not free (i.e. false) + assert(!oldVal); + } + + CPU_ZERO(&cpuSet); +}; + +// A free CPU means a CPU that has not been pinned to using pinThreadToFreeCpu +static std::unique_ptr getNextFreeCpu() +{ + int startIdx = + std::max(0, faabric::util::getSystemConfig().overrideFreeCpuStart); + for (int i = startIdx; i < freeCpus.cpuVec.size(); i++) { + // Stop when we find a true value. Otherwise we are just swapping false + // and false, so we don't change the real occupation + if (std::atomic_exchange_explicit( + freeCpus.cpuVec.at(i).get(), false, std::memory_order_acquire)) { + return std::make_unique(i); + } + } + + // In test mode, we allow running out of CPUs when pinning to support + // OVERRIDE_CPU_COUNT-like mechanisms to run the tests in constrained + // environments like GHA. If we have run out of CPUs, and we are in test + // mode, we return a special cpu index + if (isTestMode()) { + return std::make_unique(GHA_CPU_IDX); + } + + SPDLOG_ERROR("Ran-out of free CPU cores to pin to! (total cores: {})", + std::jthread::hardware_concurrency()); + throw std::runtime_error("Ran-out of free CPU cores!"); +} + +static void doPinThreadToCpu(pthread_t thread, cpu_set_t* cpuSet) +{ + int errCode = pthread_setaffinity_np(thread, sizeof(cpu_set_t), cpuSet); + if (errCode != 0) { + SPDLOG_ERROR("Error setting thread affinity: {} (code: {})", + strerror(errCode), + errCode); + throw std::runtime_error("Error setting thread affinity!"); + } +} + +std::unique_ptr pinThreadToFreeCpu(pthread_t thread) +{ + // First, get a "free" CPU + std::unique_ptr cpuSet = getNextFreeCpu(); + + // Then, pin the caller thread to this cpu set + doPinThreadToCpu(thread, cpuSet->get()); + + return cpuSet; +} +} diff --git a/tasks/tests.py b/tasks/tests.py index c3fdf06a1..1c4cdedb1 100644 --- a/tasks/tests.py +++ b/tasks/tests.py @@ -4,7 +4,7 @@ from subprocess import run from tasks.util.env import FAABRIC_STATIC_BUILD_DIR, PROJ_ROOT -IS_CI = "HOST_TYPE" in environ and environ["HOST_TYPE"] == "ci" +IS_CI = "DEPLOYMENT_TYPE" in environ and environ["DEPLOYMENT_TYPE"] == "gha-ci" TEST_ENV = { "LOG_LEVEL": "info", diff --git a/tests/dist/dist_test_fixtures.h b/tests/dist/dist_test_fixtures.h index a9f8c0ff3..0f62b17fd 100644 --- a/tests/dist/dist_test_fixtures.h +++ b/tests/dist/dist_test_fixtures.h @@ -33,6 +33,10 @@ class DistTestsFixture updateLocalSlots(4, 0); updateRemoteSlots(4, 0); + // The dist-test server always uses at most 4 slots, so we configure + // the main worker (here) to start assigning CPU cores from core 4 + conf.overrideFreeCpuStart = 4; + // Set up executor std::shared_ptr fac = std::make_shared(); @@ -129,10 +133,11 @@ class MpiDistTestsFixture : public DistTestsFixture auto decision = plannerCli.getSchedulingDecision(req); while (decision.messageIds.size() != worldSize) { if (numRetries >= maxRetries) { - SPDLOG_ERROR( - "Timed-out waiting for MPI messages to be scheduled ({}/{})", - decision.messageIds.size(), - worldSize); + SPDLOG_ERROR("Timed-out waiting for MPI messages to be " + "scheduled (app: {}, {}/{})", + req->appid(), + decision.messageIds.size(), + worldSize); throw std::runtime_error("Timed-out waiting for MPI messges"); } @@ -162,7 +167,8 @@ class MpiDistTestsFixture : public DistTestsFixture while (batchResults->messageresults_size() != worldSize) { if (numRetries >= maxRetries) { SPDLOG_ERROR( - "Timed-out waiting for MPI messages results ({}/{})", + "Timed-out waiting for MPI messages results (app: {}, {}/{})", + req->appid(), batchResults->messageresults_size(), worldSize); throw std::runtime_error("Timed-out waiting for MPI messges"); diff --git a/tests/dist/mpi/test_mpi_functions.cpp b/tests/dist/mpi/test_mpi_functions.cpp index 97a7d7290..be247450f 100644 --- a/tests/dist/mpi/test_mpi_functions.cpp +++ b/tests/dist/mpi/test_mpi_functions.cpp @@ -232,8 +232,10 @@ TEST_CASE_METHOD(MpiDistTestsFixture, "Test MPI function migration", "[mpi]") // Call the functions plannerCli.callFunctions(req); +#ifndef FAABRIC_USE_SPINLOCK auto actualHostsBeforeMigration = waitForMpiMessagesInFlight(req); REQUIRE(hostsBeforeMigration == actualHostsBeforeMigration); +#endif // Wait for messages to be finished checkAllocationAndResult(req, hostsAfterMigration); diff --git a/tests/dist/mpi/test_multiple_mpi_worlds.cpp b/tests/dist/mpi/test_multiple_mpi_worlds.cpp index c0ac85f38..0f07510ed 100644 --- a/tests/dist/mpi/test_multiple_mpi_worlds.cpp +++ b/tests/dist/mpi/test_multiple_mpi_worlds.cpp @@ -163,12 +163,16 @@ TEST_CASE_METHOD(MpiDistTestsFixture, plannerCli.preloadSchedulingDecision(preloadDec2); plannerCli.callFunctions(req1); +#ifndef FAABRIC_USE_SPINLOCK auto actualHostsBefore1 = waitForMpiMessagesInFlight(req1); REQUIRE(hostsBefore1 == actualHostsBefore1); +#endif plannerCli.callFunctions(req2); +#ifndef FAABRIC_USE_SPINLOCK auto actualHostsBefore2 = waitForMpiMessagesInFlight(req2); REQUIRE(hostsBefore2 == actualHostsBefore2); +#endif auto hostsAfter1 = std::vector(4, getMasterIP()); auto hostsAfter2 = std::vector(4, getWorkerIP()); diff --git a/tests/dist/server.cpp b/tests/dist/server.cpp index e5b3e30e4..fc9c47ceb 100644 --- a/tests/dist/server.cpp +++ b/tests/dist/server.cpp @@ -21,8 +21,8 @@ int main() SPDLOG_INFO("Starting distributed test server on worker"); std::shared_ptr fac = std::make_shared(); - faabric::runner::FaabricMain m(fac); - m.startBackground(); + faabric::runner::FaabricMain faabricMain(fac); + faabricMain.startBackground(); SPDLOG_INFO("---------------------------------"); SPDLOG_INFO("Distributed test server started"); @@ -34,7 +34,7 @@ int main() endpoint.start(faabric::endpoint::EndpointMode::SIGNAL); SPDLOG_INFO("Shutting down"); - m.shutdown(); + faabricMain.shutdown(); return EXIT_SUCCESS; } diff --git a/tests/test/mpi/test_mpi_world.cpp b/tests/test/mpi/test_mpi_world.cpp index cb049baa0..b32aab407 100644 --- a/tests/test/mpi/test_mpi_world.cpp +++ b/tests/test/mpi/test_mpi_world.cpp @@ -1366,48 +1366,27 @@ TEST_CASE_METHOD(MpiBaseTestFixture, "Test all-to-all", "[mpi]") } TEST_CASE_METHOD(MpiTestFixture, - "Test can't destroy world with outstanding requests", + "Test can destroy world with outstanding requests", "[mpi]") { int rankA = 0; int rankB = 1; int data = 9; - int actual = -1; SECTION("Outstanding irecv") { world.send(rankA, rankB, BYTES(&data), MPI_INT, 1); - int recvId = world.irecv(rankA, rankB, BYTES(&actual), MPI_INT, 1); - - REQUIRE_THROWS(world.destroy()); - - world.awaitAsyncRequest(recvId); - REQUIRE(actual == data); + REQUIRE_NOTHROW(world.destroy()); } SECTION("Outstanding acknowledged irecv") { int data2 = 14; - int actual2 = -1; world.send(rankA, rankB, BYTES(&data), MPI_INT, 1); world.send(rankA, rankB, BYTES(&data2), MPI_INT, 1); - int recvId = world.irecv(rankA, rankB, BYTES(&actual), MPI_INT, 1); - int recvId2 = world.irecv(rankA, rankB, BYTES(&actual2), MPI_INT, 1); - - REQUIRE_THROWS(world.destroy()); - - // Await for the second request, which will acknowledge the first one - // but not remove it from the pending message buffer - world.awaitAsyncRequest(recvId2); - - REQUIRE_THROWS(world.destroy()); - - // Await for the first one - world.awaitAsyncRequest(recvId); - REQUIRE(actual == data); - REQUIRE(actual2 == data2); + REQUIRE_NOTHROW(world.destroy()); } } } diff --git a/tests/test/mpi/test_multiple_mpi_worlds.cpp b/tests/test/mpi/test_multiple_mpi_worlds.cpp index 2ff2b835c..3ca7f932f 100644 --- a/tests/test/mpi/test_multiple_mpi_worlds.cpp +++ b/tests/test/mpi/test_multiple_mpi_worlds.cpp @@ -154,29 +154,6 @@ TEST_CASE_METHOD(MultiWorldMpiTestFixture, worldB.send( rankA1, rankA2, BYTES(messageData.data()), MPI_INT, messageData.size()); - SECTION("Test queueing") - { - // Check for world A - REQUIRE(worldA.getLocalQueueSize(rankA1, rankA2) == 1); - REQUIRE(worldA.getLocalQueueSize(rankA2, rankA1) == 0); - REQUIRE(worldA.getLocalQueueSize(rankA1, 2) == 0); - REQUIRE(worldA.getLocalQueueSize(rankA2, 2) == 0); - const std::shared_ptr& queueA2 = - worldA.getLocalQueue(rankA1, rankA2); - MpiMessage actualMessage = queueA2->dequeue(); - // checkMessage(actualMessage, worldId, rankA1, rankA2, messageData); - - // Check for world B - REQUIRE(worldB.getLocalQueueSize(rankA1, rankA2) == 1); - REQUIRE(worldB.getLocalQueueSize(rankA2, rankA1) == 0); - REQUIRE(worldB.getLocalQueueSize(rankA1, 2) == 0); - REQUIRE(worldB.getLocalQueueSize(rankA2, 2) == 0); - const std::shared_ptr& queueA2B = - worldB.getLocalQueue(rankA1, rankA2); - actualMessage = queueA2B->dequeue(); - // checkMessage(actualMessage, worldId, rankA1, rankA2, messageData); - } - SECTION("Test recv") { MPI_Status status{}; diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 351b177d7..00a589dc0 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -259,13 +259,13 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, } // Check the executor counts on this host - faabric::Message m = reqOne->messages().at(0); + faabric::Message msg1 = reqOne->messages().at(0); if (isThreads) { // For threads we expect only one executor - REQUIRE(sch.getFunctionExecutorCount(m) == 1); + REQUIRE(sch.getFunctionExecutorCount(msg1) == 1); } else { // For functions we expect one per core - REQUIRE(sch.getFunctionExecutorCount(m) == nCallsOne); + REQUIRE(sch.getFunctionExecutorCount(msg1) == nCallsOne); } // Check the number of messages executed locally and remotely @@ -273,7 +273,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, // Now schedule a second batch and check the decision std::shared_ptr reqTwo = - faabric::util::batchExecFactory("foo", "bar", nCallsTwo); + faabric::util::batchExecFactory("foo", "baz", nCallsTwo); int appId2 = reqTwo->appid(); std::vector reqTwoMsgIds; @@ -318,10 +318,11 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, // Check no other functions have been scheduled on this host REQUIRE(sch.getRecordedMessages().size() == nCallsOne + nCallsTwo); + faabric::Message msg2 = reqTwo->messages().at(0); if (isThreads) { - REQUIRE(sch.getFunctionExecutorCount(m) == 1); + REQUIRE(sch.getFunctionExecutorCount(msg2) == 1); } else { - REQUIRE(sch.getFunctionExecutorCount(m) == nCallsTwo); + REQUIRE(sch.getFunctionExecutorCount(msg2) == nCallsTwo); } } diff --git a/tests/test/transport/test_tcp_sockets.cpp b/tests/test/transport/test_tcp_sockets.cpp index aba8ca1ed..b4d01e8b1 100644 --- a/tests/test/transport/test_tcp_sockets.cpp +++ b/tests/test/transport/test_tcp_sockets.cpp @@ -2,9 +2,11 @@ #include #include +#include #include #include +#include #include using namespace faabric::transport::tcp; @@ -36,6 +38,57 @@ TEST_CASE("Test connecting a send/recv socket pair", "[transport]") REQUIRE(errno == EBADF); } +TEST_CASE("Test setting socket options", "[transport]") +{ + std::latch endLatch(2); + + int conn; + { + RecvSocket dst(TEST_PORT); + + // Start sending socket thread + std::jthread sendThread([&] { + SendSocket src(LOCALHOST, TEST_PORT); + + // Connect to receiving socket + src.dial(); + + endLatch.arrive_and_wait(); + }); + + // Connect to send socket and store the connection fd + dst.listen(); + conn = dst.accept(); + + // Set options on the socket given by accept (i.e. the send socket) + setReuseAddr(conn); + setNoDelay(conn); + setQuickAck(conn); + setQuickAck(conn); + setBusyPolling(conn); + setNonBlocking(conn); + setBlocking(conn); + + REQUIRE(!isNonBlocking(conn)); + + endLatch.arrive_and_wait(); + } + + // Verify that the open connections are closed when the sockets go out + // of scope + ::close(conn); + REQUIRE(errno == EBADF); + + // Any operation on the closed socket will fail + REQUIRE_THROWS(setReuseAddr(conn)); + REQUIRE_THROWS(setNoDelay(conn)); + REQUIRE_THROWS(setQuickAck(conn)); + REQUIRE_THROWS(setQuickAck(conn)); + REQUIRE_THROWS(setBusyPolling(conn)); + REQUIRE_THROWS(setNonBlocking(conn)); + REQUIRE_THROWS(setBlocking(conn)); +} + TEST_CASE("Test send/recv one message using raw TCP sockets", "[transport]") { RecvSocket dst(TEST_PORT); diff --git a/tests/test/util/test_hwloc.cpp b/tests/test/util/test_hwloc.cpp new file mode 100644 index 000000000..9c0e8bb9a --- /dev/null +++ b/tests/test/util/test_hwloc.cpp @@ -0,0 +1,78 @@ +#include + +#include +#include +#include +#include + +#include +#include + +using namespace faabric::util; + +namespace tests { +int nCpus = std::jthread::hardware_concurrency(); + +void checkThreadIsPinnedToCpu(pthread_t thread, int cpuIdx) +{ + // Check the affinity given by pthread + cpu_set_t actualCpuSet; + int retVal = + pthread_getaffinity_np(thread, sizeof(cpu_set_t), &actualCpuSet); + REQUIRE(retVal == 0); + + REQUIRE(CPU_ISSET(cpuIdx, &actualCpuSet)); +} + +TEST_CASE("Test pinning thread to CPU using pthreads") +{ + pthread_t self = pthread_self(); + REQUIRE(nCpus > 0); + + auto cpuSet = pinThreadToFreeCpu(self); + + REQUIRE(cpuSet->get() != nullptr); + + checkThreadIsPinnedToCpu(self, 0); +} + +TEST_CASE("Test overcommitting to CPU cores fails unless in test mode") +{ + std::vector threads; + std::vector> cpuSets; + + // First, occupy all CPUs + for (int i = 0; i < nCpus; i++) { + threads.emplace_back([] { SLEEP_MS(200); }); + cpuSets.emplace_back(pinThreadToFreeCpu(threads.at(i).native_handle())); + checkThreadIsPinnedToCpu(threads.at(i).native_handle(), i); + } + + bool isTestMode; + SECTION("Test mode disabled") + { + isTestMode = false; + } + + SECTION("Test mode enabled") + { + isTestMode = true; + } + + // Then, try to occupy another one + std::unique_ptr lastCpuSet; + pthread_t self = pthread_self(); + faabric::util::setTestMode(isTestMode); + + if (isTestMode) { + // In test mode, this is allowed + REQUIRE_NOTHROW(lastCpuSet = pinThreadToFreeCpu(self)); + checkThreadIsPinnedToCpu(self, 0); + } else { + // In non-test mode, it is not + REQUIRE_THROWS(lastCpuSet = pinThreadToFreeCpu(self)); + } + + faabric::util::setTestMode(true); +} +}