Skip to content

Commit e76bebb

Browse files
author
Nika Hassani
committed
feat(logging): log stream provider to cache log stream
1 parent 5b102a2 commit e76bebb

File tree

5 files changed

+211
-77
lines changed

5 files changed

+211
-77
lines changed

packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/cloudwatch_logger_plugin.dart

Lines changed: 45 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import 'dart:math';
66

77
import 'package:aws_common/aws_common.dart';
88
import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart';
9-
import 'package:aws_logging_cloudwatch/src/queued_item_store/in_memory_queued_item_store.dart';
10-
import 'package:aws_logging_cloudwatch/src/queued_item_store/queued_item_store.dart';
119
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
1210
import 'package:aws_logging_cloudwatch/src/stoppable_timer.dart';
1311
import 'package:fixnum/fixnum.dart';
@@ -32,7 +30,7 @@ import 'package:meta/meta.dart';
3230
// The maximum number of log events in a batch is 10,000.
3331

3432
const int _maxNumberOfLogEventsInBatch = 10000;
35-
const int _maxLogEventsTimeSpanInBatch = 24 * 3600;
33+
const int _maxLogEventsTimeSpanInBatch = 24 * 3600 * 1000;
3634
const int _maxLogEventsBatchSize = 1048576;
3735
const int _baseBufferSize = 26;
3836
const int _maxLogEventSize = 256000;
@@ -138,51 +136,26 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
138136

139137
Future<void> _startSyncingIfNotInProgress() async {
140138
Future<void> startSyncing() async {
141-
String logStream;
142-
try {
143-
logStream = await _logStreamProvider.logStream;
144-
} on Exception catch (e) {
145-
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
146-
await _logStore.clear();
147-
logger.warn(
148-
'Reached local store max size of: '
149-
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted from '
150-
'local store.',
151-
);
152-
}
153-
logger.error('Failed to create CloudWatch log stream', e);
154-
return;
155-
}
156-
157139
final batcheStream = _getLogBatchesToSync();
158140
await for (final (logs, events) in batcheStream) {
159-
try {
160-
final response = await _sendToCloudWatch(events, logStream);
161-
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex !=
162-
null) {
163-
break;
164-
}
165-
await _logStore.deleteItems(logs);
166-
} on Exception catch (e) {
167-
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
168-
await _logStore.deleteItems(logs);
169-
logger.warn(
170-
'Reached local store max size of: '
171-
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted '
172-
'from local store.',
173-
);
174-
}
175-
logger.error('Failed to sync batched logs to CloudWatch', e);
141+
final response = await _sendToCloudWatch(events);
142+
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) {
143+
throw _TooNewLogEventException(
144+
'logs can not be more than 2 hours in the future',
145+
);
176146
}
147+
await _logStore.deleteItems(logs);
177148
}
178149
}
179150

