Skip to content

Commit ecf2c57

Browse files
gn00295120claude
andcommitted
fix: prevent mark metadata leak in Twilio buffering (addresses Codex P1)
Critical fix for memory leak identified by chatgpt-codex-connector: Problem: - Each audio chunk created a mark entry in _mark_data - But only the last mark_id was sent to Twilio when flushing buffer - Earlier marks were never acknowledged, causing memory leak - Playback tracker couldn't track all sent audio Solution: - Track all mark_ids for buffered chunks in _buffered_marks list - Send mark events for ALL buffered chunks when flushing - Clear _buffered_marks after flush to prevent reuse - Extract mark creation logic to _create_mark() method (addresses Copilot nitpick) Additional improvements: - Remove '- NEW' comment suffix (Copilot suggestion) - _flush_outgoing_audio_buffer now handles empty buffer check internally This ensures proper playback tracking and prevents _mark_data from growing indefinitely. Generated with Lucas Wang<[email protected]> Co-Authored-By: Claude <[email protected]>
1 parent 7b2f4e0 commit ecf2c57

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

examples/realtime/twilio/twilio_handler.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def __init__(self, twilio_websocket: WebSocket):
5757
self._audio_buffer: bytearray = bytearray()
5858
self._last_buffer_send_time = time.time()
5959

60-
# Outgoing audio buffer (from OpenAI to Twilio) - NEW
60+
# Outgoing audio buffer (from OpenAI to Twilio)
6161
self._outgoing_audio_buffer: bytearray = bytearray()
6262
self._last_outgoing_send_time = time.time()
6363

@@ -66,6 +66,8 @@ def __init__(self, twilio_websocket: WebSocket):
6666
self._mark_data: dict[
6767
str, tuple[str, int, int]
6868
] = {} # mark_id -> (item_id, content_index, byte_count)
69+
# Track marks for buffered audio chunks
70+
self._buffered_marks: list[str] = [] # mark_ids for chunks in current buffer
6971

7072
async def start(self) -> None:
7173
"""Start the session."""
@@ -132,32 +134,28 @@ async def _handle_realtime_event(self, event: RealtimeSessionEvent) -> None:
132134
self._outgoing_audio_buffer.extend(event.audio.data)
133135

134136
# Store metadata for this audio chunk
135-
self._mark_counter += 1
136-
mark_id = str(self._mark_counter)
137-
self._mark_data[mark_id] = (
138-
event.audio.item_id,
139-
event.audio.content_index,
140-
len(event.audio.data),
137+
mark_id = self._create_mark(
138+
event.audio.item_id, event.audio.content_index, len(event.audio.data)
141139
)
140+
self._buffered_marks.append(mark_id)
142141

143142
# Send buffered audio if we have enough data (reduces jittering)
144143
if len(self._outgoing_audio_buffer) >= self.BUFFER_SIZE_BYTES:
145-
await self._flush_outgoing_audio_buffer(mark_id)
144+
await self._flush_outgoing_audio_buffer()
146145

147146
elif event.type == "audio_interrupted":
148147
print("Sending audio interrupted to Twilio")
149148
# Flush any remaining buffered audio before clearing
150-
if self._outgoing_audio_buffer:
151-
await self._flush_outgoing_audio_buffer(None)
149+
await self._flush_outgoing_audio_buffer()
152150
await self.twilio_websocket.send_text(
153151
json.dumps({"event": "clear", "streamSid": self._stream_sid})
154152
)
155153
self._outgoing_audio_buffer.clear()
154+
self._buffered_marks.clear()
156155
elif event.type == "audio_end":
157156
print("Audio end - flushing remaining buffered audio")
158157
# Flush remaining audio at the end
159-
if self._outgoing_audio_buffer:
160-
await self._flush_outgoing_audio_buffer(None)
158+
await self._flush_outgoing_audio_buffer()
161159
elif event.type == "raw_model_event":
162160
pass
163161
else:
@@ -245,7 +243,14 @@ async def _flush_audio_buffer(self) -> None:
245243
except Exception as e:
246244
print(f"Error sending buffered audio to OpenAI: {e}")
247245

248-
async def _flush_outgoing_audio_buffer(self, mark_id: str | None) -> None:
246+
def _create_mark(self, item_id: str, content_index: int, byte_count: int) -> str:
247+
"""Create a new mark for tracking audio playback."""
248+
self._mark_counter += 1
249+
mark_id = str(self._mark_counter)
250+
self._mark_data[mark_id] = (item_id, content_index, byte_count)
251+
return mark_id
252+
253+
async def _flush_outgoing_audio_buffer(self) -> None:
249254
"""Send buffered audio to Twilio to reduce jittering."""
250255
if not self._outgoing_audio_buffer:
251256
return
@@ -263,8 +268,8 @@ async def _flush_outgoing_audio_buffer(self, mark_id: str | None) -> None:
263268
)
264269
)
265270

266-
# Send mark event for playback tracking (if provided)
267-
if mark_id is not None:
271+
# Send mark events for all buffered chunks (for playback tracking)
272+
for mark_id in self._buffered_marks:
268273
await self.twilio_websocket.send_text(
269274
json.dumps(
270275
{
@@ -275,8 +280,9 @@ async def _flush_outgoing_audio_buffer(self, mark_id: str | None) -> None:
275280
)
276281
)
277282

278-
# Clear the buffer
283+
# Clear the buffer and marks
279284
self._outgoing_audio_buffer.clear()
285+
self._buffered_marks.clear()
280286
self._last_outgoing_send_time = time.time()
281287

282288
except Exception as e:
@@ -302,7 +308,7 @@ async def _buffer_flush_loop(self) -> None:
302308
self._outgoing_audio_buffer
303309
and current_time - self._last_outgoing_send_time > self.CHUNK_LENGTH_S * 2
304310
):
305-
await self._flush_outgoing_audio_buffer(None)
311+
await self._flush_outgoing_audio_buffer()
306312

307313
except Exception as e:
308314
print(f"Error in buffer flush loop: {e}")

0 commit comments

Comments
 (0)