package io.mantisrx.master.api.akka.route.handlers;

import akka.actor.ActorRef;
import akka.pattern.PatternsCS;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.master.api.akka.route.proto.JobClusterInfo;
import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto;
import io.mantisrx.master.api.akka.route.utils.JobDiscoveryHeartbeats;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.domain.JobId;
import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.BehaviorSubject;

/* loaded from: input_file:io/mantisrx/master/api/akka/route/handlers/JobDiscoveryRouteHandlerAkkaImpl.class */
public class JobDiscoveryRouteHandlerAkkaImpl implements JobDiscoveryRouteHandler {
    private static final Logger logger = LoggerFactory.getLogger(JobDiscoveryRouteHandlerAkkaImpl.class);
    private final ActorRef jobClustersManagerActor;
    private final Duration serverIdleConnectionTimeout;
    private final Counter schedInfoStreamErrors;
    private final Counter lastSubmittedJobIdStreamErrors;
    private final Counter lastLaunchedJobIdStreamErrors;
    private final Duration askTimeout = Duration.ofMillis(((Long) Optional.ofNullable(Long.valueOf(ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs())).orElse(1000L)).longValue());
    private final AsyncLoadingCache<JobClusterManagerProto.GetJobSchedInfoRequest, JobClusterManagerProto.GetJobSchedInfoResponse> schedInfoCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.SECONDS).maximumSize(500).buildAsync(this::jobSchedInfo);
    private final AsyncLoadingCache<JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest, JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse> lastSubmittedJobIdStreamRespCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.SECONDS).maximumSize(500).buildAsync(this::lastSubmittedJobId);
    private final AsyncLoadingCache<JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest, JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse> lastLaunchedJobIdStreamRespCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.SECONDS).maximumSize(500).buildAsync(this::lastLaunchedJobId);

    public JobDiscoveryRouteHandlerAkkaImpl(ActorRef actorRef, Duration duration) {
        this.jobClustersManagerActor = actorRef;
        this.serverIdleConnectionTimeout = duration;
        Metrics build = new Metrics.Builder().id("JobDiscoveryRouteHandlerAkkaImpl", new Tag[0]).addCounter("schedInfoStreamErrors").addCounter("lastSubmittedJobIdStreamErrors").addCounter("lastLaunchedJobIdStreamErrors").build();
        this.schedInfoStreamErrors = build.getCounter("schedInfoStreamErrors");
        this.lastSubmittedJobIdStreamErrors = build.getCounter("lastSubmittedJobIdStreamErrors");
        this.lastLaunchedJobIdStreamErrors = build.getCounter("lastLaunchedJobIdStreamErrors");
    }

    private CompletableFuture<JobClusterManagerProto.GetJobSchedInfoResponse> jobSchedInfo(JobClusterManagerProto.GetJobSchedInfoRequest getJobSchedInfoRequest, Executor executor) {
        CompletionStage ask = PatternsCS.ask(this.jobClustersManagerActor, getJobSchedInfoRequest, this.askTimeout);
        Class<JobClusterManagerProto.GetJobSchedInfoResponse> cls = JobClusterManagerProto.GetJobSchedInfoResponse.class;
        JobClusterManagerProto.GetJobSchedInfoResponse.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    @Override // io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler
    public CompletionStage<JobDiscoveryRouteProto.SchedInfoResponse> schedulingInfoStream(JobClusterManagerProto.GetJobSchedInfoRequest getJobSchedInfoRequest, boolean z) {
        CompletableFuture completableFuture = this.schedInfoCache.get(getJobSchedInfoRequest);
        try {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicReference atomicReference = new AtomicReference(new JobSchedulingInfo(getJobSchedInfoRequest.getJobId().getId(), new HashMap()));
            return completableFuture.thenApply(getJobSchedInfoResponse -> {
                Optional<BehaviorSubject<JobSchedulingInfo>> jobSchedInfoSubject = getJobSchedInfoResponse.getJobSchedInfoSubject();
                if (!getJobSchedInfoResponse.responseCode.equals(BaseResponse.ResponseCode.SUCCESS) || !jobSchedInfoSubject.isPresent()) {
                    logger.info("Failed to get Sched info stream for {}", getJobSchedInfoRequest.getJobId().getId());
                    this.schedInfoStreamErrors.increment();
                    return new JobDiscoveryRouteProto.SchedInfoResponse(getJobSchedInfoResponse.requestId, getJobSchedInfoResponse.responseCode, getJobSchedInfoResponse.message);
                }
                BehaviorSubject<JobSchedulingInfo> behaviorSubject = jobSchedInfoSubject.get();
                Observable takeWhile = Observable.interval(5L, this.serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS).map(l -> {
                    return atomicBoolean.get() ? JobDiscoveryHeartbeats.SCHED_INFO_HB_INSTANCE : (JobSchedulingInfo) atomicReference.get();
                }).takeWhile(jobSchedulingInfo -> {
                    return Boolean.valueOf(z);
                });
                Observable doOnCompleted = behaviorSubject.doOnCompleted(() -> {
                    atomicBoolean.set(true);
                });
                atomicReference.getClass();
                return new JobDiscoveryRouteProto.SchedInfoResponse(getJobSchedInfoResponse.requestId, getJobSchedInfoResponse.responseCode, getJobSchedInfoResponse.message, Observable.merge(doOnCompleted.doOnNext((v1) -> {
                    r1.set(v1);
                }), takeWhile));
            });
        } catch (Exception e) {
            logger.error("caught exception fetching sched info stream for {}", getJobSchedInfoRequest.getJobId().getId(), e);
            this.schedInfoStreamErrors.increment();
            return CompletableFuture.completedFuture(new JobDiscoveryRouteProto.SchedInfoResponse(0L, BaseResponse.ResponseCode.SERVER_ERROR, "Failed to get SchedulingInfo stream for jobId " + getJobSchedInfoRequest.getJobId().getId() + " error: " + e.getMessage()));
        }
    }

    private CompletableFuture<JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse> lastSubmittedJobId(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest getLastSubmittedJobIdStreamRequest, Executor executor) {
        CompletionStage ask = PatternsCS.ask(this.jobClustersManagerActor, getLastSubmittedJobIdStreamRequest, this.askTimeout);
        Class<JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse> cls = JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse.class;
        JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    @Override // io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler
    public CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> lastSubmittedJobIdStream(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest getLastSubmittedJobIdStreamRequest, boolean z) {
        try {
            return this.lastSubmittedJobIdStreamRespCache.get(getLastSubmittedJobIdStreamRequest).thenApply(getLastSubmittedJobIdStreamResponse -> {
                return streamJobIdBehaviorSubject(getLastSubmittedJobIdStreamResponse, getLastSubmittedJobIdStreamResponse.getjobIdBehaviorSubject(), z, this.lastSubmittedJobIdStreamErrors);
            });
        } catch (Exception e) {
            logger.error("caught exception fetching lastSubmittedJobId stream for {}", getLastSubmittedJobIdStreamRequest.getClusterName(), e);
            this.lastSubmittedJobIdStreamErrors.increment();
            return CompletableFuture.completedFuture(new JobDiscoveryRouteProto.JobClusterInfoResponse(0L, BaseResponse.ResponseCode.SERVER_ERROR, "Failed to get last submitted jobId stream for " + getLastSubmittedJobIdStreamRequest.getClusterName() + " error: " + e.getMessage()));
        }
    }

    private CompletableFuture<JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse> lastLaunchedJobId(JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest getLastLaunchedJobIdStreamRequest, Executor executor) {
        CompletionStage ask = PatternsCS.ask(this.jobClustersManagerActor, getLastLaunchedJobIdStreamRequest, this.askTimeout);
        Class<JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse> cls = JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse.class;
        JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse.class.getClass();
        return ask.thenApply(cls::cast).toCompletableFuture();
    }

    @Override // io.mantisrx.master.api.akka.route.handlers.JobDiscoveryRouteHandler
    public CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> lastLaunchedJobIdStream(JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest getLastLaunchedJobIdStreamRequest, boolean z) {
        try {
            return this.lastLaunchedJobIdStreamRespCache.get(getLastLaunchedJobIdStreamRequest).thenApply(getLastLaunchedJobIdStreamResponse -> {
                return streamJobIdBehaviorSubject(getLastLaunchedJobIdStreamResponse, getLastLaunchedJobIdStreamResponse.getjobIdBehaviorSubject(), z, this.lastLaunchedJobIdStreamErrors);
            });
        } catch (Exception e) {
            logger.error("caught exception fetching lastSubmittedJobId stream for {}", getLastLaunchedJobIdStreamRequest.getClusterName(), e);
            this.lastLaunchedJobIdStreamErrors.increment();
            return CompletableFuture.completedFuture(new JobDiscoveryRouteProto.JobClusterInfoResponse(0L, BaseResponse.ResponseCode.SERVER_ERROR, "Failed to get last submitted jobId stream for " + getLastLaunchedJobIdStreamRequest.getClusterName() + " error: " + e.getMessage()));
        }
    }

    private JobDiscoveryRouteProto.JobClusterInfoResponse streamJobIdBehaviorSubject(BaseResponse baseResponse, Optional<BehaviorSubject<JobId>> optional, boolean z, Counter counter) {
        if (baseResponse.responseCode.equals(BaseResponse.ResponseCode.SUCCESS) && optional.isPresent()) {
            return new JobDiscoveryRouteProto.JobClusterInfoResponse(baseResponse.requestId, baseResponse.responseCode, baseResponse.message, Observable.merge(optional.get().map(jobId -> {
                return new JobClusterInfo(jobId.getCluster(), jobId.getId());
            }), Observable.interval(5L, this.serverIdleConnectionTimeout.getSeconds() - 1, TimeUnit.SECONDS).map(l -> {
                return JobDiscoveryHeartbeats.JOB_CLUSTER_INFO_HB_INSTANCE;
            }).takeWhile(jobClusterInfo -> {
                return Boolean.valueOf(z);
            })));
        }
        counter.increment();
        return new JobDiscoveryRouteProto.JobClusterInfoResponse(baseResponse.requestId, baseResponse.responseCode, baseResponse.message);
    }
}
