package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.class */
public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
    protected final FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory;
    protected final TypeSerializer<OUT> outputSerializer;

    @Nullable
    protected StreamTestSingleInputGate[] inputGates;
    protected Map<Long, TaskStateSnapshot> taskStateSnapshots;
    protected final ExecutionConfig executionConfig = new ExecutionConfig();
    protected long memorySize = 1048576;
    protected int bufferSize = StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE;
    protected long bufferTimeout = 0;
    protected Configuration jobConfig = new Configuration();
    protected Configuration taskConfig = new Configuration();
    protected StreamConfig streamConfig = new StreamConfig(this.taskConfig);
    protected LocalRecoveryConfig localRecoveryConfig = TestLocalRecoveryConfig.disabled();
    protected TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
    private boolean setupCalled = false;

    public StreamTaskMailboxTestHarnessBuilder(FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> functionWithException, TypeInformation<OUT> typeInformation) {
        this.taskFactory = (FunctionWithException) Preconditions.checkNotNull(functionWithException);
        this.outputSerializer = typeInformation.createSerializer(this.executionConfig);
    }

    public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
        this.streamConfig.setBufferTimeout(this.bufferTimeout);
        TestTaskStateManager testTaskStateManager = new TestTaskStateManager(this.localRecoveryConfig);
        if (this.taskStateSnapshots != null) {
            testTaskStateManager.setReportedCheckpointId(this.taskStateSnapshots.keySet().iterator().next().longValue());
            testTaskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(this.taskStateSnapshots);
        }
        StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment(this.jobConfig, this.taskConfig, this.executionConfig, this.memorySize, new MockInputSplitProvider(), this.bufferSize, testTaskStateManager);
        streamMockEnvironment.setCheckpointResponder(testTaskStateManager.getCheckpointResponder());
        initializeInputs(streamMockEnvironment);
        Preconditions.checkState(this.inputGates != null, "InputGates hasn't been initialised");
        StreamElementSerializer streamElementSerializer = new StreamElementSerializer(this.outputSerializer);
        ArrayDeque arrayDeque = new ArrayDeque();
        streamMockEnvironment.addOutput(arrayDeque, streamElementSerializer);
        streamMockEnvironment.setTaskMetricGroup(this.taskMetricGroup);
        StreamTask streamTask = (StreamTask) this.taskFactory.apply(streamMockEnvironment);
        streamTask.beforeInvoke();
        return new StreamTaskMailboxTestHarness<>(streamTask, arrayDeque, this.inputGates, streamMockEnvironment);
    }

    protected abstract void initializeInputs(StreamMockEnvironment streamMockEnvironment);

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> streamOperator) {
        return setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) SimpleOperatorFactory.of(streamOperator), new OperatorID());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> streamOperator, OperatorID operatorID) {
        return setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) SimpleOperatorFactory.of(streamOperator), operatorID);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperatorFactory<?> streamOperatorFactory) {
        return setupOutputForSingletonOperatorChain(streamOperatorFactory, new OperatorID());
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperatorFactory<?> streamOperatorFactory, OperatorID operatorID) {
        Preconditions.checkState(!this.setupCalled, "This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setChainStart();
        this.streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        this.streamConfig.setOutputSelectors(Collections.emptyList());
        this.streamConfig.setNumberOfOutputs(1);
        this.streamConfig.setTypeSerializerOut(this.outputSerializer);
        this.streamConfig.setVertexID(0);
        AbstractStreamOperator<OUT> abstractStreamOperator = new AbstractStreamOperator<OUT>() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder.1
            private static final long serialVersionUID = 1;
        };
        LinkedList linkedList = new LinkedList();
        linkedList.add(new StreamEdge(new StreamNode(0, "group", (String) null, abstractStreamOperator, "source dummy", new LinkedList(), SourceStreamTask.class), new StreamNode(1, "group", (String) null, abstractStreamOperator, "target dummy", new LinkedList(), SourceStreamTask.class), 0, new LinkedList(), new BroadcastPartitioner(), (OutputTag) null));
        this.streamConfig.setOutEdgesInOrder(linkedList);
        this.streamConfig.setNonChainedOutputs(linkedList);
        this.streamConfig.setStreamOperatorFactory(streamOperatorFactory);
        this.streamConfig.setOperatorID(operatorID);
        return this;
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(StreamOperator<?> streamOperator) {
        return setupOperatorChain(new OperatorID(), streamOperator);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(OperatorID operatorID, StreamOperator<?> streamOperator) {
        return setupOperatorChain(operatorID, (StreamOperatorFactory<?>) SimpleOperatorFactory.of(streamOperator));
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(StreamOperatorFactory<?> streamOperatorFactory) {
        return setupOperatorChain(new OperatorID(), streamOperatorFactory);
    }

    public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(OperatorID operatorID, StreamOperatorFactory<?> streamOperatorFactory) {
        Preconditions.checkState(!this.setupCalled, "This harness was already setup.");
        this.setupCalled = true;
        this.streamConfig.setStreamOperatorFactory(streamOperatorFactory);
        return new StreamConfigChainer<>(operatorID, this.streamConfig, this);
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setTaskMetricGroup(TaskMetricGroup taskMetricGroup) {
        this.taskMetricGroup = taskMetricGroup;
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setKeyType(TypeInformation<?> typeInformation) {
        this.streamConfig.setStateKeySerializer(typeInformation.createSerializer(this.executionConfig));
        return this;
    }

    public StreamTaskMailboxTestHarnessBuilder<OUT> setTaskStateSnapshot(long j, TaskStateSnapshot taskStateSnapshot) {
        this.taskStateSnapshots = Collections.singletonMap(Long.valueOf(j), taskStateSnapshot);
        return this;
    }
}
