package io.zeebe.gateway.impl.broker;

import io.zeebe.gateway.cmd.BrokerErrorException;
import io.zeebe.gateway.cmd.BrokerRejectionException;
import io.zeebe.gateway.cmd.BrokerResponseException;
import io.zeebe.gateway.cmd.ClientOutOfMemoryException;
import io.zeebe.gateway.cmd.ClientResponseException;
import io.zeebe.gateway.cmd.IllegalBrokerResponseException;
import io.zeebe.gateway.cmd.NoTopologyAvailableException;
import io.zeebe.gateway.impl.ErrorResponseHandler;
import io.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.zeebe.gateway.impl.broker.cluster.BrokerTopologyManagerImpl;
import io.zeebe.gateway.impl.broker.request.BrokerPublishMessageRequest;
import io.zeebe.gateway.impl.broker.request.BrokerRequest;
import io.zeebe.gateway.impl.broker.response.BrokerError;
import io.zeebe.gateway.impl.broker.response.BrokerRejection;
import io.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.zeebe.protocol.impl.SubscriptionUtil;
import io.zeebe.protocol.record.ErrorCode;
import io.zeebe.protocol.record.MessageHeaderDecoder;
import io.zeebe.transport.ClientTransport;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/gateway/impl/broker/BrokerRequestManager.class */
public final class BrokerRequestManager extends Actor {
    private final ClientTransport clientTransport;
    private final RequestDispatchStrategy dispatchStrategy;
    private final BrokerTopologyManagerImpl topologyManager;
    private final Duration requestTimeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zeebe/gateway/impl/broker/BrokerRequestManager$BrokerAddressProvider.class */
    public class BrokerAddressProvider implements Supplier<String> {
        private final Function<BrokerClusterState, Integer> nodeIdSelector;

        BrokerAddressProvider(BrokerRequestManager brokerRequestManager) {
            this((Function<BrokerClusterState, Integer>) (v0) -> {
                return v0.getRandomBroker();
            });
        }

        BrokerAddressProvider(BrokerRequestManager brokerRequestManager, int i) {
            this((Function<BrokerClusterState, Integer>) brokerClusterState -> {
                return Integer.valueOf(brokerClusterState.getLeaderForPartition(i));
            });
        }

        BrokerAddressProvider(Function<BrokerClusterState, Integer> function) {
            this.nodeIdSelector = function;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public String get() {
            BrokerClusterState topology = BrokerRequestManager.this.topologyManager.getTopology();
            if (topology != null) {
                return topology.getBrokerAddress(this.nodeIdSelector.apply(topology).intValue());
            }
            return null;
        }
    }

    public BrokerRequestManager(ClientTransport clientTransport, BrokerTopologyManagerImpl brokerTopologyManagerImpl, RequestDispatchStrategy requestDispatchStrategy, Duration duration) {
        this.clientTransport = clientTransport;
        this.dispatchStrategy = requestDispatchStrategy;
        this.topologyManager = brokerTopologyManagerImpl;
        this.requestTimeout = duration;
    }

    private static boolean responseValidation(DirectBuffer directBuffer) {
        ErrorResponseHandler errorResponseHandler = new ErrorResponseHandler();
        MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
        messageHeaderDecoder.wrap(directBuffer, 0);
        if (!errorResponseHandler.handlesResponse(messageHeaderDecoder)) {
            return true;
        }
        errorResponseHandler.wrap(directBuffer, messageHeaderDecoder.encodedLength(), messageHeaderDecoder.blockLength(), messageHeaderDecoder.version());
        return errorResponseHandler.getErrorCode() != ErrorCode.PARTITION_LEADER_MISMATCH;
    }

    public <T> ActorFuture<BrokerResponse<T>> sendRequest(BrokerRequest<T> brokerRequest) {
        return sendRequest(brokerRequest, this.requestTimeout);
    }

    public <T> ActorFuture<BrokerResponse<T>> sendRequest(BrokerRequest<T> brokerRequest, Duration duration) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        sendRequest(brokerRequest, (brokerResponse, th) -> {
            if (th == null) {
                completableActorFuture.complete(brokerResponse);
            } else {
                completableActorFuture.completeExceptionally(th);
            }
        }, duration);
        return completableActorFuture;
    }

