Skip to content

Commit 041244a

Browse files
authored
[Response Ops][Task Manager] Validate Task Instance During "mGet" Task Claiming Using Try/Catch (#223496)
## Summary Resolves: #204466 This PR is an alternative solution for this PR: #207158. It adds a try catch to the task instance during the mGet task claiming strategy logic without using schema validation. This should ensure malformed tasks do no short circuit other tasks from being claimed. ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios
1 parent ec944d8 commit 041244a

File tree

2 files changed

+154
-25
lines changed

2 files changed

+154
-25
lines changed

x-pack/platform/plugins/shared/task_manager/server/task_claimers/strategy_mget.test.ts

Lines changed: 129 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ describe('TaskClaiming', () => {
382382
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
383383

384384
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
385-
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; getErrors: 0;',
385+
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 3; updateErrors: 0; getErrors: 0; malformed data errors: 0',
386386
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
387387
);
388388

@@ -547,7 +547,7 @@ describe('TaskClaiming', () => {
547547
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
548548

549549
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
550-
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0;',
550+
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; malformed data errors: 0',
551551
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
552552
);
553553

@@ -640,7 +640,7 @@ describe('TaskClaiming', () => {
640640
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
641641

642642
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
643-
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0;',
643+
'task claimer claimed: 2; stale: 0; conflicts: 0; missing: 1; capacity reached: 0; updateErrors: 0; getErrors: 0; malformed data errors: 0',
644644
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
645645
);
646646

@@ -733,7 +733,7 @@ describe('TaskClaiming', () => {
733733
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
734734

735735
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
736-
'task claimer claimed: 2; stale: 1; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
736+
'task claimer claimed: 2; stale: 1; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; malformed data errors: 0',
737737
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
738738
);
739739

@@ -832,7 +832,7 @@ describe('TaskClaiming', () => {
832832
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
833833

834834
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
835-
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
835+
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; malformed data errors: 0',
836836
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
837837
);
838838

@@ -958,7 +958,7 @@ describe('TaskClaiming', () => {
958958
unwrap(resultOrErr) as ClaimOwnershipResult;
959959

960960
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
961-
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
961+
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; malformed data errors: 0',
962962
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
963963
);
964964

@@ -1121,7 +1121,7 @@ describe('TaskClaiming', () => {
11211121
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
11221122

11231123
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
1124-
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 1;',
1124+
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 1; malformed data errors: 0',
11251125
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
11261126
);
11271127
expect(taskManagerLogger.error).toHaveBeenCalledWith(
@@ -1249,7 +1249,7 @@ describe('TaskClaiming', () => {
12491249
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
12501250

12511251
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
1252-
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
1252+
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; malformed data errors: 0',
12531253
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
12541254
);
12551255
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
@@ -1489,7 +1489,7 @@ describe('TaskClaiming', () => {
14891489
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
14901490

14911491
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
1492-
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; getErrors: 0;',
1492+
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 1; getErrors: 0; malformed data errors: 0',
14931493
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
14941494
);
14951495
expect(taskManagerLogger.error).toHaveBeenCalledWith(
@@ -1622,7 +1622,7 @@ describe('TaskClaiming', () => {
16221622
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
16231623

16241624
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
1625-
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0;',
1625+
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; malformed data errors: 0',
16261626
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
16271627
);
16281628
expect(taskManagerLogger.error).not.toHaveBeenCalled();
@@ -1787,6 +1787,125 @@ describe('TaskClaiming', () => {
17871787
expect(store.bulkGet).not.toHaveBeenCalled();
17881788
});
17891789

1790+
test('should handle malformed errors when claiming tasks', async () => {
1791+
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
1792+
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));
1793+
1794+
const fetchedTasks = [
1795+
mockInstance({ id: `id-1`, taskType: 'report' }),
1796+
mockInstance({ id: `id-2`, taskType: 'report' }),
1797+
mockInstance({
1798+
id: `id-3`,
1799+
taskType: 'yawn',
1800+
schedule: {
1801+
interval: 'PT1M',
1802+
},
1803+
}),
1804+
mockInstance({ id: `id-4`, taskType: 'report' }),
1805+
];
1806+
1807+
const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
1808+
store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap });
1809+
store.getDocVersions.mockResolvedValueOnce(docLatestVersions);
1810+
store.bulkPartialUpdate.mockResolvedValueOnce(
1811+
[fetchedTasks[0], fetchedTasks[1], fetchedTasks[3]].map(getPartialUpdateResult)
1812+
);
1813+
store.bulkGet.mockResolvedValueOnce([
1814+
asOk(fetchedTasks[0]),
1815+
asOk(fetchedTasks[1]),
1816+
asOk(fetchedTasks[3]),
1817+
]);
1818+
1819+
const taskClaiming = new TaskClaiming({
1820+
logger: taskManagerLogger,
1821+
strategy: CLAIM_STRATEGY_MGET,
1822+
definitions: taskDefinitions,
1823+
taskStore: store,
1824+
excludedTaskTypes: [],
1825+
maxAttempts: 2,
1826+
getAvailableCapacity: () => 10,
1827+
taskPartitioner,
1828+
});
1829+
1830+
const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({
1831+
claimOwnershipUntil: new Date(),
1832+
});
1833+
1834+
if (!isOk<ClaimOwnershipResult, FillPoolResult>(resultOrErr)) {
1835+
expect(resultOrErr).toBe(undefined);
1836+
}
1837+
1838+
const result = unwrap(resultOrErr) as ClaimOwnershipResult;
1839+
1840+
expect(apm.startTransaction).toHaveBeenCalledWith(
1841+
TASK_MANAGER_MARK_AS_CLAIMED,
1842+
TASK_MANAGER_TRANSACTION_TYPE
1843+
);
1844+
expect(mockApmTrans.end).toHaveBeenCalledWith('success');
1845+
1846+
expect(taskManagerLogger.debug).toHaveBeenCalledWith(
1847+
'task claimer claimed: 3; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; malformed data errors: 1',
1848+
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
1849+
);
1850+
1851+
expect(taskManagerLogger.error).toHaveBeenCalledWith(
1852+
'Error validating task schema id-3:yawn during claim: "Invalid interval \\"PT1M\\". Intervals must be of the form {number}m. Example: 5m."',
1853+
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
1854+
);
1855+
1856+
expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({
1857+
size: 40,
1858+
seq_no_primary_term: true,
1859+
});
1860+
1861+
expect(store.getDocVersions).toHaveBeenCalledWith([
1862+
'task:id-1',
1863+
'task:id-2',
1864+
'task:id-3',
1865+
'task:id-4',
1866+
]);
1867+
1868+
expect(store.bulkPartialUpdate).toHaveBeenCalledTimes(1);
1869+
expect(store.bulkPartialUpdate).toHaveBeenCalledWith([
1870+
{
1871+
id: fetchedTasks[0].id,
1872+
version: fetchedTasks[0].version,
1873+
scheduledAt: fetchedTasks[0].runAt,
1874+
attempts: 1,
1875+
ownerId: 'test-test',
1876+
retryAt: new Date('1970-01-01T00:05:30.000Z'),
1877+
status: 'running',
1878+
startedAt: new Date('1970-01-01T00:00:00.000Z'),
1879+
},
1880+
{
1881+
id: fetchedTasks[1].id,
1882+
version: fetchedTasks[1].version,
1883+
scheduledAt: fetchedTasks[1].runAt,
1884+
attempts: 1,
1885+
ownerId: 'test-test',
1886+
retryAt: new Date('1970-01-01T00:05:30.000Z'),
1887+
status: 'running',
1888+
startedAt: new Date('1970-01-01T00:00:00.000Z'),
1889+
},
1890+
{
1891+
id: fetchedTasks[3].id,
1892+
version: fetchedTasks[3].version,
1893+
scheduledAt: fetchedTasks[3].runAt,
1894+
attempts: 1,
1895+
ownerId: 'test-test',
1896+
retryAt: new Date('1970-01-01T00:05:30.000Z'),
1897+
status: 'running',
1898+
startedAt: new Date('1970-01-01T00:00:00.000Z'),
1899+
},
1900+
]);
1901+
1902+
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-4']);
1903+
1904+
expect(result.stats.tasksUpdated).toEqual(3);
1905+
expect(result.stats.tasksClaimed).toEqual(3);
1906+
expect(result.stats.tasksErrors).toEqual(1);
1907+
});
1908+
17901909
test('it should filter for specific partitions and tasks without partitions', async () => {
17911910
const taskManagerId = uuidv4();
17921911
const definitions = new TaskTypeDictionary(mockLogger());

x-pack/platform/plugins/shared/task_manager/server/task_claimers/strategy_mget.ts

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
149149
// apply capacity constraint to candidate tasks
150150
const tasksToRun: ConcreteTaskInstance[] = [];
151151
const leftOverTasks: ConcreteTaskInstance[] = [];
152+
const tasksWithMalformedData: ConcreteTaskInstance[] = [];
152153

153154
let capacityAccumulator = 0;
154155
for (const task of candidateTasks) {
@@ -166,19 +167,28 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
166167
const now = new Date();
167168
const taskUpdates: PartialConcreteTaskInstance[] = [];
168169
for (const task of tasksToRun) {
169-
taskUpdates.push({
170-
id: task.id,
171-
version: task.version,
172-
scheduledAt:
173-
task.retryAt != null && new Date(task.retryAt).getTime() < Date.now()
174-
? task.retryAt
175-
: task.runAt,
176-
status: TaskStatus.Running,
177-
startedAt: now,
178-
attempts: task.attempts + 1,
179-
retryAt: getRetryAt(task, definitions.get(task.taskType)) ?? null,
180-
ownerId: taskStore.taskManagerId,
181-
});
170+
try {
171+
taskUpdates.push({
172+
id: task.id,
173+
version: task.version,
174+
scheduledAt:
175+
task.retryAt != null && new Date(task.retryAt).getTime() < Date.now()
176+
? task.retryAt
177+
: task.runAt,
178+
status: TaskStatus.Running,
179+
startedAt: now,
180+
attempts: task.attempts + 1,
181+
retryAt: getRetryAt(task, definitions.get(task.taskType)) ?? null,
182+
ownerId: taskStore.taskManagerId,
183+
});
184+
} catch (error) {
185+
logger.error(
186+
`Error validating task schema ${task.id}:${task.taskType} during claim: ${JSON.stringify(
187+
error.message
188+
)}`
189+
);
190+
tasksWithMalformedData.push(task);
191+
}
182192
}
183193

184194
// perform the task object updates, deal with errors
@@ -224,7 +234,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
224234
}, []);
225235

226236
// TODO: need a better way to generate stats
227-
const message = `task claimer claimed: ${fullTasksToRun.length}; stale: ${staleTasks.length}; conflicts: ${conflicts}; missing: ${missingTasks.length}; capacity reached: ${leftOverTasks.length}; updateErrors: ${bulkUpdateErrors}; getErrors: ${bulkGetErrors};`;
237+
const message = `task claimer claimed: ${fullTasksToRun.length}; stale: ${staleTasks.length}; conflicts: ${conflicts}; missing: ${missingTasks.length}; capacity reached: ${leftOverTasks.length}; updateErrors: ${bulkUpdateErrors}; getErrors: ${bulkGetErrors}; malformed data errors: ${tasksWithMalformedData.length}`;
228238
logger.debug(message);
229239

230240
// build results
@@ -234,7 +244,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
234244
tasksConflicted: conflicts,
235245
tasksClaimed: fullTasksToRun.length,
236246
tasksLeftUnclaimed: leftOverTasks.length,
237-
tasksErrors: bulkUpdateErrors + bulkGetErrors,
247+
tasksErrors: bulkUpdateErrors + bulkGetErrors + tasksWithMalformedData.length,
238248
staleTasks: staleTasks.length,
239249
},
240250
docs: fullTasksToRun,

0 commit comments

Comments
 (0)