Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .darglint
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- mode: ini -*-

[darglint]
docstring_style=google

# DAR402: The docstring describes an exception not explicitly raised.
#
# Ignoring DAR402 because there are cases where public methods
# document exceptions raised by underlying functions.
ignore=DAR402
40 changes: 40 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: frequenz-channels-python

on:
push:
branches: [ v0.x.x ]

pull_request:

workflow_dispatch:

jobs:
test:
runs-on: ubuntu-20.04

steps:
- name: Fetch sources
uses: actions/checkout@v2
with:
submodules: true

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: "3.8"

- uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/setup.cfg') }}
restore-keys: |
${{ runner.os }}-pip-

- name: Install required Python packages
run: |
python -m pip install --upgrade pip
python -m pip install nox

- name: run nox
run: nox
timeout-minutes: 10
129 changes: 129 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Frequenz channels

This repository contains channel implementations for python.
131 changes: 131 additions & 0 deletions benchmarks/benchmark_anycast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Benchmark for Anycast channels.

Copyright
Copyright © 2022 Frequenz Energy-as-a-Service GmbH

License
MIT
"""

import asyncio
import csv
import timeit
from typing import Any, Coroutine, Dict, List, Tuple

from frequenz.channels import Anycast, Receiver, Sender


async def send_msg(num_messages: int, chan: Sender[int]) -> None:
"""Send messages to the channel continuously.

Args:
num_messages (int): Number of messages to send.
chan (Sender[int]): Channel sender to send the messages to.
"""
# send one message for each receiver
for ctr in range(num_messages):
await chan.send(ctr + 1)


async def benchmark_anycast(
num_channels: int,
num_messages: int,
num_receivers: int,
buffer_size: int,
) -> int:
"""Ensure sent messages are received by one receiver.

Args:
num_channels (int): Number of channels to create.
num_messages (int): Number of messages to send per channel.
num_receivers (int): Number of broadcast receivers per channel.
buffer_size (int): Buffer size of each channel.

Returns:
int: Total number of messages received by all channels.
"""
channels: List[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)]
senders = [
asyncio.create_task(send_msg(num_messages, bcast.get_sender()))
for bcast in channels
]

# Even though we want just a single int, use a list, so that it can be
# updated from other methods.
recv_trackers = [0]

async def update_tracker_on_receive(chan: Receiver[int]) -> None:
while True:
msg = await chan.receive()
if msg is None:
return
recv_trackers[0] += 1

receivers = []
for acast in channels:
for _ in range(num_receivers):
receivers.append(update_tracker_on_receive(acast.get_receiver()))

receivers_runs = asyncio.gather(*receivers)

await asyncio.gather(*senders)
for bcast in channels:
await bcast.close()
await receivers_runs
return recv_trackers[0]


def time_async_task(task: Coroutine[Any, Any, int]) -> Tuple[float, Any]:
"""Run a task and return the time taken and the result.

Args:
task (asyncio.Task): Task to run.

Returns:
(float, Any): Run time in fractional seconds, task return value.
"""
start = timeit.default_timer()
ret = asyncio.run(task)
return timeit.default_timer() - start, ret


def run_one(
num_channels: int,
num_messages: int,
num_receivers: int,
buffer_size: int,
) -> Dict[str, Any]:
"""Run a single benchmark."""
runtime, total_msgs = time_async_task(
benchmark_anycast(num_channels, num_messages, num_receivers, buffer_size)
)
ret = {
"channels": num_channels,
"messages_per_channel": num_messages,
"receivers": num_receivers,
"buffer_size": buffer_size,
"total_messages": total_msgs,
"runtime": f"{runtime:.3f}",
}

return ret


def run() -> None:
"""Run all benchmarks."""
with open("/dev/stdout", "w", encoding="utf-8") as csvfile:
fields = run_one(1, 0, 1, 1)
out = csv.DictWriter(csvfile, fields.keys())
out.writeheader()
out.writerow(run_one(1, 1000000, 1, 100))
out.writerow(run_one(1, 1000000, 1, 1000))
out.writerow(run_one(1, 1000000, 10, 100))
out.writerow(run_one(1, 1000000, 10, 1000))
out.writerow(run_one(1000, 1000, 1, 100))
out.writerow(run_one(1000, 1000, 1, 1000))
out.writerow(run_one(1000, 1000, 10, 100))
out.writerow(run_one(1000, 1000, 10, 1000))


if __name__ == "__main__":
run()
Loading