From dab7b23fc0defeb252f65ea077e20064558269a5 Mon Sep 17 00:00:00 2001 From: leosanqing Date: Fri, 23 Aug 2024 17:08:19 +0800 Subject: [PATCH 1/2] [FLINK-36143][ES6][ES7] Intro retry-on-conflict param to resolve Sink ES occurred "version conflict" --- .../table/ElasticsearchConfiguration.java | 5 +++++ .../table/ElasticsearchConnectorOptions.java | 8 ++++++++ .../table/ElasticsearchDynamicSink.java | 3 ++- .../ElasticsearchDynamicSinkFactoryBase.java | 4 +++- .../table/RowElasticsearchEmitter.java | 8 ++++++-- .../table/ElasticsearchConfiguration.java | 5 +++++ .../table/ElasticsearchConnectorOptions.java | 8 ++++++++ .../table/RowElasticsearchSinkFunction.java | 15 ++++++++++++--- .../table/Elasticsearch6DynamicSink.java | 3 ++- .../table/Elasticsearch6DynamicTableFactory.java | 4 +++- .../table/Elasticsearch7DynamicSink.java | 3 ++- .../table/Elasticsearch7DynamicTableFactory.java | 4 +++- 12 files changed, 59 insertions(+), 11 deletions(-) diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java index 578b1b32..2f802ed2 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java @@ -46,6 +46,7 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; @@ -129,4 +130,8 @@ public List getHosts() { public Optional getParallelism() { return config.getOptional(SINK_PARALLELISM); } + + public int getRetryOnConflictNum() { + return config.getOptional(RETRY_ON_CONFLICT_NUM).get(); + } } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java index 10ea0ae2..2e12cc73 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -145,4 +145,12 @@ public class ElasticsearchConnectorOptions { .enumType(DeliveryGuarantee.class) .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE) .withDescription("Optional delivery guarantee when committing."); + + public static final ConfigOption RETRY_ON_CONFLICT_NUM = + ConfigOptions.key("sink.retry-on-conflict-num") + .intType() + .defaultValue(0) + .withDescription( + "Sets the number of retries of a version conflict occurs " + + "because the document was updated between getting it and updating it. Defaults to 0."); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java index 0fd389bd..383fc425 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java @@ -127,7 +127,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, documentType, - createKeyExtractor()); + createKeyExtractor(), + config.getRetryOnConflictNum()); ElasticsearchSinkBuilderBase builder = builderSupplier.get(); diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java index ed5e7f73..1fbcd228 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java @@ -63,6 +63,7 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; @@ -225,7 +226,8 @@ public Set> optionalOptions() { DELIVERY_GUARANTEE_OPTION, PASSWORD_OPTION, USERNAME_OPTION, - SINK_PARALLELISM) + SINK_PARALLELISM, + RETRY_ON_CONFLICT_NUM) .collect(Collectors.toSet()); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java index bddc6cb1..06a9958d 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java @@ -50,18 +50,21 @@ class RowElasticsearchEmitter implements ElasticsearchEmitter { private final XContentType contentType; @Nullable private final String documentType; private final Function createKey; + private final int retryOnConflictNum; public RowElasticsearchEmitter( IndexGenerator indexGenerator, SerializationSchema serializationSchema, XContentType contentType, @Nullable String documentType, - Function createKey) { + Function createKey, + int retryOnConflictNum) { this.indexGenerator = checkNotNull(indexGenerator); this.serializationSchema = checkNotNull(serializationSchema); this.contentType = checkNotNull(contentType); this.documentType = documentType; this.createKey = checkNotNull(createKey); + this.retryOnConflictNum = retryOnConflictNum; } @Override @@ -109,7 +112,8 @@ private void processUpsert(RowData row, RequestIndexer indexer) { final UpdateRequest updateRequest = new UpdateRequest(indexGenerator.generate(row), documentType, key) .doc(document, contentType) - .upsert(document, contentType); + .upsert(document, contentType) + .retryOnConflict(retryOnConflictNum); indexer.add(updateRequest); } else { final IndexRequest indexRequest = diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java index 04c76333..a9bf10ba 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java @@ -38,6 +38,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; /** Accessor methods to elasticsearch options. */ @@ -110,6 +111,10 @@ public Optional getPassword() { return config.getOptional(PASSWORD_OPTION); } + public int getRetryOnConflictNum() { + return config.getOptional(RETRY_ON_CONFLICT_NUM).get(); + } + public boolean isBulkFlushBackoffEnabled() { return config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION) != ElasticsearchConnectorOptions.BackOffType.DISABLED; diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java index 4838b035..848c8275 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -152,6 +152,14 @@ public class ElasticsearchConnectorOptions { "The format must produce a valid JSON document. " + "Please refer to the documentation on formats for more details."); + public static final ConfigOption RETRY_ON_CONFLICT_NUM = + ConfigOptions.key("sink.retry-on-conflict-num") + .intType() + .defaultValue(0) + .withDescription( + "Sets the number of retries of a version conflict occurs " + + "because the document was updated between getting it and updating it. Defaults to 0."); + // -------------------------------------------------------------------------------------------- // Enums // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java index 48762522..51451e02 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -51,6 +51,7 @@ class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction private final XContentType contentType; private final RequestFactory requestFactory; private final Function createKey; + private final int retryOnConflictNum; public RowElasticsearchSinkFunction( IndexGenerator indexGenerator, @@ -58,13 +59,15 @@ public RowElasticsearchSinkFunction( SerializationSchema serializationSchema, XContentType contentType, RequestFactory requestFactory, - Function createKey) { + Function createKey, + int retryOnConflictNum) { this.indexGenerator = Preconditions.checkNotNull(indexGenerator); this.docType = docType; this.serializationSchema = Preconditions.checkNotNull(serializationSchema); this.contentType = Preconditions.checkNotNull(contentType); this.requestFactory = Preconditions.checkNotNull(requestFactory); this.createKey = Preconditions.checkNotNull(createKey); + this.retryOnConflictNum = retryOnConflictNum; } @Override @@ -95,8 +98,14 @@ private void processUpsert(RowData row, RequestIndexer indexer) { final String key = createKey.apply(row); if (key != null) { final UpdateRequest updateRequest = - requestFactory.createUpdateRequest( - indexGenerator.generate(row), docType, key, contentType, document); + requestFactory + .createUpdateRequest( + indexGenerator.generate(row), + docType, + key, + contentType, + document) + .retryOnConflict(retryOnConflictNum); indexer.add(updateRequest); } else { final IndexRequest indexRequest = diff --git a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java index 1a2cdd18..95e7c588 100644 --- a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java +++ b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java @@ -148,7 +148,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, REQUEST_FACTORY, - KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())); + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), + config.getRetryOnConflictNum()); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); diff --git a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java index 34fa0e58..00b5e11c 100644 --- a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java +++ b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java @@ -68,6 +68,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; @@ -105,7 +106,8 @@ public class Elasticsearch6DynamicTableFactory PARTIAL_CACHE_EXPIRE_AFTER_WRITE, PARTIAL_CACHE_MAX_ROWS, PARTIAL_CACHE_CACHE_MISSING_KEY, - MAX_RETRIES) + MAX_RETRIES, + RETRY_ON_CONFLICT_NUM) .collect(Collectors.toSet()); @Override diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java index 1926e445..5570c011 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java @@ -143,7 +143,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, REQUEST_FACTORY, - KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())); + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), + config.getRetryOnConflictNum()); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java index b516777d..a95261e2 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java @@ -67,6 +67,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; @@ -104,7 +105,8 @@ public class Elasticsearch7DynamicTableFactory PARTIAL_CACHE_EXPIRE_AFTER_WRITE, PARTIAL_CACHE_MAX_ROWS, PARTIAL_CACHE_CACHE_MISSING_KEY, - MAX_RETRIES) + MAX_RETRIES, + RETRY_ON_CONFLICT_NUM) .collect(Collectors.toSet()); @Override From 602b30f005089a4e47d245608de605cd7c3e7949 Mon Sep 17 00:00:00 2001 From: leosanqing Date: Fri, 30 Aug 2024 14:55:05 +0800 Subject: [PATCH 2/2] [FLINK-36143][ES6][ES7] Intro retry-on-conflict param to resolve Sink ES occurred "version conflict" --- .../table/ElasticsearchConfiguration.java | 6 ++--- .../table/ElasticsearchConnectorOptions.java | 4 +-- .../table/ElasticsearchDynamicSink.java | 2 +- .../ElasticsearchDynamicSinkFactoryBase.java | 4 +-- .../table/RowElasticsearchEmitter.java | 8 +++--- .../table/ElasticsearchConfiguration.java | 6 ++--- .../table/ElasticsearchConnectorOptions.java | 4 +-- ...asticsearchDynamicSinkFactoryBaseTest.java | 26 +++++++++++++++++++ .../table/Elasticsearch6DynamicSink.java | 2 +- .../Elasticsearch6DynamicTableFactory.java | 4 +-- .../table/Elasticsearch7DynamicSink.java | 2 +- .../Elasticsearch7DynamicTableFactory.java | 4 +-- 12 files changed, 49 insertions(+), 23 deletions(-) diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java index 2f802ed2..92f558a9 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java @@ -46,7 +46,7 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; -import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; @@ -131,7 +131,7 @@ public Optional getParallelism() { return config.getOptional(SINK_PARALLELISM); } - public int getRetryOnConflictNum() { - return config.getOptional(RETRY_ON_CONFLICT_NUM).get(); + public int getRetriesOnConflict() { + return config.getOptional(RETRIES_ON_CONFLICT_OPTION).orElse(0); } } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java index 2e12cc73..d273c11a 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -146,8 +146,8 @@ public class ElasticsearchConnectorOptions { .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE) .withDescription("Optional delivery guarantee when committing."); - public static final ConfigOption RETRY_ON_CONFLICT_NUM = - ConfigOptions.key("sink.retry-on-conflict-num") + public static final ConfigOption RETRIES_ON_CONFLICT_OPTION = + ConfigOptions.key("sink.retries-on-conflict") .intType() .defaultValue(0) .withDescription( diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java index 383fc425..7828d4f9 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java @@ -128,7 +128,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { XContentType.JSON, documentType, createKeyExtractor(), - config.getRetryOnConflictNum()); + config.getRetriesOnConflict()); ElasticsearchSinkBuilderBase builder = builderSupplier.get(); diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java index 1fbcd228..daf43a4e 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java @@ -63,7 +63,7 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; -import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; @@ -227,7 +227,7 @@ public Set> optionalOptions() { PASSWORD_OPTION, USERNAME_OPTION, SINK_PARALLELISM, - RETRY_ON_CONFLICT_NUM) + RETRIES_ON_CONFLICT_OPTION) .collect(Collectors.toSet()); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java index 06a9958d..51c4c336 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java @@ -50,7 +50,7 @@ class RowElasticsearchEmitter implements ElasticsearchEmitter { private final XContentType contentType; @Nullable private final String documentType; private final Function createKey; - private final int retryOnConflictNum; + private final int retiesOnConflict; public RowElasticsearchEmitter( IndexGenerator indexGenerator, @@ -58,13 +58,13 @@ public RowElasticsearchEmitter( XContentType contentType, @Nullable String documentType, Function createKey, - int retryOnConflictNum) { + int retriesOnConflict) { this.indexGenerator = checkNotNull(indexGenerator); this.serializationSchema = checkNotNull(serializationSchema); this.contentType = checkNotNull(contentType); this.documentType = documentType; this.createKey = checkNotNull(createKey); - this.retryOnConflictNum = retryOnConflictNum; + this.retiesOnConflict = retriesOnConflict; } @Override @@ -113,7 +113,7 @@ private void processUpsert(RowData row, RequestIndexer indexer) { new UpdateRequest(indexGenerator.generate(row), documentType, key) .doc(document, contentType) .upsert(document, contentType) - .retryOnConflict(retryOnConflictNum); + .retryOnConflict(retiesOnConflict); indexer.add(updateRequest); } else { final IndexRequest indexRequest = diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java index a9bf10ba..5a698291 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java @@ -38,7 +38,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; -import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; /** Accessor methods to elasticsearch options. */ @@ -111,8 +111,8 @@ public Optional getPassword() { return config.getOptional(PASSWORD_OPTION); } - public int getRetryOnConflictNum() { - return config.getOptional(RETRY_ON_CONFLICT_NUM).get(); + public int getRetriesOnConflict() { + return config.getOptional(RETRIES_ON_CONFLICT_OPTION).get(); } public boolean isBulkFlushBackoffEnabled() { diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java index 848c8275..ab1874a2 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -152,8 +152,8 @@ public class ElasticsearchConnectorOptions { "The format must produce a valid JSON document. " + "Please refer to the documentation on formats for more details."); - public static final ConfigOption RETRY_ON_CONFLICT_NUM = - ConfigOptions.key("sink.retry-on-conflict-num") + public static final ConfigOption RETRIES_ON_CONFLICT_OPTION = + ConfigOptions.key("sink.retries-on-conflict") .intType() .defaultValue(0) .withDescription( diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java index 8e5a98e7..ec383674 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java @@ -36,6 +36,7 @@ import java.util.Arrays; import java.util.Collections; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.table.api.DataTypes.ARRAY; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.BYTES; @@ -253,4 +254,29 @@ public void testSinkParallelism() { (SinkV2Provider) esSink.getSinkRuntimeProvider(new ElasticsearchUtil.MockContext()); assertThat(provider.getParallelism()).hasValue(2); } + + @Test + public void testRetriesOnConflict() { + ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); + DynamicTableSink sink = + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withOption(RETRIES_ON_CONFLICT_OPTION.key(), "2") + .build()); + assertThat(sink).isInstanceOf(ElasticsearchDynamicSink.class); + ElasticsearchDynamicSink esSink = (ElasticsearchDynamicSink) sink; + + assertThat(esSink.config.getRetriesOnConflict()).isEqualTo(2); + } + + @Test + public void testRetriesOnConflictDefault() { + ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); + DynamicTableSink sink = + sinkFactory.createDynamicTableSink(createPrefilledTestContext().build()); + assertThat(sink).isInstanceOf(ElasticsearchDynamicSink.class); + ElasticsearchDynamicSink esSink = (ElasticsearchDynamicSink) sink; + + assertThat(esSink.config.getRetriesOnConflict()).isEqualTo(0); + } } diff --git a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java index 95e7c588..8380f72b 100644 --- a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java +++ b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java @@ -149,7 +149,7 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { XContentType.JSON, REQUEST_FACTORY, KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), - config.getRetryOnConflictNum()); + config.getRetriesOnConflict()); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); diff --git a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java index 00b5e11c..268560d5 100644 --- a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java +++ b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java @@ -68,7 +68,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; -import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; @@ -107,7 +107,7 @@ public class Elasticsearch6DynamicTableFactory PARTIAL_CACHE_MAX_ROWS, PARTIAL_CACHE_CACHE_MISSING_KEY, MAX_RETRIES, - RETRY_ON_CONFLICT_NUM) + RETRIES_ON_CONFLICT_OPTION) .collect(Collectors.toSet()); @Override diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java index 5570c011..35e681d6 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java @@ -144,7 +144,7 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { XContentType.JSON, REQUEST_FACTORY, KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), - config.getRetryOnConflictNum()); + config.getRetriesOnConflict()); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java index a95261e2..53ea0a12 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java @@ -67,7 +67,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; -import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRY_ON_CONFLICT_NUM; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; @@ -106,7 +106,7 @@ public class Elasticsearch7DynamicTableFactory PARTIAL_CACHE_MAX_ROWS, PARTIAL_CACHE_CACHE_MISSING_KEY, MAX_RETRIES, - RETRY_ON_CONFLICT_NUM) + RETRIES_ON_CONFLICT_OPTION) .collect(Collectors.toSet()); @Override