package io.openlineage.spark.agent;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.client.OpenLineageClient;
import io.openlineage.spark.agent.lifecycle.ContextFactory;
import io.openlineage.spark.agent.lifecycle.ExecutionContext;
import io.openlineage.spark.agent.lifecycle.SparkSQLExecutionContext;
import io.openlineage.spark.agent.lifecycle.plan.ScalaConversionUtils;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.WeakHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.rdd.PairRDDFunctions;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:io/openlineage/spark/agent/OpenLineageSparkListener.class */
public class OpenLineageSparkListener extends SparkListener {
    public static final String SPARK_CONF_URL_KEY = "openlineage.url";
    public static final String SPARK_CONF_HOST_KEY = "openlineage.host";
    public static final String SPARK_CONF_API_VERSION_KEY = "openlineage.version";
    public static final String SPARK_CONF_NAMESPACE_KEY = "openlineage.namespace";
    public static final String SPARK_CONF_JOB_NAME_KEY = "openlineage.parentJobName";
    public static final String SPARK_CONF_PARENT_RUN_ID_KEY = "openlineage.parentRunId";
    public static final String SPARK_CONF_API_KEY = "openlineage.apiKey";
    private static ContextFactory contextFactory;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OpenLineageSparkListener.class);
    private static final Map<Long, SparkSQLExecutionContext> sparkSqlExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static final Map<Integer, ExecutionContext> rddExecutionRegistry = Collections.synchronizedMap(new HashMap());
    private static WeakHashMap<RDD<?>, Configuration> outputs = new WeakHashMap<>();

    public static void init(ContextFactory contextFactory2) {
        contextFactory = contextFactory2;
        clear();
    }

    public static void instrument(SparkContext sparkContext) {
        log.info("Initializing OpenLineage SparkContext listener...");
        OpenLineageSparkListener openLineageSparkListener = new OpenLineageSparkListener();
        log.debug("Initialized OpenLineage listener with \nspark version: {}\njava.version: {}\nconfiguration: {}", sparkContext.version(), System.getProperty("java.version"), sparkContext.conf());
        sparkContext.addSparkListener(openLineageSparkListener);
    }

    public static void registerOutput(PairRDDFunctions<?, ?> pairRDDFunctions, Configuration configuration) {
        try {
            log.info("Initializing OpenLineage PairRDDFunctions listener...");
            for (Field field : pairRDDFunctions.getClass().getDeclaredFields()) {
                if (field.getName().endsWith("self") && RDD.class.isAssignableFrom(field.getType())) {
                    field.setAccessible(true);
                    try {
                        outputs.put((RDD) field.get(pairRDDFunctions), configuration);
                    } catch (IllegalAccessException | IllegalArgumentException e) {
                        e.printStackTrace(System.out);
                    }
                }
            }
        } catch (Exception e2) {
            log.error("Could not initialize OpenLineage PairRDDFunctions listener", (Throwable) e2);
            emitError(e2);
        }
    }

    public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
        if (sparkListenerEvent instanceof SparkListenerSQLExecutionStart) {
            sparkSQLExecStart((SparkListenerSQLExecutionStart) sparkListenerEvent);
        } else if (sparkListenerEvent instanceof SparkListenerSQLExecutionEnd) {
            sparkSQLExecEnd((SparkListenerSQLExecutionEnd) sparkListenerEvent);
        }
    }

    private static void sparkSQLExecStart(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
        getSparkSQLExecutionContext(sparkListenerSQLExecutionStart.executionId()).start(sparkListenerSQLExecutionStart);
    }

    private static void sparkSQLExecEnd(SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
        SparkSQLExecutionContext remove = sparkSqlExecutionRegistry.remove(Long.valueOf(sparkListenerSQLExecutionEnd.executionId()));
        if (remove != null) {
            remove.end(sparkListenerSQLExecutionEnd);
        }
    }

    public void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
        ScalaConversionUtils.asJavaOptional(SparkSession.getActiveSession().map(ScalaConversionUtils.toScalaFn(sparkSession -> {
            return sparkSession.sparkContext();
        })).orElse(ScalaConversionUtils.toScalaFn(() -> {
            return SparkContext$.MODULE$.getActive();
        }))).flatMap(sparkContext -> {
            return ScalaConversionUtils.asJavaOptional(sparkContext.dagScheduler().jobIdToActiveJob().get(Integer.valueOf(sparkListenerJobStart.jobId())));
        }).ifPresent(activeJob -> {
            ExecutionContext executionContext;
            String property = activeJob.properties().getProperty("spark.sql.execution.id");
            if (property != null) {
                executionContext = getExecutionContext(activeJob.jobId(), Long.parseLong(property));
            } else {
                executionContext = getExecutionContext(activeJob.jobId());
            }
            executionContext.setActiveJob(activeJob);
            executionContext.start(sparkListenerJobStart);
        });
    }

    public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
        rddExecutionRegistry.remove(Integer.valueOf(sparkListenerJobEnd.jobId())).end(sparkListenerJobEnd);
    }

    public static SparkSQLExecutionContext getSparkSQLExecutionContext(long j) {
        return sparkSqlExecutionRegistry.computeIfAbsent(Long.valueOf(j), l -> {
            return contextFactory.createSparkSQLExecutionContext(j);
        });
    }

    public static ExecutionContext getExecutionContext(int i) {
        return rddExecutionRegistry.computeIfAbsent(Integer.valueOf(i), num -> {
            return contextFactory.createRddExecutionContext(i);
        });
    }

    public static ExecutionContext getExecutionContext(int i, long j) {
        SparkSQLExecutionContext sparkSQLExecutionContext = getSparkSQLExecutionContext(j);
        rddExecutionRegistry.put(Integer.valueOf(i), sparkSQLExecutionContext);
        return sparkSQLExecutionContext;
    }

    public static Configuration getConfigForRDD(RDD<?> rdd) {
        return outputs.get(rdd);
    }

    public static void emitError(Exception exc) {
        OpenLineage openLineage = new OpenLineage(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI);
        try {
            contextFactory.sparkContext.emit(buildErrorLineageEvent(openLineage, errorRunFacet(exc, openLineage)));
        } catch (Exception e) {
            log.error("Could not emit open lineage on error", (Throwable) exc);
        }
    }

    private static OpenLineage.RunFacets errorRunFacet(Exception exc, OpenLineage openLineage) {
        OpenLineage.CustomFacetBuilder newCustomFacetBuilder = openLineage.newCustomFacetBuilder();
        newCustomFacetBuilder.put("exception", exc);
        OpenLineage.RunFacetsBuilder newRunFacetsBuilder = openLineage.newRunFacetsBuilder();
        newRunFacetsBuilder.put("lineage.error", newCustomFacetBuilder.build());
        return newRunFacetsBuilder.build();
    }

    public static OpenLineage.RunEvent buildErrorLineageEvent(OpenLineage openLineage, OpenLineage.RunFacets runFacets) {
        return openLineage.newRunEventBuilder().eventTime(ZonedDateTime.now()).run(openLineage.newRun(contextFactory.sparkContext.getParentRunId().orElse(null), runFacets)).job(openLineage.newJobBuilder().namespace(contextFactory.sparkContext.getJobNamespace()).name(contextFactory.sparkContext.getParentJobName()).build()).build();
    }

    private static void clear() {
        sparkSqlExecutionRegistry.clear();
        rddExecutionRegistry.clear();
        outputs.clear();
    }

    public static void close() {
        clear();
        contextFactory.close();
    }

    public void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        if (contextFactory != null) {
            return;
        }
        SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
        if (sparkEnv == null) {
            log.warn("Open lineage listener instantiated, but no configuration could be found. Lineage events will not be collected");
            return;
        }
        try {
            contextFactory = new ContextFactory(new OpenLineageContext(parseConf(sparkEnv.conf())));
        } catch (URISyntaxException e) {
            log.error("Unable to parse open lineage endpoint. Lineage events will not be collected", (Throwable) e);
        }
    }

    private ArgumentParser parseConf(SparkConf sparkConf) {
        Optional<String> findSparkConfigKey = findSparkConfigKey(sparkConf, SPARK_CONF_URL_KEY);
        return findSparkConfigKey.isPresent() ? ArgumentParser.parse(findSparkConfigKey.get()) : new ArgumentParser(findSparkConfigKey(sparkConf, SPARK_CONF_HOST_KEY, ArgumentParser.DEFAULTS.getHost()), findSparkConfigKey(sparkConf, SPARK_CONF_API_VERSION_KEY, ArgumentParser.DEFAULTS.getVersion()), findSparkConfigKey(sparkConf, SPARK_CONF_NAMESPACE_KEY, ArgumentParser.DEFAULTS.getNamespace()), findSparkConfigKey(sparkConf, SPARK_CONF_JOB_NAME_KEY, ArgumentParser.DEFAULTS.getJobName()), findSparkConfigKey(sparkConf, SPARK_CONF_PARENT_RUN_ID_KEY, ArgumentParser.DEFAULTS.getParentRunId()), findSparkConfigKey(sparkConf, SPARK_CONF_API_KEY).filter(str -> {
            return !str.isEmpty();
        }));
    }

    private String findSparkConfigKey(SparkConf sparkConf, String str, String str2) {
        return findSparkConfigKey(sparkConf, str).orElse(str2);
    }

    private Optional<String> findSparkConfigKey(SparkConf sparkConf, String str) {
        return ScalaConversionUtils.asJavaOptional((Option) sparkConf.getOption(str).getOrElse(ScalaConversionUtils.toScalaFn(() -> {
            return sparkConf.getOption("spark." + str);
        })));
    }
}
