package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.class */
public abstract class CheckpointBarrierAlignerTestBase {
    protected static final int PAGE_SIZE = 512;
    private static final Random RND = new Random();
    private static int sizeCounter = 1;
    CheckpointedInputGate inputGate;
    static long testStartTimeNanos;
    private MockInputGate mockInputGate;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase$CheckpointExceptionMatcher.class */
    public static class CheckpointExceptionMatcher extends BaseMatcher<CheckpointException> {
        private final CheckpointFailureReason failureReason;

        public CheckpointExceptionMatcher(CheckpointFailureReason checkpointFailureReason) {
            this.failureReason = checkpointFailureReason;
        }

        public boolean matches(Object obj) {
            return obj != null && obj.getClass() == CheckpointException.class && ((CheckpointException) obj).getCheckpointFailureReason().equals(this.failureReason);
        }

        public void describeTo(Description description) {
            description.appendText("CheckpointException - reason = " + this.failureReason);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase$ValidatingCheckpointHandler.class */
    private static class ValidatingCheckpointHandler extends AbstractInvokable {
        private CheckpointFailureReason failureReason;
        private long lastCanceledCheckpointId;
        private long nextExpectedCheckpointId;
        private long triggeredCheckpointCounter;
        private long abortedCheckpointCounter;

        public ValidatingCheckpointHandler() {
            super(new DummyEnvironment("test", 1, 0));
            this.lastCanceledCheckpointId = -1L;
            this.nextExpectedCheckpointId = -1L;
            this.triggeredCheckpointCounter = 0L;
            this.abortedCheckpointCounter = 0L;
        }

        public void setNextExpectedCheckpointId(long j) {
            this.nextExpectedCheckpointId = j;
        }

        public CheckpointFailureReason getCheckpointFailureReason() {
            return this.failureReason;
        }

        public long getLastCanceledCheckpointId() {
            return this.lastCanceledCheckpointId;
        }

        public long getTriggeredCheckpointCounter() {
            return this.triggeredCheckpointCounter;
        }

        public long getAbortedCheckpointCounter() {
            return this.abortedCheckpointCounter;
        }

        public long getNextExpectedCheckpointId() {
            return this.nextExpectedCheckpointId;
        }

        public void invoke() {
            throw new UnsupportedOperationException();
        }

        public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean z) {
            throw new UnsupportedOperationException("should never be called");
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) {
            Assert.assertTrue("wrong checkpoint id", this.nextExpectedCheckpointId == -1 || this.nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
            Assert.assertTrue(checkpointMetaData.getTimestamp() > 0);
            Assert.assertTrue(checkpointMetrics.getAlignmentDurationNanos() >= 0);
            this.nextExpectedCheckpointId++;
            this.triggeredCheckpointCounter++;
        }

        public void abortCheckpointOnBarrier(long j, Throwable th) {
            this.lastCanceledCheckpointId = j;
            this.failureReason = ((CheckpointException) th).getCheckpointFailureReason();
            this.abortedCheckpointCounter++;
        }

        public Future<Void> notifyCheckpointCompleteAsync(long j) {
            throw new UnsupportedOperationException("should never be called");
        }
    }

    @Before
    public void setUp() {
        testStartTimeNanos = System.nanoTime();
    }

    protected CheckpointedInputGate createBarrierBuffer(int i, BufferOrEvent[] bufferOrEventArr, AbstractInvokable abstractInvokable) throws IOException {
        this.mockInputGate = new MockInputGate(i, Arrays.asList(bufferOrEventArr));
        return createBarrierBuffer(this.mockInputGate, abstractInvokable);
    }

    protected CheckpointedInputGate createBarrierBuffer(int i, BufferOrEvent[] bufferOrEventArr) throws IOException {
        return createBarrierBuffer(i, bufferOrEventArr, new DummyCheckpointInvokable());
    }

    abstract CheckpointedInputGate createBarrierBuffer(InputGate inputGate, AbstractInvokable abstractInvokable) throws IOException;

    @After
    public void ensureEmpty() throws Exception {
        Assert.assertFalse(this.inputGate.pollNext().isPresent());
        Assert.assertTrue(this.inputGate.isFinished());
        this.inputGate.close();
    }

    @Test
    public void testSingleChannelNoBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(0), createBuffer(0), createEndOfPartition(0)};
        this.inputGate = createBarrierBuffer(1, bufferOrEventArr);
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
        Assert.assertEquals(0L, this.inputGate.getAlignmentDurationNanos());
    }

