Skip to content

Commit 56385a6

Browse files
author
Nika Hassani
committed
feat(logging): log stream provider to cache log stream
1 parent 5b102a2 commit 56385a6

File tree

4 files changed

+211
-73
lines changed

4 files changed

+211
-73
lines changed

packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart

Lines changed: 45 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import 'package:meta/meta.dart';
3232
// The maximum number of log events in a batch is 10,000.
3333

3434
const int _maxNumberOfLogEventsInBatch = 10000;
35-
const int _maxLogEventsTimeSpanInBatch = 24 * 3600;
35+
const int _maxLogEventsTimeSpanInBatch = 24 * 3600 * 1000;
3636
const int _maxLogEventsBatchSize = 1048576;
3737
const int _baseBufferSize = 26;
3838
const int _maxLogEventSize = 256000;
@@ -138,51 +138,26 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
138138

139139
Future<void> _startSyncingIfNotInProgress() async {
140140
Future<void> startSyncing() async {
141-
String logStream;
142-
try {
143-
logStream = await _logStreamProvider.logStream;
144-
} on Exception catch (e) {
145-
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
146-
await _logStore.clear();
147-
logger.warn(
148-
'Reached local store max size of: '
149-
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted from '
150-
'local store.',
151-
);
152-
}
153-
logger.error('Failed to create CloudWatch log stream', e);
154-
return;
155-
}
156-
157141
final batcheStream = _getLogBatchesToSync();
158142
await for (final (logs, events) in batcheStream) {
159-
try {
160-
final response = await _sendToCloudWatch(events, logStream);
161-
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex !=
162-
null) {
163-
break;
164-
}
165-
await _logStore.deleteItems(logs);
166-
} on Exception catch (e) {
167-
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
168-
await _logStore.deleteItems(logs);
169-
logger.warn(
170-
'Reached local store max size of: '
171-
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted '
172-
'from local store.',
173-
);
174-
}
175-
logger.error('Failed to sync batched logs to CloudWatch', e);
143+
final response = await _sendToCloudWatch(events);
144+
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) {
145+
throw _TooNewLogEventException(
146+
'logs can not be more than 2 hours in the future',
147+
);
176148
}
149+
await _logStore.deleteItems(logs);
177150
}
178151
}
179152

