Skip to content

Commit 0d116ba

Browse files
authored
Fix an NPE in the ES|QL completion command. (#129235)
1 parent cc02ac8 commit 0d116ba

File tree

5 files changed

+30
-14
lines changed

5 files changed

+30
-14
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutionState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void markSeqNoAsPersisted(long seqNo) {
7575
* @param response The inference response.
7676
*/
7777
public synchronized void onInferenceResponse(long seqNo, InferenceAction.Response response) {
78-
if (failureCollector.hasFailure() == false) {
78+
if (response != null && failureCollector.hasFailure() == false) {
7979
bufferedResponses.put(seqNo, response);
8080
}
8181
checkpoint.markSeqNoAsProcessed(seqNo);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/bulk/BulkInferenceExecutor.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,19 @@ public void execute(BulkInferenceRequestIterator requests, ActionListener<List<I
6969
bulkExecutionState.finish();
7070
}
7171

72-
throttledInferenceRunner.doInference(
73-
request,
74-
ActionListener.runAfter(
75-
ActionListener.wrap(
76-
r -> bulkExecutionState.onInferenceResponse(seqNo, r),
77-
e -> bulkExecutionState.onInferenceException(seqNo, e)
78-
),
79-
responseHandler::persistPendingResponses
80-
)
72+
ActionListener<InferenceAction.Response> inferenceResponseListener = ActionListener.runAfter(
73+
ActionListener.wrap(
74+
r -> bulkExecutionState.onInferenceResponse(seqNo, r),
75+
e -> bulkExecutionState.onInferenceException(seqNo, e)
76+
),
77+
responseHandler::persistPendingResponses
8178
);
79+
80+
if (request == null) {
81+
inferenceResponseListener.onResponse(null);
82+
} else {
83+
throttledInferenceRunner.doInference(request, inferenceResponseListener);
84+
}
8285
}
8386
}
8487

@@ -112,7 +115,6 @@ public synchronized void persistPendingResponses() {
112115
if (bulkExecutionState.hasFailure() == false) {
113116
try {
114117
InferenceAction.Response response = bulkExecutionState.fetchBufferedResponse(persistedSeqNo);
115-
assert response != null;
116118
responses.add(response);
117119
} catch (Exception e) {
118120
bulkExecutionState.addFailure(e);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorOutputBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ public void close() {
5151
*/
5252
@Override
5353
public void addInferenceResponse(InferenceAction.Response inferenceResponse) {
54+
if (inferenceResponse == null) {
55+
outputBlockBuilder.appendNull();
56+
return;
57+
}
58+
5459
ChatCompletionResults completionResults = inferenceResults(inferenceResponse);
5560

5661
if (completionResults == null) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/inference/completion/CompletionOperatorRequestIterator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ public InferenceAction.Request next() {
5858
* Wraps a single prompt string into an {@link InferenceAction.Request}.
5959
*/
6060
private InferenceAction.Request inferenceRequest(String prompt) {
61+
if (prompt == null) {
62+
return null;
63+
}
64+
6165
return InferenceAction.Request.builder(inferenceId, TaskType.COMPLETION).setInput(List.of(prompt)).build();
6266
}
6367

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/inference/InferenceOperatorTestCase.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,17 @@ protected int remaining() {
8888
@Override
8989
protected Page createPage(int positionOffset, int length) {
9090
length = Integer.min(length, remaining());
91-
try (var builder = blockFactory.newBytesRefVectorBuilder(length)) {
91+
try (var builder = blockFactory.newBytesRefBlockBuilder(length)) {
9292
for (int i = 0; i < length; i++) {
93-
builder.appendBytesRef(new BytesRef(randomAlphaOfLength(10)));
93+
if (randomInt() % 100 == 0) {
94+
builder.appendNull();
95+
} else {
96+
builder.appendBytesRef(new BytesRef(randomAlphaOfLength(10)));
97+
}
98+
9499
}
95100
currentPosition += length;
96-
return new Page(builder.build().asBlock());
101+
return new Page(builder.build());
97102
}
98103
}
99104
};

0 commit comments

Comments
 (0)