Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Commit c3543da

Browse files
authored
Merge pull request #162 from jhrozek/output_pipeline
Add an output pipeline that deobfuscated secrets
2 parents 33492f3 + 5ada6b4 commit c3543da

File tree

15 files changed

+1135
-163
lines changed

15 files changed

+1135
-163
lines changed

src/codegate/pipeline/base.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import uuid
12
from abc import ABC, abstractmethod
23
from dataclasses import dataclass, field
34
from typing import Any, Dict, List, Optional
45

56
from litellm import ChatCompletionRequest
67

8+
from codegate.pipeline.secrets.manager import SecretsManager
9+
710

811
@dataclass
912
class CodeSnippet:
@@ -24,10 +27,25 @@ def __post_init__(self):
2427
self.language = self.language.strip().lower()
2528

2629

30+
@dataclass
31+
class PipelineSensitiveData:
32+
manager: SecretsManager
33+
session_id: str
34+
35+
def secure_cleanup(self):
36+
"""Securely cleanup sensitive data for this session"""
37+
if self.manager is None or self.session_id == "":
38+
return
39+
40+
self.manager.cleanup_session(self.session_id)
41+
self.session_id = ""
42+
43+
2744
@dataclass
2845
class PipelineContext:
2946
code_snippets: List[CodeSnippet] = field(default_factory=list)
3047
metadata: Dict[str, Any] = field(default_factory=dict)
48+
sensitive: Optional[PipelineSensitiveData] = field(default_factory=lambda: None)
3149

3250
def add_code_snippet(self, snippet: CodeSnippet):
3351
self.code_snippets.append(snippet)
@@ -139,18 +157,23 @@ def __init__(self, pipeline_steps: List[PipelineStep]):
139157

140158
async def process_request(
141159
self,
160+
secret_manager: SecretsManager,
142161
request: ChatCompletionRequest,
143162
) -> PipelineResult:
144163
"""
145164
Process a request through all pipeline steps
146165
147166
Args:
148167
request: The chat completion request to process
168+
secret_manager: The secrets manager instance to gather sensitive data from the request
149169
150170
Returns:
151171
PipelineResult containing either a modified request or response structure
152172
"""
153173
context = PipelineContext()
174+
context.sensitive = PipelineSensitiveData(
175+
manager=secret_manager, session_id=str(uuid.uuid4())
176+
) # Generate a new session ID for each request
154177
current_request = request
155178

156179
for step in self.pipeline_steps:

