/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiClock;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.MessageWaiter;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.TemporalAmount;

class MessageDispatcher {
    private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());
    @InternalApi
    static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis((long)100L);
    private final Executor executor;
    private final ScheduledExecutorService systemExecutor;
    private final ApiClock clock;
    private final Duration ackExpirationPadding;
    private final Duration maxAckExtensionPeriod;
    private final MessageReceiver receiver;
    private final AckProcessor ackProcessor;
    private final FlowController flowController;
    private final MessageWaiter messagesWaiter;
    private final ConcurrentMap<String, Instant> pendingMessages = new ConcurrentHashMap<String, Instant>();
    private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue();
    private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue();
    private final LinkedBlockingQueue<String> pendingReceipts = new LinkedBlockingQueue();
    private final Lock alarmsLock;
    private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(10);
    private ScheduledFuture<?> ackDeadlineExtensionAlarm;
    private ScheduledFuture<?> pendingAcksAlarm;
    private final Deque<OutstandingMessageBatch> outstandingMessageBatches;
    private final Distribution ackLatencyDistribution;

    MessageDispatcher(MessageReceiver receiver, AckProcessor ackProcessor, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, FlowController flowController, Deque<OutstandingMessageBatch> outstandingMessageBatches, Executor executor, ScheduledExecutorService systemExecutor, ApiClock clock) {
        this.executor = executor;
        this.systemExecutor = systemExecutor;
        this.ackExpirationPadding = ackExpirationPadding;
        this.maxAckExtensionPeriod = maxAckExtensionPeriod;
        this.receiver = receiver;
        this.ackProcessor = ackProcessor;
        this.flowController = flowController;
        this.outstandingMessageBatches = outstandingMessageBatches;
        this.ackLatencyDistribution = ackLatencyDistribution;
        this.alarmsLock = new ReentrantLock();
        this.messagesWaiter = new MessageWaiter();
        this.clock = clock;
    }

    public void start() {
        this.alarmsLock.lock();
        try {
            this.pendingAcksAlarm = this.systemExecutor.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        MessageDispatcher.this.processOutstandingAckOperations();
                    }
                    catch (Throwable t) {
                        logger.log(Level.WARNING, "failed to send acks/nacks", t);
                    }
                }
            }, PENDING_ACKS_SEND_DELAY.toMillis(), PENDING_ACKS_SEND_DELAY.toMillis(), TimeUnit.MILLISECONDS);
            Duration extensionPeriod = Duration.ofSeconds((long)this.messageDeadlineSeconds.get()).minus(this.ackExpirationPadding);
            this.ackDeadlineExtensionAlarm = this.systemExecutor.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        MessageDispatcher.this.extendDeadlines();
                    }
                    catch (Throwable t) {
                        logger.log(Level.WARNING, "failed to send extend deadlines", t);
                    }
                }
            }, extensionPeriod.toMillis(), extensionPeriod.toMillis(), TimeUnit.MILLISECONDS);
        }
        finally {
            this.alarmsLock.unlock();
        }
    }

    public void stop() {
        this.messagesWaiter.waitNoMessages();
        this.alarmsLock.lock();
        try {
            if (this.ackDeadlineExtensionAlarm != null) {
                this.ackDeadlineExtensionAlarm.cancel(true);
                this.ackDeadlineExtensionAlarm = null;
            }
            if (this.pendingAcksAlarm != null) {
                this.pendingAcksAlarm.cancel(false);
                this.pendingAcksAlarm = null;
            }
        }
        finally {
            this.alarmsLock.unlock();
        }
        this.processOutstandingAckOperations();
    }

    public void setMessageDeadlineSeconds(int messageDeadlineSeconds) {
        this.messageDeadlineSeconds.set(messageDeadlineSeconds);
    }

    public int getMessageDeadlineSeconds() {
        return this.messageDeadlineSeconds.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {
        if (messages.isEmpty()) {
            doneCallback.run();
            return;
        }
        this.messagesWaiter.incrementPendingMessages(messages.size());
        Instant totalExpiration = this.now().plus((TemporalAmount)this.maxAckExtensionPeriod);
        for (ReceivedMessage message : messages) {
            this.pendingReceipts.add(message.getAckId());
            this.pendingMessages.put(message.getAckId(), totalExpiration);
        }
        OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
        ArrayList<AckHandler> ackHandlers = new ArrayList<AckHandler>(messages.size());
        for (ReceivedMessage message : messages) {
            AckHandler ackHandler = new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
            ackHandlers.add(ackHandler);
            outstandingBatch.addMessage(message, ackHandler);
        }
        Deque<OutstandingMessageBatch> deque = this.outstandingMessageBatches;
        synchronized (deque) {
            this.outstandingMessageBatches.add(outstandingBatch);
        }
        this.processOutstandingBatches();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processOutstandingBatches() {
        while (true) {
            OutstandingMessageBatch.OutstandingMessage outstandingMessage;
            boolean batchDone = false;
            Runnable batchCallback = null;
            Deque<OutstandingMessageBatch> deque = this.outstandingMessageBatches;
            synchronized (deque) {
                OutstandingMessageBatch nextBatch = this.outstandingMessageBatches.peek();
                if (nextBatch == null) {
                    return;
                }
                outstandingMessage = (OutstandingMessageBatch.OutstandingMessage)nextBatch.messages.peek();
                if (outstandingMessage == null) {
                    return;
                }
                try {
                    this.flowController.reserve(1L, (long)outstandingMessage.receivedMessage().getMessage().getSerializedSize());
                }
                catch (FlowController.MaxOutstandingElementCountReachedException | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
                    return;
                }
                catch (FlowController.FlowControlException unexpectedException) {
                    throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
                }
                nextBatch.messages.poll();
                batchDone = nextBatch.messages.isEmpty();
                if (batchDone) {
                    this.outstandingMessageBatches.poll();
                    batchCallback = nextBatch.doneCallback;
                }
            }
            final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
            AckHandler ackHandler = outstandingMessage.ackHandler();
            final SettableFuture response = SettableFuture.create();
            final AckReplyConsumer consumer = new AckReplyConsumer(){

                @Override
                public void ack() {
                    response.set((Object)AckReply.ACK);
                }

                @Override
                public void nack() {
                    response.set((Object)AckReply.NACK);
                }
            };
            Futures.addCallback((ListenableFuture)response, (FutureCallback)ackHandler);
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        MessageDispatcher.this.receiver.receiveMessage(message, consumer);
                    }
                    catch (Exception e) {
                        response.setException((Throwable)e);
                    }
                }
            });
            if (!batchDone) continue;
            batchCallback.run();
        }
    }

    @InternalApi
    void extendDeadlines() {
        List<String> acksToSend = Collections.emptyList();
        PendingModifyAckDeadline modack = new PendingModifyAckDeadline(this.getMessageDeadlineSeconds(), new String[0]);
        Instant now = this.now();
        Iterator it = this.pendingMessages.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = it.next();
            if (((Instant)entry.getValue()).isBefore(now)) {
                it.remove();
                continue;
            }
            modack.ackIds.add((String)entry.getKey());
        }
        this.ackProcessor.sendAckOperations(acksToSend, Collections.singletonList(modack));
    }

    @InternalApi
    void processOutstandingAckOperations() {
        ArrayList<PendingModifyAckDeadline> modifyAckDeadlinesToSend = new ArrayList<PendingModifyAckDeadline>();
        ArrayList<String> acksToSend = new ArrayList<String>();
        this.pendingAcks.drainTo(acksToSend);
        logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
        PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0, new String[0]);
        this.pendingNacks.drainTo(nacksToSend.ackIds);
        logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size());
        if (!nacksToSend.ackIds.isEmpty()) {
            modifyAckDeadlinesToSend.add(nacksToSend);
        }
        PendingModifyAckDeadline receiptsToSend = new PendingModifyAckDeadline(this.getMessageDeadlineSeconds(), new String[0]);
        this.pendingReceipts.drainTo(receiptsToSend.ackIds);
        logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size());
        if (!receiptsToSend.ackIds.isEmpty()) {
            modifyAckDeadlinesToSend.add(receiptsToSend);
        }
        this.ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
    }

    private Instant now() {
        return Instant.ofEpochMilli((long)this.clock.millisTime());
    }

    static class OutstandingMessageBatch {
        private final Deque<OutstandingMessage> messages = new LinkedList<OutstandingMessage>();
        private final Runnable doneCallback;

        public OutstandingMessageBatch(Runnable doneCallback) {
            this.doneCallback = doneCallback;
        }

        public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
            this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
        }

        public Deque<OutstandingMessage> messages() {
            return this.messages;
        }

        static class OutstandingMessage {
            private final ReceivedMessage receivedMessage;
            private final AckHandler ackHandler;

            public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
                this.receivedMessage = receivedMessage;
                this.ackHandler = ackHandler;
            }

            public ReceivedMessage receivedMessage() {
                return this.receivedMessage;
            }

            public AckHandler ackHandler() {
                return this.ackHandler;
            }
        }
    }

    public static interface AckProcessor {
        public void sendAckOperations(List<String> var1, List<PendingModifyAckDeadline> var2);
    }

    private class AckHandler
    implements FutureCallback<AckReply> {
        private final String ackId;
        private final int outstandingBytes;
        private final long receivedTimeMillis;

        AckHandler(String ackId, int outstandingBytes) {
            this.ackId = ackId;
            this.outstandingBytes = outstandingBytes;
            this.receivedTimeMillis = MessageDispatcher.this.clock.millisTime();
        }

        private void onBoth(LinkedBlockingQueue<String> destination) {
            MessageDispatcher.this.pendingMessages.remove(this.ackId);
            destination.add(this.ackId);
            MessageDispatcher.this.flowController.release(1L, (long)this.outstandingBytes);
            MessageDispatcher.this.messagesWaiter.incrementPendingMessages(-1);
            MessageDispatcher.this.processOutstandingBatches();
        }

        public void onFailure(Throwable t) {
            logger.log(Level.WARNING, "MessageReceiver failed to processes ack ID: " + this.ackId + ", the message will be nacked.", t);
            this.onBoth(MessageDispatcher.this.pendingNacks);
        }

        public void onSuccess(AckReply reply) {
            LinkedBlockingQueue destination;
            switch (reply) {
                case ACK: {
                    destination = MessageDispatcher.this.pendingAcks;
                    MessageDispatcher.this.ackLatencyDistribution.record(Ints.saturatedCast((long)((long)Math.ceil((double)(MessageDispatcher.this.clock.millisTime() - this.receivedTimeMillis) / 1000.0))));
                    break;
                }
                case NACK: {
                    destination = MessageDispatcher.this.pendingNacks;
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("AckReply: %s not supported", new Object[]{reply}));
                }
            }
            this.onBoth(destination);
        }
    }

    public static enum AckReply {
        ACK,
        NACK;

    }

    static class PendingModifyAckDeadline {
        final List<String> ackIds = new ArrayList<String>();
        final int deadlineExtensionSeconds;

        PendingModifyAckDeadline(int deadlineExtensionSeconds, String ... ackIds) {
            this.deadlineExtensionSeconds = deadlineExtensionSeconds;
            for (String ackId : ackIds) {
                this.addAckId(ackId);
            }
        }

        public void addAckId(String ackId) {
            this.ackIds.add(ackId);
        }

        public String toString() {
            return String.format("PendingModifyAckDeadline{extension: %d sec, ackIds: %s}", this.deadlineExtensionSeconds, this.ackIds);
        }
    }
}

