package io.zeebe.gateway;

import io.grpc.Status;
import io.zeebe.gateway.ResponseMapper;
import io.zeebe.gateway.grpc.ServerStreamObserver;
import io.zeebe.gateway.impl.broker.BrokerClient;
import io.zeebe.gateway.impl.broker.BrokerResponseConsumer;
import io.zeebe.gateway.impl.broker.RequestRetryHandler;
import io.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.zeebe.gateway.impl.broker.request.BrokerRequest;
import io.zeebe.gateway.impl.job.ActivateJobsHandler;
import io.zeebe.gateway.protocol.GatewayOuterClass;
import io.zeebe.util.VersionUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;

/* loaded from: input_file:io/zeebe/gateway/EndpointManager.class */
public final class EndpointManager {
    private final BrokerClient brokerClient;
    private final BrokerTopologyManager topologyManager;
    private final ActivateJobsHandler activateJobsHandler;
    private final RequestRetryHandler requestRetryHandler;

    public EndpointManager(BrokerClient brokerClient, ActivateJobsHandler activateJobsHandler) {
        this.brokerClient = brokerClient;
        this.topologyManager = brokerClient.getTopologyManager();
        this.activateJobsHandler = activateJobsHandler;
        this.requestRetryHandler = new RequestRetryHandler(brokerClient, this.topologyManager);
    }

    private void addBrokerInfo(GatewayOuterClass.BrokerInfo.Builder builder, Integer num, BrokerClusterState brokerClusterState) {
        String[] split = brokerClusterState.getBrokerAddress(num.intValue()).split(":");
        builder.setNodeId(num.intValue()).setHost(split[0]).setPort(Integer.parseInt(split[1])).setVersion(brokerClusterState.getBrokerVersion(num.intValue()));
    }

    private void addPartitionInfoToBrokerInfo(GatewayOuterClass.BrokerInfo.Builder builder, Integer num, BrokerClusterState brokerClusterState) {
        brokerClusterState.getPartitions().forEach(num2 -> {
            GatewayOuterClass.Partition.Builder partitionId = GatewayOuterClass.Partition.newBuilder().setPartitionId(num2.intValue());
            if (setRole(num, num2, brokerClusterState, partitionId)) {
                if (brokerClusterState.isPartitionHealthy(num.intValue(), num2.intValue())) {
                    partitionId.setHealth(GatewayOuterClass.Partition.PartitionBrokerHealth.HEALTHY);
                } else {
                    partitionId.setHealth(GatewayOuterClass.Partition.PartitionBrokerHealth.UNHEALTHY);
                }
                builder.addPartitions(partitionId);
            }
        });
    }

    private boolean setRole(Integer num, Integer num2, BrokerClusterState brokerClusterState, GatewayOuterClass.Partition.Builder builder) {
        int leaderForPartition = brokerClusterState.getLeaderForPartition(num2.intValue());
        List<Integer> followersForPartition = brokerClusterState.getFollowersForPartition(num2.intValue());
        List<Integer> inactiveNodesForPartition = brokerClusterState.getInactiveNodesForPartition(num2.intValue());
        if (leaderForPartition == num.intValue()) {
            builder.setRole(GatewayOuterClass.Partition.PartitionBrokerRole.LEADER);
            return true;
        }
        if (followersForPartition != null && followersForPartition.contains(num)) {
            builder.setRole(GatewayOuterClass.Partition.PartitionBrokerRole.FOLLOWER);
            return true;
        }
        if (inactiveNodesForPartition == null || !inactiveNodesForPartition.contains(num)) {
            return false;
        }
        builder.setRole(GatewayOuterClass.Partition.PartitionBrokerRole.INACTIVE);
        return true;
    }

    public void activateJobs(GatewayOuterClass.ActivateJobsRequest activateJobsRequest, ServerStreamObserver<GatewayOuterClass.ActivateJobsResponse> serverStreamObserver) {
        this.activateJobsHandler.activateJobs(activateJobsRequest, serverStreamObserver);
    }

    public void cancelWorkflowInstance(GatewayOuterClass.CancelWorkflowInstanceRequest cancelWorkflowInstanceRequest, ServerStreamObserver<GatewayOuterClass.CancelWorkflowInstanceResponse> serverStreamObserver) {
        sendRequest(cancelWorkflowInstanceRequest, RequestMapper::toCancelWorkflowInstanceRequest, ResponseMapper::toCancelWorkflowInstanceResponse, serverStreamObserver);
    }