180153
if (!_syncing) {
154+
// TODO(nikahsn): disable log rotation.
181155
_syncing = true;
182156
try {
183157
await startSyncing();
184158
} on Exception catch (e) {
185159
logger.error('Failed to sync logs to CloudWatch.', e);
160+
// TODO(nikahsn): enable log rotation if the log store is full
186161
} finally {
187162
_syncing = false;
188163
}
@@ -200,43 +175,49 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
200175

201176
Future<PutLogEventsResponse> _sendToCloudWatch(
202177
List<InputLogEvent> logEvents,
203-
String logStreamName,
204178
) async {
179+
final logStreamName = await _logStreamProvider.logStream;
205180
final request = PutLogEventsRequest(
206181
logGroupName: _pluginConfig.logGroupName,
207182
logStreamName: logStreamName,
208183
logEvents: logEvents,
209184
);
210-
return _client.putLogEvents(request).result;
185+
try {
186+
return _client.putLogEvents(request).result;
187+
} on ResourceNotFoundException {
188+
await _logStreamProvider.createLogStream(logStreamName);
189+
return _client.putLogEvents(request).result;
190+
} on Exception {
191+
rethrow;
192+
}
211193
}
212194

213195
Stream<_LogBatch> _getLogBatchesToSync() async* {
214196
final queuedLogs = (await _logStore.getAll()).toList();
215-
if (queuedLogs.isEmpty) {
216-
yield ([], []);
217-
}
218-
final logEvents = <InputLogEvent>[];
219-
final logQueues = <QueuedItem>[];
220-
var totalByteSize = 0;
221-
222-
for (final currentLog in queuedLogs) {
223-
final currentLogEvent = currentLog.toInputLogEvent();
224-
final size = currentLogEvent.message.length + _baseBufferSize;
225-
if (totalByteSize + size >= _maxLogEventsBatchSize ||
226-
logEvents.length >= _maxNumberOfLogEventsInBatch ||
227-
(logEvents.length > 1 &&
228-
currentLogEvent.timestamp - logEvents.first.timestamp >=
229-
_maxLogEventsTimeSpanInBatch)) {
230-
yield (logQueues, logEvents);
231-
totalByteSize = 0;
232-
logEvents.clear();
233-
logQueues.clear();
197+
if (queuedLogs.isNotEmpty) {
198+
final logEvents = <InputLogEvent>[];
199+
final logQueues = <QueuedItem>[];
200+
var totalByteSize = 0;
201+
202+
for (final currentLog in queuedLogs) {
203+
final currentLogEvent = currentLog.toInputLogEvent();
204+
final size = currentLogEvent.message.length + _baseBufferSize;
205+
if (totalByteSize + size >= _maxLogEventsBatchSize ||
206+
logEvents.length >= _maxNumberOfLogEventsInBatch ||
207+
(logEvents.length > 1 &&
208+
currentLogEvent.timestamp - logEvents.first.timestamp >=
209+
_maxLogEventsTimeSpanInBatch)) {
210+
yield (logQueues, logEvents);
211+
totalByteSize = 0;
212+
logEvents.clear();
213+
logQueues.clear();
214+
}
215+
totalByteSize += size;
216+
logEvents.add(currentLogEvent);
217+
logQueues.add(currentLog);
234218
}
235-
totalByteSize += size;
236-
logEvents.add(currentLogEvent);
237-
logQueues.add(currentLog);
219+
yield (logQueues, logEvents);
238220
}
239-
yield (logQueues, logEvents);
240221
}
241222

242223
/// Whether a [logEntry] should be logged by this plugin.
@@ -314,3 +295,8 @@ extension on LogEntry {
314295
);
315296
}
316297
}
298+
299+
class _TooNewLogEventException implements Exception {
300+
_TooNewLogEventException(this.message);
301+
final String message;
302+
}

packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/log_stream_provider.dart

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
import 'dart:async';
5+
46
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
57
import 'package:intl/intl.dart';
68

@@ -9,10 +11,12 @@ import 'package:intl/intl.dart';
911
/// [CloudWatchLogStreamProvider]
1012
/// {@endtemplate}
1113
abstract class CloudWatchLogStreamProvider {
12-
/// Returns CloudWatch log stream name to use for sending logs to CloudWatch.
13-
///
14-
/// It creates the log stream if it does not exists.
15-
Future<String> get logStream;
14+
/// Returns log stream name from cache. if cache is missing it
15+
/// calls [createLogStream] and return the log stream name.
16+
FutureOr<String> get logStream;
17+
18+
/// Creates the log stream and add it to the cache.
19+
Future<void> createLogStream(String logStreamName);
1620
}
1721

1822
/// {@template aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
@@ -36,15 +40,22 @@ class DefaultCloudWatchLogStreamProvider
3640
final String _logGroupName;
3741
final CloudWatchLogsClient _client;
3842
static final DateFormat _dateFormat = DateFormat('yyyy-MM-dd');
43+
final _cachedLogStreams = <String>{};
3944

40-
/// Creates CloudWatch log stream if does not exists and returns
41-
/// the log stream name.
42-
///
43-
/// Throws an [Exception] if fails to create the log stream.
4445
@override
45-
Future<String> get logStream async {
46+
FutureOr<String> get logStream async {
4647
final logStreamName =
4748
_logStreamName ?? _dateFormat.format(DateTime.timestamp());
49+
if (_cachedLogStreams.contains(logStreamName)) {
50+
return logStreamName;
51+
}
52+
await createLogStream(logStreamName);
53+
_cachedLogStreams.add(logStreamName);
54+
return logStreamName;
55+
}
56+
57+
@override
58+
Future<void> createLogStream(String logStreamName) async {
4859
try {
4960
await _client
5061
.createLogStream(
@@ -55,10 +66,9 @@ class DefaultCloudWatchLogStreamProvider
5566
)
5667
.result;
5768
} on ResourceAlreadyExistsException {
58-
return logStreamName;
69+
return;
5970
} on Exception {
6071
rethrow;
6172
}
62-
return logStreamName;
6373
}
6474
}

packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/stoppable_timer.dart

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class StoppableTimer {
1616
required Future<void> Function() callback,
1717
required void Function(Object) onError,
1818
}) : _callback = callback,
19+
_onError = onError,
1920
_timer = Timer.periodic(duration, (Timer t) {
2021
callback().catchError((Object e) {
2122
onError(e);
@@ -25,12 +26,15 @@ class StoppableTimer {
2526

2627
/// [Duration] between invocations of the provided callback function.
2728
final Duration duration;
28-
final void Function() _callback;
29+
final Future<void> Function() _callback;
30+
final void Function(Object) _onError;
2931

3032
/// Start the timer.
3133
void start() {
3234
if (_timer.isActive) return;
33-
_timer = Timer.periodic(duration, (Timer t) => _callback());
35+
_timer = Timer.periodic(duration, (Timer t) {
36+
_callback().catchError(_onError);
37+
});
3438
}
3539

3640
/// Stop the timer.

0 commit comments

Comments
 (0)