Skip to content

Commit 9b5ad85

Browse files
committed
zephyr_mirror_backend: Fix thread safety problems.
As of commit 5eaac7b (zulip#18), zulip.Client is not thread-safe and especially not fork-safe due to connections held open by requests.Session. Delay construction of the Client until after forking off zulip_to_zephyr. Replace the fork for each message sent by zephyr_to_zulip with a threaded queue worker. Signed-off-by: Anders Kaseorg <[email protected]>
1 parent 091511b commit 9b5ad85

File tree

1 file changed

+57
-34
lines changed

1 file changed

+57
-34
lines changed

zulip/integrations/zephyr/zephyr_mirror_backend.py

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,38 @@
1313
import tempfile
1414
import textwrap
1515
import time
16+
from queue import Queue
17+
from threading import Thread
1618
from types import FrameType
1719
from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
1820

1921
from typing_extensions import Literal, TypedDict
2022

23+
import zulip
2124
from zulip import RandomExponentialBackoff
2225

2326
DEFAULT_SITE = "https://api.zulip.com"
2427

2528

2629
class States:
27-
Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4))
30+
Startup, ZulipToZephyr, ZephyrToZulip = list(range(3))
2831

2932

3033
CURRENT_STATE = States.Startup
3134

3235
logger: logging.Logger
3336

3437

38+
def make_zulip_client() -> zulip.Client:
39+
return zulip.Client(
40+
email=zulip_account_email,
41+
api_key=api_key,
42+
verbose=True,
43+
client="zephyr_mirror",
44+
site=options.site,
45+
)
46+
47+
3548
def to_zulip_username(zephyr_username: str) -> str:
3649
if "@" in zephyr_username:
3750
(user, realm) = zephyr_username.split("@")
@@ -117,7 +130,7 @@ class ZephyrDict(TypedDict, total=False):
117130
zsig: str
118131

119132

120-
def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
133+
def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]:
121134
message: Dict[str, Any]
122135
message = {}
123136
if options.forward_class_messages:
@@ -151,7 +164,7 @@ def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
151164
return zulip_client.send_message(message)
152165

153166

154-
def send_error_zulip(error_msg: str) -> None:
167+
def send_error_zulip(zulip_client: zulip.Client, error_msg: str) -> None:
155168
message = {
156169
"type": "private",
157170
"sender": zulip_account_email,
@@ -263,7 +276,7 @@ def maybe_restart_mirroring_script() -> None:
263276
raise Exception("Failed to reload too many times, aborting!")
264277

265278

266-
def process_loop(log: Optional[IO[str]]) -> NoReturn:
279+
def process_loop(zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]) -> NoReturn:
267280
restart_check_count = 0
268281
last_check_time = time.time()
269282
recieve_backoff = RandomExponentialBackoff()
@@ -278,7 +291,7 @@ def process_loop(log: Optional[IO[str]]) -> NoReturn:
278291
if notice is None:
279292
break
280293
try:
281-
process_notice(notice, log)
294+
process_notice(notice, zulip_queue, log)
282295
process_backoff.succeed()
283296
except Exception:
284297
logger.exception("Error relaying zephyr:")
@@ -395,7 +408,9 @@ def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str:
395408
return decrypted
396409

397410

398-
def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
411+
def process_notice(
412+
notice: "zephyr.ZNotice", zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]
413+
) -> None:
399414
assert notice.sender is not None
400415
(zsig, body) = parse_zephyr_body(notice.message, notice.format)
401416
is_personal = False
@@ -490,18 +505,19 @@ def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
490505
log.write(json.dumps(zeph) + "\n")
491506
log.flush()
492507

493-
if os.fork() == 0:
494-
global CURRENT_STATE
495-
CURRENT_STATE = States.ChildSending
496-
# Actually send the message in a child process, to avoid blocking.
508+
zulip_queue.put(zeph)
509+
510+
511+
def send_zulip_worker(zulip_queue: "Queue[ZephyrDict]", zulip_client: zulip.Client) -> None:
512+
while True:
513+
zeph = zulip_queue.get()
497514
try:
498-
res = send_zulip(zeph)
515+
res = send_zulip(zulip_client, zeph)
499516
if res.get("result") != "success":
500517
logger.error(f"Error relaying zephyr:\n{zeph}\n{res}")
501518
except Exception:
502519
logger.exception("Error relaying zephyr:")
503-
finally:
504-
os._exit(0)
520+
zulip_queue.task_done()
505521

506522

507523
def quit_failed_initialization(message: str) -> str:
@@ -560,6 +576,8 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
560576

561577

562578
def zephyr_to_zulip(options: optparse.Values) -> None:
579+
zulip_client = make_zulip_client()
580+
563581
if options.use_sessions and os.path.exists(options.session_path):
564582
logger.info("Loading old session")
565583
zephyr_load_session_autoretry(options.session_path)
@@ -593,18 +611,22 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
593611
"sending saved message to %s from %s..."
594612
% (zeph.get("stream", zeph.get("recipient")), zeph["sender"])
595613
)
596-
send_zulip(zeph)
614+
send_zulip(zulip_client, zeph)
597615
except Exception:
598616
logger.exception("Could not send saved zephyr:")
599617
time.sleep(2)
600618

601619
logger.info("Successfully initialized; Starting receive loop.")
602620

