Skip to content

Commit 89515e2

Browse files
authored
Left/Right Outer support for equi and non-equi joins (#162)
1 parent ab330be commit 89515e2

File tree

13 files changed

+423
-98
lines changed

13 files changed

+423
-98
lines changed

src/enclave/Enclave/BroadcastNestedLoopJoin.cpp

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,99 @@ void broadcast_nested_loop_join(uint8_t *join_expr, size_t join_expr_length, uin
1818
FlatbuffersJoinExprEvaluator join_expr_eval(join_expr, join_expr_length);
1919
const tuix::JoinType join_type = join_expr_eval.get_join_type();
2020

21+
switch(join_type) {
22+
case tuix::JoinType_LeftSemi:
23+
case tuix::JoinType_LeftAnti:
24+
default_join(join_expr, join_expr_length,
25+
outer_rows, outer_rows_length,
26+
inner_rows, inner_rows_length,
27+
output_rows, output_rows_length);
28+
break;
29+
case tuix::JoinType_LeftOuter:
30+
case tuix::JoinType_RightOuter:
31+
outer_join(join_expr, join_expr_length,
32+
outer_rows, outer_rows_length,
33+
inner_rows, inner_rows_length,
34+
output_rows, output_rows_length);
35+
break;
36+
default:
37+
throw std::runtime_error(
38+
std::string("Join type not supported: ")
39+
+ std::string(to_string(join_type)));
40+
}
41+
}
42+
43+
void outer_join(
44+
uint8_t *join_expr, size_t join_expr_length,
45+
uint8_t *outer_rows, size_t outer_rows_length,
46+
uint8_t *inner_rows, size_t inner_rows_length,
47+
uint8_t **output_rows, size_t *output_rows_length) {
48+
49+
FlatbuffersJoinExprEvaluator join_expr_eval(join_expr, join_expr_length);
50+
const tuix::JoinType join_type = join_expr_eval.get_join_type();
51+
52+
RowReader outer_r(BufferRefView<tuix::EncryptedBlocks>(outer_rows, outer_rows_length));
53+
RowWriter w;
54+
55+
FlatbuffersTemporaryRow last_inner;
56+
57+
while (outer_r.has_next()) {
58+
const tuix::Row *outer = outer_r.next();
59+
bool o_i_match = false;
60+
RowReader inner_r(BufferRefView<tuix::EncryptedBlocks>(inner_rows, inner_rows_length));
61+
62+
// Use peek() to get the schema of the inner table
63+
const tuix::Row *inner = inner_r.peek();
64+
65+
while (inner_r.has_next()) {
66+
inner = inner_r.next();
67+
bool condition_met = join_expr_eval.is_right_join() ?
68+
join_expr_eval.eval_condition(inner, outer) :
69+
join_expr_eval.eval_condition(outer, inner);
70+
if (!inner->is_dummy() && condition_met) {
71+
switch(join_type) {
72+
case tuix::JoinType_LeftOuter:
73+
w.append(outer, inner);
74+
break;
75+
case tuix::JoinType_RightOuter:
76+
w.append(inner, outer);
77+
break;
78+
default:
79+
break;
80+
}
81+
o_i_match |= condition_met;
82+
}
83+
}
84+
85+
switch(join_type) {
86+
case tuix::JoinType_LeftOuter:
87+
if (!o_i_match) {
88+
// Values of inner (right) do not matter: they are all set to null
89+
w.append(outer, inner, false, true);
90+
}
91+
break;
92+
case tuix::JoinType_RightOuter:
93+
if (!o_i_match) {
94+
// Values of inner (left) do not matter: they are all set to null
95+
w.append(inner, outer, true, false);
96+
}
97+
break;
98+
default:
99+
break;
100+
}
101+
}
102+
w.output_buffer(output_rows, output_rows_length);
103+
}
104+
105+
void default_join(
106+
uint8_t *join_expr, size_t join_expr_length,
107+
uint8_t *outer_rows, size_t outer_rows_length,
108+
uint8_t *inner_rows, size_t inner_rows_length,
109+
uint8_t **output_rows, size_t *output_rows_length) {
110+
111+
FlatbuffersJoinExprEvaluator join_expr_eval(join_expr, join_expr_length);
112+
const tuix::JoinType join_type = join_expr_eval.get_join_type();
113+
21114
RowReader outer_r(BufferRefView<tuix::EncryptedBlocks>(outer_rows, outer_rows_length));
22115
RowWriter w;
23116

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,20 @@
11
#include <cstddef>
22
#include <cstdint>
33

4-
void broadcast_nested_loop_join(uint8_t *join_expr, size_t join_expr_length, uint8_t *outer_rows,
5-
size_t outer_rows_length, uint8_t *inner_rows,
6-
size_t inner_rows_length, uint8_t **output_rows,
7-
size_t *output_rows_length);
4+
void broadcast_nested_loop_join(
5+
uint8_t *join_expr, size_t join_expr_length,
6+
uint8_t *outer_rows, size_t outer_rows_length,
7+
uint8_t *inner_rows, size_t inner_rows_length,
8+
uint8_t **output_rows, size_t *output_rows_length);
9+
10+
void outer_join(
11+
uint8_t *join_expr, size_t join_expr_length,
12+
uint8_t *outer_rows, size_t outer_rows_length,
13+
uint8_t *inner_rows, size_t inner_rows_length,
14+
uint8_t **output_rows, size_t *output_rows_length);
15+
16+
void default_join(
17+
uint8_t *join_expr, size_t join_expr_length,
18+
uint8_t *outer_rows, size_t outer_rows_length,
19+
uint8_t *inner_rows, size_t inner_rows_length,
20+
uint8_t **output_rows, size_t *output_rows_length);

src/enclave/Enclave/ExpressionEvaluation.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1683,6 +1683,15 @@ class FlatbuffersJoinExprEvaluator {
16831683

16841684
tuix::JoinType get_join_type() { return join_type; }
16851685

1686+
bool is_right_join() {
1687+
return join_type == tuix::JoinType_RightOuter;
1688+
}
1689+
1690+
bool is_outer_join() {
1691+
return join_type == tuix::JoinType_LeftOuter ||
1692+
join_type == tuix::JoinType_RightOuter;
1693+
}
1694+
16861695
private:
16871696
flatbuffers::FlatBufferBuilder builder;
16881697
tuix::JoinType join_type;

src/enclave/Enclave/FlatbuffersReaders.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ const tuix::Row *RowReader::next() {
6262
return block_reader.next();
6363
}
6464

65+
const tuix::Row *RowReader::peek() {
66+
return block_reader.peek();
67+
}
68+
6569
void RowReader::init_block_reader() {
6670
if (block_idx < encrypted_blocks->blocks()->size()) {
6771
block_reader.reset(encrypted_blocks->blocks()->Get(block_idx));

src/enclave/Enclave/FlatbuffersReaders.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ class EncryptedBlockToRowReader {
1919

2020
const tuix::Row *next() { return rows->rows()->Get(row_idx++); }
2121

22+
const tuix::Row *peek() {
23+
return rows->rows()->Get(row_idx);
24+
}
25+
2226
flatbuffers::Vector<flatbuffers::Offset<tuix::Row>>::const_iterator begin() {
2327
return rows->rows()->begin();
2428
}
@@ -47,6 +51,8 @@ class RowReader {
4751
bool has_next();
4852
/** Access the next Row. Invalidates any previously-returned Row pointers. */
4953
const tuix::Row *next();
54+
/** Access the next Row without incrementing the reader. */
55+
const tuix::Row *peek();
5056

5157
private:
5258
void init_block_reader();

src/enclave/Enclave/FlatbuffersWriters.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,32 +9,32 @@ void RowWriter::clear() {
99
finished = false;
1010
}
1111

12-
void RowWriter::append(const tuix::Row *row) {
13-
rows_vector.push_back(flatbuffers_copy(row, builder));
12+
void RowWriter::append(const tuix::Row *row, bool force_null) {
13+
rows_vector.push_back(flatbuffers_copy(row, builder, force_null));
1414
total_num_rows++;
1515
maybe_finish_block();
1616
}
1717

18-
void RowWriter::append(const std::vector<const tuix::Field *> &row_fields) {
18+
void RowWriter::append(const std::vector<const tuix::Field *> &row_fields, bool force_null) {
1919
flatbuffers::uoffset_t num_fields = row_fields.size();
2020
std::vector<flatbuffers::Offset<tuix::Field>> field_values(num_fields);
2121
for (flatbuffers::uoffset_t i = 0; i < num_fields; i++) {
22-
field_values[i] = flatbuffers_copy<tuix::Field>(row_fields[i], builder);
22+
field_values[i] = flatbuffers_copy<tuix::Field>(row_fields[i], builder, force_null);
2323
}
2424
rows_vector.push_back(tuix::CreateRowDirect(builder, &field_values));
2525
total_num_rows++;
2626
maybe_finish_block();
2727
}
2828

29-
void RowWriter::append(const tuix::Row *row1, const tuix::Row *row2) {
29+
void RowWriter::append(const tuix::Row *row1, const tuix::Row *row2, bool row1_force_null, bool row2_force_null) {
3030
flatbuffers::uoffset_t num_fields = row1->field_values()->size() + row2->field_values()->size();
3131
std::vector<flatbuffers::Offset<tuix::Field>> field_values(num_fields);
3232
flatbuffers::uoffset_t i = 0;
3333
for (auto it = row1->field_values()->begin(); it != row1->field_values()->end(); ++it, ++i) {
34-
field_values[i] = flatbuffers_copy<tuix::Field>(*it, builder);
34+
field_values[i] = flatbuffers_copy<tuix::Field>(*it, builder, row1_force_null);
3535
}
3636
for (auto it = row2->field_values()->begin(); it != row2->field_values()->end(); ++it, ++i) {
37-
field_values[i] = flatbuffers_copy<tuix::Field>(*it, builder);
37+
field_values[i] = flatbuffers_copy<tuix::Field>(*it, builder, row2_force_null);
3838
}
3939
rows_vector.push_back(tuix::CreateRowDirect(builder, &field_values));
4040
total_num_rows++;

src/enclave/Enclave/FlatbuffersWriters.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,13 @@ class RowWriter {
2828
void clear();
2929

3030
/** Append the given Row. */
31-
void append(const tuix::Row *row);
31+
void append(const tuix::Row *row, bool force_null = false);
3232

3333
/** Append the given `Field`s as a Row. */
34-
void append(const std::vector<const tuix::Field *> &row_fields);
34+
void append(const std::vector<const tuix::Field *> &row_fields, bool force_null = false);
3535

36-
/** Concatenate the fields of the two given `Row`s and append the resulting
37-
* single Row. */
38-
void append(const tuix::Row *row1, const tuix::Row *row2);
36+
/** Concatenate the fields of the two given `Row`s and append the resulting single Row. */
37+
void append(const tuix::Row *row1, const tuix::Row *row2, bool row1_force_null = false, bool row2_force_null = false);
3938

4039
/** Expose the stored rows as a buffer. */
4140
UntrustedBufferRef<tuix::EncryptedBlocks> output_buffer();

src/enclave/Enclave/NonObliviousSortMergeJoin.cpp

Lines changed: 65 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,27 @@ void test_rows_same_group(FlatbuffersJoinExprEvaluator &join_expr_eval, const tu
1919
}
2020
}
2121

22-
void write_output_rows(RowWriter &group, RowWriter &w) {
23-
auto group_buffer = group.output_buffer();
24-
RowReader group_reader(group_buffer.view());
25-
26-
while (group_reader.has_next()) {
27-
const tuix::Row *row = group_reader.next();
28-
w.append(row);
29-
}
22+
void write_output_rows(RowWriter &input,
23+
RowWriter &output,
24+
tuix::JoinType join_type,
25+
const tuix::Row *foreign_row = nullptr) {
26+
auto input_buffer = input.output_buffer();
27+
RowReader input_reader(input_buffer.view());
28+
29+
while (input_reader.has_next()) {
30+
const tuix::Row *row = input_reader.next();
31+
if (foreign_row == nullptr) {
32+
output.append(row);
33+
} else if (join_type == tuix::JoinType_LeftOuter) {
34+
output.append(row, foreign_row, false, true);
35+
} else if (join_type == tuix::JoinType_RightOuter) {
36+
output.append(foreign_row, row, true, false);
37+
} else {
38+
throw std::runtime_error(
39+
std::string("write_output_rows should not take a foreign row with join type ")
40+
+ to_string(join_type));
41+
}
42+
}
3043
}
3144

3245
/**
@@ -61,32 +74,43 @@ void non_oblivious_sort_merge_join(uint8_t *join_expr, size_t join_expr_length,
6174
RowWriter w;
6275

6376
RowWriter primary_group;
64-
FlatbuffersTemporaryRow last_primary_of_group;
6577
RowWriter primary_matched_rows,
66-
primary_unmatched_rows; // This is only used for left semi/anti join
78+
primary_unmatched_rows; // These are used for all joins but inner
79+
FlatbuffersTemporaryRow last_primary_of_group;
80+
81+
// Used for outer rows to get the schema of the foreign table.
82+
// A "dummy" row with the desired schema is added for each partition,
83+
// so last_foreign_row.get() is guaranteed to not be null.
84+
FlatbuffersTemporaryRow last_foreign_row;
6785

6886
while (r.has_next()) {
6987
const tuix::Row *current = r.next();
88+
if (current->is_dummy()) {
89+
last_foreign_row.set(current);
90+
continue;
91+
}
7092

7193
if (join_expr_eval.is_primary(current)) {
7294
if (last_primary_of_group.get() &&
7395
join_expr_eval.is_same_group(last_primary_of_group.get(), current)) {
7496

7597
// Add this primary row to the current group
76-
// If this is a left semi/anti join, also add the rows to
77-
// primary_unmatched_rows
7898
primary_group.append(current);
79-
if (join_type == tuix::JoinType_LeftSemi || join_type == tuix::JoinType_LeftAnti) {
99+
if (join_type != tuix::JoinType_Inner) {
80100
primary_unmatched_rows.append(current);
81101
}
82102
last_primary_of_group.set(current);
83103

84104
} else {
85105
// If a new primary group is encountered
86106
if (join_type == tuix::JoinType_LeftSemi) {
87-
write_output_rows(primary_matched_rows, w);
107+
write_output_rows(primary_matched_rows, w, join_type);
88108
} else if (join_type == tuix::JoinType_LeftAnti) {
89-
write_output_rows(primary_unmatched_rows, w);
109+
write_output_rows(primary_unmatched_rows, w, join_type);
110+
} else if (join_expr_eval.is_outer_join()) {
111+
// Dummy row is always guaranteed to be the first row, so last_foreign_row.get() cannot
112+
// be null.
113+
write_output_rows(primary_unmatched_rows, w, join_type, last_foreign_row.get());
90114
}
91115

92116
primary_group.clear();
@@ -98,20 +122,27 @@ void non_oblivious_sort_merge_join(uint8_t *join_expr, size_t join_expr_length,
98122
last_primary_of_group.set(current);
99123
}
100124
} else {
101-
if (last_primary_of_group.get() &&
102-
join_expr_eval.is_same_group(last_primary_of_group.get(), current)) {
103-
if (join_type == tuix::JoinType_Inner) {
125+
last_foreign_row.set(current);
126+
if (last_primary_of_group.get()
127+
&& join_expr_eval.is_same_group(last_primary_of_group.get(), current)) {
128+
if (join_type == tuix::JoinType_Inner || join_expr_eval.is_outer_join()) {
104129
auto primary_group_buffer = primary_group.output_buffer();
105130
RowReader primary_group_reader(primary_group_buffer.view());
131+
106132
while (primary_group_reader.has_next()) {
107133
const tuix::Row *primary = primary_group_reader.next();
108134
test_rows_same_group(join_expr_eval, primary, current);
109135

110136
if (join_expr_eval.eval_condition(primary, current)) {
111-
w.append(primary, current);
137+
if (join_expr_eval.is_right_join()) {
138+
w.append(current, primary);
139+
} else {
140+
w.append(primary, current);
141+
}
112142
}
113143
}
114-
} else if (join_type == tuix::JoinType_LeftSemi || join_type == tuix::JoinType_LeftAnti) {
144+
}
145+
if (join_type != tuix::JoinType_Inner) {
115146
auto primary_unmatched_rows_buffer = primary_unmatched_rows.output_buffer();
116147
RowReader primary_unmatched_rows_reader(primary_unmatched_rows_buffer.view());
117148
RowWriter new_primary_unmatched_rows;
@@ -128,20 +159,25 @@ void non_oblivious_sort_merge_join(uint8_t *join_expr, size_t join_expr_length,
128159

129160
// Reset primary_unmatched_rows
130161
primary_unmatched_rows.clear();
131-
auto new_primary_unmatched_rows_buffer = new_primary_unmatched_rows.output_buffer();
132-
RowReader new_primary_unmatched_rows_reader(new_primary_unmatched_rows_buffer.view());
133-
while (new_primary_unmatched_rows_reader.has_next()) {
134-
primary_unmatched_rows.append(new_primary_unmatched_rows_reader.next());
135-
}
162+
write_output_rows(new_primary_unmatched_rows, primary_unmatched_rows, join_type);
136163
}
137164
}
138165
}
139166
}
140167

141-
if (join_type == tuix::JoinType_LeftSemi) {
142-
write_output_rows(primary_matched_rows, w);
143-
} else if (join_type == tuix::JoinType_LeftAnti) {
144-
write_output_rows(primary_unmatched_rows, w);
168+
switch (join_type) {
169+
case tuix::JoinType_LeftSemi:
170+
write_output_rows(primary_matched_rows, w, join_type);
171+
break;
172+
case tuix::JoinType_LeftAnti:
173+
write_output_rows(primary_unmatched_rows, w, join_type);
174+
break;
175+
case tuix::JoinType_LeftOuter:
176+
case tuix::JoinType_RightOuter:
177+
write_output_rows(primary_unmatched_rows, w, join_type, last_foreign_row.get());
178+
break;
179+
default:
180+
break;
145181
}
146182

147183
w.output_buffer(output_rows, output_rows_length);

0 commit comments

Comments
 (0)