Skip to content

Commit 869baf4

Browse files
authored
feat(taskworker): Make process_profile worker support uncompressed payloads (#95692)
The taskworker producer layer now supports zstd compression. This means that the zlib task level compression is no longer necessary here. In order to remove, this PR updates the worker code to take an optional parameter which decides whether or not the payload should be zlib decompressed. In a subsequent PR, the producer code will be updated to send uncompressed payloads
1 parent 6abd09c commit 869baf4

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

src/sentry/profiles/task.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,20 @@ def _get_profiles_producer_from_topic(topic: Topic) -> KafkaProducer:
101101
logger = logging.getLogger(__name__)
102102

103103

104-
def decode_payload(encoded: str) -> dict[str, Any]:
105-
try:
106-
res = msgpack.unpackb(zlib.decompress(b64decode(encoded.encode("utf-8"))), use_list=False)
107-
metrics.incr("profiling.profile_metrics.decompress", tags={"status": "ok"})
108-
return res
109-
except Exception as e:
110-
logger.exception("Failed to decompress compressed profile", extra={"error": e})
111-
metrics.incr("profiling.profile_metrics.decompress", tags={"status": "err"})
112-
raise
104+
def decode_payload(encoded: str, compressed_profile: bool) -> dict[str, Any]:
105+
if compressed_profile:
106+
try:
107+
res = msgpack.unpackb(
108+
zlib.decompress(b64decode(encoded.encode("utf-8"))), use_list=False
109+
)
110+
metrics.incr("profiling.profile_metrics.decompress", tags={"status": "ok"})
111+
return res
112+
except Exception as e:
113+
logger.exception("Failed to decompress compressed profile", extra={"error": e})
114+
metrics.incr("profiling.profile_metrics.decompress", tags={"status": "err"})
115+
raise
116+
else:
117+
return msgpack.unpackb(b64decode(encoded.encode("utf-8")), use_list=False)
113118

114119

115120
def encode_payload(message: dict[str, Any]) -> str:
@@ -146,13 +151,14 @@ def process_profile_task(
146151
profile: Profile | None = None,
147152
payload: str | None = None,
148153
sampled: bool = True,
154+
compressed_profile: bool = True,
149155
**kwargs: Any,
150156
) -> None:
151157
if not sampled and not options.get("profiling.profile_metrics.unsampled_profiles.enabled"):
152158
return
153159

154160
if payload:
155-
message_dict = decode_payload(payload)
161+
message_dict = decode_payload(payload, compressed_profile)
156162

157163
profile = json.loads(message_dict["payload"], use_rapid_json=True)
158164

0 commit comments

Comments
 (0)