Skip to content

Commit 36b921a

Browse files
committed
load database schema
1 parent c0fbd97 commit 36b921a

23 files changed

+233
-83
lines changed

src/SciSharp.MySQL.Replication/ColumnMetadata.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class ColumnMetadata
2525
/// <summary>
2626
/// Gets or sets the metadata value associated with the column.
2727
/// </summary>
28-
public ushort MetadataValue { get; set; }
28+
public byte[] MetadataValue { get; set; }
2929

3030
/// <summary>
3131
/// Gets or sets the column max value length.

src/SciSharp.MySQL.Replication/Events/EnumTypeTableMetadataInitializer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ private bool IsEnumColumn(ColumnMetadata columnMetadata)
2626
if (columnMetadata.Type != ColumnType.STRING)
2727
return false;
2828

29-
var meta0 = columnMetadata.MetadataValue >> 8;
29+
var meta0 = columnMetadata.MetadataValue[0];
3030

31-
if (meta0 != (int)ColumnType.ENUM)
31+
if (meta0 != (byte)ColumnType.ENUM)
3232
return false;
3333

3434
columnMetadata.UnderlyingType = ColumnType.ENUM;

src/SciSharp.MySQL.Replication/Events/SetTypeTableMetadataInitializer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ private bool IsSetColumn(ColumnMetadata columnMetadata)
2727
if (columnMetadata.Type != ColumnType.STRING)
2828
return false;
2929

30-
var meta0 = columnMetadata.MetadataValue >> 8;
30+
var meta0 = columnMetadata.MetadataValue[0];
3131

32-
if (meta0 != (int)ColumnType.SET)
32+
if (meta0 != (byte)ColumnType.SET)
3333
return false;
3434

3535
columnMetadata.UnderlyingType = ColumnType.SET;

src/SciSharp.MySQL.Replication/Events/TableMapEvent.cs

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public sealed class TableMapEvent : LogEvent
4444
/// <summary>
4545
/// Gets or sets the array of metadata for each column.
4646
/// </summary>
47-
public int[] ColumnMetadata { get; set; }
47+
public byte[][] ColumnMetadata { get; set; }
4848

4949
/// <summary>
5050
/// Gets or sets the bitmap of columns that can be null.
@@ -78,6 +78,8 @@ public TableMapEvent()
7878
/// <param name="context">The context for decoding.</param>
7979
protected internal override void DecodeBody(ref SequenceReader<byte> reader, object context)
8080
{
81+
var repState = (ReplicationState)context;
82+
8183
TableID = reader.ReadLong(6);
8284

8385
reader.Advance(2); // skip flags
@@ -91,15 +93,22 @@ protected internal override void DecodeBody(ref SequenceReader<byte> reader, obj
9193
reader.TryRead(out len);
9294
TableName = reader.ReadString(len);
9395

96+
if (!repState.TableSchemaMap.TryGetValue($"{SchemaName}.{TableName}", out var tableSchema))
97+
{
98+
throw new Exception($"Table {SchemaName}.{TableName} not found in schema map.");
99+
}
100+
94101
reader.TryRead(out len);// 0x00
95102

96103
ColumnCount = (int)reader.ReadLengthEncodedInteger();
97104
ColumnTypes = reader.Sequence.Slice(reader.Consumed, ColumnCount).ToArray();
98105
reader.Advance(ColumnCount);
99106

100-
reader.ReadLengthEncodedInteger();
107+
var columnMetadataValuesLength = reader.ReadLengthEncodedInteger();
101108

102-
ColumnMetadata = ReadColumnMetadata(ref reader, ColumnTypes);
109+
var metadataReader = new SequenceReader<byte>(reader.Sequence.Slice(reader.Consumed, columnMetadataValuesLength));
110+
ColumnMetadata = ReadColumnMetadata(ref metadataReader, ColumnTypes, tableSchema);
111+
reader.Advance(columnMetadataValuesLength);
103112

104113
NullBitmap = reader.ReadBitArray(ColumnCount);
105114

@@ -122,10 +131,7 @@ protected internal override void DecodeBody(ref SequenceReader<byte> reader, obj
122131
}
123132
}
124133

125-
if (context is ReplicationState repState)
126-
{
127-
repState.TableMap[TableID] = this;
128-
}
134+
repState.TableMap[TableID] = this;
129135
}
130136

