7474MAIN_PROCESS_INSTANCE_NAME = "main"
7575MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
7676MAIN_PROCESS_REPLICATION_PORT = 9093
77+ # Obviously, these would only be used with the UNIX socket option
78+ MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
79+ MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
7780
7881# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
7982# during processing with the name of the worker.
@@ -407,11 +410,15 @@ def add_worker_roles_to_shared_config(
407410 )
408411
409412 # Map of stream writer instance names to host/ports combos
410- instance_map [worker_name ] = {
411- "host" : "localhost" ,
412- "port" : worker_port ,
413- }
414-
413+ if os .environ .get ("SYNAPSE_USE_UNIX_SOCKET" , False ):
414+ instance_map [worker_name ] = {
415+ "path" : f"/run/worker.{ worker_port } " ,
416+ }
417+ else :
418+ instance_map [worker_name ] = {
419+ "host" : "localhost" ,
420+ "port" : worker_port ,
421+ }
415422 # Update the list of stream writers. It's convenient that the name of the worker
416423 # type is the same as the stream to write. Iterate over the whole list in case there
417424 # is more than one.
@@ -423,10 +430,15 @@ def add_worker_roles_to_shared_config(
423430
424431 # Map of stream writer instance names to host/ports combos
425432 # For now, all stream writers need http replication ports
426- instance_map [worker_name ] = {
427- "host" : "localhost" ,
428- "port" : worker_port ,
429- }
433+ if os .environ .get ("SYNAPSE_USE_UNIX_SOCKET" , False ):
434+ instance_map [worker_name ] = {
435+ "path" : f"/run/worker.{ worker_port } " ,
436+ }
437+ else :
438+ instance_map [worker_name ] = {
439+ "host" : "localhost" ,
440+ "port" : worker_port ,
441+ }
430442
431443
432444def merge_worker_template_configs (
@@ -718,17 +730,29 @@ def generate_worker_files(
718730 # Note that yaml cares about indentation, so care should be taken to insert lines
719731 # into files at the correct indentation below.
720732
733+ # Convenience helper for if using unix sockets instead of host:port
734+ using_unix_sockets = environ .get ("SYNAPSE_USE_UNIX_SOCKET" , False )
721735 # First read the original config file and extract the listeners block. Then we'll
722736 # add another listener for replication. Later we'll write out the result to the
723737 # shared config file.
724- listeners = [
725- {
726- "port" : MAIN_PROCESS_REPLICATION_PORT ,
727- "bind_address" : MAIN_PROCESS_LOCALHOST_ADDRESS ,
728- "type" : "http" ,
729- "resources" : [{"names" : ["replication" ]}],
730- }
731- ]
738+ listeners : List [Any ]
739+ if using_unix_sockets :
740+ listeners = [
741+ {
742+ "path" : MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH ,
743+ "type" : "http" ,
744+ "resources" : [{"names" : ["replication" ]}],
745+ }
746+ ]
747+ else :
748+ listeners = [
749+ {
750+ "port" : MAIN_PROCESS_REPLICATION_PORT ,
751+ "bind_address" : MAIN_PROCESS_LOCALHOST_ADDRESS ,
752+ "type" : "http" ,
753+ "resources" : [{"names" : ["replication" ]}],
754+ }
755+ ]
732756 with open (config_path ) as file_stream :
733757 original_config = yaml .safe_load (file_stream )
734758 original_listeners = original_config .get ("listeners" )
@@ -769,7 +793,17 @@ def generate_worker_files(
769793
770794 # A list of internal endpoints to healthcheck, starting with the main process
771795 # which exists even if no workers do.
772- healthcheck_urls = ["http://localhost:8080/health" ]
796+ # This list ends up being part of the command line to curl, (curl added support for
797+ # Unix sockets in version 7.40).
798+ if using_unix_sockets :
799+ healthcheck_urls = [
800+ f"--unix-socket { MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH } "
801+ # The scheme and hostname from the following URL are ignored.
802+ # The only thing that matters is the path `/health`
803+ "http://localhost/health"
804+ ]
805+ else :
806+ healthcheck_urls = ["http://localhost:8080/health" ]
773807
774808 # Get the set of all worker types that we have configured
775809 all_worker_types_in_use = set (chain (* requested_worker_types .values ()))
@@ -806,8 +840,12 @@ def generate_worker_files(
806840 # given worker_type needs to stay assigned and not be replaced.
807841 worker_config ["shared_extra_conf" ].update (shared_config )
808842 shared_config = worker_config ["shared_extra_conf" ]
809-
810- healthcheck_urls .append ("http://localhost:%d/health" % (worker_port ,))
843+ if using_unix_sockets :
844+ healthcheck_urls .append (
845+ f"--unix-socket /run/worker.{ worker_port } http://localhost/health"
846+ )
847+ else :
848+ healthcheck_urls .append ("http://localhost:%d/health" % (worker_port ,))
811849
812850 # Update the shared config with sharding-related options if necessary
813851 add_worker_roles_to_shared_config (
@@ -826,6 +864,7 @@ def generate_worker_files(
826864 "/conf/workers/{name}.yaml" .format (name = worker_name ),
827865 ** worker_config ,
828866 worker_log_config_filepath = log_config_filepath ,
867+ using_unix_sockets = using_unix_sockets ,
829868 )
830869
831870 # Save this worker's port number to the correct nginx upstreams
@@ -846,8 +885,13 @@ def generate_worker_files(
846885 nginx_upstream_config = ""
847886 for upstream_worker_base_name , upstream_worker_ports in nginx_upstreams .items ():
848887 body = ""
849- for port in upstream_worker_ports :
850- body += f" server localhost:{ port } ;\n "
888+ if using_unix_sockets :
889+ for port in upstream_worker_ports :
890+ body += f" server unix:/run/worker.{ port } ;\n "
891+
892+ else :
893+ for port in upstream_worker_ports :
894+ body += f" server localhost:{ port } ;\n "
851895
852896 # Add to the list of configured upstreams
853897 nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK .format (
@@ -877,10 +921,15 @@ def generate_worker_files(
877921 # If there are workers, add the main process to the instance_map too.
878922 if workers_in_use :
879923 instance_map = shared_config .setdefault ("instance_map" , {})
880- instance_map [MAIN_PROCESS_INSTANCE_NAME ] = {
881- "host" : MAIN_PROCESS_LOCALHOST_ADDRESS ,
882- "port" : MAIN_PROCESS_REPLICATION_PORT ,
883- }
924+ if using_unix_sockets :
925+ instance_map [MAIN_PROCESS_INSTANCE_NAME ] = {
926+ "path" : MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH ,
927+ }
928+ else :
929+ instance_map [MAIN_PROCESS_INSTANCE_NAME ] = {
930+ "host" : MAIN_PROCESS_LOCALHOST_ADDRESS ,
931+ "port" : MAIN_PROCESS_REPLICATION_PORT ,
932+ }
884933
885934 # Shared homeserver config
886935 convert (
@@ -890,6 +939,7 @@ def generate_worker_files(
890939 appservice_registrations = appservice_registrations ,
891940 enable_redis = workers_in_use ,
892941 workers_in_use = workers_in_use ,
942+ using_unix_sockets = using_unix_sockets ,
893943 )
894944
895945 # Nginx config
@@ -900,6 +950,7 @@ def generate_worker_files(
900950 upstream_directives = nginx_upstream_config ,
901951 tls_cert_path = os .environ .get ("SYNAPSE_TLS_CERT" ),
902952 tls_key_path = os .environ .get ("SYNAPSE_TLS_KEY" ),
953+ using_unix_sockets = using_unix_sockets ,
903954 )
904955
905956 # Supervisord config
@@ -909,6 +960,7 @@ def generate_worker_files(
909960 "/etc/supervisor/supervisord.conf" ,
910961 main_config_path = config_path ,
911962 enable_redis = workers_in_use ,
963+ using_unix_sockets = using_unix_sockets ,
912964 )
913965
914966 convert (
0 commit comments