Skip to content

Commit fef5c96

Browse files
authored
[FLINK-29042][Connectors/ElasticSearch] Support lookup join for es connector (#39)
1 parent ce44a13 commit fef5c96

File tree

24 files changed

+976
-60
lines changed

24 files changed

+976
-60
lines changed

flink-connector-elasticsearch-base/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,14 @@ under the License.
164164
<scope>test</scope>
165165
</dependency>
166166

167+
<dependency>
168+
<groupId>org.apache.flink</groupId>
169+
<artifactId>flink-table-runtime</artifactId>
170+
<version>${flink.version}</version>
171+
<scope>provided</scope>
172+
<optional>true</optional>
173+
</dependency>
174+
167175
<!-- ArchUit test dependencies -->
168176

169177
<dependency>

flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.apache.flink.streaming.connectors.elasticsearch;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.java.tuple.Tuple2;
2223

2324
import org.elasticsearch.action.bulk.BulkItemResponse;
2425
import org.elasticsearch.action.bulk.BulkProcessor;
26+
import org.elasticsearch.action.search.SearchRequest;
2527

2628
import javax.annotation.Nullable;
2729

@@ -61,6 +63,21 @@ public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Ser
6163
*/
6264
BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);
6365

66+
/**
67+
* Executes a search using the Search API.
68+
*
69+
* @param client the Elasticsearch client.
70+
* @param searchRequest A request to execute search against one or more indices (or all).
71+
*/
72+
Tuple2<String, String[]> search(C client, SearchRequest searchRequest) throws IOException;
73+
74+
/**
75+
* Closes this client and releases any system resources associated with it.
76+
*
77+
* @param client the Elasticsearch client.
78+
*/
79+
void close(C client) throws IOException;
80+
6481
/**
6582
* Extracts the cause of failure of a bulk item action.
6683
*
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.elasticsearch.table;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.common.serialization.DeserializationSchema;
23+
import org.apache.flink.api.java.tuple.Tuple2;
24+
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
25+
import org.apache.flink.table.connector.source.LookupTableSource;
26+
import org.apache.flink.table.data.RowData;
27+
import org.apache.flink.table.data.util.DataFormatConverters;
28+
import org.apache.flink.table.functions.FunctionContext;
29+
import org.apache.flink.table.functions.LookupFunction;
30+
import org.apache.flink.table.types.DataType;
31+
import org.apache.flink.util.FlinkRuntimeException;
32+
import org.apache.flink.util.Preconditions;
33+
34+
import org.elasticsearch.action.search.SearchRequest;
35+
import org.elasticsearch.common.Strings;
36+
import org.elasticsearch.index.query.BoolQueryBuilder;
37+
import org.elasticsearch.index.query.TermQueryBuilder;
38+
import org.elasticsearch.search.builder.SearchSourceBuilder;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
import java.io.IOException;
43+
import java.util.ArrayList;
44+
import java.util.Arrays;
45+
import java.util.Collection;
46+
import java.util.Collections;
47+
import java.util.Map;
48+
import java.util.stream.Collectors;
49+
import java.util.stream.IntStream;
50+
51+
import static org.apache.flink.util.Preconditions.checkNotNull;
52+
53+
/** A lookup function implementing {@link LookupTableSource} in Elasticsearch connector. */
54+
@Internal
55+
public class ElasticsearchRowDataLookupFunction<C extends AutoCloseable> extends LookupFunction {
56+
57+
private static final Logger LOG =
58+
LoggerFactory.getLogger(ElasticsearchRowDataLookupFunction.class);
59+
private static final long serialVersionUID = 1L;
60+
61+
private final DeserializationSchema<RowData> deserializationSchema;
62+
63+
private final String index;
64+
private final String type;
65+
66+
private final String[] producedNames;
67+
private final String[] lookupKeys;
68+
private final int maxRetryTimes;
69+
// converters to convert data from internal to external in order to generate keys for the cache.
70+
private final DataFormatConverters.DataFormatConverter[] converters;
71+
private SearchRequest searchRequest;
72+
private SearchSourceBuilder searchSourceBuilder;
73+
74+
private final ElasticsearchApiCallBridge<C> callBridge;
75+
76+
private transient C client;
77+
78+
public ElasticsearchRowDataLookupFunction(
79+
DeserializationSchema<RowData> deserializationSchema,
80+
int maxRetryTimes,
81+
String index,
82+
String type,
83+
String[] producedNames,
84+
DataType[] producedTypes,
85+
String[] lookupKeys,
86+
ElasticsearchApiCallBridge<C> callBridge) {
87+
88+
checkNotNull(deserializationSchema, "No DeserializationSchema supplied.");
89+
checkNotNull(maxRetryTimes, "No maxRetryTimes supplied.");
90+
checkNotNull(producedNames, "No fieldNames supplied.");
91+
checkNotNull(producedTypes, "No fieldTypes supplied.");
92+
checkNotNull(lookupKeys, "No keyNames supplied.");
93+
checkNotNull(callBridge, "No ElasticsearchApiCallBridge supplied.");
94+
95+
this.deserializationSchema = deserializationSchema;
96+
this.maxRetryTimes = maxRetryTimes;
97+
this.index = index;
98+
this.type = type;
99+
this.producedNames = producedNames;
100+
this.lookupKeys = lookupKeys;
101+
this.converters = new DataFormatConverters.DataFormatConverter[lookupKeys.length];
102+
Map<String, Integer> nameToIndex =
103+
IntStream.range(0, producedNames.length)
104+
.boxed()
105+
.collect(Collectors.toMap(i -> producedNames[i], i -> i));
106+
for (int i = 0; i < lookupKeys.length; i++) {
107+
Integer position = nameToIndex.get(lookupKeys[i]);
108+
Preconditions.checkArgument(
109+
position != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
110+
converters[i] = DataFormatConverters.getConverterForDataType(producedTypes[position]);
111+
}
112+
113+
this.callBridge = callBridge;
114+
}
115+
116+
@Override
117+
public void open(FunctionContext context) throws Exception {
118+
this.client = callBridge.createClient();
119+
120+
// Set searchRequest in open method in case of amount of calling in eval method when every
121+
// record comes.
122+
this.searchRequest = new SearchRequest(index);
123+
if (type == null) {
124+
searchRequest.types(Strings.EMPTY_ARRAY);
125+
} else {
126+
searchRequest.types(type);
127+
}
128+
searchSourceBuilder = new SearchSourceBuilder();
129+
searchSourceBuilder.fetchSource(producedNames, null);
130+
deserializationSchema.open(null);
131+
}
132+
133+
@Override
134+
public Collection<RowData> lookup(RowData keyRow) {
135+
BoolQueryBuilder lookupCondition = new BoolQueryBuilder();
136+
for (int i = 0; i < lookupKeys.length; i++) {
137+
lookupCondition.must(
138+
new TermQueryBuilder(lookupKeys[i], converters[i].toExternal(keyRow, i)));
139+
}
140+
searchSourceBuilder.query(lookupCondition);
141+
searchRequest.source(searchSourceBuilder);
142+
143+
for (int retry = 0; retry <= maxRetryTimes; retry++) {
144+
try {
145+
ArrayList<RowData> rows = new ArrayList<>();
146+
Tuple2<String, String[]> searchResponse = callBridge.search(client, searchRequest);
147+
if (searchResponse.f1.length > 0) {
148+
String[] result = searchResponse.f1;
149+
for (String s : result) {
150+
RowData row = parseSearchResult(s);
151+
rows.add(row);
152+
}
153+
rows.trimToSize();
154+
return rows;
155+
}
156+
} catch (IOException e) {
157+
LOG.error(String.format("Elasticsearch search error, retry times = %d", retry), e);
158+
if (retry >= maxRetryTimes) {
159+
throw new FlinkRuntimeException("Execution of Elasticsearch search failed.", e);
160+
}
161+
try {
162+
Thread.sleep(1000L * retry);
163+
} catch (InterruptedException e1) {
164+
LOG.warn(
165+
"Interrupted while waiting to retry failed elasticsearch search, aborting");
166+
throw new FlinkRuntimeException(e1);
167+
}
168+
}
169+
}
170+
return Collections.emptyList();
171+
}
172+
173+
private RowData parseSearchResult(String result) {
174+
RowData row = null;
175+
try {
176+
row = deserializationSchema.deserialize(result.getBytes());
177+
} catch (IOException e) {
178+
LOG.error("Deserialize search hit failed: " + e.getMessage());
179+
}
180+
181+
return row;
182+
}
183+
}

flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.streaming.connectors.elasticsearch;
1919

2020
import org.apache.flink.api.common.functions.RuntimeContext;
21+
import org.apache.flink.api.java.tuple.Tuple2;
2122
import org.apache.flink.configuration.Configuration;
2223
import org.apache.flink.core.testutils.CheckedThread;
2324
import org.apache.flink.core.testutils.MultiShotLatch;
@@ -35,6 +36,7 @@
3536
import org.elasticsearch.action.bulk.BulkRequest;
3637
import org.elasticsearch.action.bulk.BulkResponse;
3738
import org.elasticsearch.action.index.IndexRequest;
39+
import org.elasticsearch.action.search.SearchRequest;
3840
import org.elasticsearch.client.Client;
3941
import org.elasticsearch.client.Requests;
4042
import org.junit.Test;
@@ -43,6 +45,7 @@
4345

4446
import javax.annotation.Nullable;
4547

48+
import java.io.IOException;
4649
import java.util.ArrayList;
4750
import java.util.Collections;
4851
import java.util.HashMap;
@@ -603,6 +606,15 @@ public BulkProcessor.Builder createBulkProcessorBuilder(
603606
return null;
604607
}
605608

609+
@Override
610+
public Tuple2<String, String[]> search(Client client, SearchRequest searchRequest)
611+
throws IOException {
612+
return null;
613+
}
614+
615+
@Override
616+
public void close(Client client) throws IOException {}
617+
606618
@Nullable
607619
@Override
608620
public Throwable extractFailureCauseFromBulkItemResponse(

flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ under the License.
7878
<artifactId>junit-jupiter</artifactId>
7979
<scope>compile</scope>
8080
</dependency>
81+
<dependency>
82+
<groupId>org.apache.flink</groupId>
83+
<artifactId>flink-table-api-java-bridge</artifactId>
84+
<version>${flink.version}</version>
85+
</dependency>
8186
</dependencies>
8287

8388
</project>

0 commit comments

Comments
 (0)