Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Demos/FeedDemo/FeedDemo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
Expand Down
184 changes: 113 additions & 71 deletions Demos/FeedDemo/Program.cs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: Demos/FeedDemo/Program.cs

Running

q -p 5001
q).u.upd:{[tbl;row] insert[tbl](row)}
q)mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$())

get many `type errors. Just running ParallelInsertRows with the q instance changed to add printing of message received i.e.

q).z.ps:{0N!x;value x}

see what's causing the errors (mismatching types for the table updates):

(".u.upd";`mytable;(0D00:00:00.000000000;`SYMBOL;49i;300))
'type

The code should prob have been

          // Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$())
          object[] row = new object[]
          {
                    new c.KTimespan(i),
                    "SYMBOL",
                    Convert.ToDouble(i)
                    300L
          };

Though since data is being sent on threads, each thread is interleaving with each other giving different order each time (as expected). Order is the same per thread, but not across threads. e.g. first run, looking at first 5 entries to table when only running parallel inserts:

q)5#mytable
time                 sym    price size
--------------------------------------
0D00:00:00.000000000 SYMBOL 0     300 
0D00:00:00.000000000 SYMBOL 75    300 
0D00:00:00.000000000 SYMBOL 50    300 
0D00:00:00.000000000 SYMBOL 25    300 
0D00:00:00.000000000 SYMBOL 26    300 

next run has different order, as expected due to threading

q)5#mytable
time                 sym    price size
--------------------------------------
0D00:00:00.000000000 SYMBOL 50    300 
0D00:00:00.000000000 SYMBOL 0     300 
0D00:00:00.000000000 SYMBOL 75    300 
0D00:00:00.000000000 SYMBOL 25    300 
0D00:00:00.000000000 SYMBOL 76    300 

Depending on use-case, this may not be desirable & will leave it off the demo as closer to programming paradigm rather than API example.

Original file line number Diff line number Diff line change
@@ -1,101 +1,143 @@
using System;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using kx;
using NLog;

namespace FeedDemo
{
static class Program
{
private static readonly ILogger Logger = LogManager.GetCurrentClassLogger();
static class Program
{
private static readonly ILogger Logger = LogManager.GetCurrentClassLogger();

private const string QFunc = ".u.upd";
private const string TableName = "mytable";
private const string QFunc = ".u.upd";
private const string TableName = "mytable";

static void Main()
static void Main()
{
string host = "localhost";
int port = 5001;
string usernamePassword = $"{Environment.UserName}:mypassword";

c connection = null;
try
{
connection = new c(host, port, usernamePassword);

//Example of 10 single row inserts to a table
InsertRows(connection);

//Parallel example of 100 single row inserts to a table
ParallelInsertRows(host, port, usernamePassword, 100, 10, 4);

//Parallel example of 1000 single row inserts to a table
ParallelInsertRows(host, port, usernamePassword, 1000, 100, 300);

//Parallel example of 1000 single row inserts to a table
ParallelInsertRows(host, port, usernamePassword, 1000, 1000, 1000);

//Example of bulk inserts to a table to improve throughput
BulkInsertRows(connection);

}
catch (Exception ex)
{
Logger.Error($"Error occurred running Feed-Demo. \r\n{ex}");
}
finally
{
if (connection != null)
{
string host = "localhost";
int port = 5001;
string usernamePassword = $"{Environment.UserName}:mypassword";

c connection = null;
try
{
connection = new c(host, port, usernamePassword);

//Example of 10 single row inserts to a table
InsertRows(connection);

//Example of bulk inserts to a table to improve throughput
BulkInsertRows(connection);

}
catch (Exception ex)
{
Logger.Error($"Error occurred running Feed-Demo. \r\n{ex}");
}
finally
{
if (connection != null)
{
connection.Close();
}
}
connection.Close();
}
}
}

private static void InsertRows(c connection)
private static void InsertRows(c connection)
{
// Single row insert - not as efficient as bulk insert
Logger.Info("Populating '{0}' table on kdb server with 10 rows...", TableName);

for (int i = 0; i < 10; i++)
{
// Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$())
object[] row = new object[]
{
// Single row insert - not as efficient as bulk insert
Logger.Info("Populating '{0}' table on kdb server with 10 rows...", TableName);

for(int i = 0; i < 10; i++)
{
// Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$())
object[] row = new object[]
{
new c.KTimespan(100),
"SYMBOL",
93.5,
300L
};
};

connection.ks(QFunc, TableName, row);
}
connection.ks(QFunc, TableName, row);
}

Logger.Info("Successfully inserted 10 rows to {0}", TableName);
}
Logger.Info("Successfully inserted 10 rows to {0}", TableName);
}
private static void ParallelInsertRows(string host, int port, string usernamePassword, int rowCount, int minThreads, int maxDegreeOfParallelism)
{
// Single row insert - not as efficient as bulk insert
Logger.Info("Populating '{0}' table on kdb server with {1} rows...", TableName, rowCount);
ThreadPool.SetMinThreads(minThreads, minThreads);

var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
};

private static void BulkInsertRows(c connection)
Parallel.For(0, rowCount, parallelOptions, i =>
{
{
// Bulk row insert - more efficient
string[] syms = new[] { "ABC", "DEF", "GHI", "JKL" };
// Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$())
object[] row = new object[]
{
new c.KTimespan(i),
"SYMBOL",
i,
300L
};
var c = new c(host, port, usernamePassword);
c.ks(QFunc, TableName, row);

c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 10);
string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 10);
double[] prices = CreateTestArray(i => i * 1.1, 10);
long[] sizes = CreateTestArray(i => (long)(i * 100), 10);
Logger.Info("Successfully inserted {1} row to {0}", TableName, i);
}
});

Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName);
Logger.Info("Successfully inserted {1} rows to {0}", TableName, rowCount);
}

connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes });
private static void BulkInsertRows(c connection)
{
// Bulk row insert - more efficient
string[] syms = new[] { "ABC", "DEF", "GHI", "JKL" };

Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName);
c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 10);
string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 10);
double[] prices = CreateTestArray(i => i * 1.1, 10);
long[] sizes = CreateTestArray(i => (long)(i * 100), 10);

connection.ks(QFunc, TableName, new c.Flip(new c.Dict(new string[] { "time", "sym", "price", "size" }, new object[] { times, symbols, prices, sizes })));
Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName);

//block until all messages are processed
connection.k(string.Empty);
}
connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes });

private static T[] CreateTestArray<T>(Func<int, T> elementBuilder, int arraySize)
{
T[] array = new T[arraySize];
Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName);

for (int i = 0; i < arraySize; i++)
{
array[i] = elementBuilder(i);
}
return array;
}
connection.ks(QFunc, TableName, new c.Flip(new c.Dict(new string[] { "time", "sym", "price", "size" }, new object[] { times, symbols, prices, sizes })));

//block until all messages are processed
connection.k(string.Empty);
}

private static T[] CreateTestArray<T>(Func<int, T> elementBuilder, int arraySize)
{
T[] array = new T[arraySize];

for (int i = 0; i < arraySize; i++)
{
array[i] = elementBuilder(i);
}
return array;
}
}
}
}
2 changes: 1 addition & 1 deletion Demos/QueryResponseDemo/QueryResponseDemo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
Expand Down
2 changes: 1 addition & 1 deletion Demos/SerializationDemo/SerializationDemo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
Expand Down
2 changes: 1 addition & 1 deletion Demos/SubscriberDemo/SubscriberDemo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
Expand Down
2 changes: 1 addition & 1 deletion kx.Benchmark.Test/kx.Benchmark.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net8</TargetFramework>
<StartupObject>kx.Benchmark.Test.Program</StartupObject>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>kx.Benchmark.Test.snk</AssemblyOriginatorKeyFile>
Expand Down
60 changes: 30 additions & 30 deletions kx.Test/TestUtils/TestSerialisationHelper.cs
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
using System;
using MessagePack;
using System;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text.Json;

namespace kx.Test.TestUtils
{
/// <summary>
/// A helper class for testing serialisation and de-serialisation logic
/// in unit-tests
/// </summary>
internal static class TestSerialisationHelper
{
/// <summary>
/// A helper class for testing serialisation and de-serialisation logic
/// in unit-tests
/// Performs binary serialisation and de-serialisation on a specified exception.
/// </summary>
internal static class TestSerialisationHelper
/// <typeparam name="T">The type of exception being tested.</typeparam>
/// <param name="exception">The exception to be serialised and de-serialised.</param>
/// <returns>
/// A de-serialised instance of the exception passed. All serialisable members should match the original.
/// </returns>
/// <remarks>
/// This is primarily intended to confirm custom exceptions within the DeltaApiCore
/// library comply to ISerialization pattern.
///
/// See https://stackoverflow.com/questions/94488/what-is-the-correct-way-to-make-a-custom-net-exception-serializable
/// </remarks>
public static T SerialiseAndDeserialiseException<T>(T exception)
where T : Exception
{
/// <summary>
/// Performs binary serialisation and de-serialisation on a specified exception.
/// </summary>
/// <typeparam name="T">The type of exception being tested.</typeparam>
/// <param name="exception">The exception to be serialised and de-serialised.</param>
/// <returns>
/// A de-serialised instance of the exception passed. All serialisable members should match the original.
/// </returns>
/// <remarks>
/// This is primarily intended to confirm custom exceptions within the DeltaApiCore
/// library comply to ISerialization pattern.
///
/// See https://stackoverflow.com/questions/94488/what-is-the-correct-way-to-make-a-custom-net-exception-serializable
/// </remarks>
public static T SerialiseAndDeserialiseException<T>(T exception)
where T : Exception
{
BinaryFormatter binaryFormatter = new BinaryFormatter();
using (var stream = new MemoryStream())
{
MessagePackSerializer.Serialize(stream, exception, MessagePack.Resolvers.ContractlessStandardResolver.Options);

using (var stream = new MemoryStream())
{
binaryFormatter.Serialize(stream, exception);
stream.Seek(0, 0);

stream.Seek(0, 0);

return (T)binaryFormatter.Deserialize(stream);
}
}
return (T)MessagePackSerializer.Deserialize<T>(stream, MessagePack.Resolvers.ContractlessStandardResolver.Options);
}
}
}
}
3 changes: 2 additions & 1 deletion kx.Test/kx.Test.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net8</TargetFramework>

<IsPackable>false</IsPackable>

Expand All @@ -25,6 +25,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="MessagePack" Version="3.1.3" />
<PackageReference Include="Moq" Version="4.15.2" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.16.1" />
Expand Down
2 changes: 1 addition & 1 deletion kx/kx.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<Title>CSharpKDB</Title>
<PackageId>CSharpKDB</PackageId>
<Description>Provides functionality for .NET applications to interface with a KDB+ process.</Description>
Expand Down