Skip to content

Commit 060eb22

Browse files
author
Nika Hassani
committed
feat(logging): log stream provider to cache log stream
1 parent 5b102a2 commit 060eb22

File tree

5 files changed

+193
-66
lines changed

5 files changed

+193
-66
lines changed

packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import 'dart:math';
66

77
import 'package:aws_common/aws_common.dart';
88
import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart';
9-
import 'package:aws_logging_cloudwatch/src/queued_item_store/in_memory_queued_item_store.dart';
10-
import 'package:aws_logging_cloudwatch/src/queued_item_store/queued_item_store.dart';
119
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
1210
import 'package:aws_logging_cloudwatch/src/stoppable_timer.dart';
1311
import 'package:fixnum/fixnum.dart';
@@ -32,10 +30,11 @@ import 'package:meta/meta.dart';
3230
// The maximum number of log events in a batch is 10,000.
3331

3432
const int _maxNumberOfLogEventsInBatch = 10000;
35-
const int _maxLogEventsTimeSpanInBatch = 24 * 3600;
3633
const int _maxLogEventsBatchSize = 1048576;
3734
const int _baseBufferSize = 26;
3835
const int _maxLogEventSize = 256000;
36+
final int _maxLogEventsTimeSpanInBatch =
37+
const Duration(hours: 24).inMilliseconds;
3938

4039
typedef _LogBatch = (List<QueuedItem> logQueues, List<InputLogEvent> logEvents);
4140

@@ -138,51 +137,26 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
138137

139138
Future<void> _startSyncingIfNotInProgress() async {
140139
Future<void> startSyncing() async {
141-
String logStream;
142-
try {
143-
logStream = await _logStreamProvider.logStream;
144-
} on Exception catch (e) {
145-
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
146-
await _logStore.clear();
147-
logger.warn(
148-
'Reached local store max size of: '
149-
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted from '
150-
'local store.',
151-
);
152-
}
153-
logger.error('Failed to create CloudWatch log stream', e);
154-
return;
155-
}
156-
157-
final batcheStream = _getLogBatchesToSync();
158-
await for (final (logs, events) in batcheStream) {
159-
try {
160-
final response = await _sendToCloudWatch(events, logStream);
161-
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex !=
162-
null) {
163-
break;
164-
}
165-
await _logStore.deleteItems(logs);
166-
} on Exception catch (e) {
167-
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
168-
await _logStore.deleteItems(logs);
169-
logger.warn(
170-
'Reached local store max size of: '
171-
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted '
172-
'from local store.',
173-
);
174-
}
175-
logger.error('Failed to sync batched logs to CloudWatch', e);
140+
final batchStream = _getLogBatchesToSync();
141+
await for (final (logs, events) in batchStream) {
142+
final response = await _sendToCloudWatch(events);
143+
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) {
144+
// TODO(nikahsn): throw and exception to enable log rotation if the
145+
// log store is full.
146+
break;
176147
}
148+
await _logStore.deleteItems(logs);
177149
}
178150
}
179151

