Skip to content

Commit 816763b

Browse files
feat: add support for flasher_args.json upload (#13)
1 parent 0cf2e46 commit 816763b

File tree

11 files changed

+238
-6
lines changed

11 files changed

+238
-6
lines changed

examples/__init__.py

Whitespace-only changes.

examples/hello_esp32_idf/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Ignore the firmware files, as they are downloaded from the internet
2+
hello_world.bin
3+
hello_world.elf
4+
build/

examples/hello_esp32_idf/__init__.py

Whitespace-only changes.

examples/hello_esp32_idf/diagram.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"version": 1,
3+
"author": "Uri Shaked",
4+
"editor": "wokwi",
5+
"parts": [
6+
{
7+
"type": "board-esp32-devkit-c-v4",
8+
"id": "esp",
9+
"top": 0,
10+
"left": 0,
11+
"attrs": { "fullBoot": "1" }
12+
}
13+
],
14+
"connections": [
15+
["esp:TX", "$serialMonitor:RX", "", []],
16+
["esp:RX", "$serialMonitor:TX", "", []]
17+
],
18+
"serialMonitor": {
19+
"display": "terminal"
20+
},
21+
"dependencies": {}
22+
}

examples/hello_esp32_idf/main.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# SPDX-License-Identifier: MIT
2+
# Copyright (C) 2025, CodeMagic LTD
3+
4+
import asyncio
5+
import os
6+
from pathlib import Path
7+
8+
from examples.helper.github_download import download_github_dir
9+
from wokwi_client import GET_TOKEN_URL, WokwiClient
10+
11+
# sys.path.append(str(Path(__file__).parent.parent))
12+
# from github_download import download_github_dir
13+
14+
EXAMPLE_DIR = Path(__file__).parent
15+
USER = "espressif"
16+
REPO = "pytest-embedded"
17+
PATH = "tests/fixtures/hello_world_esp32/build"
18+
REF = "7e66a07870d1cd97a454318892c6f6225def3144"
19+
20+
SLEEP_TIME = int(os.getenv("WOKWI_SLEEP_TIME", "10"))
21+
22+
23+
async def main() -> None:
24+
token = os.getenv("WOKWI_CLI_TOKEN")
25+
if not token:
26+
raise SystemExit(
27+
f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}."
28+
)
29+
30+
# Automatically download build files from GitHub if missing
31+
build_dir = EXAMPLE_DIR / "build"
32+
download_github_dir(
33+
user=USER,
34+
repo=REPO,
35+
path=PATH,
36+
base_path=build_dir,
37+
ref=REF,
38+
)
39+
40+
client = WokwiClient(token)
41+
print(f"Wokwi client library version: {client.version}")
42+
43+
hello = await client.connect()
44+
print("Connected to Wokwi Simulator, server version:", hello["version"])
45+
46+
# Upload the diagram and firmware files
47+
await client.upload_file("diagram.json", EXAMPLE_DIR / "diagram.json")
48+
filename = await client.upload_file(
49+
"flasher_args.json", EXAMPLE_DIR / "build" / "flasher_args.json"
50+
)
51+
52+
# Start the simulation
53+
await client.start_simulation(
54+
firmware=filename,
55+
)
56+
57+
# Stream serial output for a few seconds
58+
serial_task = asyncio.create_task(client.serial_monitor_cat())
59+
60+
# # Alternative lambda version
61+
# serial_task = client.serial_monitor(
62+
# lambda line: print(line.decode("utf-8", errors="replace"), end="", flush=True)
63+
# )
64+
65+
# delay 2 seconds
66+
await asyncio.sleep(2)
67+
68+
# await client.set_control("dsa", "dsdsa", 1)
69+
70+
print(f"Simulation started, waiting for {SLEEP_TIME} seconds…")
71+
await client.wait_until_simulation_time(SLEEP_TIME)
72+
serial_task.cancel()
73+
74+
# Disconnect from the simulator
75+
await client.disconnect()
76+
77+
78+
if __name__ == "__main__":
79+
asyncio.run(main())

examples/helper/__init__.py

Whitespace-only changes.

examples/helper/github_download.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from pathlib import Path
2+
3+
import requests
4+
5+
6+
def download_file(url: str, dest: Path) -> None:
7+
response = requests.get(url)
8+
response.raise_for_status()
9+
dest.parent.mkdir(parents=True, exist_ok=True)
10+
with open(dest, "wb") as f:
11+
f.write(response.content)
12+
13+
14+
def download_github_dir(
15+
user: str, repo: str, path: str, base_path: Path, ref: str = "main"
16+
) -> None:
17+
api_url = f"https://api.github.com/repos/{user}/{repo}/contents/{path}?ref={ref}"
18+
response = requests.get(api_url)
19+
response.raise_for_status()
20+
items = response.json()
21+
for item in items:
22+
if item["type"] == "file":
23+
print(f"Downloading {base_path / item['name']}...")
24+
download_file(item["download_url"], base_path / item["name"])
25+
elif item["type"] == "dir":
26+
subdir_name = item["name"]
27+
download_github_dir(user, repo, f"{path}/{subdir_name}", base_path / subdir_name, ref)

