Skip to content

Commit 0d2a09c

Browse files
authored
Optimize pipeline for fatal/non-retriable errors (#898)
* In case of fatal non retriable errors, stop retrying and immediately move queue msgs to poison queue. * New `KernelMemoryException.IsTransient` optional property to explicit if an error is transient or not. * Use `KernelMemoryException.IsTransient` in Azure AI Doc Intelligence when an image cannot be processed, for instance when the image is too big (note: max size depends on SKU, see service public docs). * Use `KernelMemoryException.IsTransient` in AI clients, don't retry if the HTTP error cannot be recovered without code/config changes * Use `KernelMemoryException.IsTransient` in AI Mime type detection, don't retry processing unsupported files. * Change `IQueue` and `IPipelineStepHandler` signatures to support failure types, as opposed to just a boolean `false`.
1 parent fcbb685 commit 0d2a09c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+689
-172
lines changed

examples/201-dotnet-serverless-custom-handler/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public MyHandler(
4747
public string StepName { get; }
4848

4949
/// <inheritdoc />
50-
public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(
50+
public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync(
5151
DataPipeline pipeline, CancellationToken cancellationToken = default)
5252
{
5353
/* ... your custom ...
@@ -64,6 +64,6 @@ public MyHandler(
6464
// Remove this - here only to avoid build errors
6565
await Task.Delay(0, cancellationToken).ConfigureAwait(false);
6666

67-
return (true, pipeline);
67+
return (ReturnType.Success, pipeline);
6868
}
6969
}

examples/202-dotnet-custom-handler-as-a-service/MyHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public Task StopAsync(CancellationToken cancellationToken = default)
3838
}
3939

4040
/// <inheritdoc />
41-
public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default)
41+
public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default)
4242
{
4343
/* ... your custom ...
4444
* ... handler ...
@@ -49,6 +49,6 @@ public Task StopAsync(CancellationToken cancellationToken = default)
4949
// Remove this - here only to avoid build errors
5050
await Task.Delay(0, cancellationToken).ConfigureAwait(false);
5151

52-
return (true, pipeline);
52+
return (ReturnType.Success, pipeline);
5353
}
5454
}

extensions/Anthropic/Client/RawAnthropicClient.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ internal async IAsyncEnumerable<StreamingResponseMessage> CallClaudeStreamingAsy
6464
if (!response.IsSuccessStatusCode)
6565
{
6666
var responseError = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
67-
throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}");
67+
throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}",
68+
isTransient: response.StatusCode.IsTransientError());
6869
}
6970

7071
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);

extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,19 @@ public AzureAIDocIntelEngine(
5858
///<inheritdoc/>
5959
public async Task<string> ExtractTextFromImageAsync(Stream imageContent, CancellationToken cancellationToken = default)
6060
{
61-
// Start the OCR operation
62-
var operation = await this._recognizerClient.AnalyzeDocumentAsync(WaitUntil.Completed, "prebuilt-read", imageContent, cancellationToken: cancellationToken).ConfigureAwait(false);
61+
try
62+
{
63+
// Start the OCR operation
64+
var operation = await this._recognizerClient.AnalyzeDocumentAsync(WaitUntil.Completed, "prebuilt-read", imageContent, cancellationToken: cancellationToken).ConfigureAwait(false);
6365

64-
// Wait for the result
65-
Response<AnalyzeResult> operationResponse = await operation.WaitForCompletionAsync(cancellationToken).ConfigureAwait(false);
66+
// Wait for the result
67+
Response<AnalyzeResult> operationResponse = await operation.WaitForCompletionAsync(cancellationToken).ConfigureAwait(false);
6668

67-
return operationResponse.Value.Content;
69+
return operationResponse.Value.Content;
70+
}
71+
catch (RequestFailedException e)
72+
{
73+
throw new AzureAIDocIntelException(e.Message, e, isTransient: HttpErrors.IsTransientError(e.Status));
74+
}
6875
}
6976
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System;
4+
5+
namespace Microsoft.KernelMemory.DataFormats.AzureAIDocIntel;
6+
7+
public class AzureAIDocIntelException : KernelMemoryException
8+
{
9+
/// <inheritdoc />
10+
public AzureAIDocIntelException(bool? isTransient = null)
11+
{
12+
this.IsTransient = isTransient;
13+
}
14+
15+
/// <inheritdoc />
16+
public AzureAIDocIntelException(string message, bool? isTransient = null) : base(message)
17+
{
18+
this.IsTransient = isTransient;
19+
}
20+
21+
/// <inheritdoc />
22+
public AzureAIDocIntelException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
23+
{
24+
this.IsTransient = isTransient;
25+
}
26+
}

extensions/AzureAISearch/AzureAISearch/AzureAISearchMemoryException.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,21 @@ namespace Microsoft.KernelMemory.MemoryDb.AzureAISearch;
77
public class AzureAISearchMemoryException : KernelMemoryException
88
{
99
/// <inheritdoc />
10-
public AzureAISearchMemoryException()
10+
public AzureAISearchMemoryException(bool? isTransient = null)
1111
{
12+
this.IsTransient = isTransient;
1213
}
1314

1415
/// <inheritdoc />
15-
public AzureAISearchMemoryException(string? message) : base(message)
16+
public AzureAISearchMemoryException(string message, bool? isTransient = null) : base(message)
1617
{
18+
this.IsTransient = isTransient;
1719
}
1820

1921
/// <inheritdoc />
20-
public AzureAISearchMemoryException(string? message, Exception? innerException) : base(message, innerException)
22+
public AzureAISearchMemoryException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
2123
{
24+
this.IsTransient = isTransient;
2225
}
2326
}
27+

extensions/AzureAISearch/AzureAISearch/Internals/MemoryDbSchema.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,41 +13,41 @@ public void Validate(bool vectorSizeRequired = false)
1313
{
1414
if (this.Fields.Count == 0)
1515
{
16-
throw new KernelMemoryException("The schema is empty");
16+
throw new AzureAISearchMemoryException("The schema is empty", isTransient: false);
1717
}
1818

1919
if (this.Fields.All(x => x.Type != MemoryDbField.FieldType.Vector))
2020
{
21-
throw new KernelMemoryException("The schema doesn't contain a vector field");
21+
throw new AzureAISearchMemoryException("The schema doesn't contain a vector field", isTransient: false);
2222
}
2323

2424
int keys = this.Fields.Count(x => x.IsKey);
2525
switch (keys)
2626
{
2727
case 0:
28-
throw new KernelMemoryException("The schema doesn't contain a key field");
28+
throw new AzureAISearchMemoryException("The schema doesn't contain a key field", isTransient: false);
2929
case > 1:
30-
throw new KernelMemoryException("The schema cannot contain more than one key");
30+
throw new AzureAISearchMemoryException("The schema cannot contain more than one key", isTransient: false);
3131
}
3232

3333
if (vectorSizeRequired && this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Vector, VectorSize: 0 }))
3434
{
35-
throw new KernelMemoryException("Vector fields must have a size greater than zero defined");
35+
throw new AzureAISearchMemoryException("Vector fields must have a size greater than zero defined", isTransient: false);
3636
}
3737

3838
if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Bool, IsKey: true }))
3939
{
40-
throw new KernelMemoryException("Boolean fields cannot be used as unique keys");
40+
throw new AzureAISearchMemoryException("Boolean fields cannot be used as unique keys", isTransient: false);
4141
}
4242

4343
if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.ListOfStrings, IsKey: true }))
4444
{
45-
throw new KernelMemoryException("Collection fields cannot be used as unique keys");
45+
throw new AzureAISearchMemoryException("Collection fields cannot be used as unique keys", isTransient: false);
4646
}
4747

4848
if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Vector, IsKey: true }))
4949
{
50-
throw new KernelMemoryException("Vector fields cannot be used as unique keys");
50+
throw new AzureAISearchMemoryException("Vector fields cannot be used as unique keys", isTransient: false);
5151
}
5252
}
5353
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System;
4+
5+
namespace Microsoft.KernelMemory.AI.AzureOpenAI;
6+
7+
public class AzureOpenAIException : KernelMemoryException
8+
{
9+
/// <inheritdoc />
10+
public AzureOpenAIException(bool? isTransient = null)
11+
{
12+
this.IsTransient = isTransient;
13+
}
14+
15+
/// <inheritdoc />
16+
public AzureOpenAIException(string message, bool? isTransient = null) : base(message)
17+
{
18+
this.IsTransient = isTransient;
19+
}
20+
21+
/// <inheritdoc />
22+
public AzureOpenAIException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
23+
{
24+
this.IsTransient = isTransient;
25+
}
26+
}

extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextEmbeddingGenerator.cs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Microsoft.KernelMemory.AI.AzureOpenAI.Internals;
1313
using Microsoft.KernelMemory.AI.OpenAI;
1414
using Microsoft.KernelMemory.Diagnostics;
15+
using Microsoft.SemanticKernel;
1516
using Microsoft.SemanticKernel.AI.Embeddings;
1617
using Microsoft.SemanticKernel.Connectors.AzureOpenAI;
1718

@@ -121,15 +122,29 @@ public IReadOnlyList<string> GetTokens(string text)
121122
public Task<Embedding> GenerateEmbeddingAsync(string text, CancellationToken cancellationToken = default)
122123
{
123124
this._log.LogTrace("Generating embedding");
124-
return this._client.GenerateEmbeddingAsync(text, cancellationToken);
125+
try
126+
{
127+
return this._client.GenerateEmbeddingAsync(text, cancellationToken);
128+
}
129+
catch (HttpOperationException e)
130+
{
131+
throw new AzureOpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError());
132+
}
125133
}
126134

127135
/// <inheritdoc/>
128136
public async Task<Embedding[]> GenerateEmbeddingBatchAsync(IEnumerable<string> textList, CancellationToken cancellationToken = default)
129137
{
130138
var list = textList.ToList();
131139
this._log.LogTrace("Generating embeddings, batch size '{0}'", list.Count);
132-
IList<ReadOnlyMemory<float>> embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false);
133-
return embeddings.Select(e => new Embedding(e)).ToArray();
140+
try
141+
{
142+
IList<ReadOnlyMemory<float>> embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false);
143+
return embeddings.Select(e => new Embedding(e)).ToArray();
144+
}
145+
catch (HttpOperationException e)
146+
{
147+
throw new AzureOpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError());
148+
}
134149
}
135150
}

extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Net.Http;
66
using System.Runtime.CompilerServices;
77
using System.Threading;
8+
using System.Threading.Tasks;
89
using Azure.AI.OpenAI;
910
using Microsoft.Extensions.Logging;
1011
using Microsoft.KernelMemory.AI.AzureOpenAI.Internals;
@@ -140,8 +141,17 @@ public async IAsyncEnumerable<string> GenerateTextAsync(
140141
}
141142

142143
this._log.LogTrace("Sending chat message generation request");
143-
IAsyncEnumerable<StreamingTextContent> result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken);
144-
await foreach (StreamingTextContent x in result)
144+
IAsyncEnumerable<StreamingTextContent> result;
145+
try
146+
{
147+
result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken);
148+
}
149+
catch (HttpOperationException e)
150+
{
151+
throw new AzureOpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError());
152+
}
153+
154+
await foreach (StreamingTextContent x in result.WithCancellation(cancellationToken))
145155
{
146156
if (x.Text == null) { continue; }
147157

0 commit comments

Comments
 (0)