package io.openlineage.flink.visitor.lifecycle;

import io.micrometer.core.instrument.MeterRegistry;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.circuitBreaker.CircuitBreaker;
import io.openlineage.flink.SinkLineage;
import io.openlineage.flink.TransformationUtils;
import io.openlineage.flink.api.OpenLineageContext;
import io.openlineage.flink.client.CheckpointFacet;
import io.openlineage.flink.client.EventEmitter;
import io.openlineage.flink.client.FlinkOpenLineageConfig;
import io.openlineage.flink.shaded.org.apache.commons.lang3.exception.ExceptionUtils;
import io.openlineage.flink.visitor.Visitor;
import io.openlineage.flink.visitor.VisitorFactory;
import io.openlineage.flink.visitor.VisitorFactoryImpl;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.dag.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/flink/visitor/lifecycle/FlinkExecutionContext.class */
public class FlinkExecutionContext implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger(FlinkExecutionContext.class);
    public static final String FLINK_INTEGRATION = "FLINK";
    public static final String FLINK_JOB_TYPE = "JOB";
    private final JobID jobId;
    protected final UUID runId;
    protected final EventEmitter eventEmitter;
    protected final OpenLineageContext openLineageContext;
    private final String jobName;
    private final String jobNamespace;
    private final String processingType;
    private final CircuitBreaker circuitBreaker;
    private final FlinkOpenLineageConfig config;
    private final MeterRegistry meterRegistry;
    private final List<Transformation<?>> transformations;

    /* loaded from: input_file:io/openlineage/flink/visitor/lifecycle/FlinkExecutionContext$FlinkExecutionContextBuilder.class */
    public static class FlinkExecutionContextBuilder {
        private JobID jobId;
        private UUID runId;
        private EventEmitter eventEmitter;
        private OpenLineageContext openLineageContext;
        private String jobName;
        private String jobNamespace;
        private String processingType;
        private CircuitBreaker circuitBreaker;
        private FlinkOpenLineageConfig config;
        private MeterRegistry meterRegistry;
        private List<Transformation<?>> transformations;

        public FlinkExecutionContextBuilder jobId(JobID jobID) {
            this.jobId = jobID;
            return this;
        }

        public FlinkExecutionContextBuilder runId(UUID uuid) {
            this.runId = uuid;
            return this;
        }

        public FlinkExecutionContextBuilder eventEmitter(EventEmitter eventEmitter) {
            this.eventEmitter = eventEmitter;
            return this;
        }

        public FlinkExecutionContextBuilder openLineageContext(OpenLineageContext openLineageContext) {
            this.openLineageContext = openLineageContext;
            return this;
        }

        public FlinkExecutionContextBuilder jobName(String str) {
            this.jobName = str;
            return this;
        }

        public FlinkExecutionContextBuilder jobNamespace(String str) {
            this.jobNamespace = str;
            return this;
        }

        public FlinkExecutionContextBuilder processingType(String str) {
            this.processingType = str;
            return this;
        }

        public FlinkExecutionContextBuilder circuitBreaker(CircuitBreaker circuitBreaker) {
            this.circuitBreaker = circuitBreaker;
            return this;
        }

        public FlinkExecutionContextBuilder config(FlinkOpenLineageConfig flinkOpenLineageConfig) {
            this.config = flinkOpenLineageConfig;
            return this;
        }

        public FlinkExecutionContextBuilder meterRegistry(MeterRegistry meterRegistry) {
            this.meterRegistry = meterRegistry;
            return this;
        }

        public FlinkExecutionContextBuilder transformations(List<Transformation<?>> list) {
            this.transformations = list;
            return this;
        }

        public FlinkExecutionContext build() {
            return new FlinkExecutionContext(this.jobId, this.runId, this.eventEmitter, this.openLineageContext, this.jobName, this.jobNamespace, this.processingType, this.circuitBreaker, this.config, this.meterRegistry, this.transformations);
        }

        public String toString() {
            return "FlinkExecutionContext.FlinkExecutionContextBuilder(jobId=" + this.jobId + ", runId=" + this.runId + ", eventEmitter=" + this.eventEmitter + ", openLineageContext=" + this.openLineageContext + ", jobName=" + this.jobName + ", jobNamespace=" + this.jobNamespace + ", processingType=" + this.processingType + ", circuitBreaker=" + this.circuitBreaker + ", config=" + this.config + ", meterRegistry=" + this.meterRegistry + ", transformations=" + this.transformations + ")";
        }
    }

    @Override // io.openlineage.flink.visitor.lifecycle.ExecutionContext
    public void onJobSubmitted() {
        log.debug("JobClient - jobId: {}", this.jobId);
        this.circuitBreaker.run(() -> {
            OpenLineage.RunEvent build = buildEventForEventType(OpenLineage.RunEvent.EventType.START).run(new OpenLineage.RunBuilder().runId(this.runId).build()).build();
            log.debug("Posting event for onJobSubmitted {}: {}", this.jobId, build);
            this.meterRegistry.counter("openlineage.flink.event.submitted.start", new String[0]).increment();
            this.eventEmitter.emit(build);
            this.meterRegistry.counter("openlineage.flink.event.submitted.end", new String[0]).increment();
            return null;
        });
    }

    @Override // io.openlineage.flink.visitor.lifecycle.ExecutionContext
    public void onJobCheckpoint(CheckpointFacet checkpointFacet) {
        log.debug("JobClient - jobId: {}", this.jobId);
        this.circuitBreaker.run(() -> {
            OpenLineage.RunEvent build = buildEventForEventType(OpenLineage.RunEvent.EventType.RUNNING).run(new OpenLineage.RunBuilder().runId(this.runId).facets(new OpenLineage.RunFacetsBuilder().put("checkpoints", checkpointFacet).build()).build()).build();
            log.debug("Posting event for onJobCheckpoint {}: {}", this.jobId, build);
            this.meterRegistry.counter("openlineage.flink.event.checkpoint.start", new String[0]).increment();
            this.eventEmitter.emit(build);
            this.meterRegistry.counter("openlineage.flink.event.checkpoint.end", new String[0]).increment();
            return null;
        });
    }

    public OpenLineage.RunEventBuilder buildEventForEventType(OpenLineage.RunEvent.EventType eventType) {
        List<SinkLineage> convertToVisitable = new TransformationUtils().convertToVisitable(this.transformations);
        VisitorFactoryImpl visitorFactoryImpl = new VisitorFactoryImpl();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HashSet hashSet = new HashSet();
        for (SinkLineage sinkLineage : convertToVisitable) {
            hashSet.addAll(sinkLineage.getSources());
            arrayList2.addAll(getOutputDatasets(visitorFactoryImpl, sinkLineage.getSink()));
        }
        arrayList.addAll(getInputDatasets(visitorFactoryImpl, Arrays.asList(hashSet.toArray())));
        return commonEventBuilder().inputs(arrayList).outputs(arrayList2).eventType(eventType);
    }

    @Override // io.openlineage.flink.visitor.lifecycle.ExecutionContext
    public void onJobCompleted(JobExecutionResult jobExecutionResult) {
        this.circuitBreaker.run(() -> {
            OpenLineage openLineage = this.openLineageContext.getOpenLineage();
            this.meterRegistry.counter("openlineage.flink.event.completed.start", new String[0]).increment();
            this.eventEmitter.emit(commonEventBuilder().run(openLineage.newRun(this.runId, null)).eventType(OpenLineage.RunEvent.EventType.COMPLETE).build());
            this.meterRegistry.counter("openlineage.flink.event.completed.end", new String[0]).increment();
            return null;
        });
    }

    @Override // io.openlineage.flink.visitor.lifecycle.ExecutionContext
    public void onJobFailed(Throwable th) {
        this.circuitBreaker.run(() -> {
            OpenLineage openLineage = this.openLineageContext.getOpenLineage();
            this.meterRegistry.counter("openlineage.flink.event.failed.start", new String[0]).increment();
            this.eventEmitter.emit(commonEventBuilder().run(openLineage.newRun(this.runId, openLineage.newRunFacetsBuilder().errorMessage(openLineage.newErrorMessageRunFacet(th.getMessage(), "JAVA", ExceptionUtils.getStackTrace(th))).build())).eventType(OpenLineage.RunEvent.EventType.FAIL).eventTime(ZonedDateTime.now()).build());
            this.meterRegistry.counter("openlineage.flink.event.failed.end", new String[0]).increment();
            return null;
        });
    }

    private OpenLineage.RunEventBuilder commonEventBuilder() {
        OpenLineage openLineage = this.openLineageContext.getOpenLineage();
        return openLineage.newRunEventBuilder().job(openLineage.newJob(this.jobNamespace, this.jobName, buildOwnershipFacet(new OpenLineage.JobFacetsBuilder()).jobType(openLineage.newJobTypeJobFacetBuilder().jobType(FLINK_JOB_TYPE).processingType(this.processingType).integration(FLINK_INTEGRATION).build()).build())).eventTime(ZonedDateTime.now());
    }

    private OpenLineage.JobFacetsBuilder buildOwnershipFacet(OpenLineage.JobFacetsBuilder jobFacetsBuilder) {
        Optional.ofNullable(this.config).map((v0) -> {
            return v0.getJob();
        }).map((v0) -> {
            return v0.getOwners();
        }).map((v0) -> {
            return v0.getAdditionalProperties();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).ifPresent(map -> {
            ArrayList arrayList = new ArrayList();
            map.forEach((str, str2) -> {
                arrayList.add(this.openLineageContext.getOpenLineage().newOwnershipJobFacetOwnersBuilder().name(str2).type(str).build());
            });
            jobFacetsBuilder.ownership(this.openLineageContext.getOpenLineage().newOwnershipJobFacetBuilder().owners(arrayList).build());
        });
        return jobFacetsBuilder;
    }

    private List<OpenLineage.InputDataset> getInputDatasets(VisitorFactory visitorFactory, List<Object> list) {
        ArrayList arrayList = new ArrayList();
        List<Visitor<OpenLineage.InputDataset>> inputVisitors = visitorFactory.getInputVisitors(this.openLineageContext);
        for (Object obj : list) {
            log.debug("Getting input dataset from source {}", obj.toString());
            arrayList.addAll((Collection) inputVisitors.stream().filter(visitor -> {
                return visitor.isDefinedAt(obj);
            }).map(visitor2 -> {
                return (List) this.meterRegistry.timer("openlineage.flink.dataset.input.extraction.time", "visitor", visitor2.getClass().getName()).record(() -> {
                    return visitor2.apply(obj);
                });
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()));
        }
        return arrayList;
    }

    private List<OpenLineage.OutputDataset> getOutputDatasets(VisitorFactory visitorFactory, Object obj) {
        List<Visitor<OpenLineage.OutputDataset>> outputVisitors = visitorFactory.getOutputVisitors(this.openLineageContext);
        log.debug("Getting output dataset from sink {}", obj.toString());
        return (List) outputVisitors.stream().filter(visitor -> {
            return visitor.isDefinedAt(obj);
        }).map(visitor2 -> {
            return (List) this.meterRegistry.timer("openlineage.flink.dataset.output.extraction.time", "visitor", visitor2.getClass().getName()).record(() -> {
                return visitor2.apply(obj);
            });
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    FlinkExecutionContext(JobID jobID, UUID uuid, EventEmitter eventEmitter, OpenLineageContext openLineageContext, String str, String str2, String str3, CircuitBreaker circuitBreaker, FlinkOpenLineageConfig flinkOpenLineageConfig, MeterRegistry meterRegistry, List<Transformation<?>> list) {
        this.jobId = jobID;
        this.runId = uuid;
        this.eventEmitter = eventEmitter;
        this.openLineageContext = openLineageContext;
        this.jobName = str;
        this.jobNamespace = str2;
        this.processingType = str3;
        this.circuitBreaker = circuitBreaker;
        this.config = flinkOpenLineageConfig;
        this.meterRegistry = meterRegistry;
        this.transformations = list;
    }

    public static FlinkExecutionContextBuilder builder() {
        return new FlinkExecutionContextBuilder();
    }

    public JobID getJobId() {
        return this.jobId;
    }

    public MeterRegistry getMeterRegistry() {
        return this.meterRegistry;
    }

    public List<Transformation<?>> getTransformations() {
        return this.transformations;
    }
}
