package io.zeebe.gateway.impl.job;

import com.google.rpc.Status;
import io.grpc.protobuf.StatusProto;
import io.zeebe.gateway.Loggers;
import io.zeebe.gateway.grpc.ServerStreamObserver;
import io.zeebe.gateway.impl.broker.BrokerClient;
import io.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.zeebe.gateway.metrics.LongPollingMetrics;
import io.zeebe.gateway.protocol.GatewayOuterClass;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.clock.ActorClock;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.class */
public final class LongPollingActivateJobsHandler extends Actor implements ActivateJobsHandler {
    private static final String JOBS_AVAILABLE_TOPIC = "jobsAvailable";
    private static final Logger LOG = Loggers.GATEWAY_LOGGER;
    private static final String ERROR_MSG_ACTIVATED_EXHAUSTED = "Expected to activate jobs of type '%s', but no jobs available and at least one broker returned 'RESOURCE_EXHAUSTED'. Please try again later.";
    private final RoundRobinActivateJobsHandler activateJobsHandler;
    private final BrokerClient brokerClient;
    private final Duration longPollingTimeout;
    private final long probeTimeoutMillis;
    private final int failedAttemptThreshold;
    private final Map<String, InFlightLongPollingActivateJobsRequestsState> jobTypeState = new HashMap();
    private final LongPollingMetrics metrics = new LongPollingMetrics();

    /* loaded from: input_file:io/zeebe/gateway/impl/job/LongPollingActivateJobsHandler$Builder.class */
    public static class Builder {
        private static final long DEFAULT_LONG_POLLING_TIMEOUT = 10000;
        private static final long DEFAULT_PROBE_TIMEOUT = 10000;
        private static final int EMPTY_RESPONSE_THRESHOLD = 3;
        private BrokerClient brokerClient;
        private long longPollingTimeout = 10000;
        private long probeTimeoutMillis = 10000;
        private int minEmptyResponses = EMPTY_RESPONSE_THRESHOLD;

        public Builder setBrokerClient(BrokerClient brokerClient) {
            this.brokerClient = brokerClient;
            return this;
        }

        public Builder setLongPollingTimeout(long j) {
            this.longPollingTimeout = j;
            return this;
        }

        public Builder setProbeTimeoutMillis(long j) {
            this.probeTimeoutMillis = j;
            return this;
        }

        public Builder setMinEmptyResponses(int i) {
            this.minEmptyResponses = i;
            return this;
        }

        public LongPollingActivateJobsHandler build() {
            Objects.requireNonNull(this.brokerClient, "brokerClient");
            return new LongPollingActivateJobsHandler(this.brokerClient, this.longPollingTimeout, this.probeTimeoutMillis, this.minEmptyResponses);
        }
    }

    private LongPollingActivateJobsHandler(BrokerClient brokerClient, long j, long j2, int i) {
        this.brokerClient = brokerClient;
        this.activateJobsHandler = new RoundRobinActivateJobsHandler(brokerClient);
        this.longPollingTimeout = Duration.ofMillis(j);
        this.probeTimeoutMillis = j2;
        this.failedAttemptThreshold = i;
    }

    public String getName() {
        return "GatewayLongPollingJobHandler";
    }

    protected void onActorStarted() {
        this.brokerClient.subscribeJobAvailableNotification(JOBS_AVAILABLE_TOPIC, this::onNotification);
        this.actor.runAtFixedRate(Duration.ofMillis(this.probeTimeoutMillis), this::probe);
    }

    @Override // io.zeebe.gateway.impl.job.ActivateJobsHandler
    public void activateJobs(GatewayOuterClass.ActivateJobsRequest activateJobsRequest, ServerStreamObserver<GatewayOuterClass.ActivateJobsResponse> serverStreamObserver) {
        activateJobs(new LongPollingActivateJobsRequest(activateJobsRequest, serverStreamObserver));
    }

