-
Notifications
You must be signed in to change notification settings - Fork 617
Explicitly acquire locks on nodes when @Version
is used.
#2259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
That's a great report, thank you! We'll investigate. |
Thanks @marianozunino we can follow the errors and we need to push out a fix. In general, your approach with version and retry is correct. Your implementation would not work due to proxy related issues (that retry wouldn't work in the nested class, I guess), but even if I fix the client code, we are not checking the version when merging the relationships. I am terribly sorry about this and it will be worked on asap. This is how I would write the retryable code: package com.example.demo;
import java.util.Optional;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class AService {
private final ARepository aRepository;
public AService(ARepository aRepository) {
this.aRepository = aRepository;
}
@Transactional
@Retryable(
include = { OptimisticLockingFailureException.class },
maxAttempts = 10,
backoff = @Backoff(delay = 50, multiplier = 2, random = true)
)
public String addB(String id) {
var a = aRepository.findById(id).get();
a.getBs().add(new B());
a.setName(System.nanoTime() + "");
a = aRepository.save(a);
return "finish " + a.getBs().size();
}
public Optional<A> findById(String s) {
return this.aRepository.findById(s);
}
public boolean existsById(String id) {
return this.aRepository.existsById(id);
}
public A save(A newA) {
return this.aRepository.save(newA);
}
public void deleteAll() {
this.aRepository.deleteAll();
}
} And import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@EnableRetry
@EnableTransactionManagement
public class DemoApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoApplication.class);
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
CommandLineRunner runner(AService aService) {
return args -> {
aService.deleteAll();
var id = "ac5cf47a-55a9-44a3-bb04-c2e88ff0a6b3";
if (!aService.existsById(id)) {
var newA = new A();
newA.setId(id);
aService.save(newA);
}
createBs(id, aService);
aService.findById(id).ifPresent(System.out::println);
};
}
private void createBs(String aId, AService aService)
throws InterruptedException, java.util.concurrent.ExecutionException {
ExecutorService EXEC = Executors.newFixedThreadPool(10);
var tasks = new ArrayList<Callable<String>>();
for (int i = 0; i < 1; i++) {
var c = new Callable<String>() {
@Override
public String call() {
return aService.addB(aId);
}
};
tasks.add(c);
}
var results = EXEC.invokeAll(tasks);
System.out.println(results);
for (var fr : results) {
System.out.println(fr.get());
}
}
} |
@Version
of the owning object when merging relationships.
Thank you so much @michael-simons :D I really appreciate it. Meanwhile I'm using a @Query("MATCH (a:A{id: $aId}) WITH a MATCH (b:B{id: $bId}) CREATE (a)-[:HAS]->(b)")
void storeContactRequest(String aId, String bId);
....
repo.save(b);
repo.storeContactRequest(a.getId(), b.getId()); |
@Version
of the owning object when merging relationships.@Version
is used.
This change brings back the original locking behaviour from Neo4j-OGM: The version is incremented inside the database and with it, a physical lock on the Node is acquired as described in https://neo4j.com/docs/java-reference/current/transaction-management/ under "Explicitly acquire a write lock". That incremented version is than used in a where clause after pipelining the matched node. The changes makes it necessary to retrieve the changed property from the database and apply it to the domain entity afterwards. The callbacks used before became superflous. This commit brings the ability to fetch single Nodes, Relationships or in general (driver) entities via the Neo4j client, allowing us to fetch the changed structure without additional mapping functions. This fixes #2259.
This change brings back the original locking behaviour from Neo4j-OGM: The version is incremented inside the database and with it, a physical lock on the Node is acquired as described in https://neo4j.com/docs/java-reference/current/transaction-management/ under "Explicitly acquire a write lock". That incremented version is than used in a where clause after pipelining the matched node. The changes makes it necessary to retrieve the changed property from the database and apply it to the domain entity afterwards. The callbacks used before became superflous. This commit brings the ability to fetch single Nodes, Relationships or in general (driver) entities via the Neo4j client, allowing us to fetch the changed structure without additional mapping functions. This fixes #2259.
Hey. This is now fixed in all 3 maintained branches. Snapshots should be available shortly. I have attached a version of your project using a snapshot build. The Thanks again for your contribution. |
Hi guys!
Im having the following issue.
I have a rest API that when It receives multiple calls to the same endpoint I end up loosing data.
For the sake of the issue I made up a simple example. It has two models:
A
andB
, andA
has aList<B>
The goal is to add Bs to A.
The example has 3 branches
with-out-version
,with-version
andwith-retry
. To simulate the concurrency I createdExecutors.newFixedThreadPool(10)
.The issue: I start with a fresh A (empty list of B). Then, each thread attempts to find that A and add a new B.
Each thread succeeded to create a new B (it shows up in neo4j), but only 1 thread succeded to add that B into A.
With out version: the explained above
With version: I get the optimistic lock exception.
With version+retry: No more exception, but I end up with the initial behaviour.
Example
Let me know if this is or isn't enough information, or If I'm taking a totally wrong aproach!
Thanks for you awesome work :D
The text was updated successfully, but these errors were encountered: