Skip to content

Commit 91d6e41

Browse files
committed
http-99 rebase
Signed-off-by: davidradl <[email protected]>
1 parent 550ee1d commit 91d6e41

File tree

9 files changed

+94
-576
lines changed

9 files changed

+94
-576
lines changed

CHANGELOG.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,8 @@
22

33
## [Unreleased]
44

5-
<<<<<<< HEAD
65
- Added support for generic json and URL query creator
76

8-
=======
9-
- Added support for generic json and URL query creator
10-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
117
- Retries support for source table:
128
- Auto retry on IOException and user-defined http codes - parameter `gid.connector.http.source.lookup.retry-codes`.
139
- Parameters `gid.connector.http.source.lookup.error.code.exclude"` and `gid.connector.http.source.lookup.error.code` were replaced by `gid.connector.http.source.lookup.ignored-response-codes`.

README.md

Lines changed: 0 additions & 41 deletions
Large diffs are not rendered by default.

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java

Lines changed: 1 addition & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
<<<<<<< HEAD
2-
=======
31
/*
42
* © Copyright IBM Corp. 2025
53
*/
64

7-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
85
package com.getindata.connectors.http.internal.table.lookup.querycreators;
96

107
import java.io.IOException;
@@ -13,21 +10,12 @@
1310
import java.nio.charset.StandardCharsets;
1411
import java.util.*;
1512

16-
<<<<<<< HEAD
17-
import lombok.Builder;
1813
import lombok.extern.slf4j.Slf4j;
19-
=======
20-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
2114
import org.apache.flink.annotation.VisibleForTesting;
2215
import org.apache.flink.api.common.serialization.SerializationSchema;
2316
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2417
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
2518
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
26-
<<<<<<< HEAD
27-
import org.apache.flink.table.data.RowData;
28-
import org.apache.flink.util.FlinkRuntimeException;
29-
import org.apache.flink.util.Preconditions;
30-
=======
3119
import org.apache.flink.table.api.DataTypes.Field;
3220
import org.apache.flink.table.data.GenericRowData;
3321
import org.apache.flink.table.data.RowData;
@@ -36,9 +24,6 @@
3624
import org.apache.flink.types.Row;
3725
import org.apache.flink.util.FlinkRuntimeException;
3826
import org.apache.flink.util.Preconditions;
39-
import org.apache.logging.log4j.LogManager;
40-
import org.apache.logging.log4j.Logger;
41-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
4227

4328
import com.getindata.connectors.http.LookupArg;
4429
import com.getindata.connectors.http.LookupQueryCreator;
@@ -47,18 +32,6 @@
4732
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
4833

4934
/**
50-
<<<<<<< HEAD
51-
* <p>Generic JSON and URL query creator; in addition to be able to map columns to json requests,
52-
* it allows url inserts to be mapped to column names using templating.</p>
53-
* <p>For GETs, column names are mapped to query parameters. e.g. for
54-
* <code>GenericJsonAndUrlQueryCreator.REQUEST_PARAM_FIELDS</code> = "id1;id2"
55-
* and url of http://base. At lookup time with values of id1=1 and id2=2 a call of
56-
* http/base?id1=1&amp;id2=2 will be issued.</p>
57-
* <p>For PUT and POST, parameters are mapped to the json body e.g. for
58-
* REQUEST_PARAM_FIELDS = "id1;id2" and url of http://base. At lookup time with values of id1=1 and
59-
* id2=2 as call of http/base will be issued with a json payload of {"id1":1,"id2":2}</p>
60-
* <p>For all http methods, url segments can be used to include lookup up values. Using the map from
61-
=======
6235
* Generic JSON and URL query creator; in addition to be able to map columns to json requests,
6336
* it allows url inserts to be mapped to column names using templating.
6437
* <br>
@@ -72,33 +45,16 @@
7245
* id2=2 as call of http/base will be issued with a json payload of {"id1":1,"id2":2}
7346
* <br>
7447
* For all http methods, url segments can be used to include lookup up values. Using the map from
75-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
7648
* <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key of the insert and the
7749
* value of the associated column.
7850
* e.g. for <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> = "key1":"col1"
7951
* and url of http://base/{key1}. At lookup time with values of col1="aaaa" a call of
80-
<<<<<<< HEAD
81-
* http/base/aaaa will be issued.</p>
82-
*/
83-
@Slf4j
84-
@Builder
85-
public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
86-
private static final long serialVersionUID = 1L;
87-
private final String httpMethod;
88-
// not final so we can mutate for unit test
89-
private SerializationSchema<RowData> serializationSchema;
90-
private final List<String> requestQueryParamsFields;
91-
private boolean schemaOpened = false;
92-
private final List<String> requestBodyFields;
93-
private final Map<String, String> requestUrlMap;
94-
private final LookupRow lookupRow;
95-
=======
9652
* http/base/aaaa will be issued.
9753
*
9854
*/
55+
@Slf4j
9956
public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
10057
private static final long serialVersionUID = 1L;
101-
private static final Logger log = LogManager.getLogger(GenericJsonAndUrlQueryCreator.class);
10258

