41
41
import reactor .core .publisher .Flux ;
42
42
import reactor .core .publisher .Mono ;
43
43
44
+ import org .springframework .util .Assert ;
45
+
44
46
/**
45
47
* Utilities for working with Kotlin Coroutines.
46
48
*
@@ -68,13 +70,14 @@ public static <T> Deferred<T> monoToDeferred(Mono<T> source) {
68
70
}
69
71
70
72
/**
71
- * Invoke a suspending function and converts it to {@link Mono} or
72
- * {@link Flux}. Uses an {@linkplain Dispatchers#getUnconfined() unconfined}
73
- * dispatcher.
73
+ * Invoke a suspending function and converts it to {@link Mono} or {@link Flux}.
74
+ * Uses an {@linkplain Dispatchers#getUnconfined() unconfined} dispatcher.
74
75
* @param method the suspending function to invoke
75
76
* @param target the target to invoke {@code method} on
76
- * @param args the function arguments
77
+ * @param args the function arguments. If the {@code Continuation} argument is specified as the last argument
78
+ * (typically {@code null}), it is ignored.
77
79
* @return the method invocation result as reactive stream
80
+ * @throws IllegalArgumentException if {@code method} is not a suspending function
78
81
*/
79
82
public static Publisher <?> invokeSuspendingFunction (Method method , Object target ,
80
83
Object ... args ) {
@@ -87,20 +90,22 @@ public static Publisher<?> invokeSuspendingFunction(Method method, Object target
87
90
* @param context the coroutine context to use
88
91
* @param method the suspending function to invoke
89
92
* @param target the target to invoke {@code method} on
90
- * @param args the function arguments
93
+ * @param args the function arguments. If the {@code Continuation} argument is specified as the last argument
94
+ * (typically {@code null}), it is ignored.
91
95
* @return the method invocation result as reactive stream
96
+ * @throws IllegalArgumentException if {@code method} is not a suspending function
92
97
* @since 6.0
93
98
*/
94
99
@ SuppressWarnings ("deprecation" )
95
100
public static Publisher <?> invokeSuspendingFunction (CoroutineContext context , Method method , Object target ,
96
101
Object ... args ) {
97
-
102
+ Assert . isTrue ( KotlinDetector . isSuspendingFunction ( method ), "'method' must be a suspending function" );
98
103
KFunction <?> function = Objects .requireNonNull (ReflectJvmMapping .getKotlinFunction (method ));
99
104
if (method .isAccessible () && !KCallablesJvm .isAccessible (function )) {
100
105
KCallablesJvm .setAccessible (function , true );
101
106
}
102
107
Mono <Object > mono = MonoKt .mono (context , (scope , continuation ) ->
103
- KCallables .callSuspend (function , getSuspendedFunctionArgs (target , args ), continuation ))
108
+ KCallables .callSuspend (function , getSuspendedFunctionArgs (method , target , args ), continuation ))
104
109
.filter (result -> !Objects .equals (result , Unit .INSTANCE ))
105
110
.onErrorMap (InvocationTargetException .class , InvocationTargetException ::getTargetException );
106
111
@@ -120,10 +125,11 @@ else if (returnType instanceof KClass<?> kClass &&
120
125
return mono ;
121
126
}
122
127
123
- private static Object [] getSuspendedFunctionArgs (Object target , Object ... args ) {
124
- Object [] functionArgs = new Object [args .length ];
128
+ private static Object [] getSuspendedFunctionArgs (Method method , Object target , Object ... args ) {
129
+ int length = (args .length == method .getParameterCount () - 1 ? args .length + 1 : args .length );
130
+ Object [] functionArgs = new Object [length ];
125
131
functionArgs [0 ] = target ;
126
- System .arraycopy (args , 0 , functionArgs , 1 , args . length - 1 );
132
+ System .arraycopy (args , 0 , functionArgs , 1 , length - 1 );
127
133
return functionArgs ;
128
134
}
129
135
0 commit comments