Skip to content

Patch float with trailing zero for JSON and add useFloatWithTrailingZero flag #1038

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type BinlogSyncerConfig struct {
// Use decimal.Decimal structure for decimals.
UseDecimal bool

// FloatWithTrailingZero structure for floats.
UseFloatWithTrailingZero bool

// RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection.
RecvBufferSize int

Expand Down Expand Up @@ -197,6 +200,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
b.parser.SetParseTime(b.cfg.ParseTime)
b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)
b.parser.SetUseDecimal(b.cfg.UseDecimal)
b.parser.SetUseFloatWithTrailingZero(b.cfg.UseFloatWithTrailingZero)
b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc)
b.parser.SetTableMapOptionalMetaDecodeFunc(b.cfg.TableMapOptionalMetaDecodeFunc)
Expand Down
31 changes: 26 additions & 5 deletions replication/json_binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package replication
import (
"fmt"
"math"
"strconv"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/utils"
Expand Down Expand Up @@ -52,6 +53,8 @@ type (
JsonDiffOperation byte
)

type FloatWithTrailingZero float64

const (
// The JSON value in the given path is replaced with a new value.
//
Expand Down Expand Up @@ -96,6 +99,14 @@ func (jd *JsonDiff) String() string {
return fmt.Sprintf("json_diff(op:%s path:%s value:%s)", jd.Op, jd.Path, jd.Value)
}

func (f FloatWithTrailingZero) MarshalJSON() ([]byte, error) {
if float64(f) == float64(int(f)) {
return []byte(strconv.FormatFloat(float64(f), 'f', 1, 64)), nil
}

return []byte(strconv.FormatFloat(float64(f), 'f', -1, 64)), nil
}

func jsonbGetOffsetSize(isSmall bool) int {
if isSmall {
return jsonbSmallOffsetSize
Expand Down Expand Up @@ -124,8 +135,9 @@ func jsonbGetValueEntrySize(isSmall bool) int {
// the common JSON encoding data.
func (e *RowsEvent) decodeJsonBinary(data []byte) ([]byte, error) {
d := jsonBinaryDecoder{
useDecimal: e.useDecimal,
ignoreDecodeErr: e.ignoreJSONDecodeErr,
useDecimal: e.useDecimal,
useFloatWithTrailingZero: e.useFloatWithTrailingZero,
ignoreDecodeErr: e.ignoreJSONDecodeErr,
}

if d.isDataShort(data, 1) {
Expand All @@ -141,9 +153,10 @@ func (e *RowsEvent) decodeJsonBinary(data []byte) ([]byte, error) {
}

type jsonBinaryDecoder struct {
useDecimal bool
ignoreDecodeErr bool
err error
useDecimal bool
useFloatWithTrailingZero bool
ignoreDecodeErr bool
err error
}

func (d *jsonBinaryDecoder) decodeValue(tp byte, data []byte) interface{} {
Expand Down Expand Up @@ -175,6 +188,9 @@ func (d *jsonBinaryDecoder) decodeValue(tp byte, data []byte) interface{} {
case JSONB_UINT64:
return d.decodeUint64(data)
case JSONB_DOUBLE:
if d.useFloatWithTrailingZero {
return d.decodeDoubleWithTrailingZero(data)
}
return d.decodeDouble(data)
case JSONB_STRING:
return d.decodeString(data)
Expand Down Expand Up @@ -395,6 +411,11 @@ func (d *jsonBinaryDecoder) decodeDouble(data []byte) float64 {
return v
}

func (d *jsonBinaryDecoder) decodeDoubleWithTrailingZero(data []byte) FloatWithTrailingZero {
v := d.decodeDouble(data)
return FloatWithTrailingZero(v)
}

func (d *jsonBinaryDecoder) decodeString(data []byte) string {
if d.err != nil {
return ""
Expand Down
12 changes: 9 additions & 3 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ type BinlogParser struct {
// used to start/stop processing
stopProcessing uint32

useDecimal bool
ignoreJSONDecodeErr bool
verifyChecksum bool
useDecimal bool
useFloatWithTrailingZero bool
ignoreJSONDecodeErr bool
verifyChecksum bool

rowsEventDecodeFunc func(*RowsEvent, []byte) error

Expand Down Expand Up @@ -198,6 +199,10 @@ func (p *BinlogParser) SetUseDecimal(useDecimal bool) {
p.useDecimal = useDecimal
}

func (p *BinlogParser) SetUseFloatWithTrailingZero(useFloatWithTrailingZero bool) {
p.useFloatWithTrailingZero = useFloatWithTrailingZero
}

func (p *BinlogParser) SetIgnoreJSONDecodeError(ignoreJSONDecodeErr bool) {
p.ignoreJSONDecodeErr = ignoreJSONDecodeErr
}
Expand Down Expand Up @@ -406,6 +411,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
e.parseTime = p.parseTime
e.timestampStringLocation = p.timestampStringLocation
e.useDecimal = p.useDecimal
e.useFloatWithTrailingZero = p.useFloatWithTrailingZero
e.ignoreJSONDecodeErr = p.ignoreJSONDecodeErr

switch h.EventType {
Expand Down
78 changes: 78 additions & 0 deletions replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/goccy/go-json"

Check failure on line 17 in replication/replication_test.go

View workflow job for this annotation

GitHub Actions / golangci

File is not properly formatted (gofumpt)
"github.com/go-mysql-org/go-mysql/client"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/test_util"
Expand Down Expand Up @@ -464,3 +465,80 @@
require.NoError(t.T(), err)
}
}

func (t *testSyncerSuite) TestFloatWithTrailingZeros() {
t.setupTest(mysql.MySQLFlavor)

str := `DROP TABLE IF EXISTS test_float_zeros`
t.testExecute(str)

// Create table with JSON column containing float values
str = `CREATE TABLE test_float_zeros (
id INT PRIMARY KEY,
json_val JSON
)`
t.testExecute(str)

// Test with useFloatWithTrailingZero = true
t.b.cfg.UseFloatWithTrailingZero = true
t.testFloatWithTrailingZerosCase(true)

// Test with useFloatWithTrailingZero = false
t.b.cfg.UseFloatWithTrailingZero = false
t.testFloatWithTrailingZerosCase(false)
}

func (t *testSyncerSuite) testFloatWithTrailingZerosCase(useTrailingZero bool) {
// Insert values with trailing zeros in JSON
t.testExecute(`INSERT INTO test_float_zeros VALUES (1, '{"f": 5.1}')`)
t.testExecute(`INSERT INTO test_float_zeros VALUES (2, '{"f": 1.100}')`)

// Get current position
r, err := t.c.Execute("SHOW MASTER STATUS")
require.NoError(t.T(), err)
binFile, _ := r.GetString(0, 0)
binPos, _ := r.GetInt(0, 1)

// Start syncing from current position
s, err := t.b.StartSync(mysql.Position{Name: binFile, Pos: uint32(binPos)})
require.NoError(t.T(), err)

// Insert another row to trigger binlog events
t.testExecute(`INSERT INTO test_float_zeros VALUES (3, '{"f": 3.0}')`)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

for {
evt, err := s.GetEvent(ctx)
require.NoError(t.T(), err)

// We're interested in RowsEvent
if evt.Header.EventType != WRITE_ROWS_EVENTv2 {
continue
}

// Type assert to RowsEvent
rowsEvent := evt.Event.(*RowsEvent)
for _, row := range rowsEvent.Rows {
// The third row should contain our test values

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean row or column?

if row[0].(int32) == 3 {
// Get the JSON value from binlog
jsonVal := row[1].([]byte)
var data struct {
F float64 `json:"f"`
}
err := json.Unmarshal(jsonVal, &data)
require.NoError(t.T(), err)

// Check if trailing zero is preserved based on useFloatWithTrailingZero
if useTrailingZero {
require.Equal(t.T(), "3.0", fmt.Sprintf("%.1f", data.F))
} else {
require.Equal(t.T(), "3", fmt.Sprintf("%.1f", data.F))
}
return
}
}
}
}
9 changes: 5 additions & 4 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,10 +945,11 @@ type RowsEvent struct {
Rows [][]interface{}
SkippedColumns [][]int

parseTime bool
timestampStringLocation *time.Location
useDecimal bool
ignoreJSONDecodeErr bool
parseTime bool
timestampStringLocation *time.Location
useDecimal bool
useFloatWithTrailingZero bool
ignoreJSONDecodeErr bool
}

// EnumRowsEventType is an abridged type describing the operation which triggered the given RowsEvent.
Expand Down
Loading