10359
// not final so we can mutate for unit test
10460
private SerializationSchema<RowData> serializationSchema;
@@ -133,7 +89,6 @@ public GenericJsonAndUrlQueryCreator(final String httpMethod,
13389
this.requestBodyFields = requestBodyFields;
13490
this.requestUrlMap = requestUrlMap;
13591
}
136-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
13792
@VisibleForTesting
13893
void setSerializationSchema(SerializationSchema<RowData>
13994
serializationSchema) {
@@ -153,13 +108,9 @@ public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) {
153108
jsonObject = (ObjectNode) ObjectMapperAdapter.instance().readTree(
154109
serializationSchema.serialize(lookupDataRow));
155110
} catch (IOException e) {
156-
<<<<<<< HEAD
157-
throw new RuntimeException("Unable to parse the lookup arguments to json.", e);
158-
=======
159111
String message = "Unable to parse the lookup arguments to json.";
160112
log.error(message, e);
161113
throw new RuntimeException(message, e);
162-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
163114
}
164115
// Parameters are encoded as query params for GET and none GET.
165116
// Later code will turn these query params into the body for PUTs and POSTs
@@ -180,34 +131,24 @@ public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) {
180131
lookupQuery = ObjectMapperAdapter.instance()
181132
.writeValueAsString(jsonObject.retain(requestBodyFields));
182133
} catch (JsonProcessingException e) {
183-
<<<<<<< HEAD
184-
throw new RuntimeException("Unable to convert Json Object to a string", e);
185-
=======
186134
final String message = "Unable to convert Json Object to a string";
187135
log.error(message, e);
188136
throw new RuntimeException(message,e);
189-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
190137
}
191138
// body parameters
192139
// use the request json object to scope the required fields and the lookupArgs as values
193140
bodyBasedUrlQueryParams = createBodyBasedParams(lookupArgs,
194141
jsonObjectForQueryParams);
195142
}
196143
// add the path map
197-
<<<<<<< HEAD
198-
final Map<String, String> pathBasedUrlParams = createURLPathBasedParams(lookupArgs,
199-
=======
200144
final Map<String, String> pathBasedUrlParams = createPathBasedParams(lookupArgs,
201-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
202145
requestUrlMap);
203146

204147
return new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParams, pathBasedUrlParams);
205148

206149
}
207150