    public void activateJobs(LongPollingActivateJobsRequest longPollingActivateJobsRequest) {
        this.actor.run(() -> {
            InFlightLongPollingActivateJobsRequestsState jobTypeState = getJobTypeState(longPollingActivateJobsRequest.getType());
            if (jobTypeState.getFailedAttempts() < this.failedAttemptThreshold) {
                activateJobsUnchecked(jobTypeState, longPollingActivateJobsRequest);
            } else {
                completeOrEnqueueRequest(jobTypeState, longPollingActivateJobsRequest);
            }
        });
    }

    private InFlightLongPollingActivateJobsRequestsState getJobTypeState(String str) {
        return this.jobTypeState.computeIfAbsent(str, str2 -> {
            return new InFlightLongPollingActivateJobsRequestsState(str2, this.metrics);
        });
    }

    private void activateJobsUnchecked(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, LongPollingActivateJobsRequest longPollingActivateJobsRequest) {
        BrokerClusterState topology = this.brokerClient.getTopologyManager().getTopology();
        if (topology != null) {
            inFlightLongPollingActivateJobsRequestsState.addActiveRequest(longPollingActivateJobsRequest);
            this.activateJobsHandler.activateJobs(topology.getPartitionsCount(), longPollingActivateJobsRequest.getRequest(), longPollingActivateJobsRequest.getMaxJobsToActivate(), longPollingActivateJobsRequest.getType(), activateJobsResponse -> {
                onResponse(longPollingActivateJobsRequest, activateJobsResponse);
            }, th -> {
                onError(longPollingActivateJobsRequest, th);
            }, (num, bool) -> {
                onCompleted(inFlightLongPollingActivateJobsRequestsState, longPollingActivateJobsRequest, num.intValue(), bool.booleanValue());
            });
        }
    }

    private void onNotification(String str) {
        LOG.trace("Received jobs available notification for type {}.", str);
        this.actor.run(() -> {
            resetFailedAttemptsAndHandlePendingRequests(str);
        });
    }

    private void onCompleted(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, LongPollingActivateJobsRequest longPollingActivateJobsRequest, int i, boolean z) {
        if (i != longPollingActivateJobsRequest.getMaxJobsToActivate()) {
            this.actor.submit(() -> {
                longPollingActivateJobsRequest.complete();
                inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(longPollingActivateJobsRequest);
                resetFailedAttemptsAndHandlePendingRequests(longPollingActivateJobsRequest.getType());
            });
        } else if (z) {
            this.actor.submit(() -> {
                inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(longPollingActivateJobsRequest);
                longPollingActivateJobsRequest.getResponseObserver().onError(StatusProto.toStatusException(Status.newBuilder().setCode(8).setMessage(String.format(ERROR_MSG_ACTIVATED_EXHAUSTED, longPollingActivateJobsRequest.getType())).build()));
            });
        } else {
            this.actor.submit(() -> {
                inFlightLongPollingActivateJobsRequestsState.incrementFailedAttempts(ActorClock.currentTimeMillis());
                boolean shouldBeRepeated = inFlightLongPollingActivateJobsRequestsState.shouldBeRepeated(longPollingActivateJobsRequest);
                inFlightLongPollingActivateJobsRequestsState.removeActiveRequest(longPollingActivateJobsRequest);
                if (shouldBeRepeated) {
                    activateJobs(longPollingActivateJobsRequest);
                } else {
                    completeOrEnqueueRequest(getJobTypeState(longPollingActivateJobsRequest.getType()), longPollingActivateJobsRequest);
                }
            });
        }
    }

    private void onResponse(LongPollingActivateJobsRequest longPollingActivateJobsRequest, GatewayOuterClass.ActivateJobsResponse activateJobsResponse) {
        this.actor.submit(() -> {
            longPollingActivateJobsRequest.onResponse(activateJobsResponse);
        });
    }

