Skip to content

Commit a60ead9

Browse files
committed
fix: use native support for Rx request instead blocking emitter
1 parent 9127901 commit a60ead9

File tree

5 files changed

+25
-37
lines changed

5 files changed

+25
-37
lines changed

client-reactive/pom.xml

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -98,25 +98,6 @@
9898
<artifactId>influxdb-client-java</artifactId>
9999
</dependency>
100100

101-
<dependency>
102-
<groupId>com.squareup.retrofit2</groupId>
103-
<artifactId>adapter-rxjava3</artifactId>
104-
<exclusions>
105-
<exclusion>
106-
<groupId>com.squareup.okhttp3</groupId>
107-
<artifactId>okhttp</artifactId>
108-
</exclusion>
109-
<exclusion>
110-
<groupId>com.squareup.retrofit2</groupId>
111-
<artifactId>retrofit</artifactId>
112-
</exclusion>
113-
<exclusion>
114-
<groupId>io.reactivex.rxjava2</groupId>
115-
<artifactId>rxjava</artifactId>
116-
</exclusion>
117-
</exclusions>
118-
</dependency>
119-
120101
<dependency>
121102
<groupId>com.influxdb</groupId>
122103
<artifactId>influxdb-client-test</artifactId>

client/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,25 @@
147147
<artifactId>converter-gson</artifactId>
148148
</dependency>
149149

150+
<dependency>
151+
<groupId>com.squareup.retrofit2</groupId>
152+
<artifactId>adapter-rxjava3</artifactId>
153+
<exclusions>
154+
<exclusion>
155+
<groupId>com.squareup.okhttp3</groupId>
156+
<artifactId>okhttp</artifactId>
157+
</exclusion>
158+
<exclusion>
159+
<groupId>com.squareup.retrofit2</groupId>
160+
<artifactId>retrofit</artifactId>
161+
</exclusion>
162+
<exclusion>
163+
<groupId>io.reactivex.rxjava2</groupId>
164+
<artifactId>rxjava</artifactId>
165+
</exclusion>
166+
</exclusions>
167+
</dependency>
168+
150169
<dependency>
151170
<groupId>com.moandjiezana.toml</groupId>
152171
<artifactId>toml4j</artifactId>

client/src/main/java/com/influxdb/client/internal/AbstractInfluxDBClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import retrofit2.Call;
4747
import retrofit2.CallAdapter;
4848
import retrofit2.Retrofit;
49+
import retrofit2.adapter.rxjava3.RxJava3CallAdapterFactory;
4950
import retrofit2.converter.gson.GsonConverterFactory;
5051
import retrofit2.converter.scalars.ScalarsConverterFactory;
5152

@@ -75,7 +76,7 @@ public abstract class AbstractInfluxDBClient extends AbstractRestClient {
7576
protected final Collection<AutoCloseable> autoCloseables = new CopyOnWriteArrayList<>();
7677

7778
public AbstractInfluxDBClient(@Nonnull final InfluxDBClientOptions options, @Nonnull final String clientType) {
78-
this(options, clientType, Collections.emptyList());
79+
this(options, clientType, Collections.singletonList(RxJava3CallAdapterFactory.createSynchronous()));
7980
}
8081

8182
public AbstractInfluxDBClient(@Nonnull final InfluxDBClientOptions options,

client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
import io.reactivex.rxjava3.processors.PublishProcessor;
5959
import io.reactivex.rxjava3.subjects.PublishSubject;
6060
import org.reactivestreams.Publisher;
61-
import retrofit2.Call;
62-
import retrofit2.Callback;
6361
import retrofit2.HttpException;
6462
import retrofit2.Response;
6563

@@ -434,21 +432,11 @@ public Maybe<Notification<Response>> apply(final BatchWriteItem batchWrite) {
434432
String bucket = batchWrite.batchWriteOptions.bucket;
435433
WritePrecision precision = batchWrite.batchWriteOptions.precision;
436434

437-
Maybe<Response<Void>> requestSource = Maybe.create(emitter -> service
438-
.postWrite(organization, bucket, content, null,
435+
Maybe<Response<Void>> requestSource = service
436+
.postWriteRx(organization, bucket, content, null,
439437
"identity", "text/plain; charset=utf-8", null,
440438
"application/json", null, precision)
441-
.enqueue(new Callback<Void>() {
442-
@Override
443-
public void onResponse(@Nonnull final Call<Void> call, @Nonnull final Response<Void> response) {
444-
emitter.onSuccess(response);
445-
}
446-
447-
@Override
448-
public void onFailure(@Nonnull final Call<Void> call, @Nonnull final Throwable throwable) {
449-
emitter.onError(throwable);
450-
}
451-
}));
439+
.toMaybe();
452440

453441
return requestSource
454442
//

pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,7 @@
112112
<dependency.okhttp3.version>4.9.3</dependency.okhttp3.version>
113113
<dependency.com.squareup.okio>2.10.0</dependency.com.squareup.okio>
114114
<dependency.gson.version>2.8.9</dependency.gson.version>
115-
<!-- <dependency.io.reactivex.rxjava2>2.2.21</dependency.io.reactivex.rxjava2>-->
116-
<dependency.io.reactivex.rxjava3>3.0.0</dependency.io.reactivex.rxjava3>
115+
<dependency.io.reactivex.rxjava3>3.1.3</dependency.io.reactivex.rxjava3>
117116

118117
<plugin.surefire.version>2.22.2</plugin.surefire.version>
119118
<plugin.javadoc.version>3.2.0</plugin.javadoc.version>

0 commit comments

Comments
 (0)