180151
if (!_syncing) {
152+
// TODO(nikahsn): disable log rotation.
181153
_syncing = true;
182154
try {
183155
await startSyncing();
184156
} on Exception catch (e) {
185157
logger.error('Failed to sync logs to CloudWatch.', e);
158+
// TODO(nikahsn): enable log rotation if the log store is full
186159
} finally {
187160
_syncing = false;
188161
}
@@ -200,43 +173,49 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
200173

201174
Future<PutLogEventsResponse> _sendToCloudWatch(
202175
List<InputLogEvent> logEvents,
203-
String logStreamName,
204176
) async {
177+
final logStreamName = await _logStreamProvider.logStream;
205178
final request = PutLogEventsRequest(
206179
logGroupName: _pluginConfig.logGroupName,
207180
logStreamName: logStreamName,
208181
logEvents: logEvents,
209182
);
210-
return _client.putLogEvents(request).result;
183+
try {
184+
return _client.putLogEvents(request).result;
185+
} on ResourceNotFoundException {
186+
await _logStreamProvider.createLogStream(logStreamName);
187+
return _client.putLogEvents(request).result;
188+
} on Exception {
189+
rethrow;
190+
}
211191
}
212192

213193
Stream<_LogBatch> _getLogBatchesToSync() async* {
214194
final queuedLogs = (await _logStore.getAll()).toList();
215-
if (queuedLogs.isEmpty) {
216-
yield ([], []);
217-
}
218-
final logEvents = <InputLogEvent>[];
219-
final logQueues = <QueuedItem>[];
220-
var totalByteSize = 0;
221-
222-
for (final currentLog in queuedLogs) {
223-
final currentLogEvent = currentLog.toInputLogEvent();
224-
final size = currentLogEvent.message.length + _baseBufferSize;
225-
if (totalByteSize + size >= _maxLogEventsBatchSize ||
226-
logEvents.length >= _maxNumberOfLogEventsInBatch ||
227-
(logEvents.length > 1 &&
228-
currentLogEvent.timestamp - logEvents.first.timestamp >=
229-
_maxLogEventsTimeSpanInBatch)) {
230-
yield (logQueues, logEvents);
231-
totalByteSize = 0;
232-
logEvents.clear();
233-
logQueues.clear();
195+
if (queuedLogs.isNotEmpty) {
196+
final logEvents = <InputLogEvent>[];
197+
final logQueues = <QueuedItem>[];
198+
var totalByteSize = 0;
199+
200+
for (final currentLog in queuedLogs) {
201+
final currentLogEvent = currentLog.toInputLogEvent();
202+
final size = currentLogEvent.message.length + _baseBufferSize;
203+
if (totalByteSize + size >= _maxLogEventsBatchSize ||
204+
logEvents.length >= _maxNumberOfLogEventsInBatch ||
205+
(logEvents.length > 1 &&
206+
currentLogEvent.timestamp - logEvents.first.timestamp >=
207+
_maxLogEventsTimeSpanInBatch)) {
208+
yield (logQueues, logEvents);
209+
totalByteSize = 0;
210+
logEvents.clear();
211+
logQueues.clear();
212+
}
213+
totalByteSize += size;
214+
logEvents.add(currentLogEvent);
215+
logQueues.add(currentLog);
234216
}
235-
totalByteSize += size;
236-
logEvents.add(currentLogEvent);
237-
logQueues.add(currentLog);
217+
yield (logQueues, logEvents);
238218
}
239-
yield (logQueues, logEvents);
240219
}
241220

242221
/// Whether a [logEntry] should be logged by this plugin.
@@ -314,3 +293,8 @@ extension on LogEntry {
314293
);
315294
}
316295
}
296+
297+
class _TooNewLogEventException implements Exception {
298+
_TooNewLogEventException(this.message);
299+
final String message;
300+
}

packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/log_stream_provider.dart

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
import 'dart:async';
5+
46
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
57
import 'package:intl/intl.dart';
68

@@ -9,10 +11,12 @@ import 'package:intl/intl.dart';
911
/// [CloudWatchLogStreamProvider]
1012
/// {@endtemplate}
1113
abstract class CloudWatchLogStreamProvider {
12-
/// Returns CloudWatch log stream name to use for sending logs to CloudWatch.
13-
///
14-
/// It creates the log stream if it does not exists.
15-
Future<String> get logStream;
14+
/// Returns log stream name from cache. if cache is missing it
15+
/// calls [createLogStream] and return the log stream name.
16+
FutureOr<String> get logStream;
17+
18+
/// Creates the log stream and add it to the cache.
19+
Future<void> createLogStream(String logStreamName);
1620
}
1721

1822
/// {@template aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
@@ -36,15 +40,22 @@ class DefaultCloudWatchLogStreamProvider
3640
final String _logGroupName;
3741
final CloudWatchLogsClient _client;
3842
static final DateFormat _dateFormat = DateFormat('yyyy-MM-dd');
43+
final _cachedLogStreams = <String>{};
3944

40-
/// Creates CloudWatch log stream if does not exists and returns
41-
/// the log stream name.
42-
///
43-
/// Throws an [Exception] if fails to create the log stream.
4445
@override
45-
Future<String> get logStream async {
46+
FutureOr<String> get logStream async {
4647
final logStreamName =
4748
_logStreamName ?? _dateFormat.format(DateTime.timestamp());
49+
if (_cachedLogStreams.contains(logStreamName)) {
50+
return logStreamName;
51+
}
52+
await createLogStream(logStreamName);
53+
_cachedLogStreams.add(logStreamName);
54+
return logStreamName;
55+
}
56+
57+
@override
58+
Future<void> createLogStream(String logStreamName) async {
4859
try {
4960
await _client
5061
.createLogStream(
@@ -55,10 +66,9 @@ class DefaultCloudWatchLogStreamProvider
5566
)
5667
.result;
5768
} on ResourceAlreadyExistsException {
58-
return logStreamName;
69+
return;
5970
} on Exception {
6071
rethrow;
6172
}
62-
return logStreamName;
6373
}
6474
}

packages/logging_cloudwatch/aws_logging_cloudwatch/lib/src/stoppable_timer.dart

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class StoppableTimer {
1616
required Future<void> Function() callback,
1717
required void Function(Object) onError,
1818
}) : _callback = callback,
19+
_onError = onError,
1920
_timer = Timer.periodic(duration, (Timer t) {
2021
callback().catchError((Object e) {
2122
onError(e);
@@ -25,12 +26,15 @@ class StoppableTimer {
2526

2627
/// [Duration] between invocations of the provided callback function.
2728
final Duration duration;
28-
final void Function() _callback;
29+
final Future<void> Function() _callback;
30+
final void Function(Object) _onError;
2931

3032
/// Start the timer.
3133
void start() {
3234
if (_timer.isActive) return;
33-
_timer = Timer.periodic(duration, (Timer t) => _callback());
35+
_timer = Timer.periodic(duration, (Timer t) {
36+
_callback().catchError(_onError);
37+
});
3438
}
3539

3640
/// Stop the timer.

0 commit comments

Comments
 (0)