1+ package org .springframework .integration .jdbc .channel ;
2+
3+ import org .postgresql .PGNotification ;
4+ import org .postgresql .jdbc .PgConnection ;
5+ import org .springframework .context .SmartLifecycle ;
6+ import org .springframework .core .log .LogAccessor ;
7+ import org .springframework .integration .jdbc .store .JdbcChannelMessageStore ;
8+ import org .springframework .integration .util .UUIDConverter ;
9+ import org .springframework .util .Assert ;
10+
11+ import java .sql .Statement ;
12+ import java .util .Map ;
13+ import java .util .Set ;
14+ import java .util .concurrent .ConcurrentHashMap ;
15+ import java .util .concurrent .Executor ;
16+ import java .util .concurrent .Executors ;
17+ import java .util .concurrent .atomic .AtomicBoolean ;
18+
19+ public class PostgresChannelMessageTableSubscriber implements SmartLifecycle {
20+
21+ private final LogAccessor logger = new LogAccessor (this .getClass ()); // NOSONAR final
22+
23+ private final PgConnectionSupplier connectionSupplier ;
24+
25+ private final Executor executor ;
26+
27+ private final String tablePrefix ;
28+
29+ private final AtomicBoolean running = new AtomicBoolean ();
30+
31+ private volatile Object identity ;
32+
33+ private final Map <String , Set <PostgresChannelMessageTableSubscription >> subscriptions = new ConcurrentHashMap <>();
34+
35+ public PostgresChannelMessageTableSubscriber (PgConnectionSupplier connectionSupplier ) {
36+ this (connectionSupplier , JdbcChannelMessageStore .DEFAULT_TABLE_PREFIX );
37+ }
38+
39+ public PostgresChannelMessageTableSubscriber (PgConnectionSupplier connectionSupplier , String tablePrefix ) {
40+ this (connectionSupplier , Executors .newSingleThreadExecutor (job -> {
41+ Thread t = new Thread (job );
42+ t .setDaemon (true );
43+ t .setName ("" );
44+ return t ;
45+ }), tablePrefix );
46+ }
47+
48+ public PostgresChannelMessageTableSubscriber (PgConnectionSupplier connectionSupplier , Executor executor ) {
49+ this (connectionSupplier , executor , JdbcChannelMessageStore .DEFAULT_TABLE_PREFIX );
50+ }
51+
52+ public PostgresChannelMessageTableSubscriber (PgConnectionSupplier connectionSupplier , Executor executor , String tablePrefix ) {
53+ Assert .notNull (connectionSupplier , "A connectionSupplier must be provided." );
54+ Assert .notNull (executor , "A executor must be provided." );
55+ Assert .notNull (tablePrefix , "A table prefix must be set." );
56+ this .connectionSupplier = connectionSupplier ;
57+ this .executor = executor ;
58+ this .tablePrefix = tablePrefix ;
59+ }
60+
61+ public boolean subscribe (PostgresChannelMessageTableSubscription subscription ) {
62+ Set <PostgresChannelMessageTableSubscription > subscriptions = this .subscriptions .computeIfAbsent (getKey (subscription .getGroupId ()) + subscription .getRegion (), ignored -> ConcurrentHashMap .newKeySet ());
63+ return subscriptions .add (subscription );
64+ }
65+
66+ public boolean unsubscribe (PostgresChannelMessageTableSubscription subscription ) {
67+ Set <PostgresChannelMessageTableSubscription > subscriptions = this .subscriptions .get (getKey (subscription .getGroupId ()) + subscription .getRegion ());
68+ return subscriptions != null && subscriptions .remove (subscription );
69+ }
70+
71+ @ Override
72+ public void start () {
73+ if (running .getAndSet (true )) {
74+ return ;
75+ }
76+ // Avoid that stop/start sequence reactivates previously stopped thread.
77+ Object current = new Object ();
78+ identity = current ;
79+ executor .execute (() -> {
80+ try {
81+ while (isActive (current )) {
82+ try {
83+ PgConnection conn = this .connectionSupplier .get ();
84+ try (Statement stmt = conn .createStatement ()) {
85+ stmt .execute ("LISTEN " + this .tablePrefix .toLowerCase () + "channel_message_notify" );
86+ } catch (Throwable t ) {
87+ try {
88+ conn .close ();
89+ } catch (Throwable suppressed ) {
90+ t .addSuppressed (suppressed );
91+ }
92+ throw t ;
93+ }
94+ try {
95+ while (isActive (current )) {
96+ PGNotification [] notifications = conn .getNotifications (0 );
97+ // Unfortunately, there is no good way of interrupting a notification poll.
98+ if (!isActive (current )) {
99+ return ;
100+ }
101+ if (notifications != null ) {
102+ for (PGNotification notification : notifications ) {
103+ String parameter = notification .getParameter ();
104+ Set <PostgresChannelMessageTableSubscription > subscriptions = this .subscriptions .get (parameter );
105+ if (subscriptions == null ) {
106+ continue ;
107+ }
108+ for (PostgresChannelMessageTableSubscription subscription : subscriptions ) {
109+ subscription .onPossibleUpdate ();
110+ }
111+ }
112+ }
113+ }
114+ } finally {
115+ conn .close ();
116+ }
117+ } catch (Exception e ) {
118+ // The getNotifications method does not throw a meaningful message on interruption.
119+ // Therefore, we do not log an error, unless it occurred while active.
120+ if (isActive (current )) {
121+ logger .error (e , "Failed to poll notifications from Postgres database" );
122+ }
123+ } catch (Throwable t ) {
124+ logger .error (t , "Failed to poll notifications from Postgres database" );
125+ return ;
126+ }
127+ }
128+ } finally {
129+ running .set (false );
130+ }
131+ });
132+ }
133+
134+ private boolean isActive (Object identity ) {
135+ if (identity != this .identity ) {
136+ return false ;
137+ } else if (!running .get ()) {
138+ return false ;
139+ } else if (Thread .interrupted ()) {
140+ running .set (false );
141+ return false ;
142+ } else {
143+ return true ;
144+ }
145+ }
146+
147+ @ Override
148+ public void stop () {
149+ running .set (false );
150+ }
151+
152+ @ Override
153+ public boolean isRunning () {
154+ return running .get ();
155+ }
156+
157+ private String getKey (Object input ) {
158+ return input == null ? null : UUIDConverter .getUUID (input ).toString ();
159+ }
160+ }
0 commit comments