    public void completeJob(GatewayOuterClass.CompleteJobRequest completeJobRequest, ServerStreamObserver<GatewayOuterClass.CompleteJobResponse> serverStreamObserver) {
        sendRequest(completeJobRequest, RequestMapper::toCompleteJobRequest, ResponseMapper::toCompleteJobResponse, serverStreamObserver);
    }

    public void createWorkflowInstance(GatewayOuterClass.CreateWorkflowInstanceRequest createWorkflowInstanceRequest, ServerStreamObserver<GatewayOuterClass.CreateWorkflowInstanceResponse> serverStreamObserver) {
        sendRequestWithRetryPartitions(createWorkflowInstanceRequest, RequestMapper::toCreateWorkflowInstanceRequest, ResponseMapper::toCreateWorkflowInstanceResponse, serverStreamObserver);
    }

    public void createWorkflowInstanceWithResult(GatewayOuterClass.CreateWorkflowInstanceWithResultRequest createWorkflowInstanceWithResultRequest, ServerStreamObserver<GatewayOuterClass.CreateWorkflowInstanceWithResultResponse> serverStreamObserver) {
        if (createWorkflowInstanceWithResultRequest.getRequestTimeout() > 0) {
            sendRequestWithRetryPartitions(createWorkflowInstanceWithResultRequest, RequestMapper::toCreateWorkflowInstanceWithResultRequest, ResponseMapper::toCreateWorkflowInstanceWithResultResponse, serverStreamObserver, Duration.ofMillis(createWorkflowInstanceWithResultRequest.getRequestTimeout()));
        } else {
            sendRequestWithRetryPartitions(createWorkflowInstanceWithResultRequest, RequestMapper::toCreateWorkflowInstanceWithResultRequest, ResponseMapper::toCreateWorkflowInstanceWithResultResponse, serverStreamObserver);
        }
    }

    public void deployWorkflow(GatewayOuterClass.DeployWorkflowRequest deployWorkflowRequest, ServerStreamObserver<GatewayOuterClass.DeployWorkflowResponse> serverStreamObserver) {
        sendRequest(deployWorkflowRequest, RequestMapper::toDeployWorkflowRequest, ResponseMapper::toDeployWorkflowResponse, serverStreamObserver);
    }

    public void failJob(GatewayOuterClass.FailJobRequest failJobRequest, ServerStreamObserver<GatewayOuterClass.FailJobResponse> serverStreamObserver) {
        sendRequest(failJobRequest, RequestMapper::toFailJobRequest, ResponseMapper::toFailJobResponse, serverStreamObserver);
    }

    public void throwError(GatewayOuterClass.ThrowErrorRequest throwErrorRequest, ServerStreamObserver<GatewayOuterClass.ThrowErrorResponse> serverStreamObserver) {
        sendRequest(throwErrorRequest, RequestMapper::toThrowErrorRequest, ResponseMapper::toThrowErrorResponse, serverStreamObserver);
    }

    public void publishMessage(GatewayOuterClass.PublishMessageRequest publishMessageRequest, ServerStreamObserver<GatewayOuterClass.PublishMessageResponse> serverStreamObserver) {
        sendRequest(publishMessageRequest, RequestMapper::toPublishMessageRequest, (v0, v1) -> {
            return ResponseMapper.toPublishMessageResponse(v0, v1);
        }, serverStreamObserver);
    }

    public void resolveIncident(GatewayOuterClass.ResolveIncidentRequest resolveIncidentRequest, ServerStreamObserver<GatewayOuterClass.ResolveIncidentResponse> serverStreamObserver) {
        sendRequest(resolveIncidentRequest, RequestMapper::toResolveIncidentRequest, ResponseMapper::toResolveIncidentResponse, serverStreamObserver);
    }

    public void setVariables(GatewayOuterClass.SetVariablesRequest setVariablesRequest, ServerStreamObserver<GatewayOuterClass.SetVariablesResponse> serverStreamObserver) {
        sendRequest(setVariablesRequest, RequestMapper::toSetVariablesRequest, ResponseMapper::toSetVariablesResponse, serverStreamObserver);
    }