src/codegate/pipeline/output.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
from abc import ABC, abstractmethod
2+
from dataclasses import dataclass, field
3+
from typing import AsyncIterator, Optional
4+
5+
from litellm import ModelResponse
6+
from litellm.types.utils import Delta, StreamingChoices
7+
8+
from codegate.pipeline.base import PipelineContext
9+
10+
11+
@dataclass
12+
class OutputPipelineContext:
13+
"""
14+
Context passed between output pipeline steps.
15+
16+
Does not include the input context, that one is separate.
17+
"""
18+
19+
# We store the messages that are not yet sent to the client in the buffer.
20+
# One reason for this might be that the buffer contains a secret that we want to de-obfuscate
21+
buffer: list[str] = field(default_factory=list)
22+
23+
24+
class OutputPipelineStep(ABC):
25+
"""
26+
Base class for output pipeline steps
27+
The process method should be implemented by subclasses and handles
28+
processing of a single chunk of the stream.
29+
"""
30+
31+
@property
32+
@abstractmethod
33+
def name(self) -> str:
34+
"""Returns the name of this pipeline step"""
35+
pass
36+
37+
@abstractmethod
38+
async def process_chunk(
39+
self,
40+
chunk: ModelResponse,
41+
context: OutputPipelineContext,
42+
input_context: Optional[PipelineContext] = None,
43+
) -> Optional[ModelResponse]:
44+
"""
45+
Process a single chunk of the stream.
46+
47+
Args:
48+
- chunk: The input chunk to process, normalized to ModelResponse
49+
- context: The output pipeline context. Can be used to store state between steps, mainly
50+
the buffer.
51+
- input_context: The input context from processing the user's input. Can include the secrets
52+
obfuscated in the user message or code snippets in the user message.
53+
54+
Return:
55+
- None to pause the stream
56+
- Modified or unmodified input chunk to pass through
57+
"""
58+
pass
59+
60+
61+
class OutputPipelineInstance:
62+
"""
63+
Handles processing of a single stream
64+
Think of this class as steps + buffer
65+
"""
66+
67+
def __init__(
68+
self,
69+
pipeline_steps: list[OutputPipelineStep],
70+
input_context: Optional[PipelineContext] = None,
71+
):
72+
self._input_context = input_context
73+
self._pipeline_steps = pipeline_steps
74+
self._context = OutputPipelineContext()
75+
# we won't actually buffer the chunk, but in case we need to pass
76+
# the remaining content in the buffer when the stream ends, we need
77+
# to store the parameters like model, timestamp, etc.
78+
self._buffered_chunk = None
79+
80+
def _buffer_chunk(self, chunk: ModelResponse) -> None:
81+
"""
82+
Add chunk content to buffer.
83+
"""
84+
self._buffered_chunk = chunk
85+
for choice in chunk.choices:
86+
# the last choice has no delta or content, let's not buffer it
87+
if choice.delta is not None and choice.delta.content is not None:
88+
self._context.buffer.append(choice.delta.content)
89+
90+
async def process_stream(
91+
self, stream: AsyncIterator[ModelResponse]
92+
) -> AsyncIterator[ModelResponse]:
93+
"""
94+
Process a stream through all pipeline steps
95+
"""
96+
try:
97+
async for chunk in stream:
98+
# Store chunk content in buffer
99+
self._buffer_chunk(chunk)
100+
101+
# Process chunk through each step of the pipeline
102+
current_chunk = chunk
103+
for step in self._pipeline_steps:
104+
if current_chunk is None:
105+
# Stop processing if a step returned None previously
106+
# this means that the pipeline step requested to pause the stream
107+
# instead, let's try again with the next chunk
108+
break
109+
110+
processed_chunk = await step.process_chunk(
111+
current_chunk, self._context, self._input_context
112+
)
113+
# the returned chunk becomes the input for the next chunk in the pipeline
114+
current_chunk = processed_chunk
115+
116+
# we have either gone through all the steps in the pipeline and have a chunk
117+
# to return or we are paused in which case we don't yield
118+
if current_chunk is not None:
119+
# Step processed successfully, yield the chunk and clear buffer
120+
self._context.buffer.clear()
121+
yield current_chunk
122+
# else: keep buffering for next iteration
123+
124+
except Exception as e:
125+
# Log exception and stop processing
126+
raise e
127+
finally:
128+
# Process any remaining content in buffer when stream ends
129+
if self._context.buffer:
130+
final_content = "".join(self._context.buffer)
131+
yield ModelResponse(
132+
id=self._buffered_chunk.id,
133+
choices=[
134+
StreamingChoices(
135+
finish_reason=None,
136+
# we just put one choice in the buffer, so 0 is fine
137+
index=0,
138+
delta=Delta(content=final_content, role="assistant"),
139+
# umm..is this correct?
140+
logprobs=self._buffered_chunk.choices[0].logprobs,
141+
)
142+
],
143+
created=self._buffered_chunk.created,
144+
model=self._buffered_chunk.model,
145+
object="chat.completion.chunk",
146+
)
147+
self._context.buffer.clear()
148+
149+
# Cleanup sensitive data through the input context
150+
if self._input_context and self._input_context.sensitive:
151+
self._input_context.sensitive.secure_cleanup()
152+
153+
154+
class OutputPipelineProcessor:
155+
"""
156+
Since we want to provide each run of the pipeline with a fresh context,
157+
we need a factory to create new instances of the pipeline.
158+
"""
159+
160+
def __init__(self, pipeline_steps: list[OutputPipelineStep]):
161+
self.pipeline_steps = pipeline_steps
162+
163+
def _create_instance(self) -> OutputPipelineInstance:
164+
"""Create a new pipeline instance for processing a stream"""
165+
return OutputPipelineInstance(self.pipeline_steps)
166+
167+
async def process_stream(
168+
self, stream: AsyncIterator[ModelResponse]
169+
) -> AsyncIterator[ModelResponse]:
170+
"""Create a new pipeline instance and process the stream"""
171+
instance = self._create_instance()
172+
async for chunk in instance.process_stream(stream):
173+
yield chunk
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
from typing import NamedTuple, Optional
2+
3+
import structlog
4+
5+
from codegate.pipeline.secrets.gatecrypto import CodeGateCrypto
6+
7+
logger = structlog.get_logger("codegate")
8+
9+
10+
class SecretEntry(NamedTuple):
11+
"""Represents a stored secret"""
12+
13+
original: str
14+
encrypted: str
15+
service: str
16+
secret_type: str
17+
18+
19+
class SecretsManager:
20+
"""Manages encryption, storage and retrieval of secrets"""
21+
22+
def __init__(self):
23+
self.crypto = CodeGateCrypto()
24+
self._session_store: dict[str, SecretEntry] = {}
25+
self._encrypted_to_session: dict[str, str] = {} # Reverse lookup index
26+
27+
def store_secret(self, value: str, service: str, secret_type: str, session_id: str) -> str:
28+
"""
29+
Encrypts and stores a secret value.
30+
Returns the encrypted value.
31+
"""
32+
if not value:
33+
raise ValueError("Value must be provided")
34+
if not service:
35+
raise ValueError("Service must be provided")
36+
if not secret_type:
37+
raise ValueError("Secret type must be provided")
38+
if not session_id:
39+
raise ValueError("Session ID must be provided")
40+
41+
encrypted_value = self.crypto.encrypt_token(value, session_id)
42+
43+
# Store mappings
44+
self._session_store[session_id] = SecretEntry(
45+
original=value,
46+
encrypted=encrypted_value,
47+
service=service,
48+
secret_type=secret_type,
49+
)
50+
self._encrypted_to_session[encrypted_value] = session_id
51+
52+
logger.debug("Stored secret", service=service, type=secret_type, encrypted=encrypted_value)
53+
54+
return encrypted_value
55+
56+
def get_original_value(self, encrypted_value: str, session_id: str) -> Optional[str]:
57+
"""Retrieve original value for an encrypted value"""
58+
try:
59+
stored_session_id = self._encrypted_to_session.get(encrypted_value)
60+
if stored_session_id == session_id:
61+
return self._session_store[session_id].original
62+
except Exception as e:
63+
logger.error("Error retrieving secret", error=str(e))
64+
return None
65+
66+
def get_by_session_id(self, session_id: str) -> Optional[SecretEntry]:
67+
"""Get stored data by session ID"""
68+
return self._session_store.get(session_id)
69+
70+
def cleanup(self):
71+
"""Securely wipe sensitive data"""
72+
try:
73+
# Convert and wipe original values
74+
for entry in self._session_store.values():
75+
original_bytes = bytearray(entry.original.encode())
76+
self.crypto.wipe_bytearray(original_bytes)
77+
78+
# Clear the dictionaries
79+
self._session_store.clear()
80+
self._encrypted_to_session.clear()
81+
82+
logger.info("Secrets manager data securely wiped")
83+
except Exception as e:
84+
logger.error("Error during secure cleanup", error=str(e))
85+
86+
def cleanup_session(self, session_id: str):
87+
"""
88+
Remove a specific session's secrets and perform secure cleanup.
89+
90+
Args:
91+
session_id (str): The session identifier to remove
92+
"""
93+
try:
94+
# Get the secret entry for the session
95+
entry = self._session_store.get(session_id)
96+
97+
if entry:
98+
# Securely wipe the original value
99+
original_bytes = bytearray(entry.original.encode())
100+
self.crypto.wipe_bytearray(original_bytes)
101+
102+
# Remove the encrypted value from the reverse lookup index
103+
self._encrypted_to_session.pop(entry.encrypted, None)
104+
105+
# Remove the session from the store
106+
self._session_store.pop(session_id, None)
107+
108+
logger.debug("Session secrets securely removed", session_id=session_id)
109+
else:
110+
logger.debug("No secrets found for session", session_id=session_id)
111+
except Exception as e:
112+
logger.error("Error during session cleanup", session_id=session_id, error=str(e))

0 commit comments

Comments
 (0)