180152
if (!_syncing) {
153+
// TODO(nikahsn): disable log rotation.
181154
_syncing = true;
182155
try {
183156
await startSyncing();
184157
} on Exception catch (e) {
185158
logger.error('Failed to sync logs to CloudWatch.', e);
159+
// TODO(nikahsn): enable log rotation if the log store is full
186160
} finally {
187161
_syncing = false;
188162
}
@@ -200,20 +174,25 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
200174

201175
Future<PutLogEventsResponse> _sendToCloudWatch(
202176
List<InputLogEvent> logEvents,
203-
String logStreamName,
204177
) async {
178+
final logStreamName = await _logStreamProvider.defaultLogStream;
205179
final request = PutLogEventsRequest(
206180
logGroupName: _pluginConfig.logGroupName,
207181
logStreamName: logStreamName,
208182
logEvents: logEvents,
209183
);
210-
return _client.putLogEvents(request).result;
184+
try {
185+
return await _client.putLogEvents(request).result;
186+
} on ResourceNotFoundException {
187+
await _logStreamProvider.createLogStream(logStreamName);
188+
return _client.putLogEvents(request).result;
189+
}
211190
}
212191

213192
Stream<_LogBatch> _getLogBatchesToSync() async* {
214193
final queuedLogs = (await _logStore.getAll()).toList();
215194
if (queuedLogs.isEmpty) {
216-
yield ([], []);
195+
return;
217196
}
218197
final logEvents = <InputLogEvent>[];
219198
final logQueues = <QueuedItem>[];
Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
import 'dart:async';
5+
46
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
57
import 'package:intl/intl.dart';
68

@@ -9,42 +11,51 @@ import 'package:intl/intl.dart';
911
/// [CloudWatchLogStreamProvider]
1012
/// {@endtemplate}
1113
abstract class CloudWatchLogStreamProvider {
12-
/// Returns CloudWatch log stream name to use for sending logs to CloudWatch.
13-
///
14-
/// It creates the log stream if it does not exists.
15-
Future<String> get logStream;
14+
/// Returns the default log stream name from cache. if cache is missing it
15+
/// calls [createLogStream] and return the log stream name.
16+
FutureOr<String> get defaultLogStream;
17+
18+
/// Creates the log stream and add it to the cache.
19+
Future<void> createLogStream(String logStreamName);
1620
}
1721

1822
/// {@template aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
1923
/// The default implementaion of [CloudWatchLogStreamProvider].
2024
///
21-
/// It uses `logStreamName` if provided otherwise uses `yyyy-MM-dd` date format
22-
/// of UTC time now for the `logStreamName`.
25+
/// It uses `defaultLogStreamName` if provided otherwise uses `yyyy-MM-dd`
26+
/// date format of UTC time now for the `defaultLogStreamName`.
2327
/// {@endtemplate}
2428
class DefaultCloudWatchLogStreamProvider
2529
implements CloudWatchLogStreamProvider {
2630
/// {@macro aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
2731
DefaultCloudWatchLogStreamProvider({
2832
required CloudWatchLogsClient client,
2933
required String logGroupName,
30-
String? logStreamName,
31-
}) : _logStreamName = logStreamName,
34+
String? defaultLogStreamName,
35+
}) : _defaultLogStreamName = defaultLogStreamName,
3236
_logGroupName = logGroupName,
3337
_client = client;
3438

35-
final String? _logStreamName;
39+
final String? _defaultLogStreamName;
3640
final String _logGroupName;
3741
final CloudWatchLogsClient _client;
3842
static final DateFormat _dateFormat = DateFormat('yyyy-MM-dd');
43+
final _createdLogStreams = <String>{};
3944

40-
/// Creates CloudWatch log stream if does not exists and returns
41-
/// the log stream name.
42-
///
43-
/// Throws an [Exception] if fails to create the log stream.
4445
@override
45-
Future<String> get logStream async {
46+
Future<String> get defaultLogStream async {
4647
final logStreamName =
47-
_logStreamName ?? _dateFormat.format(DateTime.timestamp());
48+
_defaultLogStreamName ?? _dateFormat.format(DateTime.timestamp());
49+
if (_createdLogStreams.contains(logStreamName)) {
50+
return logStreamName;
51+
}
52+
await createLogStream(logStreamName);
53+
_createdLogStreams.add(logStreamName);
54+
return logStreamName;
55+
}
56+
57+
@override
58+
Future<void> createLogStream(String logStreamName) async {
4859
try {
4960
await _client
5061
.createLogStream(
@@ -55,10 +66,7 @@ class DefaultCloudWatchLogStreamProvider
5566
)
5667
.result;
5768
} on ResourceAlreadyExistsException {
58-
return logStreamName;
59-
} on Exception {
60-
rethrow;
69+
return;
6170
}
62-
return logStreamName;
6371
}
6472
}

packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/stoppable_timer.dart

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class StoppableTimer {
1616
required Future<void> Function() callback,
1717
required void Function(Object) onError,
1818
}) : _callback = callback,
19+
_onError = onError,
1920
_timer = Timer.periodic(duration, (Timer t) {
2021
callback().catchError((Object e) {
2122
onError(e);
@@ -25,12 +26,15 @@ class StoppableTimer {
2526

2627
/// [Duration] between invocations of the provided callback function.
2728
final Duration duration;
28-
final void Function() _callback;
29+
final Future<void> Function() _callback;
30+
final void Function(Object) _onError;
2931

3032
/// Start the timer.
3133
void start() {
3234
if (_timer.isActive) return;
33-
_timer = Timer.periodic(duration, (Timer t) => _callback());
35+
_timer = Timer.periodic(duration, (Timer t) {
36+
_callback().catchError(_onError);
37+
});
3438
}
3539

3640
/// Stop the timer.

packages/logging_cloudwatch/aws_logging_cloudwatch/test/cloudwatch_logger_plugin_test.dart

Lines changed: 140 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import 'dart:async';
55

66
import 'package:aws_common/aws_common.dart';
77
import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart';
8-
import 'package:aws_logging_cloudwatch/src/queued_item_store/queued_item_store.dart';
98
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
109
import 'package:mocktail/mocktail.dart';
1110
import 'package:test/test.dart';
@@ -132,7 +131,101 @@ void main() {
132131
(_) async => Future<PutLogEventsResponse>.value(PutLogEventsResponse()),
133132
);
134133

135-
when(() => mockCloudWatchLogStreamProvider.logStream)
134+
when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer(
135+
(_) => mockPutLogEventsOperation,
136+
);
137+
138+
when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
139+
.thenAnswer((_) async => Future<String>.value('log stream name'));
140+
141+
when(() => mockQueuedItemStore.addItem(any(), any()))
142+
.thenAnswer((_) async => {});
143+
144+
when(() => mockQueuedItemStore.deleteItems(any()))
145+
.thenAnswer((_) async => {});
146+
147+
when(() => mockQueuedItemStore.getAll()).thenAnswer(
148+
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
149+
);
150+
151+
await expectLater(
152+
plugin.flushLogs(),
153+
completes,
154+
);
155+
156+
verify(
157+
() => mockCloudWatchLogStreamProvider.defaultLogStream,
158+
).called(1);
159+
160+
verifyNever(
161+
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
162+
);
163+
164+
verify(
165+
() => mockCloudWatchLogsClient.putLogEvents(any()),
166+
).called(1);
167+
168+
verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(1);
169+
});
170+
171+
test('does not start a sync if a sync is in progress', () async {
172+
when(
173+
() => mockPutLogEventsOperation.result,
174+
).thenAnswer(
175+
(_) async => Future<PutLogEventsResponse>.value(
176+
PutLogEventsResponse(),
177+
),
178+
);
179+
180+
when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer(
181+
(_) => mockPutLogEventsOperation,
182+
);
183+
184+
when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
185+
.thenAnswer((_) async => Future<String>.value('log stream name'));
186+
187+
when(() => mockQueuedItemStore.addItem(any(), any()))
188+
.thenAnswer((_) async => {});
189+
190+
when(() => mockQueuedItemStore.getAll()).thenAnswer(
191+
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
192+
);
193+
194+
final flushLogs = plugin.flushLogs();
195+
await expectLater(
196+
plugin.flushLogs(),
197+
completes,
198+
);
199+
await flushLogs;
200+
201+
verify(
202+
() => mockCloudWatchLogStreamProvider.defaultLogStream,
203+
).called(1);
204+
205+
verifyNever(
206+
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
207+
);
208+
209+
verify(
210+
() => mockCloudWatchLogsClient.putLogEvents(any()),
211+
).called(1);
212+
213+
verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(1);
214+
});
215+
216+
test('does not delete logs if they are too new', () async {
217+
when(
218+
() => mockPutLogEventsOperation.result,
219+
).thenAnswer(
220+
(_) async => Future<PutLogEventsResponse>.value(
221+
PutLogEventsResponse(
222+
rejectedLogEventsInfo:
223+
RejectedLogEventsInfo(tooNewLogEventStartIndex: 0),
224+
),
225+
),
226+
);
227+
228+
when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
136229
.thenAnswer((_) async => Future<String>.value('log stream name'));
137230

138231
when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer(
@@ -146,11 +239,55 @@ void main() {
146239
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
147240
);
148241

149-
await plugin.handleLogEntry(errorLog);
150242
await expectLater(
151243
plugin.flushLogs(),
152244
completes,
153245
);
246+
247+
verify(
248+
() => mockCloudWatchLogStreamProvider.defaultLogStream,
249+
).called(1);
250+
251+
verifyNever(
252+
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
253+
);
254+
255+
verify(
256+
() => mockCloudWatchLogsClient.putLogEvents(any()),
257+
).called(1);
258+
259+
verifyNever(
260+
() => mockQueuedItemStore.deleteItems(any()),
261+
);
262+
});
263+
264+
test('it calls create log stream on resource not found exception',
265+
() async {
266+
when(() => mockCloudWatchLogsClient.putLogEvents(any()))
267+
.thenThrow(ResourceNotFoundException());
268+
269+
when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
270+
.thenAnswer((_) async => Future<String>.value('log stream name'));
271+
272+
when(() => mockCloudWatchLogStreamProvider.createLogStream(any()))
273+
.thenAnswer((_) async => Future<String>.value('log stream name'));
274+
275+
when(() => mockQueuedItemStore.getAll()).thenAnswer(
276+
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
277+
);
278+
279+
await expectLater(
280+
plugin.flushLogs(),
281+
completes,
282+
);
283+
284+
verify(
285+
() => mockCloudWatchLogStreamProvider.defaultLogStream,
286+
).called(1);
287+
288+
verify(
289+
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
290+
).called(1);
154291
});
155292
});
156293
}

0 commit comments

Comments
 (0)