package qunar.tc.qmq.consumer.pull;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.broker.impl.BrokerServiceImpl;
import qunar.tc.qmq.common.MapKeyBuilder;
import qunar.tc.qmq.common.StatusSource;
import qunar.tc.qmq.concurrent.NamedThreadFactory;
import qunar.tc.qmq.consumer.exception.DuplicateListenerException;
import qunar.tc.qmq.consumer.register.ConsumerRegister;
import qunar.tc.qmq.consumer.register.RegistParam;
import qunar.tc.qmq.metainfoclient.ConsumerStateChangedListener;
import qunar.tc.qmq.metainfoclient.MetaInfoService;
import qunar.tc.qmq.utils.RetrySubjectUtils;

/* loaded from: input_file:qunar/tc/qmq/consumer/pull/PullRegister.class */
public class PullRegister implements ConsumerRegister, ConsumerStateChangedListener {
    private volatile Boolean isOnline = false;
    private final Map<String, PullEntry> pullEntryMap = new HashMap();
    private final Map<String, DefaultPullConsumer> pullConsumerMap = new HashMap();
    private final ExecutorService pullExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("qmq-pull"));
    private final MetaInfoService metaInfoService = new MetaInfoService();
    private final BrokerService brokerService = new BrokerServiceImpl(this.metaInfoService);
    private final PullService pullService = new PullService();
    private final AckService ackService = new AckService(this.brokerService);
    private String clientId;
    private String metaServer;
    private int destroyWaitInSeconds;

    public void init() {
        this.metaInfoService.setMetaServer(this.metaServer);
        this.metaInfoService.setClientId(this.clientId);
        this.metaInfoService.init();
        this.ackService.setDestroyWaitInSeconds(this.destroyWaitInSeconds);
        this.ackService.setClientId(this.clientId);
        this.metaInfoService.setConsumerStateChangedListener(this);
    }

    @Override // qunar.tc.qmq.consumer.register.ConsumerRegister
    public synchronized void regist(String str, String str2, RegistParam registParam) {
        registPullEntry(str, str2, registParam, new AlwaysPullStrategy());
        registPullEntry(RetrySubjectUtils.buildRetrySubject(str, str2), str2, registParam, new WeightPullStrategy());
    }

    private void registPullEntry(String str, String str2, RegistParam registParam, PullStrategy pullStrategy) {
        String buildSubscribeKey = MapKeyBuilder.buildSubscribeKey(str, str2);
        PullEntry pullEntry = this.pullEntryMap.get(buildSubscribeKey);
        if (pullEntry == PullEntry.EMPTY_PULL_ENTRY) {
            throw new DuplicateListenerException(buildSubscribeKey);
        }
        if (pullEntry == null) {
            pullEntry = createAndSubmitPullEntry(str, str2, registParam, pullStrategy);
        }
        if (this.isOnline.booleanValue()) {
            pullEntry.online(registParam.getActionSrc());
        } else {
            pullEntry.offline(registParam.getActionSrc());
        }
    }

    private PullEntry createAndSubmitPullEntry(String str, String str2, RegistParam registParam, PullStrategy pullStrategy) {
        PullEntry pullEntry = new PullEntry(new PushConsumerImpl(str, str2, registParam), this.pullService, this.ackService, this.brokerService, pullStrategy);
        this.pullEntryMap.put(MapKeyBuilder.buildSubscribeKey(str, str2), pullEntry);
        this.pullExecutor.submit(pullEntry);
        return pullEntry;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultPullConsumer createDefaultPullConsumer(String str, String str2, boolean z) {
        DefaultPullConsumer defaultPullConsumer = new DefaultPullConsumer(str, str2, z, this.clientId, this.pullService, this.ackService, this.brokerService);
        registerDefaultPullConsumer(defaultPullConsumer);
        return defaultPullConsumer;
    }

    private synchronized void registerDefaultPullConsumer(DefaultPullConsumer defaultPullConsumer) {
        String buildSubscribeKey = MapKeyBuilder.buildSubscribeKey(defaultPullConsumer.subject(), defaultPullConsumer.group());
        if (this.pullEntryMap.containsKey(buildSubscribeKey)) {
            throw new DuplicateListenerException(buildSubscribeKey);
        }
        this.pullEntryMap.put(buildSubscribeKey, PullEntry.EMPTY_PULL_ENTRY);
        this.pullConsumerMap.put(buildSubscribeKey, defaultPullConsumer);
        this.pullExecutor.submit(defaultPullConsumer);
    }

    @Override // qunar.tc.qmq.consumer.register.ConsumerRegister
    public void unregist(String str, String str2) {
        changeOnOffline(str, str2, false, StatusSource.CODE);
    }

    @Override // qunar.tc.qmq.metainfoclient.ConsumerStateChangedListener
    public void online(String str, String str2) {
        changeOnOffline(str, str2, true, StatusSource.OPS);
    }

    @Override // qunar.tc.qmq.metainfoclient.ConsumerStateChangedListener
    public void offline(String str, String str2) {
        changeOnOffline(str, str2, false, StatusSource.OPS);
    }

    private synchronized void changeOnOffline(String str, String str2, boolean z, StatusSource statusSource) {
        String realSubject = RetrySubjectUtils.getRealSubject(str);
        String buildRetrySubject = RetrySubjectUtils.buildRetrySubject(realSubject, str2);
        String buildSubscribeKey = MapKeyBuilder.buildSubscribeKey(realSubject, str2);
        changeOnOffline(this.pullEntryMap.get(buildSubscribeKey), z, statusSource);
        changeOnOffline(this.pullEntryMap.get(MapKeyBuilder.buildSubscribeKey(buildRetrySubject, str2)), z, statusSource);
        DefaultPullConsumer defaultPullConsumer = this.pullConsumerMap.get(buildSubscribeKey);
        if (defaultPullConsumer == null) {
            return;
        }
        if (z) {
            defaultPullConsumer.online(statusSource);
        } else {
            defaultPullConsumer.offline(statusSource);
        }
    }

    private void changeOnOffline(PullEntry pullEntry, boolean z, StatusSource statusSource) {
        if (pullEntry == null) {
            return;
        }
        if (z) {
            pullEntry.online(statusSource);
        } else {
            pullEntry.offline(statusSource);
        }
    }

    @Override // qunar.tc.qmq.consumer.register.ConsumerRegister
    public synchronized void setAutoOnline(boolean z) {
        if (z) {
            online();
        } else {
            offline();
        }
        this.isOnline = Boolean.valueOf(z);
    }

    public synchronized boolean offline() {
        this.isOnline = false;
        Iterator<PullEntry> it = this.pullEntryMap.values().iterator();
        while (it.hasNext()) {
            it.next().offline(StatusSource.HEALTHCHECKER);
        }
        Iterator<DefaultPullConsumer> it2 = this.pullConsumerMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().offline(StatusSource.HEALTHCHECKER);
        }
        this.ackService.tryCleanAck();
        return true;
    }

    public synchronized boolean online() {
        this.isOnline = true;
        Iterator<PullEntry> it = this.pullEntryMap.values().iterator();
        while (it.hasNext()) {
            it.next().online(StatusSource.HEALTHCHECKER);
        }
        Iterator<DefaultPullConsumer> it2 = this.pullConsumerMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().online(StatusSource.HEALTHCHECKER);
        }
        return true;
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public void setMetaServer(String str) {
        this.metaServer = str;
    }

    @Override // qunar.tc.qmq.consumer.register.ConsumerRegister
    public synchronized void destroy() {
        Iterator<PullEntry> it = this.pullEntryMap.values().iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        this.ackService.destroy();
    }

    public void setDestroyWaitInSeconds(int i) {
        this.destroyWaitInSeconds = i;
    }
}
