1+ package org .springframework .integration .jdbc .channel ;
2+
3+ import org .postgresql .PGNotification ;
4+ import org .postgresql .jdbc .PgConnection ;
5+ import org .springframework .context .SmartLifecycle ;
6+ import org .springframework .core .log .LogAccessor ;
7+ import org .springframework .integration .jdbc .store .JdbcChannelMessageStore ;
8+ import org .springframework .integration .util .UUIDConverter ;
9+ import org .springframework .lang .Nullable ;
10+ import org .springframework .util .Assert ;
11+
12+ import java .sql .Statement ;
13+ import java .util .Map ;
14+ import java .util .Set ;
15+ import java .util .concurrent .ConcurrentHashMap ;
16+ import java .util .concurrent .Executor ;
17+ import java .util .concurrent .Executors ;
18+ import java .util .concurrent .atomic .AtomicBoolean ;
19+
20+ public class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
21+
22+ private final LogAccessor logger = new LogAccessor (this .getClass ()); // NOSONAR final
23+
24+ private final PgConnectionSupplier connectionSupplier ;
25+
26+ private final String tablePrefix ;
27+
28+ @ Nullable
29+ private Executor executor ;
30+
31+ private final AtomicBoolean running = new AtomicBoolean ();
32+
33+ private volatile Object identity ;
34+
35+ private final Map <String , Set <PostgresChannelMessageTableSubscription >> subscriptions = new ConcurrentHashMap <>();
36+
37+ public PostgresChannelMessageTableSubscriber (PgConnectionSupplier connectionSupplier ) {
38+ this (connectionSupplier , JdbcChannelMessageStore .DEFAULT_TABLE_PREFIX );
39+ }
40+
41+ public PostgresChannelMessageTableSubscriber (PgConnectionSupplier connectionSupplier , String tablePrefix ) {
42+ Assert .notNull (connectionSupplier , "A connectionSupplier must be provided." );
43+ Assert .notNull (tablePrefix , "A table prefix must be set." );
44+ this .connectionSupplier = connectionSupplier ;
45+ this .tablePrefix = tablePrefix ;
46+ }
47+
48+ public void setExecutor (Executor executor ) {
49+ this .executor = executor ;
50+ }
51+
52+ public boolean subscribe (PostgresChannelMessageTableSubscription subscription ) {
53+ Set <PostgresChannelMessageTableSubscription > subscriptions = this .subscriptions .computeIfAbsent (getKey (subscription .getGroupId ()) + subscription .getRegion (), ignored -> ConcurrentHashMap .newKeySet ());
54+ return subscriptions .add (subscription );
55+ }
56+
57+ public boolean unsubscribe (PostgresChannelMessageTableSubscription subscription ) {
58+ Set <PostgresChannelMessageTableSubscription > subscriptions = this .subscriptions .get (getKey (subscription .getGroupId ()) + subscription .getRegion ());
59+ return subscriptions != null && subscriptions .remove (subscription );
60+ }
61+
62+ @ Override
63+ public void start () {
64+ if (running .getAndSet (true )) {
65+ return ;
66+ }
67+ Executor executor = this .executor ;
68+ if (executor == null ) {
69+ executor = Executors .newSingleThreadExecutor (job -> {
70+ Thread t = new Thread (job );
71+ t .setDaemon (true );
72+ t .setName ("postgres-channel-message-table-subscriber" );
73+ return t ;
74+ });
75+ this .executor = executor ;
76+ }
77+ // Avoid that stop/start sequence reactivates previously stopped thread.
78+ Object current = new Object ();
79+ identity = current ;
80+ executor .execute (() -> {
81+ try {
82+ while (isActive (current )) {
83+ try {
84+ PgConnection conn = this .connectionSupplier .get ();
85+ try (Statement stmt = conn .createStatement ()) {
86+ stmt .execute ("LISTEN " + this .tablePrefix .toLowerCase () + "channel_message_notify" );
87+ } catch (Throwable t ) {
88+ try {
89+ conn .close ();
90+ } catch (Throwable suppressed ) {
91+ t .addSuppressed (suppressed );
92+ }
93+ throw t ;
94+ }
95+ try {
96+ while (isActive (current )) {
97+ PGNotification [] notifications = conn .getNotifications (0 );
98+ // Unfortunately, there is no good way of interrupting a notification poll.
99+ if (!isActive (current )) {
100+ return ;
101+ }
102+ if (notifications != null ) {
103+ for (PGNotification notification : notifications ) {
104+ String parameter = notification .getParameter ();
105+ Set <PostgresChannelMessageTableSubscription > subscriptions = this .subscriptions .get (parameter );
106+ if (subscriptions == null ) {
107+ continue ;
108+ }
109+ for (PostgresChannelMessageTableSubscription subscription : subscriptions ) {
110+ subscription .onPossibleUpdate ();
111+ }
112+ }
113+ }
114+ }
115+ } finally {
116+ conn .close ();
117+ }
118+ } catch (Exception e ) {
119+ // The getNotifications method does not throw a meaningful message on interruption.
120+ // Therefore, we do not log an error, unless it occurred while active.
121+ if (isActive (current )) {
122+ logger .error (e , "Failed to poll notifications from Postgres database" );
123+ }
124+ } catch (Throwable t ) {
125+ logger .error (t , "Failed to poll notifications from Postgres database" );
126+ return ;
127+ }
128+ }
129+ } finally {
130+ running .set (false );
131+ }
132+ });
133+ }
134+
135+ private boolean isActive (Object identity ) {
136+ if (identity != this .identity ) {
137+ return false ;
138+ } else if (!running .get ()) {
139+ return false ;
140+ } else if (Thread .interrupted ()) {
141+ running .set (false );
142+ return false ;
143+ } else {
144+ return true ;
145+ }
146+ }
147+
148+ @ Override
149+ public void stop () {
150+ running .set (false );
151+ }
152+
153+ @ Override
154+ public boolean isRunning () {
155+ return running .get ();
156+ }
157+
158+ private String getKey (Object input ) {
159+ return input == null ? null : UUIDConverter .getUUID (input ).toString ();
160+ }
161+ }
0 commit comments