-
Notifications
You must be signed in to change notification settings - Fork 13
Description
Hello,
I'm trying to merge two pipes in a custom subassembly, one coming from a new calculated pipe and the other one coming from the existing ORC file of the current data. The merge operation gives me the following error:
cascading.flow.FlowException: unhandled exception at cascading.flow.BaseFlow.complete(BaseFlow.java:1012) at com.dataartisans.flink.cascading.planner.FlinkFlow.complete(FlinkFlow.java:79) at fr.erdf.distribution.linky.lpt.support.CascadingMergeTest.trap(CascadingMergeTest.java:154) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:77) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:195) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) Caused by: org.apache.flink.optimizer.CompilerException: An error occurred while translating the optimized plan to a JobGraph: Conflicting types in union operator. at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:600) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:106) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:192) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) at com.dataartisans.flink.cascading.planner.FlinkFlowStepJob.internalNonBlockingStart(FlinkFlowStepJob.java:154) at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:269) at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:184) at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:146) at com.dataartisans.flink.cascading.planner.FlinkFlowStepJob.call(FlinkFlowStepJob.java:281) at com.dataartisans.flink.cascading.planner.FlinkFlowStepJob.call(FlinkFlowStepJob.java:65) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.optimizer.CompilerException: Conflicting types in union operator. at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.translateChannel(JobGraphGenerator.java:690) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:589) ... 17 more
It looks like the types are not well defined in my pipes (when I look deeper, different serializers are used for each pipe)... I tried to use the coerce function to force typing in my first pipe but it's still not working. It works fine if I use a simple CSV file instead of ORC in the second pipe...
Do you know how to get around this?