package weka.knowledgeflow;

import weka.core.WekaException;
import weka.knowledgeflow.steps.Step;

/* loaded from: input_file:lib/weka-dev-3.9.6.jar:weka/knowledgeflow/StepInjectorFlowRunner.class */
public class StepInjectorFlowRunner extends FlowRunner {
    protected boolean m_reset = true;
    protected boolean m_streaming;

    public void reset() {
        this.m_reset = true;
        this.m_streaming = false;
    }

    public void injectWithExecutionFinishedCallback(final Data data, ExecutionFinishedCallback executionFinishedCallback, final Step step) throws WekaException {
        if (StepManagerImpl.connectionIsIncremental(data)) {
            throw new WekaException("Only batch data can be injected via this method.");
        }
        addExecutionFinishedCallback(executionFinishedCallback);
        String connectionName = data.getConnectionName();
        if (!step.getIncomingConnectionTypes().contains(connectionName)) {
            throw new WekaException("Step '" + step.getName() + "' can't accept a " + connectionName + " input at present!");
        }
        initializeFlow();
        this.m_execEnv.submitTask(new StepTask<Void>(null) { // from class: weka.knowledgeflow.StepInjectorFlowRunner.1
            private static final long serialVersionUID = 663985401825979869L;

            @Override // weka.knowledgeflow.StepTask
            public void process() throws Exception {
                step.processIncoming(data);
            }
        });
        this.m_logHandler.logDebug("StepInjectorFlowRunner: Launching shutdown monitor");
        launchExecutorShutdownThread();
    }

    public Step findStep(String str, Class cls) throws WekaException {
        if (this.m_flow == null) {
            throw new WekaException("No flow set!");
        }
        StepManagerImpl findStep = this.m_flow.findStep(str);
        if (findStep == null) {
            throw new WekaException("Step '" + str + "' does not seem to be part of the flow!");
        }
        Step managedStep = findStep.getManagedStep();
        if (managedStep.getClass() != cls) {
            throw new WekaException("Step '" + str + "' is not an instance of " + cls.getCanonicalName());
        }
        if (managedStep.getIncomingConnectionTypes() == null || managedStep.getIncomingConnectionTypes().size() == 0) {
            throw new WekaException("Step '" + str + "' cannot process any incoming data!");
        }
        return managedStep;
    }

    public void injectStreaming(Data data, Step step, boolean z) throws WekaException {
        if (this.m_reset) {
            if (this.m_streaming) {
                this.m_execEnv.stopClientExecutionService();
            }
            String connectionName = data.getConnectionName();
            if (!step.getIncomingConnectionTypes().contains(connectionName)) {
                throw new WekaException("Step '" + step.getName() + "' can't accept a " + connectionName + " input at present!");
            }
            initializeFlow();
            data.setPayloadElement(StepManager.CON_AUX_DATA_INCREMENTAL_STREAM_END, false);
            this.m_streaming = true;
            this.m_reset = false;
        }
        if (z) {
            data.setPayloadElement(StepManager.CON_AUX_DATA_INCREMENTAL_STREAM_END, true);
        }
        step.processIncoming(data);
        if (z) {
            this.m_logHandler.logDebug("StepInjectorFlowRunner: Shutting down executor service");
            this.m_execEnv.stopClientExecutionService();
            reset();
        }
    }
}
