package com.dataartisans.streamingledger.runtime.serial;

import com.dataartisans.streamingledger.sdk.api.StreamingLedger;
import com.dataartisans.streamingledger.sdk.common.reflection.ByteBuddyProcessFunctionInvoker;
import com.dataartisans.streamingledger.sdk.common.reflection.ProcessFunctionInvoker;
import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpec;
import java.util.Objects;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.util.OutputTag;

/* loaded from: input_file:com/dataartisans/streamingledger/runtime/serial/SingleStreamSerialTransactor.class */
final class SingleStreamSerialTransactor<InT, ResultT> {
    private final ProcessFunctionInvoker<InT, ResultT> userFunction;
    private final SerialStateAccess<InT, ?, ?>[] accesses;
    private final OutputTag<ResultT> sideOutputTag;
    private final SideOutputContext<ResultT> context;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SingleStreamSerialTransactor(StreamingLedgerSpec<InT, ResultT> streamingLedgerSpec, OutputTag<ResultT> outputTag, SideOutputContext<ResultT> sideOutputContext, RuntimeContext runtimeContext) {
        Objects.requireNonNull(streamingLedgerSpec);
        Objects.requireNonNull(outputTag);
        Objects.requireNonNull(runtimeContext);
        this.context = sideOutputContext;
        this.sideOutputTag = outputTag;
        this.accesses = createStateAccessesFromSpec(streamingLedgerSpec, runtimeContext);
        this.userFunction = ByteBuddyProcessFunctionInvoker.create(streamingLedgerSpec);
    }

    private static <T> SerialStateAccess<T, ?, ?>[] newStateAccessArray(int i) {
        return new SerialStateAccess[i];
    }

    private static <K, V> MapState<K, V> fromSpec(StreamingLedger.StateAccessSpec<?, K, V> stateAccessSpec, RuntimeContext runtimeContext) {
        return runtimeContext.getMapState(new MapStateDescriptor(stateAccessSpec.state.getName(), stateAccessSpec.state.getKeyType(), stateAccessSpec.state.getValueType()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void apply(InT r6) throws Exception {
        this.context.setOutputTag(this.sideOutputTag);
        for (SerialStateAccess<InT, ?, ?> serialStateAccess : this.accesses) {
            serialStateAccess.prepare(r6);
        }
        this.context.prepare();
        this.userFunction.invoke(r6, this.context, this.accesses);
        for (SerialStateAccess<InT, ?, ?> serialStateAccess2 : this.accesses) {
            serialStateAccess2.commit(this.context.wasAborted());
        }
        this.context.emitChanges();
    }

    private SerialStateAccess<InT, ?, ?>[] createStateAccessesFromSpec(StreamingLedgerSpec<InT, ResultT> streamingLedgerSpec, RuntimeContext runtimeContext) {
        SerialStateAccess<InT, ?, ?>[] newStateAccessArray = newStateAccessArray(streamingLedgerSpec.stateBindings.size());
        for (int i = 0; i < newStateAccessArray.length; i++) {
            StreamingLedger.StateAccessSpec stateAccessSpec = (StreamingLedger.StateAccessSpec) streamingLedgerSpec.stateBindings.get(i);
            newStateAccessArray[i] = new SerialStateAccess<>(stateAccessSpec, fromSpec(stateAccessSpec, runtimeContext));
        }
        return newStateAccessArray;
    }
}
