Skip to content

Commit 697bc20

Browse files
authored
Merge pull request #22 from liquidata-inc/zachmu/index-or-bug-fix
Fixed a bug in indexing code that would cause queries to return incorrect results. Along the way, implemented index capabilities for the in-memory tables (only for correctness verification), and expanded engine_test to test every combination of indexes, partitions, and parallelism.
2 parents fa72192 + d166b52 commit 697bc20

File tree

12 files changed

+1311
-480
lines changed

12 files changed

+1311
-480
lines changed

engine_test.go

Lines changed: 196 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package sqle_test
22

33
import (
44
"context"
5+
"fmt"
6+
"github.com/src-d/go-mysql-server/sql/expression"
57
"io"
68
"math"
79
"strings"
@@ -59,6 +61,14 @@ var queries = []struct {
5961
"SELECT i FROM mytable WHERE i <> 2;",
6062
[]sql.Row{{int64(1)}, {int64(3)}},
6163
},
64+
{
65+
"SELECT i FROM mytable WHERE i in (1, 3)",
66+
[]sql.Row{{int64(1)}, {int64(3)}},
67+
},
68+
{
69+
"SELECT i FROM mytable WHERE i = 1 or i = 3",
70+
[]sql.Row{{int64(1)}, {int64(3)}},
71+
},
6272
{
6373
"SELECT f32 FROM floattable WHERE f64 = 2.0;",
6474
[]sql.Row{{float32(2.0)}},
@@ -1589,21 +1599,146 @@ var queries = []struct {
15891599
}
15901600

15911601
func TestQueries(t *testing.T) {
1592-
e := newEngine(t)
1593-
t.Run("sequential", func(t *testing.T) {
1594-
for _, tt := range queries {
1595-
testQuery(t, e, tt.query, tt.expected)
1602+
type indexDriverInitalizer func(map[string]*memory.Table) sql.IndexDriver
1603+
type indexDriverTestCase struct {
1604+
name string
1605+
initializer indexDriverInitalizer
1606+
}
1607+
1608+
// Test all queries with these combinations, for a total of 12 runs:
1609+
// 1) Partitioned tables / non partitioned tables
1610+
// 2) Mergeable / unmergeable / no indexes
1611+
// 3) Parallelism on / off
1612+
numPartitionsVals := []int{
1613+
1,
1614+
testNumPartitions,
1615+
}
1616+
indexDrivers := []*indexDriverTestCase{
1617+
nil,
1618+
{"unmergableIndexes", unmergableIndexDriver},
1619+
{"mergableIndexes", mergableIndexDriver},
1620+
}
1621+
parallelVals := []int{
1622+
1,
1623+
2,
1624+
}
1625+
for _, numPartitions := range numPartitionsVals {
1626+
for _, indexDriverInit := range indexDrivers {
1627+
for _, parallelism := range parallelVals {
1628+
tables := allTestTables(t, numPartitions)
1629+
1630+
var indexDriver sql.IndexDriver
1631+
if indexDriverInit != nil {
1632+
indexDriver = indexDriverInit.initializer(tables)
1633+
}
1634+
engine := newEngineWithParallelism(t, parallelism, tables, indexDriver)
1635+
1636+
indexDriverName := "none"
1637+
if indexDriverInit != nil {
1638+
indexDriverName = indexDriverInit.name
1639+
}
1640+
testName := fmt.Sprintf("partitions=%d,indexes=%v,parallelism=%v", numPartitions, indexDriverName, parallelism)
1641+
t.Run(testName, func(t *testing.T) {
1642+
for _, tt := range queries {
1643+
testQuery(t, engine, tt.query, tt.expected)
1644+
}
1645+
})
1646+
}
15961647
}
1648+
}
1649+
}
1650+
1651+
func unmergableIndexDriver(tables map[string]*memory.Table) sql.IndexDriver {
1652+
return memory.NewIndexDriver("mydb", map[string][]sql.Index{
1653+
"mytable": {
1654+
newUnmergableIndex(tables, "mytable",
1655+
expression.NewGetFieldWithTable(0, sql.Int64, "mytable", "i", false)),
1656+
newUnmergableIndex(tables, "mytable",
1657+
expression.NewGetFieldWithTable(1, sql.Text, "mytable", "s", false)),
1658+
newUnmergableIndex(tables, "mytable",
1659+
expression.NewGetFieldWithTable(0, sql.Int64, "mytable", "i", false),
1660+
expression.NewGetFieldWithTable(1, sql.Text, "mytable", "s", false)),
1661+
},
1662+
"othertable": {
1663+
newUnmergableIndex(tables, "othertable",
1664+
expression.NewGetFieldWithTable(0, sql.Text, "othertable", "s2", false)),
1665+
newUnmergableIndex(tables, "othertable",
1666+
expression.NewGetFieldWithTable(1, sql.Text, "othertable", "i2", false)),
1667+
newUnmergableIndex(tables, "othertable",
1668+
expression.NewGetFieldWithTable(0, sql.Text, "othertable", "s2", false),
1669+
expression.NewGetFieldWithTable(1, sql.Text, "othertable", "i2", false)),
1670+
},
1671+
"bigtable": {
1672+
newUnmergableIndex(tables, "bigtable",
1673+
expression.NewGetFieldWithTable(0, sql.Text, "bigtable", "t", false)),
1674+
},
1675+
"floattable": {
1676+
newUnmergableIndex(tables, "floattable",
1677+
expression.NewGetFieldWithTable(2, sql.Text, "floattable", "f64", false)),
1678+
},
1679+
"niltable": {
1680+
newUnmergableIndex(tables, "niltable",
1681+
expression.NewGetFieldWithTable(0, sql.Int64, "niltable", "i", false)),
1682+
},
15971683
})
1684+
}
15981685

1599-
ep := newEngineWithParallelism(t, 2)
1600-
t.Run("parallel", func(t *testing.T) {
1601-
for _, tt := range queries {
1602-
testQuery(t, ep, tt.query, tt.expected)
1603-
}
1686+
func mergableIndexDriver(tables map[string]*memory.Table) sql.IndexDriver {
1687+
return memory.NewIndexDriver("mydb", map[string][]sql.Index{
1688+
"mytable": {
1689+
newMergableIndex(tables, "mytable",
1690+
expression.NewGetFieldWithTable(0, sql.Int64, "mytable", "i", false)),
1691+
newMergableIndex(tables, "mytable",
1692+
expression.NewGetFieldWithTable(1, sql.Text, "mytable", "s", false)),
1693+
newMergableIndex(tables, "mytable",
1694+
expression.NewGetFieldWithTable(0, sql.Int64, "mytable", "i", false),
1695+
expression.NewGetFieldWithTable(1, sql.Text, "mytable", "s", false)),
1696+
},
1697+
"othertable": {
1698+
newMergableIndex(tables, "othertable",
1699+
expression.NewGetFieldWithTable(0, sql.Text, "othertable", "s2", false)),
1700+
newMergableIndex(tables, "othertable",
1701+
expression.NewGetFieldWithTable(1, sql.Text, "othertable", "i2", false)),
1702+
newMergableIndex(tables, "othertable",
1703+
expression.NewGetFieldWithTable(0, sql.Text, "othertable", "s2", false),
1704+
expression.NewGetFieldWithTable(1, sql.Text, "othertable", "i2", false)),
1705+
},
1706+
"bigtable": {
1707+
newMergableIndex(tables, "bigtable",
1708+
expression.NewGetFieldWithTable(0, sql.Text, "bigtable", "t", false)),
1709+
},
1710+
"floattable": {
1711+
newMergableIndex(tables, "floattable",
1712+
expression.NewGetFieldWithTable(2, sql.Text, "floattable", "f64", false)),
1713+
},
1714+
"niltable": {
1715+
newMergableIndex(tables, "niltable",
1716+
expression.NewGetFieldWithTable(0, sql.Int64, "niltable", "i", false)),
1717+
},
16041718
})
16051719
}
16061720

1721+
1722+
func newUnmergableIndex(tables map[string]*memory.Table, tableName string, exprs ...sql.Expression) *memory.UnmergeableIndex {
1723+
return &memory.UnmergeableIndex{
1724+
DB: "mydb",
1725+
DriverName: memory.IndexDriverId,
1726+
TableName: tableName,
1727+
Tbl: tables[tableName],
1728+
Exprs: exprs,
1729+
}
1730+
}
1731+
1732+
func newMergableIndex(tables map[string]*memory.Table, tableName string, exprs ...sql.Expression) *memory.MergeableIndex {
1733+
return &memory.MergeableIndex{
1734+
DB: "mydb",
1735+
DriverName: memory.IndexDriverId,
1736+
TableName: tableName,
1737+
Tbl: tables[tableName],
1738+
Exprs: exprs,
1739+
}
1740+
}
1741+
16071742
func TestSessionSelectLimit(t *testing.T) {
16081743
ctx := newCtx()
16091744
ctx.Session.Set("sql_select_limit", sql.Int64, int64(1))
@@ -1752,7 +1887,7 @@ func TestWarnings(t *testing.T) {
17521887
}
17531888

17541889
e := newEngine(t)
1755-
ep := newEngineWithParallelism(t, 2)
1890+
ep := newEngineWithParallelism(t, 2, allTestTables(t, testNumPartitions), nil)
17561891

17571892
t.Run("sequential", func(t *testing.T) {
17581893
for _, tt := range queries {
@@ -1816,7 +1951,7 @@ func TestClearWarnings(t *testing.T) {
18161951
func TestDescribe(t *testing.T) {
18171952
e := newEngine(t)
18181953

1819-
ep := newEngineWithParallelism(t, 2)
1954+
ep := newEngineWithParallelism(t, 2, allTestTables(t, testNumPartitions), nil)
18201955

18211956
query := `DESCRIBE FORMAT=TREE SELECT * FROM mytable`
18221957
expectedSeq := []sql.Row{
@@ -2822,66 +2957,64 @@ func testQueryWithContext(ctx *sql.Context, t *testing.T, e *sqle.Engine, q stri
28222957
})
28232958
}
28242959

2825-
func newEngine(t *testing.T) *sqle.Engine {
2826-
return newEngineWithParallelism(t, 1)
2827-
}
2960+
func allTestTables(t *testing.T, numPartitions int) map[string]*memory.Table {
2961+
tables := make(map[string]*memory.Table)
28282962

2829-
func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
2830-
table := memory.NewPartitionedTable("mytable", sql.Schema{
2963+
tables["mytable"] = memory.NewPartitionedTable("mytable", sql.Schema{
28312964
{Name: "i", Type: sql.Int64, Source: "mytable"},
28322965
{Name: "s", Type: sql.Text, Source: "mytable"},
2833-
}, testNumPartitions)
2966+
}, numPartitions)
28342967

28352968
insertRows(
2836-
t, table,
2969+
t, tables["mytable"],
28372970
sql.NewRow(int64(1), "first row"),
28382971
sql.NewRow(int64(2), "second row"),
28392972
sql.NewRow(int64(3), "third row"),
28402973
)
28412974

2842-
table2 := memory.NewPartitionedTable("othertable", sql.Schema{
2975+
tables["othertable"] = memory.NewPartitionedTable("othertable", sql.Schema{
28432976
{Name: "s2", Type: sql.Text, Source: "othertable"},
28442977
{Name: "i2", Type: sql.Int64, Source: "othertable"},
2845-
}, testNumPartitions)
2978+
}, numPartitions)
28462979

28472980
insertRows(
2848-
t, table2,
2981+
t, tables["othertable"],
28492982
sql.NewRow("first", int64(3)),
28502983
sql.NewRow("second", int64(2)),
28512984
sql.NewRow("third", int64(1)),
28522985
)
28532986

2854-
table3 := memory.NewPartitionedTable("tabletest", sql.Schema{
2987+
tables["tabletest"] = memory.NewPartitionedTable("tabletest", sql.Schema{
28552988
{Name: "i", Type: sql.Int32, Source: "tabletest"},
28562989
{Name: "s", Type: sql.Text, Source: "tabletest"},
2857-
}, testNumPartitions)
2990+
}, numPartitions)
28582991

28592992
insertRows(
2860-
t, table3,
2993+
t, tables["tabletest"],
28612994
sql.NewRow(int64(1), "first row"),
28622995
sql.NewRow(int64(2), "second row"),
28632996
sql.NewRow(int64(3), "third row"),
28642997
)
28652998

2866-
table4 := memory.NewPartitionedTable("other_table", sql.Schema{
2999+
tables["other_table"] = memory.NewPartitionedTable("other_table", sql.Schema{
28673000
{Name: "text", Type: sql.Text, Source: "tabletest"},
28683001
{Name: "number", Type: sql.Int32, Source: "tabletest"},
2869-
}, testNumPartitions)
3002+
}, numPartitions)
28703003

28713004
insertRows(
2872-
t, table4,
3005+
t, tables["other_table"],
28733006
sql.NewRow("a", int32(4)),
28743007
sql.NewRow("b", int32(2)),
28753008
sql.NewRow("c", int32(0)),
28763009
)
28773010

2878-
bigtable := memory.NewPartitionedTable("bigtable", sql.Schema{
3011+
tables["bigtable"] = memory.NewPartitionedTable("bigtable", sql.Schema{
28793012
{Name: "t", Type: sql.Text, Source: "bigtable"},
28803013
{Name: "n", Type: sql.Int64, Source: "bigtable"},
2881-
}, testNumPartitions)
3014+
}, numPartitions)
28823015

28833016
insertRows(
2884-
t, bigtable,
3017+
t, tables["bigtable"],
28853018
sql.NewRow("a", int64(1)),
28863019
sql.NewRow("s", int64(2)),
28873020
sql.NewRow("f", int64(3)),
@@ -2898,14 +3031,14 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
28983031
sql.NewRow("b", int64(9)),
28993032
)
29003033

2901-
floatTable := memory.NewPartitionedTable("floattable", sql.Schema{
3034+
tables["floattable"] = memory.NewPartitionedTable("floattable", sql.Schema{
29023035
{Name: "i", Type: sql.Int64, Source: "floattable"},
29033036
{Name: "f32", Type: sql.Float32, Source: "floattable"},
29043037
{Name: "f64", Type: sql.Float64, Source: "floattable"},
2905-
}, testNumPartitions)
3038+
}, numPartitions)
29063039

29073040
insertRows(
2908-
t, floatTable,
3041+
t, tables["floattable"],
29093042
sql.NewRow(int64(1), float32(1.0), float64(1.0)),
29103043
sql.NewRow(int64(2), float32(1.5), float64(1.5)),
29113044
sql.NewRow(int64(3), float32(2.0), float64(2.0)),
@@ -2914,36 +3047,36 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
29143047
sql.NewRow(int64(-2), float32(-1.5), float64(-1.5)),
29153048
)
29163049

2917-
nilTable := memory.NewPartitionedTable("niltable", sql.Schema{
3050+
tables["niltable"] = memory.NewPartitionedTable("niltable", sql.Schema{
29183051
{Name: "i", Type: sql.Int64, Source: "niltable", Nullable: true},
29193052
{Name: "b", Type: sql.Boolean, Source: "niltable", Nullable: true},
29203053
{Name: "f", Type: sql.Float64, Source: "niltable", Nullable: true},
2921-
}, testNumPartitions)
3054+
}, numPartitions)
29223055

29233056
insertRows(
2924-
t, nilTable,
3057+
t, tables["niltable"],
29253058
sql.NewRow(int64(1), true, float64(1.0)),
29263059
sql.NewRow(int64(2), nil, float64(2.0)),
29273060
sql.NewRow(nil, false, float64(3.0)),
29283061
sql.NewRow(int64(4), true, nil),
29293062
sql.NewRow(nil, nil, nil),
29303063
)
29313064

2932-
newlineTable := memory.NewPartitionedTable("newlinetable", sql.Schema{
3065+
tables["newlinetable"] = memory.NewPartitionedTable("newlinetable", sql.Schema{
29333066
{Name: "i", Type: sql.Int64, Source: "newlinetable"},
29343067
{Name: "s", Type: sql.Text, Source: "newlinetable"},
2935-
}, testNumPartitions)
3068+
}, numPartitions)
29363069

29373070
insertRows(
2938-
t, newlineTable,
3071+
t, tables["newlinetable"],
29393072
sql.NewRow(int64(1), "\nthere is some text in here"),
29403073
sql.NewRow(int64(2), "there is some\ntext in here"),
29413074
sql.NewRow(int64(3), "there is some text\nin here"),
29423075
sql.NewRow(int64(4), "there is some text in here\n"),
29433076
sql.NewRow(int64(5), "there is some text in here"),
29443077
)
29453078

2946-
typestable := memory.NewPartitionedTable("typestable", sql.Schema{
3079+
tables["typestable"] = memory.NewPartitionedTable("typestable", sql.Schema{
29473080
{Name: "id", Type: sql.Int64, Source: "typestable"},
29483081
{Name: "i8", Type: sql.Int8, Source: "typestable", Nullable: true},
29493082
{Name: "i16", Type: sql.Int16, Source: "typestable", Nullable: true},
@@ -2961,20 +3094,25 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
29613094
{Name: "bo", Type: sql.Boolean, Source: "typestable", Nullable: true},
29623095
{Name: "js", Type: sql.JSON, Source: "typestable", Nullable: true},
29633096
{Name: "bl", Type: sql.Blob, Source: "typestable", Nullable: true},
2964-
}, testNumPartitions)
3097+
}, numPartitions)
3098+
3099+
return tables
3100+
}
3101+
3102+
func newEngine(t *testing.T) *sqle.Engine {
3103+
return newEngineWithParallelism(t, 1, allTestTables(t, testNumPartitions), nil)
3104+
}
29653105

3106+
func newEngineWithParallelism(t *testing.T, parallelism int, tables map[string]*memory.Table, driver sql.IndexDriver) *sqle.Engine {
29663107
db := memory.NewDatabase("mydb")
2967-
db.AddTable("mytable", table)
2968-
db.AddTable("othertable", table2)
2969-
db.AddTable("tabletest", table3)
2970-
db.AddTable("bigtable", bigtable)
2971-
db.AddTable("floattable", floatTable)
2972-
db.AddTable("niltable", nilTable)
2973-
db.AddTable("newlinetable", newlineTable)
2974-
db.AddTable("typestable", typestable)
3108+
for name, table := range tables {
3109+
if name != "other_table" {
3110+
db.AddTable(name, table)
3111+
}
3112+
}
29753113

29763114
db2 := memory.NewDatabase("foo")
2977-
db2.AddTable("other_table", table4)
3115+
db2.AddTable("other_table", tables["other_table"])
29783116

29793117
catalog := sql.NewCatalog()
29803118
catalog.AddDatabase(db)
@@ -2988,7 +3126,14 @@ func newEngineWithParallelism(t *testing.T, parallelism int) *sqle.Engine {
29883126
a = analyzer.NewDefault(catalog)
29893127
}
29903128

2991-
return sqle.New(catalog, a, new(sqle.Config))
3129+
if driver != nil {
3130+
catalog.RegisterIndexDriver(driver)
3131+
}
3132+
3133+
engine := sqle.New(catalog, a, new(sqle.Config))
3134+
require.NoError(t, engine.Init())
3135+
3136+
return engine
29923137
}
29933138

29943139
const expectedTree = `Limit(5)

0 commit comments

Comments
 (0)