Skip to content
This repository was archived by the owner on Mar 14, 2024. It is now read-only.

Implement a reflection-based aggregate system. #63

Merged
merged 11 commits into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bin/rabbitmq.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
docker run \
--rm \
--detach \
--name ax_rmq \
--publish 127.0.0.1:5672:5672 \
--publish 127.0.0.1:15672:15672 \
rabbitmq:management
135 changes: 40 additions & 95 deletions examples/banking/account/account.go
Original file line number Diff line number Diff line change
@@ -1,118 +1,63 @@
package account

import (
"context"
"fmt"

"github.com/jmalloc/ax/examples/banking/messages"
"github.com/jmalloc/ax/src/ax"
"github.com/jmalloc/ax/src/ax/aggregate"
"github.com/jmalloc/ax/src/ax/ident"
"github.com/jmalloc/ax/src/ax/saga"
)

// InstanceDescription returns a human-readable description of the saga instance.
func (a *Account) InstanceDescription() string {
return fmt.Sprintf(
"account %s for %s, balance of %d",
ident.Format(a.AccountId),
a.Name,
a.Balance,
)
}

// AggregateRoot is a saga that implements the Account aggregate.
var AggregateRoot saga.Saga = &aggregateRoot{}

type aggregateRoot struct {
saga.ErrorIfNotFound
func (a *Account) OpenAccount(m *messages.OpenAccount, rec aggregate.Recorder) {
if !a.IsOpen {
rec(&messages.AccountOpened{
AccountId: m.AccountId,
Name: m.Name,
})
}
}

func (aggregateRoot) SagaName() string {
return "Account"
func (a *Account) CreditAccount(m *messages.CreditAccount, rec aggregate.Recorder) {
rec(&messages.AccountCredited{
AccountId: m.AccountId,
Cents: m.Cents,
})
}

func (aggregateRoot) MessageTypes() (ax.MessageTypeSet, ax.MessageTypeSet) {
return ax.TypesOf(
&messages.OpenAccount{},
), ax.TypesOf(
&messages.CreditAccount{},
&messages.DebitAccount{},
)
func (a *Account) DebitAccount(m *messages.DebitAccount, rec aggregate.Recorder) {
rec(&messages.AccountDebited{
AccountId: m.AccountId,
Cents: m.Cents,
})
}

func (aggregateRoot) GenerateInstanceID(ctx context.Context, env ax.Envelope) (id saga.InstanceID, err error) {
err = id.Parse(env.Message.(*messages.OpenAccount).AccountId)
return
func (a *Account) WhenAccountOpened(m *messages.AccountOpened) {
a.AccountId = m.AccountId
a.Name = m.Name
a.IsOpen = true
}

func (aggregateRoot) NewData() saga.Data {
return &Account{}
func (a *Account) WhenAccountCredited(m *messages.AccountCredited) {
a.Balance += m.Cents
}

func (aggregateRoot) MappingKeyForMessage(ctx context.Context, env ax.Envelope) (string, bool, error) {
type hasAccountID interface {
GetAccountId() string
}

return env.Message.(hasAccountID).GetAccountId(), true, nil
func (a *Account) WhenAccountDebited(m *messages.AccountDebited) {
a.Balance -= m.Cents
}

func (aggregateRoot) MappingKeysForInstance(
ctx context.Context,
i saga.Instance,
) (saga.KeySet, error) {
return saga.NewKeySet(
i.Data.(*Account).AccountId,
), nil
}

func (aggregateRoot) HandleMessage(
ctx context.Context,
s ax.Sender,
env ax.Envelope,
i saga.Instance,
) (err error) {
acct := i.Data.(*Account)

switch m := env.Message.(type) {
case *messages.OpenAccount:
if acct.IsOpen {
return
}

_, err = s.PublishEvent(ctx, &messages.AccountOpened{
AccountId: m.AccountId,
Name: m.Name,
})

case *messages.CreditAccount:
_, err = s.PublishEvent(ctx, &messages.AccountCredited{
AccountId: m.AccountId,
Cents: m.Cents,
})

case *messages.DebitAccount:
_, err = s.PublishEvent(ctx, &messages.AccountDebited{
AccountId: m.AccountId,
Cents: m.Cents,
})
}

return
// InstanceDescription returns a human-readable description of the aggregate
// instance.
func (a *Account) InstanceDescription() string {
return fmt.Sprintf(
"account %s for %s, balance of %d",
ident.Format(a.AccountId),
a.Name,
a.Balance,
)
}

// ApplyEvent updates the data to reflect the fact that ev has occurred.
func (aggregateRoot) ApplyEvent(d saga.Data, env ax.Envelope) {
acct := d.(*Account)

switch ev := env.Message.(type) {
case *messages.AccountOpened:
acct.AccountId = ev.AccountId
acct.Name = ev.Name
acct.IsOpen = true
case *messages.AccountCredited:
acct.Balance += ev.Cents
case *messages.AccountDebited:
acct.Balance -= ev.Cents
}
}
// AggregateRoot is a saga that implements the Account aggregate.
var AggregateRoot = aggregate.New(
&Account{},
aggregate.IdentifyByField("AccountId"),
)
28 changes: 14 additions & 14 deletions examples/banking/account/account.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/banking/account/account.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
syntax = "proto3";

package ax.examples.banking.account;
package ax.examples.banking;
option go_package = "account";

// Account is the saga.Instance for the account aggregate.
Expand Down
51 changes: 51 additions & 0 deletions src/ax/aggregate/aggregate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package aggregate

import (
"github.com/jmalloc/ax/src/ax"
"github.com/jmalloc/ax/src/ax/ident"
"github.com/jmalloc/ax/src/ax/saga"
)

// ID uniquely identifies an aggregate instance.
type ID struct{ ident.ID }

// Aggregate is an interface for application-defined domain aggregates.
//
// Aggregates are a specialization of sagas (stateful message handlers) that
// handle commands and produce events.
//
// For each command type to be handled, the aggregate must implement a "handler"
// method that adheres to the following signature:
//
// func (cmd *<T>, rec Recorder)
//
// Where T is a struct type that implements ax.Command.
//
// Handler methods are responsible for producing new events based on the cmd
// being handled. They may inspect the current state of the aggregate, and then
// record zero or more events using rec. Handlers should never mutate the
// aggregate state.
//
// For each of the event types passed to rec, the aggregate must implement an
// "applier" method that adheres to the following signature:
//
// func (ev *T)
//
// Where T is a struct type that implements ax.Event.
//
// Applier methods are responsible for mutating the aggregate state. The applier
// is called every time an event is recorded, *and* when loading an event-sourced
// aggregate from the message store.
//
// The names of handler and applier methods are not meaningful to the aggregate
// system. The convention is to name command handlers after their commands, and
// to prefix event appliers with the word "When", such as:
//
// func (*BankAccount) CreditAccount(*messages.CreditAccount, Recorder)
// func (*BankAccount) WhenAccountCredited(*messages.AccountCredited)
type Aggregate interface {
saga.Data
}

// Recorder is a function that records the events produced by an aggregate.
type Recorder func(ax.Event)
54 changes: 54 additions & 0 deletions src/ax/aggregate/identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package aggregate

import (
"fmt"
"reflect"

"github.com/jmalloc/ax/src/ax"
)

// Identifier is an interface for determining the aggregate ID that a command
// targets.
type Identifier interface {
// AggregateID returns the ID of the aggregate that m targets.
AggregateID(m ax.Command) (ID, error)
}

// ByFieldIdentifier is an Identifier instance that treats a specific field of
// the message as the aggregate ID.
type ByFieldIdentifier struct {
FieldName string
}

// AggregateID returns the ID of the aggregate that m targets.
func (i *ByFieldIdentifier) AggregateID(m ax.Command) (ID, error) {
mt := ax.TypeOf(m)

f, ok := mt.StructType.FieldByName(i.FieldName)
if !ok {
return ID{}, fmt.Errorf(
"%s does not contain a field named %s",
reflect.TypeOf(m),
i.FieldName,
)
}

if f.Type.Kind() != reflect.String {
return ID{}, fmt.Errorf(
"%s.%s is not a string",
reflect.TypeOf(m),
i.FieldName,
)
}

v := reflect.
ValueOf(m).
Elem().
FieldByIndex(f.Index).
Interface().(string)

var id ID
err := id.Parse(v)

return id, err
}
15 changes: 15 additions & 0 deletions src/ax/aggregate/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package aggregate

// Option is a function that applies some change to the underlying saga
// implementation of an aggregate.
type Option func(*Saga)

// IdentifyByField returns an aggregate option that maps commands to instances
// by using the value of the message field named n as the aggregate ID.
func IdentifyByField(n string) Option {
return func(sg *Saga) {
sg.Identifier = &ByFieldIdentifier{
FieldName: n,
}
}
}
3 changes: 3 additions & 0 deletions src/ax/aggregate/pkg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package aggregate provides a convenience layer on top of sagas for
// implementing domain aggregates.
package aggregate
Loading