From 6b9b5241f15ce0f345972316b833c2d4085a3696 Mon Sep 17 00:00:00 2001 From: nanfeng <1004789224@qq.com> Date: Tue, 24 May 2022 16:57:14 +0800 Subject: [PATCH] first commit --- spring-context/spring-context.gradle | 2 + .../ScheduledAnnotationBeanPostProcessor.java | 8 ++- .../support/ScheduledMethodRunnable.java | 50 +++++++++++++++---- ...duledAnnotationBeanPostProcessorTests.java | 12 +++++ .../annotation/SuspendFunctionTest.kt | 13 +++++ .../springframework/core/CoroutinesUtils.java | 4 ++ 6 files changed, 79 insertions(+), 10 deletions(-) create mode 100644 spring-context/src/test/kotlin/org/springframework/scheduling/annotation/SuspendFunctionTest.kt diff --git a/spring-context/spring-context.gradle b/spring-context/spring-context.gradle index b038d4289270..07c05dcd51af 100644 --- a/spring-context/spring-context.gradle +++ b/spring-context/spring-context.gradle @@ -20,6 +20,7 @@ dependencies { optional("org.apache-extras.beanshell:bsh") optional("org.hibernate:hibernate-validator") optional("org.jetbrains.kotlin:kotlin-reflect") + optional("org.jetbrains.kotlinx:kotlinx-coroutines-core") optional("org.jetbrains.kotlin:kotlin-stdlib") optional("org.reactivestreams:reactive-streams") testImplementation(testFixtures(project(":spring-aop"))) @@ -27,6 +28,7 @@ dependencies { testImplementation(testFixtures(project(":spring-core"))) testImplementation(project(":spring-core-test")) testImplementation("io.projectreactor:reactor-core") + testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") testImplementation("org.apache.groovy:groovy-jsr223") testImplementation("org.apache.groovy:groovy-xml") testImplementation("org.apache.commons:commons-pool2") diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java index f0aae203468a..669faa584540 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java @@ -57,6 +57,7 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.EmbeddedValueResolverAware; import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.core.KotlinDetector; import org.springframework.core.MethodIntrospector; import org.springframework.core.Ordered; import org.springframework.core.annotation.AnnotatedElementUtils; @@ -529,7 +530,12 @@ protected void processScheduled(Scheduled scheduled, Method method, Object bean) * @see ScheduledMethodRunnable#ScheduledMethodRunnable(Object, Method) */ protected Runnable createRunnable(Object target, Method method) { - Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); + if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) { + // add support for suspend function + Assert.isTrue(method.getParameterCount() == 0||method.getParameterCount()==1, "Only suspend methods or no-arg methods may be annotated with @Scheduled"); + } else { + Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); + } Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } diff --git a/spring-context/src/main/java/org/springframework/scheduling/support/ScheduledMethodRunnable.java b/spring-context/src/main/java/org/springframework/scheduling/support/ScheduledMethodRunnable.java index 4977226424b5..3fe9202be42f 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/support/ScheduledMethodRunnable.java +++ b/spring-context/src/main/java/org/springframework/scheduling/support/ScheduledMethodRunnable.java @@ -16,20 +16,26 @@ package org.springframework.scheduling.support; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.springframework.core.CoroutinesUtils; +import org.springframework.core.KotlinDetector; +import org.springframework.util.ReflectionUtils; + import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; -import org.springframework.util.ReflectionUtils; - /** * Variant of {@link MethodInvokingRunnable} meant to be used for processing * of no-arg scheduled methods. Propagates user exceptions to the caller, * assuming that an error strategy for Runnables is in place. * * @author Juergen Hoeller - * @since 3.0.6 * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor + * @since 3.0.6 */ public class ScheduledMethodRunnable implements Runnable { @@ -37,10 +43,12 @@ public class ScheduledMethodRunnable implements Runnable { private final Method method; + private static final Log logger = LogFactory.getLog(ScheduledMethodRunnable.class); /** * Create a {@code ScheduledMethodRunnable} for the given target instance, * calling the specified method. + * * @param target the target instance to call the method on * @param method the target method to call */ @@ -52,7 +60,8 @@ public ScheduledMethodRunnable(Object target, Method method) { /** * Create a {@code ScheduledMethodRunnable} for the given target instance, * calling the specified method by name. - * @param target the target instance to call the method on + * + * @param target the target instance to call the method on * @param methodName the name of the target method * @throws NoSuchMethodException if the specified method does not exist */ @@ -81,12 +90,35 @@ public Method getMethod() { public void run() { try { ReflectionUtils.makeAccessible(this.method); - this.method.invoke(this.target); - } - catch (InvocationTargetException ex) { + + if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) { + CoroutinesUtils.invokeSuspendingFunction(method, target).subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Object o) { + + } + + @Override + public void onError(Throwable t) { + logger.error("task run failed", t); + } + + @Override + public void onComplete() { + + } + }); + } else { + this.method.invoke(this.target); + } + } catch (InvocationTargetException ex) { ReflectionUtils.rethrowRuntimeException(ex.getTargetException()); - } - catch (IllegalAccessException ex) { + } catch (IllegalAccessException ex) { throw new UndeclaredThrowableException(ex); } } diff --git a/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessorTests.java b/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessorTests.java index 9aaf52c881f3..2fc4ece76343 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessorTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessorTests.java @@ -54,6 +54,7 @@ import org.springframework.core.annotation.AliasFor; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.TriggerContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.CronTask; import org.springframework.scheduling.config.IntervalTask; import org.springframework.scheduling.config.ScheduledTaskHolder; @@ -725,6 +726,17 @@ void nonEmptyParamList() { context::refresh); } + @Test + void suspendFunction() throws InterruptedException { + BeanDefinition processorDefinition = new RootBeanDefinition(ScheduledAnnotationBeanPostProcessor.class); + BeanDefinition targetDefinition = new RootBeanDefinition(SuspendFunctionTest.class); + BeanDefinition testScheduleDefinition = new RootBeanDefinition(ThreadPoolTaskScheduler.class); + context.registerBeanDefinition("postProcessor", processorDefinition); + context.registerBeanDefinition("target", targetDefinition); + context.registerBeanDefinition("scheduler", testScheduleDefinition); + context.refresh(); + Thread.sleep(6000); + } static class FixedDelay { diff --git a/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/SuspendFunctionTest.kt b/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/SuspendFunctionTest.kt new file mode 100644 index 000000000000..fa81b80a7ffb --- /dev/null +++ b/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/SuspendFunctionTest.kt @@ -0,0 +1,13 @@ +package org.springframework.scheduling.annotation + +import org.springframework.stereotype.Component + +@Component + +class SuspendFunctionTest { + + @Scheduled(fixedRate = 200) + suspend fun test() { + println("as") + } +} \ No newline at end of file diff --git a/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java b/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java index b051c105e0ec..22e1cadfb874 100644 --- a/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java +++ b/spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java @@ -87,6 +87,10 @@ public static Publisher invokeSuspendingFunction(Method method, Object target } private static Object[] getSuspendedFunctionArgs(Object target, Object... args) { + // add support for suspend @Scheduled + if (args.length == 0) { + return new Object[]{target}; + } Object[] functionArgs = new Object[args.length]; functionArgs[0] = target; System.arraycopy(args, 0, functionArgs, 1, args.length - 1);