Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ jobs:
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.16.0
env:
DEPLOYMENT_TYPE: gha-ci
steps:
- name: "Check-out code"
uses: actions/checkout@v4
Expand Down Expand Up @@ -59,7 +61,7 @@ jobs:
needs: [conan-cache]
runs-on: ubuntu-latest
env:
HOST_TYPE: ci
DEPLOYMENT_TYPE: gha-ci
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
Expand Down Expand Up @@ -98,7 +100,7 @@ jobs:
matrix:
sanitiser: [None, Address, Thread, Undefined]
env:
HOST_TYPE: ci
DEPLOYMENT_TYPE: gha-ci
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
Expand All @@ -119,6 +121,7 @@ jobs:
run: ./bin/inv_wrapper.sh dev.cc faabric_tests
- name: "Run tests"
run: ./bin/inv_wrapper.sh tests
timeout-minutes: 15

dist-tests:
if: github.event.pull_request.draft == false
Expand All @@ -136,6 +139,10 @@ jobs:
run: ./dist-test/build.sh
- name: "Run the distributed tests"
run: ./dist-test/run.sh
timeout-minutes: 7
- name: "Print dist-test server logs"
if: always()
run: docker compose logs dist-test-server
- name: "Print planner logs"
if: always()
run: docker compose logs planner
Expand All @@ -145,7 +152,7 @@ jobs:
needs: [conan-cache]
runs-on: ubuntu-latest
env:
HOST_TYPE: ci
DEPLOYMENT_TYPE: gha-ci
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
Expand Down
14 changes: 12 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ option(FAABRIC_BUILD_TESTS "Build Faabric tests" ON)
option(FAABRIC_SELF_TRACING "Turn on system tracing using the logger" OFF)
option(FAABRIC_CODE_COVERAGE "Build Faabric with code coverage profiling" OFF)
option(FAABRIC_TARGET_CPU "CPU to optimise for, e.g. skylake, icelake or native" OFF)
option(FAABRIC_USE_SPINLOCK "Use spinlocks for low-latency messaging" ON)

# Enable colorized compiler output
if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
Expand All @@ -16,10 +17,10 @@ endif()

# Optimise for CPU
if(FAABRIC_TARGET_CPU)
message(STATUS "Optimising Faabric for CPU ${FAABRIC_TARGET_CPU}")
message(STATUS "Faabric: optimising for CPU ${FAABRIC_TARGET_CPU}")
add_compile_options(-march=${FAABRIC_TARGET_CPU} -mtune=${FAABRIC_TARGET_CPU})
else()
message(STATUS "Faabric not optimised for specific CPU")
message(STATUS "Faabric: not optimised for specific CPU")
endif()

# Top-level CMake config
Expand All @@ -38,6 +39,15 @@ if(${FAABRIC_SELF_TRACING})
add_definitions(-DTRACE_ALL=1)
endif()

# We want to disable the usage of spinlocks (and CPU pinning) in GHA runners
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annoyingly we cannot use spinlocks inside GHA as that would make tests (and particularly distributed tests) very flaky.

# as they have a very low number of vCPU cores
if (${FAABRIC_USE_SPINLOCK} AND NOT "$ENV{DEPLOYMENT_TYPE}" STREQUAL "gha-ci")
message(STATUS "Faabric: enabled spin-locks")
add_definitions(-DFAABRIC_USE_SPINLOCK)
else()
message(STATUS "Faabric: disabled spin-locks")
endif()

# Set-up use of sanitisers
if (FAABRIC_USE_SANITISER STREQUAL "Address")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
Expand Down
45 changes: 27 additions & 18 deletions cmake/ExternalProjects.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ if(NOT EXISTS "${CMAKE_CURRENT_BINARY_DIR}/conan.cmake")
TLS_VERIFY ON)
endif()

set(CONAN_CMAKE_SILENT_OUTPUT ON CACHE INTERNAL "")
include(${CMAKE_CURRENT_BINARY_DIR}/conan.cmake)