    public <T> void sendRequest(BrokerRequest<T> brokerRequest, BrokerResponseConsumer<T> brokerResponseConsumer, Consumer<Throwable> consumer) {
        sendRequest(brokerRequest, brokerResponseConsumer, consumer, this.requestTimeout);
    }

    public <T> void sendRequest(BrokerRequest<T> brokerRequest, BrokerResponseConsumer<T> brokerResponseConsumer, Consumer<Throwable> consumer, Duration duration) {
        sendRequest(brokerRequest, brokerResponseConsumer, brokerRejection -> {
            consumer.accept(new BrokerRejectionException(brokerRejection));
        }, brokerError -> {
            consumer.accept(new BrokerErrorException(brokerError));
        }, consumer, duration);
    }

    private <T> void sendRequest(BrokerRequest<T> brokerRequest, BrokerResponseConsumer<T> brokerResponseConsumer, Consumer<BrokerRejection> consumer, Consumer<BrokerError> consumer2, Consumer<Throwable> consumer3, Duration duration) {
        sendRequest(brokerRequest, (brokerResponse, th) -> {
            try {
                if (th != null) {
                    consumer3.accept(th);
                } else if (brokerResponse.isResponse()) {
                    brokerResponseConsumer.accept(brokerResponse.getKey(), brokerResponse.getResponse());
                } else if (brokerResponse.isRejection()) {
                    consumer.accept(brokerResponse.getRejection());
                } else if (brokerResponse.isError()) {
                    consumer2.accept(brokerResponse.getError());
                } else {
                    consumer3.accept(new IllegalBrokerResponseException("Expected broker response to be either response, rejection, or error, but is neither of them"));
                }
            } catch (RuntimeException e) {
                consumer3.accept(new BrokerResponseException(e));
            }
        }, duration);
    }

    private <T> void sendRequest(BrokerRequest<T> brokerRequest, BiConsumer<BrokerResponse<T>, Throwable> biConsumer, Duration duration) {
        brokerRequest.serializeValue();
        this.actor.run(() -> {
            sendRequestInternal(brokerRequest, biConsumer, duration);
        });
    }

    private <T> void sendRequestInternal(BrokerRequest<T> brokerRequest, BiConsumer<BrokerResponse<T>, Throwable> biConsumer, Duration duration) {
        ActorFuture sendRequestWithRetry = this.clientTransport.sendRequestWithRetry(determineBrokerNodeIdProvider(brokerRequest), BrokerRequestManager::responseValidation, brokerRequest, duration);
        if (sendRequestWithRetry != null) {
            this.actor.runOnCompletion(sendRequestWithRetry, (directBuffer, th) -> {
                try {
                    if (th == null) {
                        biConsumer.accept(brokerRequest.getResponse(directBuffer), null);
                    } else {
                        biConsumer.accept(null, th);
                    }
                } catch (RuntimeException e) {
                    biConsumer.accept(null, new ClientResponseException(e));
                }
            });
        } else {
            biConsumer.accept(null, new ClientOutOfMemoryException());
        }
    }

    private BrokerAddressProvider determineBrokerNodeIdProvider(BrokerRequest<?> brokerRequest) {
        if (brokerRequest.addressesSpecificPartition()) {
            return new BrokerAddressProvider(this, brokerRequest.getPartitionId());
        }
        if (!brokerRequest.requiresPartitionId()) {
            return new BrokerAddressProvider(this);
        }
        if (brokerRequest instanceof BrokerPublishMessageRequest) {
            determinePartitionIdForPublishMessageRequest((BrokerPublishMessageRequest) brokerRequest);
        } else {
            int determinePartition = this.dispatchStrategy.determinePartition();
            if (determinePartition == -3) {
                determinePartition = 1;
            }
            brokerRequest.setPartitionId(determinePartition);
        }
        return new BrokerAddressProvider(this, brokerRequest.getPartitionId());
    }

    private void determinePartitionIdForPublishMessageRequest(BrokerPublishMessageRequest brokerPublishMessageRequest) {
        BrokerClusterState topology = this.topologyManager.getTopology();
        if (topology == null) {
            throw new NoTopologyAvailableException(String.format("Expected to pick partition for message with correlation key '%s', but no topology is available", brokerPublishMessageRequest.getCorrelationKey()));
        }
        brokerPublishMessageRequest.setPartitionId(SubscriptionUtil.getSubscriptionPartitionId(brokerPublishMessageRequest.getCorrelationKey(), topology.getPartitionsCount()));
    }
}
