package qunar.tc.qmq.batch;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import qunar.tc.qmq.concurrent.NamedThreadFactory;

/* loaded from: input_file:qunar/tc/qmq/batch/BatchExecutor.class */
public class BatchExecutor<Item> implements Runnable {
    private final String name;
    private final int batchSize;
    private final Processor<Item> processor;
    private static final int DEFAULT_QUEUE_SIZE = 1000;
    private static final int DEFAULT_PROCESS_THREADS = Runtime.getRuntime().availableProcessors() + 1;
    private int queueSize;
    private int threads;
    private BlockingQueue<Item> queue;
    private ThreadPoolExecutor executor;

    public BatchExecutor(String str, int i, Processor<Item> processor) {
        this(str, i, processor, DEFAULT_PROCESS_THREADS);
    }

    public BatchExecutor(String str, int i, Processor<Item> processor, int i2) {
        this.queueSize = DEFAULT_QUEUE_SIZE;
        Preconditions.checkNotNull(processor);
        this.name = str;
        this.batchSize = i;
        this.processor = processor;
        this.threads = i2;
    }

    @PostConstruct
    public void init() {
        this.queue = new LinkedBlockingQueue(this.queueSize);
        if (this.executor == null) {
            this.executor = new ThreadPoolExecutor(1, this.threads, 1L, TimeUnit.MINUTES, new ArrayBlockingQueue(1), new NamedThreadFactory("batch-" + this.name + "-task", true));
            this.executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        }
    }

    public boolean addItem(Item item) {
        boolean offer = this.queue.offer(item);
        if (offer) {
            this.executor.execute(this);
        }
        return offer;
    }

    public boolean addItem(Item item, long j, TimeUnit timeUnit) throws InterruptedException {
        boolean offer = this.queue.offer(item, j, timeUnit);
        if (offer) {
            this.executor.execute(this);
        }
        return offer;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.queue.isEmpty()) {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(this.batchSize);
            if (this.queue.drainTo(newArrayListWithCapacity, this.batchSize) > 0) {
                this.processor.process(newArrayListWithCapacity);
            }
        }
    }

    public void setQueueSize(int i) {
        this.queueSize = i;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public void setExecutor(ThreadPoolExecutor threadPoolExecutor) {
        this.executor = threadPoolExecutor;
    }

    @PreDestroy
    public void destroy() {
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }
}