    public void topology(ServerStreamObserver<GatewayOuterClass.TopologyResponse> serverStreamObserver) {
        GatewayOuterClass.TopologyResponse.Builder newBuilder = GatewayOuterClass.TopologyResponse.newBuilder();
        BrokerClusterState topology = this.topologyManager.getTopology();
        if (topology == null) {
            serverStreamObserver.onError(Status.UNAVAILABLE.augmentDescription("No brokers available").asRuntimeException());
            return;
        }
        newBuilder.setClusterSize(topology.getClusterSize()).setPartitionsCount(topology.getPartitionsCount()).setReplicationFactor(topology.getReplicationFactor());
        String version = VersionUtil.getVersion();
        if (version != null && !version.isBlank()) {
            newBuilder.setGatewayVersion(version);
        }
        ArrayList arrayList = new ArrayList();
        topology.getBrokers().forEach(num -> {
            GatewayOuterClass.BrokerInfo.Builder newBuilder2 = GatewayOuterClass.BrokerInfo.newBuilder();
            addBrokerInfo(newBuilder2, num, topology);
            addPartitionInfoToBrokerInfo(newBuilder2, num, topology);
            arrayList.add(newBuilder2.build());
        });
        newBuilder.addAllBrokers(arrayList);
        serverStreamObserver.onNext(newBuilder.build());
        serverStreamObserver.onCompleted();
    }

    public void updateJobRetries(GatewayOuterClass.UpdateJobRetriesRequest updateJobRetriesRequest, ServerStreamObserver<GatewayOuterClass.UpdateJobRetriesResponse> serverStreamObserver) {
        sendRequest(updateJobRetriesRequest, RequestMapper::toUpdateJobRetriesRequest, ResponseMapper::toUpdateJobRetriesResponse, serverStreamObserver);
    }

    private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequest(GrpcRequestT grpcrequestt, Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> function, ResponseMapper.BrokerResponseMapper<BrokerResponseT, GrpcResponseT> brokerResponseMapper, ServerStreamObserver<GrpcResponseT> serverStreamObserver) {
        try {
            BrokerRequest<BrokerResponseT> apply = function.apply(grpcrequestt);
            BrokerClient brokerClient = this.brokerClient;
            BrokerResponseConsumer brokerResponseConsumer = (j, obj) -> {
                consumeResponse(brokerResponseMapper, serverStreamObserver, j, obj);
            };
            Objects.requireNonNull(serverStreamObserver);
            brokerClient.sendRequestWithRetry(apply, brokerResponseConsumer, serverStreamObserver::onError);
        } catch (Exception e) {
            serverStreamObserver.onError(e);
        }
    }

    private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequestWithRetryPartitions(GrpcRequestT grpcrequestt, Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> function, ResponseMapper.BrokerResponseMapper<BrokerResponseT, GrpcResponseT> brokerResponseMapper, ServerStreamObserver<GrpcResponseT> serverStreamObserver) {
        try {
            BrokerRequest<BrokerResponseT> apply = function.apply(grpcrequestt);
            RequestRetryHandler requestRetryHandler = this.requestRetryHandler;
            BrokerResponseConsumer<BrokerResponseT> brokerResponseConsumer = (j, obj) -> {
                consumeResponse(brokerResponseMapper, serverStreamObserver, j, obj);
            };
            Objects.requireNonNull(serverStreamObserver);
            requestRetryHandler.sendRequest(apply, brokerResponseConsumer, serverStreamObserver::onError);
        } catch (Exception e) {
            serverStreamObserver.onError(e);
        }
    }

    private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequestWithRetryPartitions(GrpcRequestT grpcrequestt, Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> function, ResponseMapper.BrokerResponseMapper<BrokerResponseT, GrpcResponseT> brokerResponseMapper, ServerStreamObserver<GrpcResponseT> serverStreamObserver, Duration duration) {
        try {
            BrokerRequest<BrokerResponseT> apply = function.apply(grpcrequestt);
            RequestRetryHandler requestRetryHandler = this.requestRetryHandler;
            BrokerResponseConsumer<BrokerResponseT> brokerResponseConsumer = (j, obj) -> {
                consumeResponse(brokerResponseMapper, serverStreamObserver, j, obj);
            };
            Objects.requireNonNull(serverStreamObserver);
            requestRetryHandler.sendRequest(apply, brokerResponseConsumer, serverStreamObserver::onError, duration);
        } catch (Exception e) {
            serverStreamObserver.onError(e);
        }
    }

    private <BrokerResponseT, GrpcResponseT> void consumeResponse(ResponseMapper.BrokerResponseMapper<BrokerResponseT, GrpcResponseT> brokerResponseMapper, ServerStreamObserver<GrpcResponseT> serverStreamObserver, long j, BrokerResponseT brokerresponset) {
        serverStreamObserver.onNext(brokerResponseMapper.apply(j, brokerresponset));
        serverStreamObserver.onCompleted();
    }
}