src/wokwi_client/client.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,20 @@ async def upload(self, name: str, content: bytes) -> None:
8181
"""
8282
await upload(self._transport, name, content)
8383

84-
async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> None:
84+
async def upload_file(self, filename: str, local_path: Optional[Path] = None) -> str:
8585
"""
8686
Upload a local file to the simulator.
87+
If you specify the local_path to the file `flasher_args.json` (IDF flash information),
88+
the contents of the file will be processed and the correct firmware file will be
89+
uploaded instead, returning the firmware filename.
8790
8891
Args:
8992
filename: The name to use for the uploaded file.
9093
local_path: Optional path to the local file. If not provided, uses filename as the path.
94+
Returns:
95+
The filename of the uploaded file (useful for idf when uploading flasher_args.json).
9196
"""
92-
await upload_file(self._transport, filename, local_path)
97+
return await upload_file(self._transport, filename, local_path)
9398

9499
async def download(self, name: str) -> bytes:
95100
"""

src/wokwi_client/file_ops.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,24 @@
66
from pathlib import Path
77
from typing import Optional
88

9+
from wokwi_client.idf import resolveIdfFirmware
10+
911
from .models import UploadParams
1012
from .protocol_types import ResponseMessage
1113
from .transport import Transport
1214

1315

1416
async def upload_file(
1517
transport: Transport, filename: str, local_path: Optional[Path] = None
16-
) -> ResponseMessage:
17-
path = Path(local_path or filename)
18-
content = path.read_bytes()
19-
return await upload(transport, filename, content)
18+
) -> str:
19+
firmware_path = local_path or filename
20+
if str(firmware_path).endswith("flasher_args.json"):
21+
filename = "firmware.bin"
22+
content = resolveIdfFirmware(str(firmware_path))
23+
else:
24+
content = Path(firmware_path).read_bytes()
25+
await upload(transport, filename, content)
26+
return filename
2027

2128

2229
async def upload(transport: Transport, name: str, content: bytes) -> ResponseMessage:

src/wokwi_client/idf.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import json
2+
import os
3+
from typing import TypedDict
4+
5+
MAX_FIRMWARE_SIZE = 4 * 1024 * 1024 # 4MB
6+
7+
8+
class FirmwarePart(TypedDict):
9+
offset: int
10+
data: bytes
11+
12+
13+
def resolveIdfFirmware(flasher_args_path: str) -> bytes:
14+
"""
15+
Resolve ESP32 firmware from flasher_args.json file.
16+
Implemented based on the logic from the wokwi-cli.
17+
- https://github.com/wokwi/wokwi-cli/blob/1726692465f458420f71bc4dbd100aeedf2e37bb/src/uploadFirmware.ts
18+
19+
More about flasher_args.json:
20+
- https://docs.espressif.com/projects/esp-idf/en/release-v5.5/esp32/api-guides/build-system.html
21+
22+
Args:
23+
flasher_args_path: Path to the flasher_args.json file
24+
25+
Returns:
26+
Combined firmware binary data as bytes
27+
28+
Raises:
29+
ValueError: If flasher_args.json is invalid or files are missing
30+
FileNotFoundError: If required firmware files are not found
31+
"""
32+
try:
33+
with open(flasher_args_path) as f:
34+
flasher_args = json.load(f)
35+
except (json.JSONDecodeError, FileNotFoundError) as e:
36+
raise ValueError(f"Failed to read flasher_args.json: {e}")
37+
38+
if "flash_files" not in flasher_args:
39+
raise ValueError("flash_files is not defined in flasher_args.json")
40+
41+
firmware_parts: list[FirmwarePart] = []
42+
firmware_size = 0
43+
flasher_dir = os.path.dirname(flasher_args_path)
44+
45+
# Process each flash file entry
46+
for offset_str, file_path in flasher_args["flash_files"].items():
47+
try:
48+
offset = int(offset_str, 16)
49+
except ValueError:
50+
raise ValueError(f"Invalid offset in flasher_args.json flash_files: {offset_str}")
51+
52+
full_file_path = os.path.join(flasher_dir, file_path)
53+
54+
try:
55+
with open(full_file_path, "rb") as f:
56+
data = f.read()
57+
except FileNotFoundError:
58+
raise FileNotFoundError(f"Firmware file not found: {full_file_path}")
59+
60+
firmware_parts.append({"offset": offset, "data": data})
61+
firmware_size = max(firmware_size, offset + len(data))
62+
63+
if firmware_size > MAX_FIRMWARE_SIZE:
64+
raise ValueError(
65+
f"Firmware size ({firmware_size} bytes) exceeds the maximum supported size ({MAX_FIRMWARE_SIZE} bytes)"
66+
)
67+
68+
# Create combined firmware binary
69+
firmware_data = bytearray(firmware_size)
70+
71+
# Fill with 0xFF (typical flash erased state)
72+
for i in range(firmware_size):
73+
firmware_data[i] = 0xFF
74+
75+
# Write each firmware part to the correct offset
76+
for part in firmware_parts:
77+
offset = part["offset"]
78+
data = part["data"]
79+
firmware_data[offset : offset + len(data)] = data
80+
81+
return bytes(firmware_data)

0 commit comments

Comments
 (0)