Skip to content
This repository was archived by the owner on Mar 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
61a1d60
Split `Saga.NewInstance()` into two separate methods, `GenerateInstan…
jmalloc May 22, 2018
becd855
Seperate mapping abstraction from `Repository` into new `Mapper` int…
jmalloc May 23, 2018
b7f6576
Implement eventsourced sagas.
jmalloc May 23, 2018
ae3a65e
Refactor event store into generic message store.
jmalloc May 23, 2018
ef1ced5
Rename `saga_instance` table to `saga_data` and tailor to use by CRUD…
jmalloc May 24, 2018
2735d9c
Move outbox schema into separate .sql file for consistency with other…
jmalloc May 24, 2018
c86d86c
Add missing `rows.Close()`.
jmalloc May 24, 2018
8bfddc0
Implement SQL-based saga snapshots.
jmalloc May 24, 2018
aa82eb9
Improve clarity between the different kinds of repositories used by t…
jmalloc May 24, 2018
b08c3dc
Move `eventsourcing.Data` to `saga.EventedData`
jmalloc May 24, 2018
cce8e11
Rename `saga.InitialState()` to `NewData()`
jmalloc May 24, 2018
d2ef049
Move `eventsourcing.Sender` to `saga` package.
jmalloc May 24, 2018
21fdf7d
Call `EventedData.ApplyEvents()` for CRUD sagas.
jmalloc May 24, 2018
9a809ad
Rename `saga.MessageHandler.findInstance()` to `loadInstance()`.
jmalloc May 24, 2018
3008214
Refactor sagas to isolate the key differences between CRUD and ES sag…
jmalloc May 25, 2018
3fb5140
Document error condition for `MessageStore.OpenStream()`.
jmalloc May 25, 2018
18aa9bb
Remove documentation about absent `ok` return value.
jmalloc May 25, 2018
e213796
Improve documentation around `Instance.Revision`.
jmalloc May 25, 2018
98b5c43
Name and document `ok` variable.
jmalloc May 25, 2018
66bf696
Document `s` parameter on internal functions.
jmalloc May 25, 2018
0afddbe
Fix table name in documentation
jmalloc May 25, 2018
dd7f6e3
Fix mixed tabs/spaces.
jmalloc May 25, 2018
43baa68
Used name return values and document `ok`.
jmalloc May 25, 2018
16749b2
Fix documentation of methods on `axmysql.SagaRepository`.
jmalloc May 25, 2018
e58e963
Clarify optimistic-locking behavior of `SagaRepository`.
jmalloc May 25, 2018
88d2451
Correct outdated struct name.
jmalloc May 25, 2018
709a142
Fix table name in documentation.
jmalloc May 25, 2018
7cb5463
Correct outdated struct name.
jmalloc May 25, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions examples/banking/account/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,26 @@ import (

// SagaDescription returns a human-readable description of the saga instance.
func (a *Account) SagaDescription() string {
return fmt.Sprintf("account %s", ident.Format(a.AccountId))
return fmt.Sprintf(
"account %s for %s, balance of %d",
ident.Format(a.AccountId),
a.Name,
a.Balance,
)
}

// ApplyEvent updates the data to reflect the fact that ev has occurred.
func (a *Account) ApplyEvent(env ax.Envelope) {
switch ev := env.Message.(type) {
case *messages.AccountOpened:
a.AccountId = ev.AccountId
a.Name = ev.Name
a.IsOpen = true
case *messages.AccountCredited:
a.Balance += ev.Cents
case *messages.AccountDebited:
a.Balance -= ev.Cents
}
}

// AggregateRoot is a saga that implements the Account aggregate.
Expand All @@ -35,11 +54,13 @@ func (aggregateRoot) MessageTypes() (ax.MessageTypeSet, ax.MessageTypeSet) {
)
}

func (aggregateRoot) NewInstance(ctx context.Context, env ax.Envelope) (saga.InstanceID, saga.Data, error) {
var id saga.InstanceID
id.MustParse(env.Message.(*messages.OpenAccount).AccountId)
func (aggregateRoot) GenerateInstanceID(ctx context.Context, env ax.Envelope) (id saga.InstanceID, err error) {
err = id.Parse(env.Message.(*messages.OpenAccount).AccountId)
return
}

return id, &Account{}, nil
func (aggregateRoot) NewData() saga.Data {
return &Account{}
}

func (aggregateRoot) MappingKeyForMessage(ctx context.Context, env ax.Envelope) (string, error) {
Expand All @@ -64,40 +85,32 @@ func (aggregateRoot) HandleMessage(
s ax.Sender,
env ax.Envelope,
i saga.Instance,
) error {
) (err error) {
acct := i.Data.(*Account)

switch m := env.Message.(type) {
case *messages.OpenAccount:
if acct.IsOpen {
return nil
return
}

acct.IsOpen = true
acct.AccountId = m.AccountId
acct.Name = m.Name

return s.PublishEvent(ctx, &messages.AccountOpened{
_, err = s.PublishEvent(ctx, &messages.AccountOpened{
AccountId: m.AccountId,
Name: m.Name,
})

case *messages.CreditAccount:
acct.Balance += m.Cents

return s.PublishEvent(ctx, &messages.AccountCredited{
_, err = s.PublishEvent(ctx, &messages.AccountCredited{
AccountId: m.AccountId,
Cents: m.Cents,
})

case *messages.DebitAccount:
acct.Balance -= m.Cents

return s.PublishEvent(ctx, &messages.AccountDebited{
_, err = s.PublishEvent(ctx, &messages.AccountDebited{
AccountId: m.AccountId,
Cents: m.Cents,
})
}

return nil
return
}
18 changes: 16 additions & 2 deletions examples/banking/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/jmalloc/ax/src/ax/persistence"
"github.com/jmalloc/ax/src/ax/routing"
"github.com/jmalloc/ax/src/ax/saga"
"github.com/jmalloc/ax/src/ax/saga/eventsourcing"
"github.com/jmalloc/ax/src/axcli"
"github.com/jmalloc/ax/src/axmysql"
"github.com/jmalloc/ax/src/axrmq"
Expand All @@ -37,10 +38,23 @@ func main() {
}
defer rmq.Close()

// p := &crud.Persister{
// Repository: axmysql.SagaRepository{},
// }

p := &eventsourcing.Persister{
MessageStore: axmysql.MessageStore{},
Snapshots: axmysql.SnapshotRepository{},
SnapshotFrequency: 3,
}

mapper := axmysql.SagaMapper{}

htable, err := routing.NewHandlerTable(
&saga.MessageHandler{
Saga: account.AggregateRoot,
Repository: &axmysql.SagaRepository{},
Saga: account.AggregateRoot,
Mapper: mapper,
Persister: p,
},
)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions src/ax/endpoint/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ type SinkSender struct {
//
// If ctx contains a message envelope, m is sent as a child of the message in
// that envelope.
func (s SinkSender) ExecuteCommand(ctx context.Context, m ax.Command) error {
func (s SinkSender) ExecuteCommand(ctx context.Context, m ax.Command) (ax.Envelope, error) {
return s.send(ctx, OpSendUnicast, m)
}

// PublishEvent sends an event message.
//
// If ctx contains a message envelope, m is sent as a child of the message in
// that envelope.
func (s SinkSender) PublishEvent(ctx context.Context, m ax.Event) error {
func (s SinkSender) PublishEvent(ctx context.Context, m ax.Event) (ax.Envelope, error) {
return s.send(ctx, OpSendMulticast, m)
}

// send wraps m in an envelope and passes that envelope to s.Sink.
// The new envelope is configured as a child of the envelope in ctx, if any.
func (s SinkSender) send(ctx context.Context, op Operation, m ax.Message) error {
func (s SinkSender) send(ctx context.Context, op Operation, m ax.Message) (ax.Envelope, error) {
env, ok := GetEnvelope(ctx)

if ok {
Expand All @@ -39,7 +39,7 @@ func (s SinkSender) send(ctx context.Context, op Operation, m ax.Message) error
env = ax.NewEnvelope(m)
}

return s.Sink.Accept(
return env, s.Sink.Accept(
ctx,
OutboundEnvelope{
Envelope: env,
Expand Down
20 changes: 16 additions & 4 deletions src/ax/endpoint/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ = Describe("SinkSender", func() {

Describe("ExecuteCommand", func() {
It("sends a unicast message to the sink", func() {
err := sender.ExecuteCommand(context.Background(), &messagetest.Command{})
_, err := sender.ExecuteCommand(context.Background(), &messagetest.Command{})
Expect(err).ShouldNot(HaveOccurred())

Expect(sink.Envelopes()).To(HaveLen(1))
Expand All @@ -36,16 +36,22 @@ var _ = Describe("SinkSender", func() {
env := ax.NewEnvelope(&messagetest.Message{})
ctx := WithEnvelope(context.Background(), env)

_ = sender.ExecuteCommand(ctx, &messagetest.Command{})
_, _ = sender.ExecuteCommand(ctx, &messagetest.Command{})

Expect(sink.Envelopes()).To(HaveLen(1))
Expect(sink.Envelopes()[0].CausationID).To(Equal(env.MessageID))
})

It("returns the sent envelope", func() {
env, _ := sender.ExecuteCommand(context.Background(), &messagetest.Command{})

Expect(env).To(Equal(sink.Envelopes()[0].Envelope))
})
})

Describe("PublishEvent", func() {
It("sends a multicast message to the sink", func() {
err := sender.PublishEvent(context.Background(), &messagetest.Event{})
_, err := sender.PublishEvent(context.Background(), &messagetest.Event{})
Expect(err).ShouldNot(HaveOccurred())

Expect(sink.Envelopes()).To(HaveLen(1))
Expand All @@ -58,10 +64,16 @@ var _ = Describe("SinkSender", func() {
env := ax.NewEnvelope(&messagetest.Message{})
ctx := WithEnvelope(context.Background(), env)

_ = sender.PublishEvent(ctx, &messagetest.Event{})
_, _ = sender.PublishEvent(ctx, &messagetest.Event{})

Expect(sink.Envelopes()).To(HaveLen(1))
Expect(sink.Envelopes()[0].CausationID).To(Equal(env.MessageID))
})

It("returns the sent envelope", func() {
env, _ := sender.PublishEvent(context.Background(), &messagetest.Event{})

Expect(env).To(Equal(sink.Envelopes()[0].Envelope))
})
})
})
46 changes: 46 additions & 0 deletions src/ax/persistence/messagestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package persistence

import (
"context"

"github.com/jmalloc/ax/src/ax"
)

// MessageStore is an interface for persisting streams of messages.
type MessageStore interface {
// AppendMessages appends one or more messages to a named stream.
//
// offset is a zero-based index into the stream. An error is returned if
// offset is not the next unused offset in the stream.
AppendMessages(
ctx context.Context,
tx Tx,
stream string,
offset uint64,
envs []ax.Envelope,
) error

// OpenStream opens a stream of messages for reading from a specific offset.
//
// The offset may be past the end of the stream. It returns an error if
// the stream does not exist.
OpenStream(
ctx context.Context,
tx Tx,
stream string,
offset uint64,
) (MessageStream, error)
}

// MessageStream is a stream of messages stored in a MessageStore.
type MessageStream interface {
// Next advances the stream to the next message.
// It returns false if there are no more messages in the stream.
Next(ctx context.Context) (bool, error)

// Get returns the message at the current offset in the stream.
Get(ctx context.Context) (ax.Envelope, error)

// Close closes the stream.
Close() error
}
3 changes: 2 additions & 1 deletion src/ax/routing/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ var _ = Describe("Dispatcher", func() {

It("passes a sender that sends messages via the message sink", func() {
h1.HandleMessageFunc = func(ctx context.Context, s ax.Sender, _ ax.Envelope) error {
return s.ExecuteCommand(ctx, &messagetest.Command{})
_, err := s.ExecuteCommand(ctx, &messagetest.Command{})
return err
}

_ = dispatcher.Accept(ctx, sink, env)
Expand Down
37 changes: 37 additions & 0 deletions src/ax/saga/applier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package saga

import (
"context"

"github.com/jmalloc/ax/src/ax"
)

// Applier is an implementation of ax.Sender that applies published
// events to an EventedData instance.
type Applier struct {
Data EventedData
Next ax.Sender
}

// ExecuteCommand sends a command message.
//
// If ctx contains a message envelope, m is sent as a child of the message in
// that envelope.
func (s *Applier) ExecuteCommand(ctx context.Context, m ax.Command) (ax.Envelope, error) {
return s.Next.ExecuteCommand(ctx, m)
}

// PublishEvent sends an event message.
//
// If ctx contains a message envelope, m is sent as a child of the message in
// that envelope.
func (s *Applier) PublishEvent(ctx context.Context, m ax.Event) (ax.Envelope, error) {
env, err := s.Next.PublishEvent(ctx, m)
if err != nil {
return ax.Envelope{}, err
}

s.Data.ApplyEvent(env)

return env, nil
}
90 changes: 90 additions & 0 deletions src/ax/saga/crud/persister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package crud

import (
"context"

"github.com/golang/protobuf/proto"
"github.com/jmalloc/ax/src/ax"
"github.com/jmalloc/ax/src/ax/persistence"
"github.com/jmalloc/ax/src/ax/saga"
)

// Persister is an implementation of saga.Persister that persists saga instances
// using "CRUD" semantics.
type Persister struct {
Repository Repository
}

// BeginCreate starts a new unit-of-work that persists a new saga instance.
func (p *Persister) BeginCreate(
ctx context.Context,
_ saga.Saga,
tx persistence.Tx,
s ax.Sender,
i saga.Instance,
) (saga.UnitOfWork, error) {
return p.newUnitOfWork(tx, s, i), nil
}

// BeginUpdate starts a new unit-of-work that updates an existing saga
// instance.
func (p *Persister) BeginUpdate(
ctx context.Context,
_ saga.Saga,
tx persistence.Tx,
s ax.Sender,
id saga.InstanceID,
) (saga.UnitOfWork, error) {
i, err := p.Repository.LoadSagaInstance(ctx, tx, id)
if err != nil {
return nil, err
}

return p.newUnitOfWork(tx, s, i), nil
}

// newUnitOfWork returns a new unit-of-work.
func (p *Persister) newUnitOfWork(
tx persistence.Tx,
s ax.Sender,
i saga.Instance,
) *unitOfWork {
return &unitOfWork{
p.Repository,
tx,
s,
proto.Clone(i.Data).(saga.Data),
i,
}
}

// unitOfWork is an implementation of saga.UnitOfWork that saves saga instances
// using "CRUD" semantics.
type unitOfWork struct {
repository Repository

tx persistence.Tx
sender ax.Sender
original saga.Data
instance saga.Instance
}

// Sender returns the ax.Sender that the saga must use to send messages.
func (w *unitOfWork) Sender() ax.Sender {
return w.sender
}

// Instance returns the saga instance that the unit-of-work applies to.
func (w *unitOfWork) Instance() saga.Instance {
return w.instance
}

// Save persists changes to the instance.
// It returns true if any changes have occurred.
func (w *unitOfWork) Save(ctx context.Context) (bool, error) {
if proto.Equal(w.instance.Data, w.original) {
return false, nil
}

return true, w.repository.SaveSagaInstance(ctx, w.tx, w.instance)
}
Loading