package io.openlineage.flink.tracker;

import io.openlineage.flink.client.CheckpointFacet;
import io.openlineage.flink.shaded.com.fasterxml.jackson.databind.DeserializationFeature;
import io.openlineage.flink.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.openlineage.flink.shaded.org.apache.hc.client5.http.classic.methods.HttpGet;
import io.openlineage.flink.shaded.org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import io.openlineage.flink.shaded.org.apache.hc.client5.http.impl.classic.HttpClients;
import io.openlineage.flink.shaded.org.apache.hc.core5.http.ClassicHttpRequest;
import io.openlineage.flink.shaded.org.apache.hc.core5.http.ParseException;
import io.openlineage.flink.shaded.org.apache.hc.core5.http.io.entity.EntityUtils;
import io.openlineage.flink.tracker.restapi.Checkpoints;
import io.openlineage.flink.visitor.lifecycle.FlinkExecutionContext;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/flink/tracker/OpenLineageContinousJobTracker.class */
public class OpenLineageContinousJobTracker {
    private static final Logger log = LoggerFactory.getLogger(OpenLineageContinousJobTracker.class);
    private final ReadableConfig config;
    private final Duration trackingInterval;
    private Thread trackingThread;
    private Optional<Checkpoints> latestCheckpoints = Optional.empty();
    private boolean shouldContinue = true;

    public OpenLineageContinousJobTracker(ReadableConfig readableConfig, Duration duration) {
        this.config = readableConfig;
        this.trackingInterval = duration;
    }

    public void startTracking(FlinkExecutionContext flinkExecutionContext) {
        CloseableHttpClient createDefault = HttpClients.createDefault();
        HttpGet httpGet = new HttpGet(String.format("http://%s:%s/jobs/%s/checkpoints", this.config.get(RestOptions.ADDRESS), this.config.get(RestOptions.PORT), flinkExecutionContext.getJobId().toString()));
        this.trackingThread = new Thread(() -> {
            try {
                Thread.sleep(this.trackingInterval.toMillis());
            } catch (InterruptedException e) {
                log.warn("Tracking thread interrupted", e);
            }
            while (this.shouldContinue) {
                try {
                    Optional.of((Checkpoints) new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).readValue(EntityUtils.toString(createDefault.execute((ClassicHttpRequest) httpGet).getEntity()), Checkpoints.class)).filter(checkpoints -> {
                        return this.latestCheckpoints.isEmpty() || this.latestCheckpoints.get().getCounts().getTotal() != checkpoints.getCounts().getTotal();
                    }).ifPresentOrElse(checkpoints2 -> {
                        emitNewCheckpointEvent(flinkExecutionContext, checkpoints2);
                    }, () -> {
                        log.info("no new checkpoint found");
                    });
                } catch (ParseException | IOException e2) {
                    log.error("Connecting REST API failed", e2);
                } catch (Exception e3) {
                    log.error("tracker thread failed due not unknown exception", e3);
                    this.shouldContinue = false;
                }
                try {
                    Thread.sleep(this.trackingInterval.toMillis());
                } catch (InterruptedException e4) {
                    log.warn("Tracking thread interrupted", e4);
                    this.shouldContinue = false;
                }
            }
        });
        log.info("Starting tracking thread for jobId={}", flinkExecutionContext.getJobId().toString());
        this.trackingThread.start();
    }

    private void emitNewCheckpointEvent(FlinkExecutionContext flinkExecutionContext, Checkpoints checkpoints) {
        log.info("New checkpoint encountered total-checkpoint:{}", Integer.valueOf(checkpoints.getCounts().getTotal()));
        this.latestCheckpoints = Optional.of(checkpoints);
        flinkExecutionContext.onJobCheckpoint(new CheckpointFacet(checkpoints.getCounts().getCompleted(), checkpoints.getCounts().getFailed(), checkpoints.getCounts().getIn_progress(), checkpoints.getCounts().getRestored(), checkpoints.getCounts().getTotal()));
    }

    public void stopTracking() {
        log.info("stop tracking");
        this.shouldContinue = false;
    }
}