131137
/// <summary>
@@ -142,47 +148,64 @@ public override string ToString()
142148
/// </summary>
143149
/// <param name="reader">The sequence reader containing the binary data.</param>
144150
/// <param name="columnTypes">The array of column types.</param>
151+
/// <param name="tableSchema">The schema of the table.</param>
145152
/// <returns>An array of metadata for each column.</returns>
146-
private int[] ReadColumnMetadata(ref SequenceReader<byte> reader, byte[] columnTypes)
153+
private byte[][] ReadColumnMetadata(ref SequenceReader<byte> reader, byte[] columnTypes, TableSchema tableSchema)
147154
{
148-
var columnMetadata = new int[columnTypes.Length];
155+
var columnMetadata = new byte[columnTypes.Length][];
149156

150157
for (int i = 0; i < columnTypes.Length; i++)
151158
{
159+
var columnSchema = tableSchema.Columns[i];
160+
152161
switch((ColumnType)columnTypes[i])
153162
{
154163
case ColumnType.FLOAT:
155164
case ColumnType.DOUBLE:
156165
case ColumnType.BLOB:
157166
case ColumnType.JSON:
158167
case ColumnType.GEOMETRY:
159-
columnMetadata[i] = (int)reader.ReadLong(1);
168+
columnMetadata[i] = ReadColumnMetadataValue(ref reader, 1);
160169
break;
161-
case ColumnType.BIT:
162170
case ColumnType.VARCHAR:
163-
case ColumnType.NEWDECIMAL:
164-
columnMetadata[i] = (int)reader.ReadLong(2);
171+
columnMetadata[i] = ReadColumnMetadataValue(ref reader, columnSchema.ColumnSize < 256 ? 1 : 2);
165172
break;
173+
case ColumnType.BIT:
174+
case ColumnType.NEWDECIMAL:
166175
case ColumnType.SET:
167176
case ColumnType.ENUM:
168177
case ColumnType.STRING:
169-
reader.TryReadBigEndian(out short value);
170-
columnMetadata[i] = (int)value;
178+
columnMetadata[i] = ReadColumnMetadataValue(ref reader, 2);
171179
break;
172180
case ColumnType.TIME_V2:
173181
case ColumnType.DATETIME_V2:
174182
case ColumnType.TIMESTAMP_V2:
175-
columnMetadata[i] = (int)reader.ReadLong(1);
183+
columnMetadata[i] = ReadColumnMetadataValue(ref reader, 1);
176184
break;
177185
default:
178-
columnMetadata[i] = 0;
179186
break;
180187
}
181188
}
182189

183190
return columnMetadata;
184191
}
185192

193+
private byte[] ReadColumnMetadataValue(ref SequenceReader<byte> reader, int size)
194+
{
195+
if (size == 1)
196+
return [reader.TryRead(out var b0) ? b0 : (byte)0];
197+
198+
if (size == 2)
199+
{
200+
reader.TryRead(out var b0);
201+
reader.TryRead(out var b1);
202+
203+
return [b0, b1];
204+
}
205+
206+
throw new Exception($"Unsupported column metadata size: {size}");
207+
}
208+
186209
/// <summary>
187210
/// Reads the table metadata from the binary representation.
188211
/// </summary>

src/SciSharp.MySQL.Replication/ReplicationClient.cs

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
using System;
2+
using System.Buffers.Binary;
3+
using System.Collections.Generic;
4+
using System.Data;
25
using System.IO;
3-
using System.Threading.Tasks;
6+
using System.Linq;
47
using System.Reflection;
5-
using System.Buffers.Binary;
8+
using System.Threading.Tasks;
69
using Microsoft.Extensions.Logging;
710
using MySql.Data.MySqlClient;
8-
using SuperSocket.Connection;
9-
using SuperSocket.Client;
1011
using SciSharp.MySQL.Replication.Events;
12+
using SuperSocket.Client;
13+
using SuperSocket.Connection;
1114

1215
namespace SciSharp.MySQL.Replication
1316
{
@@ -40,6 +43,8 @@ public class ReplicationClient : EasyClient<LogEvent>, IReplicationClient
4043
set { base.Logger = value; }
4144
}
4245