    private void onError(LongPollingActivateJobsRequest longPollingActivateJobsRequest, Throwable th) {
        this.actor.submit(() -> {
            longPollingActivateJobsRequest.onError(th);
        });
    }

    private void resetFailedAttemptsAndHandlePendingRequests(String str) {
        InFlightLongPollingActivateJobsRequestsState jobTypeState = getJobTypeState(str);
        jobTypeState.resetFailedAttempts();
        Queue<LongPollingActivateJobsRequest> pendingRequests = jobTypeState.getPendingRequests();
        if (!pendingRequests.isEmpty()) {
            pendingRequests.forEach(longPollingActivateJobsRequest -> {
                LOG.trace("Unblocking ActivateJobsRequest {}", longPollingActivateJobsRequest.getRequest());
                activateJobs(longPollingActivateJobsRequest);
            });
        } else {
            if (jobTypeState.hasActiveRequests()) {
                return;
            }
            this.jobTypeState.remove(str);
        }
    }

    private void completeOrEnqueueRequest(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, LongPollingActivateJobsRequest longPollingActivateJobsRequest) {
        if (longPollingActivateJobsRequest.isLongPollingDisabled()) {
            longPollingActivateJobsRequest.complete();
            return;
        }
        if (longPollingActivateJobsRequest.isTimedOut()) {
            return;
        }
        LOG.trace("Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will be kept open until a new job of this type is created or until timeout of '{}'.", new Object[]{longPollingActivateJobsRequest.getWorker(), Integer.valueOf(longPollingActivateJobsRequest.getMaxJobsToActivate()), longPollingActivateJobsRequest.getType(), longPollingActivateJobsRequest.getLongPollingTimeout(this.longPollingTimeout)});
        inFlightLongPollingActivateJobsRequestsState.enqueueRequest(longPollingActivateJobsRequest);
        if (longPollingActivateJobsRequest.hasScheduledTimer()) {
            return;
        }
        addTimeOut(inFlightLongPollingActivateJobsRequestsState, longPollingActivateJobsRequest);
    }

    private void addTimeOut(InFlightLongPollingActivateJobsRequestsState inFlightLongPollingActivateJobsRequestsState, LongPollingActivateJobsRequest longPollingActivateJobsRequest) {
        ActorClock.currentTimeMillis();
        Duration longPollingTimeout = longPollingActivateJobsRequest.getLongPollingTimeout(this.longPollingTimeout);
        longPollingActivateJobsRequest.setScheduledTimer(this.actor.runDelayed(longPollingTimeout, () -> {
            LOG.debug("Remove blocking request {} for job type {} after timeout of {}", new Object[]{longPollingActivateJobsRequest.getRequest(), longPollingActivateJobsRequest.getType(), longPollingTimeout});
            inFlightLongPollingActivateJobsRequestsState.removeRequest(longPollingActivateJobsRequest);
            longPollingActivateJobsRequest.timeout();
        }));
    }

    private void probe() {
        long currentTimeMillis = ActorClock.currentTimeMillis();
        this.jobTypeState.forEach((str, inFlightLongPollingActivateJobsRequestsState) -> {
            if (inFlightLongPollingActivateJobsRequestsState.getLastUpdatedTime() < currentTimeMillis - this.probeTimeoutMillis) {
                LongPollingActivateJobsRequest nextPendingRequest = inFlightLongPollingActivateJobsRequestsState.getNextPendingRequest();
                if (nextPendingRequest != null) {
                    activateJobsUnchecked(inFlightLongPollingActivateJobsRequestsState, nextPendingRequest);
                } else if (inFlightLongPollingActivateJobsRequestsState.getFailedAttempts() >= this.failedAttemptThreshold) {
                    inFlightLongPollingActivateJobsRequestsState.setFailedAttempts(this.failedAttemptThreshold - 1);
                }
            }
        });
    }

    public static Builder newBuilder() {
        return new Builder();
    }
}
