Oozie是个针对Hadoop的工作流,有些自己的语法. 这两天碰到一个异常,查看源码才明白Oozie的join只允许承接fork下来的任务,否则会报以下错误.整个异常如下:
WARN CallableQueueService$CallableWrapper:528 - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[-] ACTION[-] exception callable [signal], E0720: Fork/join mismatch, node [join_node_name]
org.apache.oozie.command.CommandException: E0720: Fork/join mismatch, node [tianqi_sawlog_transformation_done]
at org.apache.oozie.command.wf.SignalCommand.call(SignalCommand.java:213)
at org.apache.oozie.command.wf.SignalCommand.execute(SignalCommand.java:305)
at org.apache.oozie.command.wf.SignalCommand.execute(SignalCommand.java:59)
at org.apache.oozie.command.Command.call(Command.java:202)
at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:128)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: org.apache.oozie.workflow.WorkflowException: E0720: Fork/join mismatch, node [tianqi_sawlog_transformation_done]
at org.apache.oozie.workflow.lite.JoinNodeDef$JoinNodeHandler.loopDetection(JoinNodeDef.java:44)
at org.apache.oozie.workflow.lite.LiteWorkflowInstance.signal(LiteWorkflowInstance.java:203)
at org.apache.oozie.workflow.lite.LiteWorkflowInstance.signal(LiteWorkflowInstance.java:284)
at org.apache.oozie.command.wf.SignalCommand.call(SignalCommand.java:120)
... 7 more
源码来自org.apache.oozie.workflow.lite.JoinNodeDef,检测这个语法的代码如下:
public void loopDetection(Context context) throws WorkflowException { String flag = getLoopFlag(context.getNodeDef().getName()); if (context.getVar(flag) != null) { throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName()); } String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath); if (forkCount == null) { throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); } int count = Integer.parseInt(forkCount) - 1; if (count == 0) { context.setVar(flag, "true"); } } public boolean enter(Context context) throws WorkflowException { String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath); if (forkCount == null) { throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); } int count = Integer.parseInt(forkCount) - 1; if (count > 0) { context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, "" + count); context.deleteExecutionPath(); } else { context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, null); } return (count == 0); }