package qunar.tc.qmq.batch;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import qunar.tc.qmq.concurrent.NamedThreadFactory;

/* loaded from: input_file:qunar/tc/qmq/batch/MultipleQueueFlusher.class */
public class MultipleQueueFlusher<Item> {
    private final LoadBalance<Item> loadBalance;
    private final Queue<Item>[] flusherQueues;
    private int flushInterval;
    private Future[] futures;
    private static final Random r = new Random();
    private final ExecutorService EXECUTORS = Executors.newCachedThreadPool(new NamedThreadFactory("MultipleQueueFlusher"));
    private final ScheduledExecutorService SCHEDULE = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("MultipleQueueFlusher-SCHED"));
    private static final int delta = 50;

    /* loaded from: input_file:qunar/tc/qmq/batch/MultipleQueueFlusher$Flusher.class */
    private static class Flusher<Item> implements Runnable {
        private final int capacityPerQueue;
        private final int batch;
        private final int flushInterval;
        private final Processor<Item> processor;
        private final Object notifier = new Object();
        private List<Queue<Item>> queues = new ArrayList();

        Flusher(int i, int i2, int i3, Processor<Item> processor) {
            this.capacityPerQueue = i;
            this.batch = i2;
            this.flushInterval = i3;
            this.processor = processor;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                select();
                process();
            }
        }

        private void select() {
            synchronized (this.notifier) {
                while (!flushOnDemand()) {
                    waitAWhile();
                }
            }
        }

        private void process() {
            while (!Thread.currentThread().isInterrupted()) {
                boolean z = false;
                for (int i = 0; i < this.queues.size(); i++) {
                    Queue<Item> queue = this.queues.get(i);
                    if (queue.canFlush()) {
                        queue.flush();
                        z = true;
                    }
                }
                if (!z) {
                    return;
                }
            }
        }

        private boolean flushOnDemand() {
            for (int i = 0; i < this.queues.size(); i++) {
                if (this.queues.get(i).canFlush()) {
                    return true;
                }
            }
            return false;
        }

        private void waitAWhile() {
            try {
                this.notifier.wait();
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }

        public Queue<Item> create(int i, String str) {
            Queue<Item> queue = new Queue<>(str, i, this.capacityPerQueue, this.batch, this.flushInterval, this.notifier, this.processor);
            this.queues.add(queue);
            return queue;
        }
    }

    /* loaded from: input_file:qunar/tc/qmq/batch/MultipleQueueFlusher$Group.class */
    public static class Group<Item> {
        public final int index;
        public final List<Item> items;

        Group(int i, List<Item> list) {
            this.index = i;
            this.items = list;
        }
    }

    /* loaded from: input_file:qunar/tc/qmq/batch/MultipleQueueFlusher$LoadBalance.class */
    public interface LoadBalance<Item> {
        int idx(Item item);
    }

    /* loaded from: input_file:qunar/tc/qmq/batch/MultipleQueueFlusher$Processor.class */
    public interface Processor<Item> {
        void process(Group<Item> group);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:qunar/tc/qmq/batch/MultipleQueueFlusher$Queue.class */
    public static class Queue<Item> {
        private final int index;
        private final Processor<Item> processor;
        private volatile int flushInterval;
        private volatile int batch;
        private volatile long lastFlush = System.currentTimeMillis();
        private MpscLinkedQueue<Item> queue;
        private final Object notifier;

        Queue(String str, int i, int i2, int i3, int i4, Object obj, Processor<Item> processor) {
            this.index = i;
            this.batch = i3;
            this.flushInterval = i4;
            this.notifier = obj;
            this.processor = processor;
            this.queue = new MpscLinkedQueue<>(i2);
        }

        public boolean add(Item item) {
            boolean offer = this.queue.offer(item);
            flushOnDemand();
            return offer;
        }

        public void flush() {
            this.lastFlush = System.currentTimeMillis();
            ArrayList arrayList = new ArrayList(this.batch);
            if (this.queue.drainTo(arrayList, this.batch) > 0) {
                this.processor.process(new Group<>(this.index, arrayList));
            }
        }

        private void flushOnDemand() {
            if (this.queue.SIZE() < this.batch) {
                return;
            }
            start();
        }

        private void start() {
            synchronized (this.notifier) {
                this.notifier.notify();
            }
        }

        boolean canFlush() {
            return this.queue.SIZE() >= this.batch || System.currentTimeMillis() - this.lastFlush >= ((long) this.flushInterval);
        }

        public void timeout() {
            if (System.currentTimeMillis() - this.lastFlush < this.flushInterval) {
                return;
            }
            start();
        }

        void setFlushInterval(int i) {
            this.flushInterval = i;
        }

        public void setBatch(int i) {
            this.batch = i;
        }
    }

    public MultipleQueueFlusher(String str, int i, int i2, int i3, int i4, int i5, LoadBalance<Item> loadBalance, Processor<Item> processor) {
        this.flusherQueues = new Queue[i];
        this.futures = new Future[i];
        Flusher[] flusherArr = new Flusher[i5];
        this.loadBalance = loadBalance;
        this.flushInterval = i4;
        for (int i6 = 0; i6 < i5; i6++) {
            flusherArr[i6] = new Flusher(i2, i3, i4, processor);
        }
        for (int i7 = 0; i7 < i; i7++) {
            this.flusherQueues[i7] = flusherArr[i7 % i5].create(i7, str);
        }
        for (int i8 = 0; i8 < i5; i8++) {
            this.EXECUTORS.submit(flusherArr[i8]);
        }
        schedule();
    }

    private void schedule() {
        for (int i = 0; i < this.flusherQueues.length; i++) {
            final Queue<Item> queue = this.flusherQueues[i];
            this.futures[i] = this.SCHEDULE.scheduleAtFixedRate(new Runnable() { // from class: qunar.tc.qmq.batch.MultipleQueueFlusher.1
                @Override // java.lang.Runnable
                public void run() {
                    queue.timeout();
                }
            }, r.nextInt(50), this.flushInterval, TimeUnit.MILLISECONDS);
        }
    }

    public boolean add(Item item) {
        return this.flusherQueues[this.loadBalance.idx(item)].add(item);
    }

    public void setFlushInterval(int i) {
        Preconditions.checkArgument(i > 0);
        if (this.flushInterval == i) {
            return;
        }
        this.flushInterval = i;
        for (int i2 = 0; i2 < this.futures.length; i2++) {
            this.futures[i2].cancel(false);
            final Queue<Item> queue = this.flusherQueues[i2];
            queue.setFlushInterval(i);
            this.futures[i2] = this.SCHEDULE.scheduleAtFixedRate(new Runnable() { // from class: qunar.tc.qmq.batch.MultipleQueueFlusher.2
                @Override // java.lang.Runnable
                public void run() {
                    queue.timeout();
                }
            }, r.nextInt(50), i, TimeUnit.MILLISECONDS);
        }
    }

    public void setBatch(int i) {
        for (int i2 = 0; i2 < this.flusherQueues.length; i2++) {
            this.flusherQueues[i2].setBatch(i);
        }
    }
}
