Skip to content

Update helpers to use new multisearch types #2697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .buildkite/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,3 @@ WORKDIR /usr/src/app

COPY package.json .
RUN npm install

COPY . .
3 changes: 0 additions & 3 deletions .buildkite/Dockerfile-make
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,3 @@ USER ${BUILDER_UID}:${BUILDER_GID}
# install dependencies
COPY package.json .
RUN npm install

# copy project files
COPY . .
14 changes: 7 additions & 7 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export interface MsearchHelperOptions extends T.MsearchRequest {

export interface MsearchHelper extends Promise<void> {
stop: (error?: Error | null) => void
search: <TDocument = unknown>(header: T.MsearchMultisearchHeader, body: T.MsearchMultisearchBody) => Promise<MsearchHelperResponse<TDocument>>
search: <TDocument = unknown>(header: T.MsearchMultisearchHeader, body: T.SearchSearchRequestBody) => Promise<MsearchHelperResponse<TDocument>>
}

export interface MsearchHelperResponse<TDocument> {
Expand Down Expand Up @@ -362,7 +362,7 @@ export default class Helpers {
// TODO: support abort a single search?
// NOTE: the validation checks are synchronous and the callback/promise will
// be resolved in the same tick. We might want to fix this in the future.
search<TDocument = unknown> (header: T.MsearchMultisearchHeader, body: T.MsearchMultisearchBody): Promise<MsearchHelperResponse<TDocument>> {
search<TDocument = unknown> (header: T.MsearchMultisearchHeader, body: T.SearchSearchRequestBody): Promise<MsearchHelperResponse<TDocument>> {
if (stopReading) {
const error = stopError === null
? new ConfigurationError('The msearch processor has been stopped')
Expand Down Expand Up @@ -397,7 +397,7 @@ export default class Helpers {

async function iterate (): Promise<void> {
const { semaphore, finish } = buildSemaphore()
const msearchBody: Array<T.MsearchMultisearchHeader | T.MsearchMultisearchBody> = []
const msearchBody: Array<T.MsearchMultisearchHeader | T.SearchSearchRequestBody> = []
const callbacks: any[] = []
let loadedOperations = 0
timeoutRef = setTimeout(onFlushTimeout, flushInterval) // eslint-disable-line
Expand Down Expand Up @@ -490,7 +490,7 @@ export default class Helpers {
}
}

function send (msearchBody: Array<T.MsearchMultisearchHeader | T.MsearchMultisearchBody>, callbacks: any[]): void {
function send (msearchBody: Array<T.MsearchMultisearchHeader | T.SearchSearchRequestBody>, callbacks: any[]): void {
/* istanbul ignore if */
if (running > concurrency) {
throw new Error('Max concurrency reached')
Expand All @@ -508,15 +508,15 @@ export default class Helpers {
}
}

function msearchOperation (msearchBody: Array<T.MsearchMultisearchHeader | T.MsearchMultisearchBody>, callbacks: any[], done: () => void): void {
function msearchOperation (msearchBody: Array<T.MsearchMultisearchHeader | T.SearchSearchRequestBody>, callbacks: any[], done: () => void): void {
let retryCount = retries

// Instead of going full on async-await, which would make the code easier to read,
// we have decided to use callback style instead.
// This because every time we use async await, V8 will create multiple promises
// behind the scenes, making the code slightly slower.
tryMsearch(msearchBody, callbacks, retrySearch)
function retrySearch (msearchBody: Array<T.MsearchMultisearchHeader | T.MsearchMultisearchBody>, callbacks: any[]): void {
function retrySearch (msearchBody: Array<T.MsearchMultisearchHeader | T.SearchSearchRequestBody>, callbacks: any[]): void {
if (msearchBody.length > 0 && retryCount > 0) {
retryCount -= 1
setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch)
Expand All @@ -528,7 +528,7 @@ export default class Helpers {

// This function never returns an error, if the msearch operation fails,
// the error is dispatched to all search executors.
function tryMsearch (msearchBody: Array<T.MsearchMultisearchHeader | T.MsearchMultisearchBody>, callbacks: any[], done: (msearchBody: Array<T.MsearchMultisearchHeader | T.MsearchMultisearchBody>, callbacks: any[]) => void): void {
function tryMsearch (msearchBody: Array<T.MsearchMultisearchHeader | T.SearchSearchRequestBody>, callbacks: any[], done: (msearchBody: Array<T.MsearchMultisearchHeader | T.SearchSearchRequestBody>, callbacks: any[]) => void): void {
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), reqOptions as TransportRequestOptionsWithMeta)
.then(results => {
const retryBody = []
Expand Down
Loading