46+
private readonly Dictionary<string, TableSchema> _tableSchemaMap;
47+
4348
static ReplicationClient()
4449
{
4550
LogEventPackageDecoder.RegisterEmptyPayloadEventTypes(
@@ -68,11 +73,17 @@ static ReplicationClient()
6873
/// Initializes a new instance of the <see cref="ReplicationClient"/> class.
6974
/// </summary>
7075
public ReplicationClient()
71-
: base(new LogEventPipelineFilter())
76+
: this(new LogEventPipelineFilter())
7277
{
7378

7479
}
7580

81+
private ReplicationClient(LogEventPipelineFilter logEventPipelineFilter)
82+
: base(logEventPipelineFilter)
83+
{
84+
_tableSchemaMap = (logEventPipelineFilter.Context as ReplicationState).TableSchemaMap;
85+
}
86+
7687
/// <summary>
7788
/// Gets the underlying stream from a MySQL connection.
7889
/// </summary>
@@ -104,7 +115,7 @@ public async Task<LoginResult> ConnectAsync(string server, string username, stri
104115

105116
try
106117
{
107-
await mysqlConn.OpenAsync();
118+
await mysqlConn.OpenAsync().ConfigureAwait(false);
108119
}
109120
catch (Exception e)
110121
{
@@ -117,16 +128,18 @@ public async Task<LoginResult> ConnectAsync(string server, string username, stri
117128

118129
try
119130
{
120-
var binlogInfo = await GetBinlogFileNameAndPosition(mysqlConn);
131+
await LoadDatabaseSchemaAsync(mysqlConn).ConfigureAwait(false);
132+
133+
var binlogInfo = await GetBinlogFileNameAndPosition(mysqlConn).ConfigureAwait(false);
121134

122-
var binlogChecksum = await GetBinlogChecksum(mysqlConn);
123-
await ConfirmChecksum(mysqlConn);
135+
var binlogChecksum = await GetBinlogChecksum(mysqlConn).ConfigureAwait(false);
136+
await ConfirmChecksum(mysqlConn).ConfigureAwait(false);
124137
LogEvent.ChecksumType = binlogChecksum;
125138

126139
_stream = GetStreamFromMySQLConnection(mysqlConn);
127140
_serverId = serverId;
128141

129-
await StartDumpBinlog(_stream, serverId, binlogInfo.Item1, binlogInfo.Item2);
142+
await StartDumpBinlog(_stream, serverId, binlogInfo.Item1, binlogInfo.Item2).ConfigureAwait(false);
130143

131144
_connection = mysqlConn;
132145

@@ -143,7 +156,7 @@ public async Task<LoginResult> ConnectAsync(string server, string username, stri
143156
}
144157
catch (Exception e)
145158
{
146-
await mysqlConn.CloseAsync();
159+
await mysqlConn.CloseAsync().ConfigureAwait(false);
147160

148161
return new LoginResult
149162
{
@@ -153,6 +166,50 @@ public async Task<LoginResult> ConnectAsync(string server, string username, stri
153166
}
154167
}
155168

169+
private async Task LoadDatabaseSchemaAsync(MySqlConnection mysqlConn)
170+
{
171+
var tableSchemaTable = await mysqlConn.GetSchemaAsync("Columns").ConfigureAwait(false);
172+
173+
var systemDatabases = new HashSet<string>(
174+
new [] { "mysql", "information_schema", "performance_schema", "sys" },
175+
StringComparer.OrdinalIgnoreCase);
176+
177+
var userDatabaseColumns = tableSchemaTable.Rows.OfType<DataRow>()
178+
.Where(row => !systemDatabases.Contains(row.ItemArray[1].ToString()))
179+
.ToArray();
180+
181+
userDatabaseColumns.Select(row =>
182+
{
183+
var columnSizeCell = row["CHARACTER_MAXIMUM_LENGTH"];
184+
185+
return new {
186+
TableName = row["TABLE_NAME"].ToString(),
187+
DatabaseName = row["TABLE_SCHEMA"].ToString(),
188+
ColumnName = row["COLUMN_NAME"].ToString(),
189+
ColumnType = row["DATA_TYPE"].ToString(),
190+
ColumnSize = columnSizeCell == DBNull.Value ? 0 : Convert.ToUInt64(columnSizeCell),
191+
};
192+
})
193+
.GroupBy(row => new { row.TableName, row.DatabaseName })
194+
.ToList()
195+
.ForEach(group =>
196+
{
197+
var tableSchema = new TableSchema
198+
{
199+
TableName = group.Key.TableName,
200+
DatabaseName = group.Key.DatabaseName,
201+
Columns = group.Select(row => new ColumnSchema
202+
{
203+
Name = row.ColumnName,
204+
DataType = row.ColumnType,
205+
ColumnSize = row.ColumnSize
206+
}).ToList()
207+
};
208+
209+
_tableSchemaMap[$"{group.Key.DatabaseName}.{group.Key.TableName}"] = tableSchema;
210+
});
211+
}
212+
156213
/// <summary>
157214
/// Retrieves the binary log file name and position from the MySQL server.
158215
/// https://dev.mysql.com/doc/refman/5.6/en/replication-howto-masterstatus.html
@@ -164,15 +221,15 @@ private async Task<Tuple<string, int>> GetBinlogFileNameAndPosition(MySqlConnect
164221
var cmd = mysqlConn.CreateCommand();
165222
cmd.CommandText = "SHOW MASTER STATUS;";
166223

167-
using (var reader = await cmd.ExecuteReaderAsync())
224+
using (var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false))
168225
{
169226
if (!await reader.ReadAsync())
170227
throw new Exception("No binlog information has been returned.");
171228

172229
var fileName = reader.GetString(0);
173230
var position = reader.GetInt32(1);
174231

175-
await reader.CloseAsync();
232+
await reader.CloseAsync().ConfigureAwait(false);
176233

177234
return new Tuple<string, int>(fileName, position);
178235
}
@@ -190,11 +247,11 @@ private async Task<ChecksumType> GetBinlogChecksum(MySqlConnection mysqlConn)
190247

191248
using (var reader = await cmd.ExecuteReaderAsync())
192249
{
193-
if (!await reader.ReadAsync())
250+
if (!await reader.ReadAsync().ConfigureAwait(false))
194251
return ChecksumType.NONE;
195252

196253
var checksumTypeName = reader.GetString(1).ToUpper();
197-
await reader.CloseAsync();
254+
await reader.CloseAsync().ConfigureAwait(false);
198255

199256
return (ChecksumType)Enum.Parse(typeof(ChecksumType), checksumTypeName);
200257
}
@@ -209,7 +266,7 @@ private async ValueTask ConfirmChecksum(MySqlConnection mysqlConn)
209266
{
210267
var cmd = mysqlConn.CreateCommand();
211268
cmd.CommandText = "set @`master_binlog_checksum` = @@binlog_checksum;";
212-
await cmd.ExecuteNonQueryAsync();
269+
await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
213270
}
214271

215272
/// <summary>
@@ -269,8 +326,8 @@ private Memory<byte> GetDumpBinlogCommand(int serverId, string fileName, int pos
269326
private async ValueTask StartDumpBinlog(Stream stream, int serverId, string fileName, int position)
270327
{
271328
var data = GetDumpBinlogCommand(serverId, fileName, position);
272-
await stream.WriteAsync(data);
273-
await stream.FlushAsync();
329+
await stream.WriteAsync(data).ConfigureAwait(false);
330+
await stream.FlushAsync().ConfigureAwait(false);
274331
}
275332

276333
/// <summary>
@@ -307,10 +364,10 @@ public override async ValueTask CloseAsync()
307364
if (connection != null)
308365
{
309366
_connection = null;
310-
await connection.CloseAsync();
367+
await connection.CloseAsync().ConfigureAwait(false);
311368
}
312369

313-
await base.CloseAsync();
370+
await base.CloseAsync().ConfigureAwait(false);
314371
}
315372
}
316373
}

src/SciSharp.MySQL.Replication/ReplicationState.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,10 @@ class ReplicationState
1313
/// Gets or sets the dictionary mapping table IDs to their corresponding TableMapEvent objects.
1414
/// </summary>
1515
public Dictionary<long, TableMapEvent> TableMap { get; set; } = new Dictionary<long, TableMapEvent>();
16+
17+
/// <summary>
18+
/// Gets or sets the dictionary mapping table IDs to their corresponding TableSchema objects.
19+
/// </summary>
20+
public Dictionary<string, TableSchema> TableSchemaMap { get; set; } = new Dictionary<string, TableSchema>(StringComparer.OrdinalIgnoreCase);
1621
}
1722
}

src/SciSharp.MySQL.Replication/SequenceReaderExtensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ internal static long ReadLong(ref this SequenceReader<byte> reader, int length)
200200
return value;
201201
}
202202

203+
internal static ushort ReadLittleEndianShort(ref this SequenceReader<byte> reader)
204+
{
205+
reader.TryRead(out byte b0);
206+
reader.TryRead(out byte b1);
207+
208+
return (ushort) (b1 << 8 | b0);
209+
}
210+
203211
/// <summary>
204212
/// Reads a length-encoded string from the binary stream.
205213
/// </summary>

0 commit comments

Comments
 (0)