208151
/**
209-
<<<<<<< HEAD
210-
=======
211152
* Create a Row from a RowData and DataType
212153
* @param lookupRowData the lookup RowData
213154
* @param rowType the datatype
@@ -230,19 +171,14 @@ static Row rowDataToRow(final RowData lookupRowData, final DataType rowType) {
230171
}
231172

232173
/**
233-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
234174
* Create map of the json key to the lookup argument
235175
* value. This is used for body based content.
236176
* @param args lookup arguments
237177
* @param objectNode object node
238178
* @return map of field content to the lookup argument value.
239179
*/
240180
private Map<String, String> createBodyBasedParams(final Collection<LookupArg> args,
241-
<<<<<<< HEAD
242-
ObjectNode objectNode ) {
243-
=======
244181
ObjectNode objectNode ) {
245-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
246182
Map<String, String> mapOfJsonKeyToLookupArg = new LinkedHashMap<>();
247183
Iterator<Map.Entry<String, JsonNode>> iterator = objectNode.fields();
248184
iterator.forEachRemaining(field -> {
@@ -258,45 +194,26 @@ private Map<String, String> createBodyBasedParams(final Collection<LookupArg> ar
258194
return mapOfJsonKeyToLookupArg;
259195
}
260196
/**
261-
<<<<<<< HEAD
262-
* Create map of insert name to column name for path inserts
263-
=======
264197
* Create map of the json key to the lookup argument
265198
* value. This is used for body based content.
266-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
267199
* @param args lookup arguments
268200
* @param urlMap map of insert name to column name
269201
* @return map of field content to the lookup argument value.
270202
*/
271-
<<<<<<< HEAD
272-
private Map<String, String> createURLPathBasedParams(final Collection<LookupArg> args,
273-
Map<String,
274-
String> urlMap ) {
275-
Map<String, String> mapOfinsertKeyToLookupArg = new LinkedHashMap<>();
276-
=======
277203
private Map<String, String> createPathBasedParams(final Collection<LookupArg> args,
278204
Map<String, String> urlMap ) {
279205
Map<String, String> mapOfJsonKeyToLookupArg = new LinkedHashMap<>();
280-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
281206
if (urlMap != null) {
282207
for (String key: urlMap.keySet()) {
283208
for (final LookupArg arg : args) {
284209
if (arg.getArgName().equals(key)) {
285-
<<<<<<< HEAD
286-
mapOfinsertKeyToLookupArg.put(
287-
=======
288210
mapOfJsonKeyToLookupArg.put(
289-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
290211
urlMap.get(key), arg.getArgValue());
291212
}
292213
}
293214
}
294215
}
295-
<<<<<<< HEAD
296-
return mapOfinsertKeyToLookupArg;
297-
=======
298216
return mapOfJsonKeyToLookupArg;
299-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
300217
}
301218
/**
302219
* Convert json object to query params string
@@ -317,23 +234,13 @@ static String convertToQueryParameters(final ObjectNode jsonObject, String enc)
317234
result.add(fieldName + "="
318235
+ URLEncoder.encode(fieldValue, enc));
319236
} catch (UnsupportedEncodingException e) {
320-
<<<<<<< HEAD
321-
throw new RuntimeException("Failed to encode the value of the query parameter name "
322-
+ fieldName
323-
+ ": "
324-
+ fieldValue
325-
+ " using encoding "
326-
+ enc,
327-
e);
328-
=======
329237
final String message =
330238
"Failed to encode the value of the query parameter name "
331239
+ fieldName
332240
+ ": "
333241
+ fieldValue;
334242
log.error(message, e);
335243
throw new RuntimeException(message, e);
336-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
337244
}
338245
});
339246

@@ -349,16 +256,11 @@ private void checkOpened() {
349256
GenericJsonAndUrlQueryCreator.class));
350257
this.schemaOpened = true;
351258
} catch (final Exception e) {
352-
<<<<<<< HEAD
353-
throw new FlinkRuntimeException("Failed to initialize serialization schema for "
354-
+ GenericJsonAndUrlQueryCreator.class, e);
355-
=======
356259
final String message =
357260
"Failed to initialize serialization schema for "
358261
+ GenericJsonAndUrlQueryCreator.class;
359262
log.error(message, e);
360263
throw new FlinkRuntimeException(message, e);
361-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
362264
}
363265
}
364266
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
import org.apache.flink.table.factories.DynamicTableFactory;
1717
import org.apache.flink.table.factories.FactoryUtil;
1818
import org.apache.flink.table.factories.SerializationFormatFactory;
19-
import org.apache.logging.log4j.LogManager;
20-
import org.apache.logging.log4j.Logger;
2119
import static org.apache.flink.configuration.ConfigOptions.key;
2220

2321
import com.getindata.connectors.http.LookupQueryCreator;
@@ -39,11 +37,6 @@
3937
@SuppressWarnings({"checkstyle:RegexpSingleline", "checkstyle:LineLength"})
4038
public class GenericJsonAndUrlQueryCreatorFactory implements LookupQueryCreatorFactory {
4139
private static final long serialVersionUID = 1L;
42-
<<<<<<< HEAD
43-
private static final Logger log =
44-
LogManager.getLogger(GenericJsonAndUrlQueryCreatorFactory.class);
45-
=======
46-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
4740

4841
public static final String ID = "generic-json-url";
4942

@@ -73,11 +66,7 @@ public class GenericJsonAndUrlQueryCreatorFactory implements LookupQueryCreatorF
7366
+ "as url segments. Parses a string as a map of strings. "
7467
+ "<br>"
7568
+ "For example if there are table columns called customerId"
76-
<<<<<<< HEAD
77-
+ " and orderId, then specifying value customerId:cid,orderID:oid"
78-
=======
7969
+ " and orderId, then specifying value customerId:cid1,orderID:oid"
80-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
8170
+ " and a url of https://myendpoint/customers/{cid}/orders/{oid}"
8271
+ " will mean that the url used for the lookup query will"
8372
+ " dynamically pickup the values for customerId, orderId"
@@ -125,16 +114,6 @@ public LookupQueryCreator createLookupQueryCreator(final ReadableConfig readable
125114
encoder.createRuntimeEncoder(null,
126115
lookupRow.getLookupPhysicalRowDataType());
127116
}
128-
<<<<<<< HEAD
129-
return GenericJsonAndUrlQueryCreator.builder()
130-
.httpMethod(httpMethod)
131-
.serializationSchema(jsonSerializationSchema)
132-
.requestQueryParamsFields(requestQueryParamsFields)
133-
.requestBodyFields(requestBodyFields)
134-
.requestUrlMap(requestUrlMap)
135-
.lookupRow(lookupRow)
136-
.build();
137-
=======
138117
// create using config parameter values and specify serialization
139118
// schema from json format.
140119
return new GenericJsonAndUrlQueryCreator(httpMethod,
@@ -143,7 +122,6 @@ public LookupQueryCreator createLookupQueryCreator(final ReadableConfig readable
143122
requestBodyFields,
144123
requestUrlMap,
145124
lookupRow);
146-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
147125
}
148126

149127
@Override

src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/CustomFormatFactory.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@ public class CustomFormatFactory implements SerializationFormatFactory {
2020
public static final String REQUIRED_OPTION = "required-option-one";
2121

2222
/**
23-
<<<<<<< HEAD
24-
* Used for testing custom format.
25-
=======
2623
* Consider removing this static only used for testing only
27-
>>>>>>> 6c68722 (HTTP-99 Generic Json url query creator)
2824
*/
2925
static boolean requiredOptionsWereUsed = false;
3026

0 commit comments

Comments
 (0)