621+
# Actually send the messages in a thread, to avoid blocking.
622+
zulip_queue: "Queue[ZephyrDict]" = Queue()
623+
Thread(target=lambda: send_zulip_worker(zulip_queue, zulip_client)).start()
624+
603625
if options.resend_log_path is not None:
604626
with open(options.resend_log_path, "a") as log:
605-
process_loop(log)
627+
process_loop(zulip_queue, log)
606628
else:
607-
process_loop(None)
629+
process_loop(zulip_queue, None)
608630

609631

610632
def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
@@ -675,7 +697,7 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op
675697
return encrypted
676698

677699

678-
def forward_to_zephyr(message: Dict[str, Any]) -> None:
700+
def forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
679701
# 'Any' can be of any type of text
680702
support_heading = "Hi there! This is an automated message from Zulip."
681703
support_closing = """If you have any questions, please be in touch through the \
@@ -749,6 +771,7 @@ def forward_to_zephyr(message: Dict[str, Any]) -> None:
749771
result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content)
750772
if result is None:
751773
send_error_zulip(
774+
zulip_client,
752775
"""%s
753776
754777
Your Zulip-Zephyr mirror bot was unable to forward that last message \
@@ -758,7 +781,7 @@ class and your mirroring bot does not have access to the relevant \
758781
Zulip users (like you) received it, Zephyr users did not.
759782
760783
%s"""
761-
% (support_heading, support_closing)
784+
% (support_heading, support_closing),
762785
)
763786
return
764787

@@ -775,6 +798,7 @@ class and your mirroring bot does not have access to the relevant \
775798
return
776799
elif code == 0:
777800
send_error_zulip(
801+
zulip_client,
778802
"""%s
779803
780804
Your last message was successfully mirrored to zephyr, but zwrite \
@@ -783,7 +807,7 @@ class and your mirroring bot does not have access to the relevant \
783807
%s
784808
785809
%s"""
786-
% (support_heading, stderr, support_closing)
810+
% (support_heading, stderr, support_closing),
787811
)
788812
return
789813
elif code != 0 and (
@@ -797,6 +821,7 @@ class and your mirroring bot does not have access to the relevant \
797821
if options.ignore_expired_tickets:
798822
return
799823
send_error_zulip(
824+
zulip_client,
800825
"""%s
801826
802827
Your last message was forwarded from Zulip to Zephyr unauthenticated, \
@@ -806,14 +831,15 @@ class and your mirroring bot does not have access to the relevant \
806831
authenticated Zephyr messages for you again.
807832
808833
%s"""
809-
% (support_heading, support_closing)
834+
% (support_heading, support_closing),
810835
)
811836
return
812837

813838
# zwrite failed and it wasn't because of expired tickets: This is
814839
# probably because the recipient isn't subscribed to personals,
815840
# but regardless, we should just notify the user.
816841
send_error_zulip(
842+
zulip_client,
817843
"""%s
818844
819845
Your Zulip-Zephyr mirror bot was unable to forward that last message \
@@ -823,12 +849,12 @@ class and your mirroring bot does not have access to the relevant \
823849
%s
824850
825851
%s"""
826-
% (support_heading, stderr, support_closing)
852+
% (support_heading, stderr, support_closing),
827853
)
828854
return
829855

830856

831-
def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
857+
def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
832858
# The key string can be used to direct any type of text.
833859
if message["sender_email"] == zulip_account_email:
834860
if not (
@@ -851,20 +877,24 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
851877
)
852878
return
853879
try:
854-
forward_to_zephyr(message)
880+
forward_to_zephyr(message, zulip_client)
855881
except Exception:
856882
# Don't let an exception forwarding one message crash the
857883
# whole process
858884
logger.exception("Error forwarding message:")
859885

860886

861887
def zulip_to_zephyr(options: optparse.Values) -> NoReturn:
888+
zulip_client = make_zulip_client()
889+
862890
# Sync messages from zulip to zephyr
863891
logger.info("Starting syncing messages.")
864892
backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
865893
while True:
866894
try:
867-
zulip_client.call_on_each_message(maybe_forward_to_zephyr)
895+
zulip_client.call_on_each_message(
896+
lambda message: maybe_forward_to_zephyr(message, zulip_client)
897+
)
868898
except Exception:
869899
logger.exception("Error syncing messages:")
870900
backoff.fail()
@@ -886,6 +916,8 @@ def subscribed_to_mail_messages() -> bool:
886916

887917

888918
def add_zulip_subscriptions(verbose: bool) -> None:
919+
zulip_client = make_zulip_client()
920+
889921
zephyr_subscriptions = set()
890922
skipped = set()
891923
for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose):
@@ -1146,7 +1178,7 @@ def parse_args() -> Tuple[optparse.Values, List[str]]:
11461178

11471179

11481180
def die_gracefully(signal: int, frame: FrameType) -> None:
1149-
if CURRENT_STATE == States.ZulipToZephyr or CURRENT_STATE == States.ChildSending:
1181+
if CURRENT_STATE == States.ZulipToZephyr:
11501182
# this is a child process, so we want os._exit (no clean-up necessary)
11511183
os._exit(1)
11521184

@@ -1207,15 +1239,6 @@ def die_gracefully(signal: int, frame: FrameType) -> None:
12071239
sys.exit(1)
12081240

12091241
zulip_account_email = options.user + "@mit.edu"
1210-
import zulip
1211-
1212-
zulip_client = zulip.Client(
1213-
email=zulip_account_email,
1214-
api_key=api_key,
1215-
verbose=True,
1216-
client="zephyr_mirror",
1217-
site=options.site,
1218-
)
12191242

12201243
start_time = time.time()
12211244

0 commit comments

Comments
 (0)