Skip to content

feat(logging): log stream provider to cache log stream #3646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import 'dart:math';

import 'package:aws_common/aws_common.dart';
import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart';
import 'package:aws_logging_cloudwatch/src/queued_item_store/in_memory_queued_item_store.dart';
import 'package:aws_logging_cloudwatch/src/queued_item_store/queued_item_store.dart';
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
import 'package:aws_logging_cloudwatch/src/stoppable_timer.dart';
import 'package:fixnum/fixnum.dart';
Expand All @@ -32,10 +30,11 @@ import 'package:meta/meta.dart';
// The maximum number of log events in a batch is 10,000.

const int _maxNumberOfLogEventsInBatch = 10000;
const int _maxLogEventsTimeSpanInBatch = 24 * 3600;
const int _maxLogEventsBatchSize = 1048576;
const int _baseBufferSize = 26;
const int _maxLogEventSize = 256000;
final int _maxLogEventsTimeSpanInBatch =
const Duration(hours: 24).inMilliseconds;

typedef _LogBatch = (List<QueuedItem> logQueues, List<InputLogEvent> logEvents);

Expand Down Expand Up @@ -138,51 +137,28 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin

Future<void> _startSyncingIfNotInProgress() async {
Future<void> startSyncing() async {
String logStream;
try {
logStream = await _logStreamProvider.logStream;
} on Exception catch (e) {
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
await _logStore.clear();
logger.warn(
'Reached local store max size of: '
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted from '
'local store.',
);
}
logger.error('Failed to create CloudWatch log stream', e);
return;
}

final batcheStream = _getLogBatchesToSync();
await for (final (logs, events) in batcheStream) {
try {
final response = await _sendToCloudWatch(events, logStream);
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex !=
null) {
break;
}
await _logStore.deleteItems(logs);
} on Exception catch (e) {
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {
await _logStore.deleteItems(logs);
logger.warn(
'Reached local store max size of: '
'${_pluginConfig.localStoreMaxSizeInMB}.Hence logs are deleted '
'from local store.',
);
}
logger.error('Failed to sync batched logs to CloudWatch', e);
final batchStream = _getLogBatchesToSync();
await for (final (logs, events) in batchStream) {
final response = await _sendToCloudWatch(events);
// TODO(nikahsn): handle tooOldLogEventEndIndex
// and expiredLogEventEndIndex.
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two other properties to this, expiredLogEventEndIndex and tooOldLogEventEndIndex, which will need to be handled. I'm guessing later iterations will take into account partial completion as well since currently it is an all-or-nothing approach for a batch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, added a TODO

// TODO(nikahsn): throw and exception to enable log rotation if the
// log store is full.
break;
}
await _logStore.deleteItems(logs);
}
}

