/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub.v1;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiService;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.cloud.pubsub.v1.MessageDispatcher;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.StatusUtil;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import com.google.pubsub.v1.Subscription;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

final class PollingSubscriberConnection
extends AbstractApiService
implements MessageDispatcher.AckProcessor {
    static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds((long)60L);
    private static final int MAX_PER_REQUEST_CHANGES = 1000;
    private static final int DEFAULT_MAX_MESSAGES = 1000;
    private static final Duration INITIAL_BACKOFF = Duration.ofMillis((long)100L);
    private static final Duration MAX_BACKOFF = Duration.ofSeconds((long)10L);
    private static final Logger logger = Logger.getLogger(PollingSubscriberConnection.class.getName());
    private final Subscription subscription;
    private final ScheduledExecutorService pollingExecutor;
    private final SubscriberGrpc.SubscriberFutureStub stub;
    private final MessageDispatcher messageDispatcher;
    private final int maxDesiredPulledMessages;

    public PollingSubscriberConnection(Subscription subscription, MessageReceiver receiver, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, SubscriberGrpc.SubscriberFutureStub stub, FlowController flowController, @Nullable Long maxDesiredPulledMessages, Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, ApiClock clock) {
        this.subscription = subscription;
        this.pollingExecutor = systemExecutor;
        this.stub = stub;
        this.messageDispatcher = new MessageDispatcher(receiver, this, ackExpirationPadding, maxAckExtensionPeriod, ackLatencyDistribution, flowController, outstandingMessageBatches, executor, systemExecutor, clock);
        this.messageDispatcher.setMessageDeadlineSeconds(subscription.getAckDeadlineSeconds());
        this.maxDesiredPulledMessages = maxDesiredPulledMessages != null ? Ints.saturatedCast((long)maxDesiredPulledMessages) : 1000;
    }

    protected void doStart() {
        logger.config("Starting subscriber.");
        this.messageDispatcher.start();
        this.pullMessages(INITIAL_BACKOFF);
        this.notifyStarted();
    }

    protected void doStop() {
        this.messageDispatcher.stop();
        this.notifyStopped();
    }

    private ListenableFuture<PullResponse> pullMessages(final Duration backoff) {
        if (!this.isAlive()) {
            return Futures.immediateCancelledFuture();
        }
        ListenableFuture pullResult = this.stub.pull(PullRequest.newBuilder().setSubscription(this.subscription.getName()).setMaxMessages(this.maxDesiredPulledMessages).setReturnImmediately(false).build());
        Futures.addCallback((ListenableFuture)pullResult, (FutureCallback)new FutureCallback<PullResponse>(){

            public void onSuccess(PullResponse pullResponse) {
                if (pullResponse.getReceivedMessagesCount() == 0) {
                    PollingSubscriberConnection.this.pollingExecutor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            Duration newBackoff = backoff.multipliedBy(2L);
                            if (newBackoff.compareTo(MAX_BACKOFF) > 0) {
                                newBackoff = MAX_BACKOFF;
                            }
                            PollingSubscriberConnection.this.pullMessages(newBackoff);
                        }
                    }, backoff.toMillis(), TimeUnit.MILLISECONDS);
                    return;
                }
                PollingSubscriberConnection.this.messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList(), new Runnable(){

                    @Override
                    public void run() {
                        PollingSubscriberConnection.this.pullMessages(INITIAL_BACKOFF);
                    }
                });
            }

            public void onFailure(Throwable cause) {
                if (!PollingSubscriberConnection.this.isAlive()) {
                    logger.log(Level.FINE, "pull failure after service no longer running", cause);
                    return;
                }
                if (StatusUtil.isRetryable(cause)) {
                    logger.log(Level.WARNING, "Failed to pull messages (recoverable): ", cause);
                    PollingSubscriberConnection.this.pollingExecutor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            Duration newBackoff = backoff.multipliedBy(2L);
                            if (newBackoff.compareTo(MAX_BACKOFF) > 0) {
                                newBackoff = MAX_BACKOFF;
                            }
                            PollingSubscriberConnection.this.pullMessages(newBackoff);
                        }
                    }, backoff.toMillis(), TimeUnit.MILLISECONDS);
                } else {
                    PollingSubscriberConnection.this.messageDispatcher.stop();
                    PollingSubscriberConnection.this.notifyFailed(cause);
                }
            }
        }, (Executor)this.pollingExecutor);
        return pullResult;
    }

    private boolean isAlive() {
        ApiService.State state = this.state();
        return state == ApiService.State.RUNNING || state == ApiService.State.STARTING;
    }

    @Override
    public void sendAckOperations(List<String> acksToSend, List<MessageDispatcher.PendingModifyAckDeadline> ackDeadlineExtensions) {
        for (MessageDispatcher.PendingModifyAckDeadline modifyAckDeadline : ackDeadlineExtensions) {
            for (List ackIdChunk : Lists.partition(modifyAckDeadline.ackIds, (int)1000)) {
                ((SubscriberGrpc.SubscriberFutureStub)this.stub.withDeadlineAfter(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)).modifyAckDeadline(ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscription.getName()).addAllAckIds((Iterable)ackIdChunk).setAckDeadlineSeconds(modifyAckDeadline.deadlineExtensionSeconds).build());
            }
        }
        for (List ackChunk : Lists.partition(acksToSend, (int)1000)) {
            ((SubscriberGrpc.SubscriberFutureStub)this.stub.withDeadlineAfter(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)).acknowledge(AcknowledgeRequest.newBuilder().setSubscription(this.subscription.getName()).addAllAckIds((Iterable)ackChunk).build());
        }
    }
}