    @Test
    public void testMultiChannelNoBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(2), createBuffer(2), createBuffer(0), createBuffer(1), createBuffer(0), createEndOfPartition(0), createBuffer(3), createBuffer(1), createEndOfPartition(3), createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)};
        this.inputGate = createBarrierBuffer(4, bufferOrEventArr);
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
        Assert.assertEquals(0L, this.inputGate.getAlignmentDurationNanos());
    }

    @Test
    public void testSingleChannelWithBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(2L, 0), createBarrier(3L, 0), createBuffer(0), createBuffer(0), createBarrier(4L, 0), createBarrier(5L, 0), createBarrier(6L, 0), createBuffer(0), createEndOfPartition(0)};
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = createBarrierBuffer(1, bufferOrEventArr, validatingCheckpointHandler);
        validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelWithBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 1), createBarrier(2L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 2), createBuffer(0), createBuffer(0), createBarrier(3L, 0), createBarrier(3L, 1), createBarrier(4L, 1), createBarrier(4L, 2), createBarrier(4L, 0), createBuffer(0), createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)};
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = createBarrierBuffer(3, bufferOrEventArr, validatingCheckpointHandler);
        validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
        check(bufferOrEventArr[0], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[1], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[2], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(1L, validatingCheckpointHandler.getNextExpectedCheckpointId());
        long nanoTime = System.nanoTime();
        check(bufferOrEventArr[3], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[4], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[5], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[6], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(2L, validatingCheckpointHandler.getNextExpectedCheckpointId());
        validateAlignmentTime(nanoTime, this.inputGate);
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[7], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[8], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[9], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[10], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[11], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(2L, validatingCheckpointHandler.getNextExpectedCheckpointId());
        long nanoTime2 = System.nanoTime();
        check(bufferOrEventArr[12], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[13], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[14], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(3L, validatingCheckpointHandler.getNextExpectedCheckpointId());
        validateAlignmentTime(nanoTime2, this.inputGate);
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[15], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[16], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[17], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[18], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[19], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[20], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[21], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(4L, validatingCheckpointHandler.getNextExpectedCheckpointId());
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[22], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[23], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[24], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(5L, validatingCheckpointHandler.getNextExpectedCheckpointId());
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[25], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[26], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[27], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[28], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
    }

    @Test
    public void testMultiChannelJumpingOverCheckpoint() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(0), createBarrier(1L, 0), createBuffer(1), createBuffer(0), createBarrier(2L, 1), createBuffer(2), createBarrier(2L, 0), createBuffer(2), createBarrier(3L, 2), createBuffer(1), createBuffer(0), createBarrier(3L, 0), createBarrier(4L, 1), createBuffer(2), createBuffer(0), createEndOfPartition(0), createBuffer(2), createEndOfPartition(2), createBuffer(1), createEndOfPartition(1)};
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = createBarrierBuffer(3, bufferOrEventArr, validatingCheckpointHandler);
        validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
        check(bufferOrEventArr[0], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[1], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[2], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[3], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(1L, this.inputGate.getLatestCheckpointId());
        check(bufferOrEventArr[4], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[5], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[6], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[7], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[8], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[9], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(2L, this.inputGate.getLatestCheckpointId());
        check(bufferOrEventArr[10], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[11], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[12], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[13], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(3L, this.inputGate.getLatestCheckpointId());
        Assert.assertArrayEquals(new Integer[]{0, 1}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[14], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[15], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[16], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[17], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(4L, this.inputGate.getLatestCheckpointId());
        Assert.assertArrayEquals(new Integer[]{0, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[18], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[19], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[20], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertArrayEquals(new Integer[]{1}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[21], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[22], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[23], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[24], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        Assert.assertEquals(3L, validatingCheckpointHandler.getAbortedCheckpointCounter());
    }

    @Test
    public void testMissingCancellationBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBarrier(1L, 0), createCancellationBarrier(3L, 1), createCancellationBarrier(2L, 0), createCancellationBarrier(3L, 0), createBuffer(0)};
        this.inputGate = createBarrierBuffer(2, bufferOrEventArr, new ValidatingCheckpointHandler());
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            if (bufferOrEvent.isBuffer() || bufferOrEvent.getEvent().getClass() != CancelCheckpointMarker.class) {
                Assert.assertEquals(bufferOrEvent, this.inputGate.pollNext().get());
            }
        }
        Assert.assertArrayEquals(new Integer[]{0}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
    }

    @Test
    public void testStartAlignmentWithClosedChannels() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createEndOfPartition(2), createEndOfPartition(1), createBuffer(0), createBuffer(0), createBuffer(3), createBarrier(2L, 3), createBarrier(2L, 0), createBarrier(3L, 0), createBarrier(3L, 3), createBuffer(0), createBuffer(0), createBuffer(3), createEndOfPartition(0), createBuffer(3), createBarrier(4L, 3), createBuffer(3), createEndOfPartition(3)};
        this.inputGate = createBarrierBuffer(4, bufferOrEventArr);
        check(bufferOrEventArr[0], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[1], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[2], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[3], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[4], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[5], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[6], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(2L, this.inputGate.getLatestCheckpointId());
        Assert.assertArrayEquals(new Integer[]{0, 3}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[7], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[8], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(3L, this.inputGate.getLatestCheckpointId());
        Assert.assertArrayEquals(new Integer[]{0, 3}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[9], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[10], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[11], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[12], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[13], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[14], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(4L, this.inputGate.getLatestCheckpointId());
        Assert.assertArrayEquals(new Integer[]{3}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[15], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[16], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
    }

    @Test
    public void testEndOfStreamWhileCheckpoint() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBarrier(1L, 0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(0), createBuffer(0), createBuffer(2), createBarrier(2L, 2), createBarrier(2L, 0), createBuffer(1), createEndOfPartition(1), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)};
        this.inputGate = createBarrierBuffer(3, bufferOrEventArr);
        check(bufferOrEventArr[0], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[1], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[2], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[3], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[4], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[5], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(1L, this.inputGate.getLatestCheckpointId());
        check(bufferOrEventArr[6], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(2L, this.inputGate.getLatestCheckpointId());
        check(bufferOrEventArr[7], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[8], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[9], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertArrayEquals(new Integer[]{0, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[10], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[11], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[12], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
    }

    @Test
    public void testSingleChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBarrier(2L, 0), createBuffer(0), createCancellationBarrier(4L, 0), createBarrier(5L, 0), createBuffer(0), createCancellationBarrier(6L, 0), createBuffer(0)};
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = createBarrierBuffer(1, bufferOrEventArr, validatingCheckpointHandler);
        validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
        check(bufferOrEventArr[0], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[1], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(0L, this.inputGate.getAlignmentDurationNanos());
        Assert.assertArrayEquals(new Integer[]{0}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        validatingCheckpointHandler.setNextExpectedCheckpointId(2L);
        check(bufferOrEventArr[2], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[3], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertArrayEquals(new Integer[]{0}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        validatingCheckpointHandler.setNextExpectedCheckpointId(5L);
        check(bufferOrEventArr[4], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[6], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(5L, this.inputGate.getLatestCheckpointId());
        Assert.assertEquals(4L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        Assert.assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, validatingCheckpointHandler.getCheckpointFailureReason());
        Assert.assertEquals(0L, this.inputGate.getAlignmentDurationNanos());
        Assert.assertArrayEquals(new Integer[]{0}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[7], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[9], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(6L, this.inputGate.getLatestCheckpointId());
        Assert.assertEquals(6L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        Assert.assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, validatingCheckpointHandler.getCheckpointFailureReason());
        Assert.assertEquals(0L, this.inputGate.getAlignmentDurationNanos());
        Assert.assertEquals(3L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        Assert.assertEquals(2L, validatingCheckpointHandler.getAbortedCheckpointCounter());
    }

    @Test
    public void testMultiChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 2), createBuffer(1), createCancellationBarrier(2L, 1), createBuffer(2), createBuffer(1), createBarrier(3L, 1), createBarrier(3L, 2), createBarrier(3L, 0), createBuffer(0), createBuffer(1), createCancellationBarrier(4L, 1), createBarrier(4L, 2), createBuffer(2), createBarrier(4L, 0), createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(5L, 2), createBarrier(5L, 1), createBarrier(5L, 0), createBuffer(0), createBuffer(1), createCancellationBarrier(6L, 1), createCancellationBarrier(6L, 2), createBarrier(6L, 0), createBuffer(0)};
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = createBarrierBuffer(3, bufferOrEventArr, validatingCheckpointHandler);
        check(bufferOrEventArr[0], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[1], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[2], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        long nanoTime = System.nanoTime();
        validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
        check(bufferOrEventArr[3], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[4], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[5], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[6], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        validateAlignmentTime(nanoTime, this.inputGate);
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[7], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[8], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[9], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[10], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[11], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[13], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(2L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        Assert.assertArrayEquals(new Integer[]{0, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        Assert.assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, validatingCheckpointHandler.getCheckpointFailureReason());
        check(bufferOrEventArr[14], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        long nanoTime2 = System.nanoTime();
        validatingCheckpointHandler.setNextExpectedCheckpointId(3L);
        check(bufferOrEventArr[15], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[16], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[17], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        validateAlignmentTime(nanoTime2, this.inputGate);
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[18], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[19], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[21], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(4L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        Assert.assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, validatingCheckpointHandler.getCheckpointFailureReason());
        Assert.assertEquals(0L, this.inputGate.getAlignmentDurationNanos());
        Assert.assertArrayEquals(new Integer[]{2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[22], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[23], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertArrayEquals(new Integer[]{0}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[24], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[25], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[26], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        long nanoTime3 = System.nanoTime();
        validatingCheckpointHandler.setNextExpectedCheckpointId(5L);
        check(bufferOrEventArr[27], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[28], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[29], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        validateAlignmentTime(nanoTime3, this.inputGate);
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[30], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[31], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[34], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(6L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        Assert.assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, validatingCheckpointHandler.getCheckpointFailureReason());
        Assert.assertEquals(0L, this.inputGate.getAlignmentDurationNanos());
        Assert.assertArrayEquals(new Integer[]{0}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[35], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(3L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        Assert.assertEquals(3L, validatingCheckpointHandler.getAbortedCheckpointCounter());
    }

    @Test
    public void testAbortOnCanceledBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(1), createBarrier(1L, 1), createBuffer(2), createBuffer(0), createCancellationBarrier(1L, 0), createBuffer(1), createBarrier(2L, 1), createBuffer(2), createBuffer(0), createBarrier(1L, 2), createBuffer(0), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 2), createBuffer(0), createBuffer(1), createBuffer(2)};
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = createBarrierBuffer(3, bufferOrEventArr, validatingCheckpointHandler);
        check(bufferOrEventArr[0], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[1], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[2], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[3], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[5], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(1L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        Assert.assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, validatingCheckpointHandler.getCheckpointFailureReason());
        Assert.assertArrayEquals(new Integer[]{1}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        long nanoTime = System.nanoTime();
        check(bufferOrEventArr[6], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[7], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[8], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[9], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertArrayEquals(new Integer[]{2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[10], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[11], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        validatingCheckpointHandler.setNextExpectedCheckpointId(2L);
        check(bufferOrEventArr[12], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[13], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        validateAlignmentTime(nanoTime, this.inputGate);
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[14], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[15], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[16], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        Assert.assertEquals(1L, validatingCheckpointHandler.getAbortedCheckpointCounter());
    }

    @Test
    public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(2), createBarrier(3L, 1), createBarrier(3L, 0), createBuffer(2), createBarrier(5L, 2), createBuffer(1), createBuffer(0), createCancellationBarrier(3L, 0), createBuffer(0), createBuffer(1), createBarrier(5L, 0), createBarrier(5L, 1), createBuffer(0), createBuffer(1), createBuffer(2)};
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        this.inputGate = createBarrierBuffer(3, bufferOrEventArr, validatingCheckpointHandler);
        check(bufferOrEventArr[0], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[1], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[2], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[3], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        long nanoTime = System.nanoTime();
        check(bufferOrEventArr[4], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(3L, validatingCheckpointHandler.getLastCanceledCheckpointId());
        Assert.assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED, validatingCheckpointHandler.getCheckpointFailureReason());
        Assert.assertArrayEquals(new Integer[]{0, 1}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[5], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[6], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[8], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[9], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        validatingCheckpointHandler.setNextExpectedCheckpointId(5L);
        check(bufferOrEventArr[10], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[11], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        validateAlignmentTime(nanoTime, this.inputGate);
        Assert.assertArrayEquals(new Integer[]{0, 1, 2}, this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        check(bufferOrEventArr[12], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[13], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        check(bufferOrEventArr[14], (BufferOrEvent) this.inputGate.pollNext().get(), PAGE_SIZE);
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        Assert.assertEquals(1L, validatingCheckpointHandler.getAbortedCheckpointCounter());
    }

    private static BufferOrEvent createBarrier(long j, int i) {
        return new BufferOrEvent(new CheckpointBarrier(j, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), new InputChannelInfo(0, i));
    }

    private static BufferOrEvent createCancellationBarrier(long j, int i) {
        return new BufferOrEvent(new CancelCheckpointMarker(j), new InputChannelInfo(0, i));
    }

    private static BufferOrEvent createBuffer(int i) {
        int i2 = sizeCounter;
        sizeCounter = i2 + 1;
        byte[] bArr = new byte[i2];
        RND.nextBytes(bArr);
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
        allocateUnpooledSegment.put(0, bArr);
        NetworkBuffer networkBuffer = new NetworkBuffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE);
        networkBuffer.setSize(i2);
        networkBuffer.retainBuffer();
        return new BufferOrEvent(networkBuffer, new InputChannelInfo(0, i));
    }

    private static BufferOrEvent createEndOfPartition(int i) {
        return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, new InputChannelInfo(0, i));
    }

    private static void check(BufferOrEvent bufferOrEvent, BufferOrEvent bufferOrEvent2, int i) {
        Assert.assertNotNull(bufferOrEvent);
        Assert.assertNotNull(bufferOrEvent2);
        Assert.assertEquals(Boolean.valueOf(bufferOrEvent.isBuffer()), Boolean.valueOf(bufferOrEvent2.isBuffer()));
        if (!bufferOrEvent.isBuffer()) {
            Assert.assertEquals(bufferOrEvent.getEvent(), bufferOrEvent2.getEvent());
            return;
        }
        Assert.assertEquals(bufferOrEvent.getBuffer().getMaxCapacity(), bufferOrEvent2.getBuffer().getMaxCapacity());
        Assert.assertEquals(bufferOrEvent.getBuffer().getSize(), bufferOrEvent2.getBuffer().getSize());
        Assert.assertTrue("memory contents differs", bufferOrEvent.getBuffer().getMemorySegment().compare(bufferOrEvent2.getBuffer().getMemorySegment(), 0, 0, i) == 0);
    }

    private static void validateAlignmentTime(long j, CheckpointedInputGate checkpointedInputGate) {
        long nanoTime = System.nanoTime() - j;
        long nanoTime2 = System.nanoTime() - testStartTimeNanos;
        Assert.assertThat(Long.valueOf(checkpointedInputGate.getAlignmentDurationNanos()), Matchers.lessThanOrEqualTo(Long.valueOf(nanoTime)));
        Assert.assertThat(Long.valueOf(checkpointedInputGate.getCheckpointStartDelayNanos()), Matchers.lessThanOrEqualTo(Long.valueOf(nanoTime2 + 1000000)));
    }
}