if (!_syncing) {
// TODO(nikahsn): disable log rotation.
_syncing = true;
try {
await startSyncing();
} on Exception catch (e) {
logger.error('Failed to sync logs to CloudWatch.', e);
// TODO(nikahsn): enable log rotation if the log store is full
} finally {
_syncing = false;
}
Expand All @@ -200,20 +176,25 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin

Future<PutLogEventsResponse> _sendToCloudWatch(
List<InputLogEvent> logEvents,
String logStreamName,
) async {
final logStreamName = await _logStreamProvider.defaultLogStream;
final request = PutLogEventsRequest(
logGroupName: _pluginConfig.logGroupName,
logStreamName: logStreamName,
logEvents: logEvents,
);
return _client.putLogEvents(request).result;
try {
return await _client.putLogEvents(request).result;
} on ResourceNotFoundException {
await _logStreamProvider.createLogStream(logStreamName);
return _client.putLogEvents(request).result;
}
}

Stream<_LogBatch> _getLogBatchesToSync() async* {
final queuedLogs = (await _logStore.getAll()).toList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be streamed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean the log store to return stream instead of Future or do you mean creating stream from _logStore.getAll()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, _logStore.getAll() can be a Stream, then this method becomes a transformer of batches of log records from the DB to batches which can be synced.

if (queuedLogs.isEmpty) {
yield ([], []);
return;
}
final logEvents = <InputLogEvent>[];
final logQueues = <QueuedItem>[];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import 'dart:async';

import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
import 'package:intl/intl.dart';

Expand All @@ -9,42 +11,51 @@ import 'package:intl/intl.dart';
/// [CloudWatchLogStreamProvider]
/// {@endtemplate}
abstract class CloudWatchLogStreamProvider {
/// Returns CloudWatch log stream name to use for sending logs to CloudWatch.
///
/// It creates the log stream if it does not exists.
Future<String> get logStream;
/// Returns the default log stream name from cache. if cache is missing it
/// calls [createLogStream] and return the log stream name.
FutureOr<String> get defaultLogStream;

/// Creates the log stream and add it to the cache.
Future<void> createLogStream(String logStreamName);
}

/// {@template aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
/// The default implementaion of [CloudWatchLogStreamProvider].
///
/// It uses `logStreamName` if provided otherwise uses `yyyy-MM-dd` date format
/// of UTC time now for the `logStreamName`.
/// It uses `defaultLogStreamName` if provided otherwise uses `yyyy-MM-dd`
/// date format of UTC time now for the `defaultLogStreamName`.
/// {@endtemplate}
class DefaultCloudWatchLogStreamProvider
implements CloudWatchLogStreamProvider {
/// {@macro aws_logging_cloudwatch.default_cloudwatch_logstream_provider}
DefaultCloudWatchLogStreamProvider({
required CloudWatchLogsClient client,
required String logGroupName,
String? logStreamName,
}) : _logStreamName = logStreamName,
String? defaultLogStreamName,
}) : _defaultLogStreamName = defaultLogStreamName,
_logGroupName = logGroupName,
_client = client;

final String? _logStreamName;
final String? _defaultLogStreamName;
final String _logGroupName;
final CloudWatchLogsClient _client;
static final DateFormat _dateFormat = DateFormat('yyyy-MM-dd');
final _createdLogStreams = <String>{};

/// Creates CloudWatch log stream if does not exists and returns
/// the log stream name.
///
/// Throws an [Exception] if fails to create the log stream.
@override
Future<String> get logStream async {
Future<String> get defaultLogStream async {
final logStreamName =
_logStreamName ?? _dateFormat.format(DateTime.timestamp());
_defaultLogStreamName ?? _dateFormat.format(DateTime.timestamp());
if (_createdLogStreams.contains(logStreamName)) {
return logStreamName;
}
await createLogStream(logStreamName);
_createdLogStreams.add(logStreamName);
return logStreamName;
}

@override
Future<void> createLogStream(String logStreamName) async {
try {
await _client
.createLogStream(
Expand All @@ -55,10 +66,7 @@ class DefaultCloudWatchLogStreamProvider
)
.result;
} on ResourceAlreadyExistsException {
return logStreamName;
} on Exception {
rethrow;
return;
}
return logStreamName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class StoppableTimer {
required Future<void> Function() callback,
required void Function(Object) onError,
}) : _callback = callback,
_onError = onError,
_timer = Timer.periodic(duration, (Timer t) {
callback().catchError((Object e) {
onError(e);
Expand All @@ -25,12 +26,15 @@ class StoppableTimer {

/// [Duration] between invocations of the provided callback function.
final Duration duration;
final void Function() _callback;
final Future<void> Function() _callback;
final void Function(Object) _onError;

/// Start the timer.
void start() {
if (_timer.isActive) return;
_timer = Timer.periodic(duration, (Timer t) => _callback());
_timer = Timer.periodic(duration, (Timer t) {
_callback().catchError(_onError);
});
}

/// Stop the timer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import 'dart:async';

import 'package:aws_common/aws_common.dart';
import 'package:aws_logging_cloudwatch/aws_logging_cloudwatch.dart';
import 'package:aws_logging_cloudwatch/src/queued_item_store/queued_item_store.dart';
import 'package:aws_logging_cloudwatch/src/sdk/cloud_watch_logs.dart';
import 'package:mocktail/mocktail.dart';
import 'package:test/test.dart';
Expand Down Expand Up @@ -132,7 +131,101 @@ void main() {
(_) async => Future<PutLogEventsResponse>.value(PutLogEventsResponse()),
);

when(() => mockCloudWatchLogStreamProvider.logStream)
when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer(
(_) => mockPutLogEventsOperation,
);

when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockQueuedItemStore.addItem(any(), any()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any benefit to mocking over using the in-mem store? I would generally mock a single layer per test, in this case just the CloudWatch SDK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mocking dependencies make the tests faster, reliable and can isolate the code under test from its dependencies.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then there should be tests for the log stream provider as well since that's currently not being tested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, will add them in a seperate PR.

.thenAnswer((_) async => {});

when(() => mockQueuedItemStore.deleteItems(any()))
.thenAnswer((_) async => {});

when(() => mockQueuedItemStore.getAll()).thenAnswer(
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
);

await expectLater(
plugin.flushLogs(),
completes,
);

verify(
() => mockCloudWatchLogStreamProvider.defaultLogStream,
).called(1);

verifyNever(
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
);

verify(
() => mockCloudWatchLogsClient.putLogEvents(any()),
).called(1);

verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(1);
});

test('does not start a sync if a sync is in progress', () async {
when(
() => mockPutLogEventsOperation.result,
).thenAnswer(
(_) async => Future<PutLogEventsResponse>.value(
PutLogEventsResponse(),
),
);

when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer(
(_) => mockPutLogEventsOperation,
);

when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockQueuedItemStore.addItem(any(), any()))
.thenAnswer((_) async => {});

when(() => mockQueuedItemStore.getAll()).thenAnswer(
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
);

final flushLogs = plugin.flushLogs();
await expectLater(
plugin.flushLogs(),
completes,
);
await flushLogs;

verify(
() => mockCloudWatchLogStreamProvider.defaultLogStream,
).called(1);

verifyNever(
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
);

verify(
() => mockCloudWatchLogsClient.putLogEvents(any()),
).called(1);

verify(() => mockQueuedItemStore.deleteItems(queuedItems)).called(1);
});

test('does not delete logs if they are too new', () async {
when(
() => mockPutLogEventsOperation.result,
).thenAnswer(
(_) async => Future<PutLogEventsResponse>.value(
PutLogEventsResponse(
rejectedLogEventsInfo:
RejectedLogEventsInfo(tooNewLogEventStartIndex: 0),
),
),
);

when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockCloudWatchLogsClient.putLogEvents(any())).thenAnswer(
Expand All @@ -146,11 +239,55 @@ void main() {
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
);

await plugin.handleLogEntry(errorLog);
await expectLater(
plugin.flushLogs(),
completes,
);

verify(
() => mockCloudWatchLogStreamProvider.defaultLogStream,
).called(1);

verifyNever(
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
);

verify(
() => mockCloudWatchLogsClient.putLogEvents(any()),
).called(1);

verifyNever(
() => mockQueuedItemStore.deleteItems(any()),
);
});

test('it calls create log stream on resource not found exception',
() async {
when(() => mockCloudWatchLogsClient.putLogEvents(any()))
.thenThrow(ResourceNotFoundException());

when(() => mockCloudWatchLogStreamProvider.defaultLogStream)
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockCloudWatchLogStreamProvider.createLogStream(any()))
.thenAnswer((_) async => Future<String>.value('log stream name'));

when(() => mockQueuedItemStore.getAll()).thenAnswer(
(_) async => Future<Iterable<QueuedItem>>.value(queuedItems),
);

await expectLater(
plugin.flushLogs(),
completes,
);

verify(
() => mockCloudWatchLogStreamProvider.defaultLogStream,
).called(1);

verify(
() => mockCloudWatchLogStreamProvider.createLogStream(any()),
).called(1);
});
});
}
Loading