package io.openlineage.spark.agent;

import io.openlineage.client.Environment;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import io.openlineage.spark.agent.lifecycle.ExecutionContext;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.package$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.Option;

/* loaded from: input_file:io/openlineage/spark/agent/OpenLineageSparkListener.class */
public class OpenLineageSparkListener extends SparkListener {
    private static ContextFactory contextFactory;
    private final Function1<SparkSession, SparkContext> sparkContextFromSession = ScalaConversionUtils.toScalaFn((v0) -> {
        return v0.sparkContext();
    });
    private final Function0<Option<SparkContext>> activeSparkContext;
    String sparkVersion;
    private static final Logger log = LoggerFactory.getLogger(OpenLineageSparkListener.class);
    private static final Map<Long, ExecutionContext> sparkSqlExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static final Map<Integer, ExecutionContext> rddExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static WeakHashMap<RDD<?>, Configuration> outputs = new WeakHashMap<>();
    private static JobMetricsHolder jobMetrics = JobMetricsHolder.getInstance();
    private static final boolean isDisabled = checkIfDisabled();

    public OpenLineageSparkListener() {
        SparkContext$ sparkContext$ = SparkContext$.MODULE$;
        sparkContext$.getClass();
        this.activeSparkContext = ScalaConversionUtils.toScalaFn(sparkContext$::getActive);
        this.sparkVersion = package$.MODULE$.SPARK_VERSION();
    }

    public static void init(ContextFactory contextFactory2) {
        contextFactory = contextFactory2;
        clear();
    }

