Skip to content

Commit 2c6d10c

Browse files
authored
Extract 'BlockingBridge' for IO operations (#1933)
1 parent 88868be commit 2c6d10c

File tree

13 files changed

+102
-23
lines changed

13 files changed

+102
-23
lines changed

save-backend/src/main/kotlin/com/saveourtool/save/backend/configs/ApplicationConfiguration.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.saveourtool.save.backend.configs
22

33
import com.saveourtool.save.service.LogService
44
import com.saveourtool.save.service.LokiLogService
5+
import com.saveourtool.save.utils.BlockingBridge
56
import org.springframework.boot.actuate.autoconfigure.metrics.orm.jpa.HibernateMetricsAutoConfiguration
67
import org.springframework.boot.autoconfigure.ImportAutoConfiguration
78
import org.springframework.boot.autoconfigure.domain.EntityScan
@@ -23,4 +24,10 @@ class ApplicationConfiguration {
2324
*/
2425
@Bean
2526
fun logService(configProperties: ConfigProperties): LogService = LokiLogService.createOrStub(configProperties.loki)
27+
28+
/**
29+
* @return [BlockingBridge]
30+
*/
31+
@Bean
32+
fun blockingBridge(): BlockingBridge = BlockingBridge.default
2633
}

save-backend/src/main/kotlin/com/saveourtool/save/backend/storage/FileS3KeyManager.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import com.saveourtool.save.entities.Project
1010
import com.saveourtool.save.entities.toEntity
1111
import com.saveourtool.save.storage.concatS3Key
1212
import com.saveourtool.save.storage.key.AbstractS3KeyDtoManager
13+
import com.saveourtool.save.utils.BlockingBridge
1314
import com.saveourtool.save.utils.orNotFound
1415

1516
import org.springframework.data.repository.findByIdOrNull
@@ -24,11 +25,13 @@ import kotlinx.datetime.toJavaLocalDateTime
2425
class FileS3KeyManager(
2526
configProperties: ConfigProperties,
2627
fileRepository: FileRepository,
28+
blockingBridge: BlockingBridge,
2729
private val projectService: ProjectService,
2830
private val executionService: ExecutionService,
2931
) : AbstractS3KeyDtoManager<FileDto, File, FileRepository>(
3032
concatS3Key(configProperties.s3Storage.prefix, "storage"),
3133
fileRepository,
34+
blockingBridge,
3235
) {
3336
override fun createNewEntityFromDto(dto: FileDto): File = dto.toEntity {
3437
projectService.findByNameAndOrganizationNameAndCreatedStatus(dto.projectCoordinates.projectName, dto.projectCoordinates.organizationName)

save-backend/src/main/kotlin/com/saveourtool/save/backend/storage/TestsSourceSnapshotS3KeyManager.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import com.saveourtool.save.entities.TestsSourceSnapshot.Companion.toEntity
1111
import com.saveourtool.save.storage.concatS3Key
1212
import com.saveourtool.save.storage.key.AbstractS3KeyDtoManager
1313
import com.saveourtool.save.test.TestsSourceSnapshotDto
14+
import com.saveourtool.save.utils.BlockingBridge
1415
import com.saveourtool.save.utils.getByIdOrNotFound
16+
1517
import org.springframework.stereotype.Component
1618

1719
/**
@@ -21,12 +23,14 @@ import org.springframework.stereotype.Component
2123
class TestsSourceSnapshotS3KeyManager(
2224
configProperties: ConfigProperties,
2325
testsSourceSnapshotRepository: TestsSourceSnapshotRepository,
26+
blockingBridge: BlockingBridge,
2427
private val testSuitesSourceRepository: TestSuitesSourceRepository,
2528
private val testSuitesService: TestSuitesService,
2629
private val executionService: ExecutionService,
2730
) : AbstractS3KeyDtoManager<TestsSourceSnapshotDto, TestsSourceSnapshot, TestsSourceSnapshotRepository>(
2831
concatS3Key(configProperties.s3Storage.prefix, "tests-source-snapshot"),
2932
testsSourceSnapshotRepository,
33+
blockingBridge
3034
) {
3135
override fun createNewEntityFromDto(dto: TestsSourceSnapshotDto): TestsSourceSnapshot = dto.toEntity { testSuitesSourceRepository.getByIdOrNotFound(it) }
3236

save-backend/src/main/resources/application-dev.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ backend.s3-storage.presigned-endpoint=http://host.docker.internal:9000
99
backend.s3-storage.bucketName=cnb
1010
backend.s3-storage.credentials.accessKeyId=admin
1111
backend.s3-storage.credentials.secretAccessKey=adminadmin
12+
backend.s3-storage.createBucketIfNotExists=true

save-cloud-common/src/jvmMain/kotlin/com/saveourtool/save/storage/DefaultStorageCoroutines.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import kotlinx.coroutines.future.asDeferred
2323
import kotlinx.coroutines.reactive.asFlow
2424
import kotlinx.coroutines.reactive.asPublisher
2525

26-
import kotlinx.coroutines.withContext
27-
2826
/**
2927
* S3 implementation of [StorageCoroutines]
3028
*
@@ -171,7 +169,7 @@ class DefaultStorageCoroutines<K : Any>(
171169

172170
private suspend fun <R> S3KeyManager<K>.callAsSuspend(function: S3KeyManager<K>.() -> R): R =
173171
if (s3KeyManager is AbstractS3KeyDatabaseManager<*, *, *>) {
174-
withContext(s3KeyManager.ioDispatcher) {
172+
s3KeyManager.blockingBridge.blockingToSuspend {
175173
function(this@callAsSuspend)
176174
}
177175
} else {

save-cloud-common/src/jvmMain/kotlin/com/saveourtool/save/storage/DefaultStorageProjectReactor.kt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,10 @@ class DefaultStorageProjectReactor<K : Any>(
156156

157157
private fun createNewS3Key(key: K): Mono<String> = s3KeyManager.callAsMono { createNewS3Key(key) }
158158

159-
private fun <R : Any> S3KeyManager<K>.callAsMono(function: S3KeyManager<K>.() -> R?): Mono<R> = { function(this) }
160-
.toMono()
161-
.let {
159+
private fun <R : Any> S3KeyManager<K>.callAsMono(function: S3KeyManager<K>.() -> R?): Mono<R> =
162160
if (s3KeyManager is AbstractS3KeyDatabaseManager<*, *, *>) {
163-
it.subscribeOn(s3KeyManager.ioScheduler)
161+
s3KeyManager.blockingBridge.blockingToMono { function(this) }
164162
} else {
165-
it
163+
{ function(this) }.toMono()
166164
}
167-
}
168165
}

save-cloud-common/src/jvmMain/kotlin/com/saveourtool/save/storage/key/AbstractS3KeyDatabaseManager.kt

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,23 @@ package com.saveourtool.save.storage.key
22

33
import com.saveourtool.save.spring.entity.BaseEntity
44
import com.saveourtool.save.spring.repository.BaseEntityRepository
5+
import com.saveourtool.save.utils.BlockingBridge
56
import com.saveourtool.save.utils.orNotFound
67

78
import org.springframework.data.repository.findByIdOrNull
89
import org.springframework.transaction.annotation.Transactional
9-
import reactor.core.scheduler.Scheduler
10-
import reactor.core.scheduler.Schedulers
11-
12-
import kotlinx.coroutines.CoroutineDispatcher
13-
import kotlinx.coroutines.Dispatchers
1410

1511
/**
1612
* Implementation of [S3KeyManager] which stores keys in database
1713
*
1814
* @param prefix a common prefix for all keys in S3 storage for this storage
1915
* @property repository repository for [E]
20-
* @property ioScheduler
21-
* @property ioDispatcher
16+
* @property blockingBridge
2217
*/
2318
abstract class AbstractS3KeyDatabaseManager<K : Any, E : BaseEntity, R : BaseEntityRepository<E>>(
2419
prefix: String,
2520
protected val repository: R,
26-
val ioScheduler: Scheduler = Schedulers.boundedElastic(),
27-
val ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
21+
val blockingBridge: BlockingBridge,
2822
) : S3KeyManager<K> {
2923
/**
3024
* [S3KeyManager] with [Long] as key (it's [ID][com.saveourtool.save.spring.entity.BaseEntity.requiredId])

save-cloud-common/src/jvmMain/kotlin/com/saveourtool/save/storage/key/AbstractS3KeyDtoManager.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.saveourtool.save.storage.key
33
import com.saveourtool.save.entities.DtoWithId
44
import com.saveourtool.save.spring.entity.BaseEntityWithDtoWithId
55
import com.saveourtool.save.spring.repository.BaseEntityRepository
6+
import com.saveourtool.save.utils.BlockingBridge
67
import com.saveourtool.save.utils.orNotFound
78
import org.springframework.data.repository.findByIdOrNull
89

@@ -11,11 +12,13 @@ import org.springframework.data.repository.findByIdOrNull
1112
*
1213
* @param prefix a common prefix for all keys in S3 storage for this storage
1314
* @param repository repository for [E] which is entity for [K]
15+
* @param blockingBridge
1416
*/
1517
abstract class AbstractS3KeyDtoManager<K : DtoWithId, E : BaseEntityWithDtoWithId<K>, R : BaseEntityRepository<E>>(
1618
prefix: String,
1719
repository: R,
18-
) : AbstractS3KeyDatabaseManager<K, E, R>(prefix, repository) {
20+
blockingBridge: BlockingBridge,
21+
) : AbstractS3KeyDatabaseManager<K, E, R>(prefix, repository, blockingBridge) {
1922
override fun E.toKey(): K = toDto()
2023

2124
override fun K.toEntity(): E = createNewEntityFromDto(this)

save-cloud-common/src/jvmMain/kotlin/com/saveourtool/save/storage/key/AbstractS3KeyEntityManager.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.saveourtool.save.storage.key
22

33
import com.saveourtool.save.spring.entity.BaseEntity
44
import com.saveourtool.save.spring.repository.BaseEntityRepository
5+
import com.saveourtool.save.utils.BlockingBridge
56
import com.saveourtool.save.utils.orNotFound
67
import org.springframework.data.repository.findByIdOrNull
78

@@ -10,13 +11,16 @@ import org.springframework.data.repository.findByIdOrNull
1011
*
1112
* @param prefix a common prefix for all keys in S3 storage for this storage
1213
* @param repository repository for [E]
14+
* @param blockingBridge
1315
*/
1416
abstract class AbstractS3KeyEntityManager<E : BaseEntity, R : BaseEntityRepository<E>>(
1517
prefix: String,
1618
repository: R,
19+
blockingBridge: BlockingBridge,
1720
) : AbstractS3KeyDatabaseManager<E, E, R>(
1821
prefix,
1922
repository,
23+
blockingBridge,
2024
) {
2125
override fun E.toKey(): E = this
2226

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.saveourtool.save.utils
2+
3+
import org.jetbrains.annotations.NonBlocking
4+
import reactor.core.publisher.Flux
5+
import reactor.core.publisher.Mono
6+
import reactor.core.scheduler.Scheduler
7+
import reactor.core.scheduler.Schedulers
8+
import reactor.kotlin.core.publisher.toMono
9+
10+
import kotlinx.coroutines.CoroutineDispatcher
11+
import kotlinx.coroutines.Dispatchers
12+
import kotlinx.coroutines.withContext
13+
14+
/**
15+
* A bridge for blocking (IO) operations
16+
*
17+
* @property ioScheduler [Scheduler] for IO operations for [Mono] and [Flux]
18+
* @property ioDispatcher [CoroutineDispatcher] for IO operations in suspend function
19+
*/
20+
class BlockingBridge(
21+
val ioScheduler: Scheduler = Schedulers.boundedElastic(),
22+
val ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
23+
) {
24+
/**
25+
* Taking from https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
26+
*
27+
* @param supplier blocking operation like JDBC
28+
* @return [Mono] from result of blocking operation [T]
29+
* @see blockingToFlux
30+
*/
31+
@NonBlocking
32+
fun <T : Any> blockingToMono(supplier: () -> T?): Mono<T> = supplier.toMono().subscribeOn(ioScheduler)
33+
34+
/**
35+
* @param supplier blocking operation like JDBC
36+
* @return [Flux] from result of blocking operation [List] of [T]
37+
* @see blockingToMono
38+
*/
39+
@NonBlocking
40+
fun <T> blockingToFlux(supplier: () -> Iterable<T>): Flux<T> = blockingToMono(supplier).flatMapIterable { it }
41+
42+
/**
43+
* @param supplier blocking operation like JDBC
44+
* @return suspend result of blocking operation [T]
45+
* @see blockingToMono
46+
*/
47+
@NonBlocking
48+
suspend fun <T> blockingToSuspend(supplier: () -> T): T = withContext(ioDispatcher) {
49+
supplier()
50+
}
51+
52+
companion object {
53+
/**
54+
* A default instance of [BlockingBridge]
55+
*/
56+
val default: BlockingBridge = BlockingBridge()
57+
}
58+
}

0 commit comments

Comments
 (0)