-
Notifications
You must be signed in to change notification settings - Fork 3
Implement event sourced saga persistence. #52
Conversation
…ceID()` and `InitialState()`.
… implementations.
…he saga subsystem. - Renamed `saga.Repository` to `InstanceRepository` - Renamed `eventsourcing.Repository` to `InstanceRepository` - Collapsed the snapshotting/non-snapshotting implementations of saga.InstanceRepository` into a single `StandardInstanceReposiotry` struct
This is in preparation for allowing use of evented data instances for CRUD sagas.
Also remove the `ctx` parameter and `error` return value, as creating a new data instance should always by a simple construction operation.
Codecov Report
@@ Coverage Diff @@
## master #52 +/- ##
===========================================
- Coverage 56.52% 45.74% -10.79%
===========================================
Files 35 38 +3
Lines 1118 1388 +270
===========================================
+ Hits 632 635 +3
- Misses 476 743 +267
Partials 10 10
Continue to review full report at Codecov.
|
src/ax/persistence/messagestore.go
Outdated
) error | ||
|
||
// OpenStream opens a stream of messages for reading from a specific offset. | ||
OpenStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document possible error conditions such as opening from an out of range offset, or stream not existing.
src/ax/saga/crud/repository.go
Outdated
// in its key set. | ||
// | ||
// sn is the saga name. k is the message mapping key. | ||
// LoadSagaInstance fetches a saga instance by its ID. | ||
// | ||
// If a saga instance is found; ok is true, otherwise it is false. A |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ok
return value has been removed.
return "saga:" + id.Get() | ||
} | ||
|
||
// appendEvents appends the events in envs to the message stream for the given |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems ok, but should it be talking about the "message stream" or "message store".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is ok, internally it calls ms.AppendEvents()
, the docs of which are:
// AppendMessages appends one or more messages to a named stream.
src/ax/saga/instance.go
Outdated
// Data is the application-defined data associated with this instance. | ||
Data Data | ||
|
||
// Revision is version of the instance that the data represents. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing the ?
Revision is the version
src/ax/saga/mapping.go
Outdated
ctx context.Context, | ||
tx persistence.Tx, | ||
sn, k string, | ||
) (InstanceID, bool, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming bool
is ok. Should this be documented?
|
||
// Save persists changes to the instance. | ||
// It returns true if any changes have occurred. | ||
Save(ctx context.Context) (bool, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does bool
need to be documented as ok ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case it's ok just to say "it returns true" as that's the primary piece of data it returns.
src/axmysql/messagestore.go
Outdated
func (MessageStore) insertStream( | ||
ctx context.Context, | ||
tx *sql.Tx, | ||
s string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this use more descriptive naming for self documenting?
src/axmysql/messagestore.go
Outdated
func (MessageStore) incrStreamOffset( | ||
ctx context.Context, | ||
tx *sql.Tx, | ||
s string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this use more descriptive naming for self documenting?
s string, | ||
o uint64, | ||
n uint64, | ||
) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this int64
return value be documented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's much need when there's only a single non-error return value. The function description does document what it returns -- it's just not named.
src/axmysql/messagestore.sql
Outdated
-- | ||
|
||
-- | ||
-- messagestore_global stores the next global message offset across all streams. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean messagestore_offset
?
messagestore_global
src/axmysql/messagestore.sql
Outdated
global_offset BIGINT UNSIGNED NOT NULL, | ||
insert_time TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), | ||
|
||
stream_id BIGINT UNSIGNED NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column alignment?
src/axmysql/sagamapper.go
Outdated
ctx context.Context, | ||
ptx persistence.Tx, | ||
sn, k string, | ||
) (saga.InstanceID, bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming bool
is ok. Should this be documented?
src/axmysql/sagarepository.go
Outdated
// LoadSagaInstance fetches a saga instance that has a specific mapping key | ||
// in its key set. | ||
// | ||
// sn is the saga name. k is the message mapping key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line should not exist? There is no sn
or k
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this whole doc block should be reviewed? There is no ok to return, and it looks up by id, not key sets?
src/axmysql/sagarepository.go
Outdated
} | ||
|
||
// SaveSagaInstance persists a saga instance and its associated mapping | ||
// table to the store as part of tx. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ptx instead of tx ?
src/axmysql/sagarepository.go
Outdated
// SaveSagaInstance persists a saga instance and its associated mapping | ||
// table to the store as part of tx. | ||
// | ||
// It returns an error if the saga instance has been modified since it was |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the whole idea to save modifications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's poorly worded, it should say "modified by another process" or something like that.
src/axmysql/sagarepository.sql
Outdated
@@ -0,0 +1,16 @@ | |||
-- | |||
-- This file defines the SQL schema used by SagaInstanceRepository. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SagaInstanceRepository
does not exist?
src/axmysql/sagarepository.sql
Outdated
-- | ||
|
||
-- | ||
-- saga_data stores saga.Data instances for each instance of a CRUD saga. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean saga_instance
?
saga_data
// SaveSagaSnapshot saves a snapshot to the store. | ||
func (SnapshotRepository) SaveSagaSnapshot( | ||
ctx context.Context, | ||
tx persistence.Tx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You called this ptx
in the LoadSagaSnapshot()
method. Rename one for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, could go either way. I did this because in the implementation ptx
is the one that you use the least. They don't have to match really.
src/axmysql/sagasnapshot.sql
Outdated
@@ -0,0 +1,16 @@ | |||
-- | |||
-- This file defines the SQL schema used by SagaSnapshotRepository. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this type name change, I can't find it? SagaSnapshotRepository
src/axmysql/sagasnapshot.sql
Outdated
-- | ||
|
||
-- | ||
-- saga_snapshot stores snapshots saga.Data instances for eventsourced sagas. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non plural snapshot ?
stores snapshots saga.Data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done review. Just the comments mentioned.
ctx context.Context, | ||
ptx persistence.Tx, | ||
id saga.InstanceID, | ||
) (saga.Instance, bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming bool is ok. Should this be documented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skipped this one before, it was the last one. It seemed like the pattern was to not document those, but you did end up documenting those previously mentioned ones.
Fixes #46