    public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
        if (isDisabled) {
            return;
        }
        initializeContextFactoryIfNotInitialized();
        if (sparkListenerEvent instanceof SparkListenerSQLExecutionStart) {
            sparkSQLExecStart((SparkListenerSQLExecutionStart) sparkListenerEvent);
        } else if (sparkListenerEvent instanceof SparkListenerSQLExecutionEnd) {
            sparkSQLExecEnd((SparkListenerSQLExecutionEnd) sparkListenerEvent);
        }
    }

    private static void sparkSQLExecStart(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
        getSparkSQLExecutionContext(sparkListenerSQLExecutionStart.executionId()).ifPresent(executionContext -> {
            executionContext.start(sparkListenerSQLExecutionStart);
        });
    }

    private static void sparkSQLExecEnd(SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
        ExecutionContext remove = sparkSqlExecutionRegistry.remove(Long.valueOf(sparkListenerSQLExecutionEnd.executionId()));
        if (remove != null) {
            remove.end(sparkListenerSQLExecutionEnd);
        } else {
            contextFactory.createSparkSQLExecutionContext(sparkListenerSQLExecutionEnd).ifPresent(executionContext -> {
                executionContext.end(sparkListenerSQLExecutionEnd);
            });
        }
    }

    public void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
        if (isDisabled) {
            return;
        }
        initializeContextFactoryIfNotInitialized();
        Optional flatMap = ScalaConversionUtils.asJavaOptional(SparkSession.getDefaultSession().map(this.sparkContextFromSession).orElse(this.activeSparkContext)).flatMap(sparkContext -> {
            return Optional.ofNullable(sparkContext.dagScheduler()).map(dAGScheduler -> {
                return dAGScheduler.jobIdToActiveJob().get(Integer.valueOf(sparkListenerJobStart.jobId()));
            });
        }).flatMap(ScalaConversionUtils::asJavaOptional);
        Stream stream = ScalaConversionUtils.fromSeq(sparkListenerJobStart.stageIds()).stream();
        Class<Integer> cls = Integer.class;
        Integer.class.getClass();
        Set<Integer> set = (Set) stream.map(cls::cast).collect(Collectors.toSet());
        if (this.sparkVersion.startsWith("3")) {
            jobMetrics.addJobStages(sparkListenerJobStart.jobId(), set);
        }
        ((Optional) ((Optional) Optional.ofNullable(getSqlExecutionId(sparkListenerJobStart.properties())).map((v0) -> {
            return Optional.of(v0);
        }).orElseGet(() -> {
            return ScalaConversionUtils.asJavaOptional(SparkSession.getDefaultSession().map(this.sparkContextFromSession).orElse(this.activeSparkContext)).flatMap(sparkContext2 -> {
                return Optional.ofNullable(sparkContext2.dagScheduler()).map(dAGScheduler -> {
                    return dAGScheduler.jobIdToActiveJob().get(Integer.valueOf(sparkListenerJobStart.jobId()));
                }).flatMap(ScalaConversionUtils::asJavaOptional);
            }).map(activeJob -> {
                return getSqlExecutionId(activeJob.properties());
            });
        })).map(Long::parseLong).map(l -> {
            return getExecutionContext(sparkListenerJobStart.jobId(), l.longValue());
        }).orElseGet(() -> {
            return getExecutionContext(sparkListenerJobStart.jobId());
        })).ifPresent(executionContext -> {
            executionContext.getClass();
            flatMap.ifPresent(executionContext::setActiveJob);
            executionContext.start(sparkListenerJobStart);
        });
    }

    private String getSqlExecutionId(Properties properties) {
        return properties.getProperty("spark.sql.execution.id");
    }

    public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
        if (isDisabled) {
            return;
        }
        ExecutionContext remove = rddExecutionRegistry.remove(Integer.valueOf(sparkListenerJobEnd.jobId()));
        if (remove != null) {
            remove.end(sparkListenerJobEnd);
        }
        if (this.sparkVersion.startsWith("3")) {
            jobMetrics.cleanUp(sparkListenerJobEnd.jobId());
        }
    }

    public void onTaskEnd(SparkListenerTaskEnd sparkListenerTaskEnd) {
        if (isDisabled || this.sparkVersion.startsWith("2")) {
            return;
        }
        jobMetrics.addMetrics(sparkListenerTaskEnd.stageId(), sparkListenerTaskEnd.taskMetrics());
    }

    public static Optional<ExecutionContext> getSparkSQLExecutionContext(long j) {
        return Optional.ofNullable(sparkSqlExecutionRegistry.computeIfAbsent(Long.valueOf(j), l -> {
            return contextFactory.createSparkSQLExecutionContext(j).orElse(null);
        }));
    }

    public static Optional<ExecutionContext> getExecutionContext(int i) {
        return Optional.ofNullable(rddExecutionRegistry.computeIfAbsent(Integer.valueOf(i), num -> {
            return contextFactory.createRddExecutionContext(i);
        }));
    }

    public static Optional<ExecutionContext> getExecutionContext(int i, long j) {
        Optional<ExecutionContext> sparkSQLExecutionContext = getSparkSQLExecutionContext(j);
        sparkSQLExecutionContext.ifPresent(executionContext -> {
            rddExecutionRegistry.put(Integer.valueOf(i), executionContext);
        });
        return sparkSQLExecutionContext;
    }

    public static Configuration getConfigForRDD(RDD<?> rdd) {
        return outputs.get(rdd);
    }

    public static void emitError(Exception exc) {
        OpenLineage openLineage = new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI);
        try {
            contextFactory.openLineageEventEmitter.emit(buildErrorLineageEvent(openLineage, errorRunFacet(exc, openLineage)));
        } catch (Exception e) {
            log.error("Could not emit open lineage on error", exc);
        }
    }

    private static OpenLineage.RunFacets errorRunFacet(Exception exc, OpenLineage openLineage) {
        OpenLineage.RunFacet newRunFacet = openLineage.newRunFacet();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        exc.printStackTrace(new PrintWriter((OutputStream) byteArrayOutputStream, true));
        newRunFacet.getAdditionalProperties().put("exception", byteArrayOutputStream.toString());
        OpenLineage.RunFacetsBuilder newRunFacetsBuilder = openLineage.newRunFacetsBuilder();
        newRunFacetsBuilder.put("lineage.error", newRunFacet);
        return newRunFacetsBuilder.build();
    }

    public static OpenLineage.RunEvent buildErrorLineageEvent(OpenLineage openLineage, OpenLineage.RunFacets runFacets) {
        return openLineage.newRunEventBuilder().eventTime(ZonedDateTime.now()).run(openLineage.newRun(contextFactory.openLineageEventEmitter.getParentRunId().orElse(null), runFacets)).job(openLineage.newJobBuilder().namespace(contextFactory.openLineageEventEmitter.getJobNamespace()).name(contextFactory.openLineageEventEmitter.getParentJobName()).build()).build();
    }

    private static void clear() {
        sparkSqlExecutionRegistry.clear();
        rddExecutionRegistry.clear();
        outputs.clear();
    }

    public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
        close();
        super.onApplicationEnd(sparkListenerApplicationEnd);
    }

    public static void close() {
        clear();
    }

    public void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        initializeContextFactoryIfNotInitialized();
    }

    private void initializeContextFactoryIfNotInitialized() {
        if (contextFactory != null || isDisabled) {
            return;
        }
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        if (sparkEnv == null) {
            log.warn("Open lineage listener instantiated, but no configuration could be found. Lineage events will not be collected");
            return;
        }
        try {
            contextFactory = new ContextFactory(new EventEmitter(ArgumentParser.parse(sparkEnv.conf())));
        } catch (URISyntaxException e) {
            log.error("Unable to parse open lineage endpoint. Lineage events will not be collected", e);
        }
    }

    private static boolean checkIfDisabled() {
        return Boolean.parseBoolean(Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED"));
    }
}
