Skip to content

Commit a4b4f7b

Browse files
authored
Merge pull request #199 from DataObjects-NET/6.0-async-query-store
Fix for async execution of queries with .Store() and .In() operations
2 parents e7c82cc + d1cd0a0 commit a4b4f7b

14 files changed

+560
-121
lines changed

Orm/Xtensive.Orm.Tests/Linq/InTest.cs

Lines changed: 316 additions & 58 deletions
Large diffs are not rendered by default.

Orm/Xtensive.Orm.Tests/Linq/LocalCollectionsTest.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
using Xtensive.Orm.Tests.Linq.LocalCollectionsTest_Model;
1515
using Xtensive.Orm.Tests.ObjectModel;
1616
using Xtensive.Orm.Tests.ObjectModel.ChinookDO;
17+
using System.Threading.Tasks;
1718

1819
namespace Xtensive.Orm.Tests.Linq.LocalCollectionsTest_Model
1920
{
@@ -253,7 +254,7 @@ public void Pair2Test()
253254
[Test]
254255
public void Poco1Test()
255256
{
256-
Assert.Throws<QueryTranslationException>(() => {
257+
_ = Assert.Throws<QueryTranslationException>(() => {
257258
var pocos = Customers
258259
.Select(customer => new Poco<string>() { Value = customer.LastName })
259260
.ToList();
@@ -357,7 +358,7 @@ public void TypeLoop1Test()
357358
var nodes = new Node[10];
358359
var query = Session.Query.All<Invoice>()
359360
.Join(nodes, invoice => invoice.Customer.Address.City, node => node.Name, (invoice, node) => new {invoice, node});
360-
Assert.Throws<QueryTranslationException>(() => QueryDumper.Dump(query));
361+
_ = Assert.Throws<QueryTranslationException>(() => QueryDumper.Dump(query));
361362
}
362363

363364
[Test]
@@ -403,7 +404,7 @@ public void AllTest()
403404
[Test]
404405
public void KeyTest()
405406
{
406-
Assert.Throws<QueryTranslationException>( () => {
407+
_ = Assert.Throws<QueryTranslationException>(() => {
407408
var keys = Session.Query.All<Invoice>().Take(10).Select(invoice => invoice.Key).ToList();
408409
var query = Session.Query.All<Invoice>()
409410
.Join(keys, invoice => invoice.Key, key => key, (invoice, key) => new {invoice, key});
@@ -863,7 +864,6 @@ public void Aggregate1Test()
863864
Assert.AreEqual(result, expected);
864865
}
865866

866-
867867
[Test]
868868
public void Aggregate2Test()
869869
{
@@ -881,10 +881,27 @@ public void Aggregate2Test()
881881
QueryDumper.Dump(result);
882882
}
883883

884+
[Test]
885+
public async Task Aggregate2AsyncTest()
886+
{
887+
Require.AllFeaturesSupported(ProviderFeatures.TemporaryTables);
888+
Require.ProviderIsNot(StorageProvider.SqlServerCe);
889+
var localItems = GetLocalItems(100);
890+
var queryable = Session.Query.Store(localItems);
891+
var result = (await Session.Query.All<Invoice>()
892+
.Where(invoice => invoice.Commission > queryable.Max(poco => poco.Value2)).AsAsync()).ToList();
893+
var expected = Invoices
894+
.Where(invoice => invoice.Commission > localItems.Max(poco => poco.Value2));
895+
896+
Assert.That(result, Is.Not.Empty);
897+
Assert.AreEqual(0, expected.Except(result).Count());
898+
QueryDumper.Dump(result);
899+
}
900+
884901
[Test]
885902
public void ClosureCacheTest()
886903
{
887-
Assert.Throws<QueryTranslationException>( () => {
904+
_ = Assert.Throws<QueryTranslationException>( () => {
888905
var localItems = GetLocalItems(100);
889906
var queryable = Session.Query.Store(localItems);
890907
var result = Session.Query.Execute(qe => qe.All<Invoice>().Where(invoice => invoice.Commission > queryable.Max(poco => poco.Value2)));

Orm/Xtensive.Orm.Tests/Linq/QueryMethodTests.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
using System.Collections.Generic;
88
using System.Linq;
9+
using System.Threading.Tasks;
910
using NUnit.Framework;
1011
using Xtensive.Collections;
1112
using Xtensive.Core;
@@ -119,6 +120,29 @@ public void Store1Test()
119120
Assert.AreEqual(0, expected.Except(query).Count());
120121
}
121122

123+
[Test]
124+
public async Task Store1AsyncTest()
125+
{
126+
Require.AllFeaturesSupported(ProviderFeatures.TemporaryTables);
127+
128+
var localCustomers = Session.Query.All<Customer>().Take(10).ToList();
129+
var query = (await Session.Query.All<Customer>()
130+
.Join(
131+
Session.Query.Store(localCustomers),
132+
customer => customer,
133+
localCustomer => localCustomer,
134+
(customer, localCustomer) => new { customer, localCustomer }).AsAsync()).ToList();
135+
var expected = Session.Query.All<Customer>().AsEnumerable()
136+
.Join(
137+
Session.Query.Store(localCustomers),
138+
customer => customer,
139+
localCustomer => localCustomer,
140+
(customer, localCustomer) => new { customer, localCustomer });
141+
142+
Assert.That(query, Is.Not.Empty);
143+
Assert.AreEqual(0, expected.Except(query).Count());
144+
}
145+
122146
[Test]
123147
public void Store2Test()
124148
{
@@ -141,6 +165,28 @@ public void Store2Test()
141165
Assert.AreEqual(0, expected.Except(query).Count());
142166
}
143167

168+
[Test]
169+
public async Task Store2AsyncTest()
170+
{
171+
Require.AllFeaturesSupported(ProviderFeatures.TemporaryTables);
172+
173+
var query = (await Session.Query.All<Customer>()
174+
.Join(
175+
Session.Query.Store(Session.Query.All<Customer>().Take(10)),
176+
customer => customer,
177+
localCustomer => localCustomer,
178+
(customer, localCustomer) => new { customer, localCustomer }).AsAsync()).ToList();
179+
var expected = Session.Query.All<Customer>().AsEnumerable()
180+
.Join(
181+
Session.Query.Store(Session.Query.All<Customer>().Take(10)),
182+
customer => customer,
183+
localCustomer => localCustomer,
184+
(customer, localCustomer) => new { customer, localCustomer });
185+
186+
Assert.That(query, Is.Not.Empty);
187+
Assert.AreEqual(0, expected.Except(query).Count());
188+
}
189+
144190
private IEnumerable<Customer> GetExpectedCustomerAsSequence()
145191
{
146192
if (StorageProviderInfo.Instance.CheckProviderIs(StorageProvider.Firebird)) {

Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/SimpleCommandProcessor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,15 @@ public override async Task<IEnumerator<Tuple>> ExecuteTasksWithReaderAsync(Query
129129

130130
token.ThrowIfCancellationRequested();
131131

132-
await ExecuteTasksAsync(context, token);
132+
await ExecuteTasksAsync(context, token).ConfigureAwait(false);
133133
context.AllowPartialExecution = oldValue;
134134

135135
var lastRequestCommand = Factory.CreateCommand();
136136
var commandPart = Factory.CreateQueryPart(lastRequest);
137137
ValidateCommandParameters(commandPart);
138138
lastRequestCommand.AddPart(commandPart);
139139
token.ThrowIfCancellationRequested();
140-
await lastRequestCommand.ExecuteReaderAsync(token);
140+
await lastRequestCommand.ExecuteReaderAsync(token).ConfigureAwait(false);
141141
return lastRequestCommand.AsReaderOf(lastRequest);
142142
}
143143

Orm/Xtensive.Orm/Orm/Providers/Interfaces/IProviderExecutor.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
// Copyright (C) 2003-2010 Xtensive LLC.
2-
// All rights reserved.
3-
// For conditions of distribution and use, see license.
1+
// Copyright (C) 2009-2021 Xtensive LLC.
2+
// This code is distributed under MIT license terms.
3+
// See the License.txt file in the project root for more information.
44
// Created by: Denis Krjuchkov
55
// Created: 2009.10.30
66

@@ -38,6 +38,13 @@ public interface IProviderExecutor
3838
/// <param name="tuples">The tuples to store.</param>
3939
void Store(IPersistDescriptor descriptor, IEnumerable<Tuple> tuples);
4040

41+
/// <summary>
42+
/// Asynchronously stores the specified tuples in specified table.
43+
/// </summary>
44+
/// <param name="descriptor">Persist descriptor.</param>
45+
/// <param name="tuples">The tuples to store.</param>
46+
Task StoreAsync(EnumerationContext context, IPersistDescriptor descriptor, IEnumerable<Tuple> tuples, CancellationToken token);
47+
4148
/// <summary>
4249
/// Clears the specified table.
4350
/// </summary>

Orm/Xtensive.Orm/Orm/Providers/SqlIncludeProvider.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
using Tuple = Xtensive.Tuples.Tuple;
1515
using Xtensive.Orm.Rse;
1616
using Xtensive.Orm.Rse.Providers;
17+
using System.Threading.Tasks;
18+
using System.Threading;
1719

1820
namespace Xtensive.Orm.Providers
1921
{
@@ -51,6 +53,29 @@ protected override void OnBeforeEnumerate(Rse.Providers.EnumerationContext conte
5153
}
5254
}
5355

56+
protected override async Task OnBeforeEnumerateAsync(Rse.Providers.EnumerationContext context, CancellationToken token)
57+
{
58+
await base.OnBeforeEnumerateAsync(context, token).ConfigureAwait(false);
59+
60+
switch (Origin.Algorithm) {
61+
case IncludeAlgorithm.Auto:
62+
var filterData = filterDataSource.Invoke().ToList();
63+
if (filterData.Count > WellKnown.MaxNumberOfConditions)
64+
await LockAndStoreAsync(context, filterData, token).ConfigureAwait(false);
65+
else
66+
context.SetValue(filterDataSource, RowFilterDataName, filterData);
67+
break;
68+
case IncludeAlgorithm.ComplexCondition:
69+
// nothing
70+
break;
71+
case IncludeAlgorithm.TemporaryTable:
72+
await LockAndStoreAsync(context, filterDataSource.Invoke(), token).ConfigureAwait(false);
73+
break;
74+
default:
75+
throw new ArgumentOutOfRangeException("Origin.Algorithm");
76+
}
77+
}
78+
5479
/// <inheritdoc/>
5580
protected override void OnAfterEnumerate(Rse.Providers.EnumerationContext context)
5681
{

Orm/Xtensive.Orm/Orm/Providers/SqlSessionHandler.IProviderExecutor.cs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
// Copyright (C) 2010 Xtensive LLC.
2-
// All rights reserved.
3-
// For conditions of distribution and use, see license.
1+
// Copyright (C) 2010-2021 Xtensive LLC.
2+
// This code is distributed under MIT license terms.
3+
// See the License.txt file in the project root for more information.
44
// Created by: Alex Yakunin
55
// Created: 2010.02.09
66

77
using System.Collections.Generic;
88
using System.Threading;
99
using System.Threading.Tasks;
10+
using Xtensive.Orm.Rse.Providers;
1011
using Tuple = Xtensive.Tuples.Tuple;
1112

1213
namespace Xtensive.Orm.Providers
@@ -39,10 +40,35 @@ async Task<IEnumerator<Tuple>> IProviderExecutor.ExecuteTupleReaderAsync(QueryRe
3940
void IProviderExecutor.Store(IPersistDescriptor descriptor, IEnumerable<Tuple> tuples)
4041
{
4142
Prepare();
42-
foreach (var tuple in tuples)
43+
foreach (var tuple in tuples) {
4344
commandProcessor.RegisterTask(new SqlPersistTask(descriptor.StoreRequest, tuple));
44-
using (var context = new CommandProcessorContext())
45+
}
46+
47+
using (var context = new CommandProcessorContext()) {
4548
commandProcessor.ExecuteTasks(context);
49+
}
50+
}
51+
52+
/// <inheritdoc/>
53+
async Task IProviderExecutor.StoreAsync(EnumerationContext enumerationContext,IPersistDescriptor descriptor, IEnumerable<Tuple> tuples, CancellationToken token)
54+
{
55+
await PrepareAsync(token).ConfigureAwait(false);
56+
57+
if (tuples is ExecutableRawProvider rawProvider) {
58+
var enumerator = await rawProvider.GetEnumeratorAsync(enumerationContext, token).ConfigureAwait(false);
59+
while(enumerator.MoveNext()) {
60+
commandProcessor.RegisterTask(new SqlPersistTask(descriptor.StoreRequest, enumerator.Current));
61+
}
62+
}
63+
else {
64+
foreach (var tuple in tuples) {
65+
commandProcessor.RegisterTask(new SqlPersistTask(descriptor.StoreRequest, tuple));
66+
}
67+
}
68+
69+
using (var context = new CommandProcessorContext()) {
70+
await commandProcessor.ExecuteTasksAsync(context, token).ConfigureAwait(false);
71+
}
4672
}
4773

4874
/// <inheritdoc/>

Orm/Xtensive.Orm/Orm/Providers/SqlSessionHandler.cs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
// Copyright (C) 2003-2010 Xtensive LLC.
2-
// All rights reserved.
3-
// For conditions of distribution and use, see license.
1+
// Copyright (C) 2008-2021 Xtensive LLC.
2+
// This code is distributed under MIT license terms.
3+
// See the License.txt file in the project root for more information.
44
// Created by: Alexey Gamzov
55
// Created: 2008.05.20
66

@@ -147,38 +147,43 @@ private void Prepare()
147147
{
148148
Session.EnsureNotDisposed();
149149
driver.EnsureConnectionIsOpen(Session, connection);
150-
foreach (var script in initializationSqlScripts)
151-
using (var command = connection.CreateCommand(script))
152-
driver.ExecuteNonQuery(Session, command);
150+
foreach (var script in initializationSqlScripts) {
151+
using (var command = connection.CreateCommand(script)) {
152+
_ = driver.ExecuteNonQuery(Session, command);
153+
}
154+
}
155+
153156
initializationSqlScripts.Clear();
154-
if (pendingTransaction==null)
157+
if (pendingTransaction == null)
155158
return;
156159
var transaction = pendingTransaction;
157160
pendingTransaction = null;
158-
if (connection.ActiveTransaction==null) // Handle external transactions
161+
if (connection.ActiveTransaction == null) // Handle external transactions
159162
driver.BeginTransaction(Session, connection, IsolationLevelConverter.Convert(transaction.IsolationLevel));
160163
}
161164

162165
private async Task PrepareAsync(CancellationToken cancellationToken)
163166
{
164167
Session.EnsureNotDisposed();
165-
await driver.OpenConnectionAsync(Session, connection, cancellationToken).ConfigureAwait(false);
168+
await driver.EnsureConnectionIsOpenAsync(Session, connection, cancellationToken).ConfigureAwait(false);
166169

167170
try {
168-
foreach (var initializationSqlScript in initializationSqlScripts)
169-
using (var command = connection.CreateCommand(initializationSqlScript))
170-
await driver.ExecuteNonQueryAsync(Session, command, cancellationToken).ConfigureAwait(false);
171+
foreach (var initializationSqlScript in initializationSqlScripts) {
172+
using (var command = connection.CreateCommand(initializationSqlScript)) {
173+
_ = await driver.ExecuteNonQueryAsync(Session, command, cancellationToken).ConfigureAwait(false);
174+
}
175+
}
171176
}
172177
catch (OperationCanceledException) {
173178
connection.Close();
174179
throw;
175180
}
176181

177-
if (pendingTransaction==null)
182+
if (pendingTransaction == null)
178183
return;
179184
var transaction = pendingTransaction;
180185
pendingTransaction = null;
181-
if (connection.ActiveTransaction==null) // Handle external transactions
186+
if (connection.ActiveTransaction == null) // Handle external transactions
182187
driver.BeginTransaction(Session, connection, IsolationLevelConverter.Convert(transaction.IsolationLevel));
183188
}
184189

Orm/Xtensive.Orm/Orm/Providers/SqlStoreProvider.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
// Copyright (C) 2003-2010 Xtensive LLC.
2-
// All rights reserved.
3-
// For conditions of distribution and use, see license.
1+
// Copyright (C) 2008-2021 Xtensive LLC.
2+
// This code is distributed under MIT license terms.
3+
// See the License.txt file in the project root for more information.
44
// Created by: Dmitri Maximov
55
// Created: 2008.09.05
66

7+
using System.Threading;
8+
using System.Threading.Tasks;
79
using Xtensive.Orm.Rse.Providers;
810

911
namespace Xtensive.Orm.Providers
@@ -30,9 +32,16 @@ protected override void OnBeforeEnumerate(Rse.Providers.EnumerationContext conte
3032
LockAndStore(context, Source);
3133
}
3234

35+
/// <inheritdoc/>
36+
protected override async Task OnBeforeEnumerateAsync(Rse.Providers.EnumerationContext context, CancellationToken token)
37+
{
38+
await base.OnBeforeEnumerateAsync(context, token).ConfigureAwait(false);
39+
await LockAndStoreAsync(context, Source, token).ConfigureAwait(false);
40+
}
41+
3342
protected override void OnAfterEnumerate(Rse.Providers.EnumerationContext context)
3443
{
35-
ClearAndUnlock(context);
44+
_ = ClearAndUnlock(context);
3645
base.OnAfterEnumerate(context);
3746
}
3847

0 commit comments

Comments
 (0)