conan_check(VERSION 1.63.0 REQUIRED)
Expand Down Expand Up @@ -73,19 +74,19 @@ conan_cmake_install(PATH_OR_REFERENCE .

include(${CMAKE_CURRENT_BINARY_DIR}/conan_paths.cmake)

find_package(absl REQUIRED)
find_package(Boost 1.80.0 REQUIRED)
find_package(Catch2 REQUIRED)
find_package(flatbuffers REQUIRED)
find_package(fmt REQUIRED)
find_package(hiredis REQUIRED)
find_package(absl QUIET REQUIRED)
find_package(Boost 1.80.0 QUIET REQUIRED)
find_package(Catch2 QUIET REQUIRED)
find_package(flatbuffers QUIET REQUIRED)
find_package(fmt QUIET REQUIRED)
find_package(hiredis QUIET REQUIRED)
# 27/01/2023 - Pin OpenSSL to a specific version to avoid incompatibilities
# with the system's (i.e. Ubuntu 22.04) OpenSSL
find_package(OpenSSL 3.0.2 REQUIRED)
find_package(Protobuf 3.20.0 REQUIRED)
find_package(readerwriterqueue REQUIRED)
find_package(spdlog REQUIRED)
find_package(ZLIB REQUIRED)
find_package(OpenSSL 3.0.2 QUIET REQUIRED)
find_package(Protobuf 3.20.0 QUIET REQUIRED)
find_package(readerwriterqueue QUIET REQUIRED)
find_package(spdlog QUIET REQUIRED)
find_package(ZLIB QUIET REQUIRED)

# --------------------------------
# Fetch content dependencies
Expand All @@ -109,16 +110,26 @@ set(ZSTD_LZ4_SUPPORT OFF CACHE INTERNAL "")
# nng (Conan version out of date)
set(NNG_TESTS OFF CACHE INTERNAL "")

FetchContent_Declare(zstd_ext
GIT_REPOSITORY "https://github.com/facebook/zstd"
GIT_TAG "v1.5.2"
SOURCE_SUBDIR "build/cmake"
FetchContent_Declare(atomic_queue_ext
GIT_REPOSITORY "https://github.com/max0x7ba/atomic_queue"
GIT_TAG "7c36f0997979a0fee5be84c9511ee0f6032057ec"
)
FetchContent_Declare(nng_ext
GIT_REPOSITORY "https://github.com/nanomsg/nng"
# NNG tagged version 1.7.1
GIT_TAG "ec4b5722fba105e3b944e3dc0f6b63c941748b3f"
)
FetchContent_Declare(zstd_ext
GIT_REPOSITORY "https://github.com/facebook/zstd"
GIT_TAG "v1.5.2"
SOURCE_SUBDIR "build/cmake"
)

FetchContent_MakeAvailable(atomic_queue_ext)
add_library(atomic_queue::atomic_queue ALIAS atomic_queue)

FetchContent_MakeAvailable(nng_ext)
add_library(nng::nng ALIAS nng)

FetchContent_MakeAvailable(zstd_ext)
# Work around zstd not declaring its targets properly
Expand All @@ -127,9 +138,6 @@ target_include_directories(libzstd_shared SYSTEM INTERFACE $<BUILD_INTERFACE:${z
add_library(zstd::libzstd_static ALIAS libzstd_static)
add_library(zstd::libzstd_shared ALIAS libzstd_shared)

FetchContent_MakeAvailable(nng_ext)
add_library(nng::nng ALIAS nng)

# Group all external dependencies into a convenient virtual CMake library
add_library(faabric_common_dependencies INTERFACE)
target_include_directories(faabric_common_dependencies INTERFACE
Expand All @@ -141,6 +149,7 @@ target_link_libraries(faabric_common_dependencies INTERFACE
absl::flat_hash_set
absl::flat_hash_map
absl::strings
atomic_queue::atomic_queue
Boost::Boost
Boost::system
flatbuffers::flatbuffers
Expand Down
1 change: 1 addition & 0 deletions dist-test/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export PROJ_ROOT=$(dirname $(dirname $(readlink -f $0)))
pushd ${PROJ_ROOT} >> /dev/null

# Run the build
export FAABRIC_DEPLOYMENT_TYPE=gha-ci
docker compose \
run \
--rm \
Expand Down
2 changes: 1 addition & 1 deletion dist-test/build_internal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pushd ${PROJ_ROOT} >> /dev/null
source ./bin/workon.sh

# Run the debug build
inv dev.cmake --build=Debug
inv dev.cmake --build=Debug --clean
inv dev.cc faabric_dist_tests
inv dev.cc faabric_dist_test_server
inv dev.cc planner_server
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ services:
tty: true
privileged: true
environment:
- DEPLOYMENT_TYPE=${FAABRIC_DEPLOYMENT_TYPE:-compose}
- LOG_LEVEL=${LOG_LEVEL:-debug}
- PLANNER_HOST=planner
- PLANNER_PORT=8080
Expand All @@ -48,6 +49,7 @@ services:
- ./conan-cache/:/root/.conan
working_dir: /build/faabric/static
environment:
- DEPLOYMENT_TYPE=${FAABRIC_DEPLOYMENT_TYPE:-compose}
- LOG_LEVEL=debug
- PLANNER_HOST=planner
- PLANNER_PORT=8080
Expand Down
4 changes: 4 additions & 0 deletions include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ namespace faabric::mpi {
// as the broker already has mocking capabilities
std::vector<MpiMessage> getMpiMockedMessages(int sendRank);

#ifdef FAABRIC_USE_SPINLOCK
typedef faabric::util::SpinLockQueue<MpiMessage> InMemoryMpiQueue;
#else
typedef faabric::util::FixedCapacityQueue<MpiMessage> InMemoryMpiQueue;
#endif

class MpiWorld
{
Expand Down
2 changes: 2 additions & 0 deletions include/faabric/transport/tcp/RecvSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ class RecvSocket
int port;

std::deque<int> openConnections;

void setSocketOptions(int connFd);
};
}
5 changes: 4 additions & 1 deletion include/faabric/transport/tcp/SocketOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ void setReuseAddr(int connFd);
void setNoDelay(int connFd);
void setQuickAck(int connFd);

// Blocking/Non-blocking sockets
void setNonBlocking(int connFd);
void setBlocking(int connFd);

bool isNonBlocking(int connFd);

// Enable busy polling for non-blocking sockets
void setBusyPolling(int connFd);
}
1 change: 1 addition & 0 deletions include/faabric/util/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class SystemConfig

// Scheduling
int overrideCpuCount;
int overrideFreeCpuStart;
std::string batchSchedulerMode;

// Worker-related timeouts
Expand Down
32 changes: 32 additions & 0 deletions include/faabric/util/hwloc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <memory>
#include <pthread.h>

namespace faabric::util {

const int NO_CPU_IDX = -1;
const int GHA_CPU_IDX = -2;

class FaabricCpuSet
{
public:
FaabricCpuSet(int cpuIdxIn = NO_CPU_IDX);
FaabricCpuSet& operator=(const FaabricCpuSet&) = delete;
FaabricCpuSet(const FaabricCpuSet&) = delete;

~FaabricCpuSet();

cpu_set_t* get() { return &cpuSet; }

private:
cpu_set_t cpuSet;

// CPU index in internal CPU accounting
int cpuIdx = NO_CPU_IDX;
};

// Pin thread to any "unpinned" CPUs. Returns the CPU set it was pinned to.
// We return a unique pointer to enforce RAII on the pinned-to CPU
std::unique_ptr<FaabricCpuSet> pinThreadToFreeCpu(pthread_t thread);
}
27 changes: 27 additions & 0 deletions include/faabric/util/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>

#include <atomic_queue/atomic_queue.h>
#include <condition_variable>
#include <queue>
#include <readerwriterqueue/readerwritercircularbuffer.h>
Expand Down Expand Up @@ -215,6 +216,32 @@ class FixedCapacityQueue
moodycamel::BlockingReaderWriterCircularBuffer<T> mq;
};

template<typename T>
class SpinLockQueue
{
public:
void enqueue(T& value) { mq.push(value); }

T dequeue() { return mq.pop(); }

long size()
{
throw std::runtime_error("Size for fast queue unimplemented!");
}

void drain()
{
while (mq.pop()) {
;
}
}

void reset() { ; }

private:
atomic_queue::AtomicQueue2<T, 1024, true, true, false, true> mq;
};

class TokenPool
{
public:
Expand Down
15 changes: 12 additions & 3 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,10 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)

// MPI migration
if (msg.ismpi()) {
auto& mpiWorld = faabric::mpi::getMpiWorldRegistry().getWorld(
msg.mpiworldid());
mpiWorld.destroy();
auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry();
if (mpiWorldRegistry.worldExists(msg.mpiworldid())) {
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();
}
}
} catch (const std::exception& ex) {
returnValue = 1;
Expand All @@ -403,6 +404,14 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
"Task {} threw exception. What: {}", msg.id(), ex.what());
SPDLOG_ERROR(errorMessage);
msg.set_outputdata(errorMessage);

// MPI-specific clean-up after we throw an exception
if (msg.ismpi()) {
auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry();
if (mpiWorldRegistry.worldExists(msg.mpiworldid())) {
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();
}
}
}

// Unset context
Expand Down
Loading