package io.openlineage.spark.agent.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.OpenLineageContext;
import io.openlineage.spark.agent.client.OpenLineageClient;
import io.openlineage.spark.agent.facets.ErrorFacet;
import io.openlineage.spark.agent.facets.LogicalPlanFacet;
import io.openlineage.spark.agent.facets.UnknownEntryFacet;
import io.openlineage.spark.agent.lifecycle.plan.PlanTraversal;
import io.openlineage.spark.agent.lifecycle.plan.PlanUtils;
import io.openlineage.spark.agent.lifecycle.plan.UnknownEntryFacetListener;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.JobResult;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.PartialFunction;
import scala.collection.JavaConversions;

/* loaded from: input_file:io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.class */
public class SparkSQLExecutionContext implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SparkSQLExecutionContext.class);
    private final long executionId;
    private final QueryExecution queryExecution;
    private final UUID runUuid = UUID.randomUUID();
    private final UnknownEntryFacetListener unknownEntryFacetListener = new UnknownEntryFacetListener();
    private OpenLineageContext sparkContext;
    private final List<PartialFunction<LogicalPlan, List<OpenLineage.OutputDataset>>> outputDatasetSupplier;
    private final List<PartialFunction<LogicalPlan, List<OpenLineage.InputDataset>>> inputDatasetSupplier;

    public SparkSQLExecutionContext(long j, OpenLineageContext openLineageContext, List<PartialFunction<LogicalPlan, List<OpenLineage.OutputDataset>>> list, List<PartialFunction<LogicalPlan, List<OpenLineage.InputDataset>>> list2) {
        this.executionId = j;
        this.sparkContext = openLineageContext;
        this.queryExecution = SQLExecution.getQueryExecution(j);
        this.outputDatasetSupplier = list;
        this.inputDatasetSupplier = list2;
    }

    public void start(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
    }

    public void end(SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void setActiveJob(ActiveJob activeJob) {
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerJobStart sparkListenerJobStart) {
        log.info("Starting job as part of spark-sql:" + sparkListenerJobStart.jobId());
        if (this.queryExecution == null) {
            log.info("No execution info {}", this.queryExecution);
            return;
        }
        PlanTraversal planTraversal = getPlanTraversal(PlanUtils.merge(this.outputDatasetSupplier));
        OpenLineage.RunEvent build = new OpenLineage(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI).newRunEventBuilder().eventTime(toZonedTime(sparkListenerJobStart.time())).eventType("START").inputs(getInputDatasets()).outputs(planTraversal.isDefinedAt(this.queryExecution.optimizedPlan()) ? (List) planTraversal.mo345apply(this.queryExecution.optimizedPlan()) : Collections.emptyList()).run(buildRun(buildRunFacets(buildLogicalPlanFacet(this.queryExecution.optimizedPlan()), null, this.unknownEntryFacetListener.build(this.queryExecution.optimizedPlan()), buildParentFacet()))).job(buildJob(this.queryExecution)).build();
        log.debug("Posting event for start {}: {}", sparkListenerJobStart, build);
        this.sparkContext.emit(build);
    }

    private List<OpenLineage.InputDataset> getInputDatasets() {
        return (List) JavaConversions.seqAsJavaList(this.queryExecution.optimizedPlan().collect(getPlanTraversal(PlanUtils.merge(this.inputDatasetSupplier)))).stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    private <T> PlanTraversal<LogicalPlan, List<T>> getPlanTraversal(PartialFunction<LogicalPlan, List<T>> partialFunction) {
        return PlanTraversal.builder().processor(partialFunction).visitedNodeListener(this.unknownEntryFacetListener).build();
    }

    private Optional<OpenLineage.ParentRunFacet> buildParentFacet() {
        return this.sparkContext.getParentRunId().map(uuid -> {
            return PlanUtils.parentRunFacet(uuid, this.sparkContext.getParentJobName(), this.sparkContext.getJobNamespace());
        });
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerJobEnd sparkListenerJobEnd) {
        log.info("Ending job as part of spark-sql:" + sparkListenerJobEnd.jobId());
        if (this.queryExecution == null) {
            log.info("No execution info {}", this.queryExecution);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Traversing optimized plan {}", this.queryExecution.optimizedPlan().toJSON());
            log.debug("Physical plan executed {}", this.queryExecution.executedPlan().toJSON());
        }
        PlanTraversal planTraversal = getPlanTraversal(PlanUtils.merge(this.outputDatasetSupplier));
        OpenLineage.RunEvent build = new OpenLineage(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI).newRunEventBuilder().eventTime(toZonedTime(sparkListenerJobEnd.time())).eventType(getEventType(sparkListenerJobEnd.jobResult())).inputs(getInputDatasets()).outputs(planTraversal.isDefinedAt(this.queryExecution.optimizedPlan()) ? (List) planTraversal.mo345apply(this.queryExecution.optimizedPlan()) : Collections.emptyList()).run(buildRun(buildRunFacets(buildLogicalPlanFacet(this.queryExecution.logical()), buildJobErrorFacet(sparkListenerJobEnd.jobResult()), this.unknownEntryFacetListener.build(this.queryExecution.optimizedPlan()), buildParentFacet()))).job(buildJob(this.queryExecution)).build();
        log.debug("Posting event for start {}: {}", sparkListenerJobEnd, build);
        this.sparkContext.emit(build);
    }

    protected ZonedDateTime toZonedTime(long j) {
        return ZonedDateTime.ofInstant(Instant.ofEpochMilli(j), ZoneOffset.UTC);
    }

    protected String getEventType(JobResult jobResult) {
        return jobResult.getClass().getSimpleName().startsWith("JobSucceeded") ? "COMPLETE" : "FAIL";
    }

    protected OpenLineage.Run buildRun(OpenLineage.RunFacets runFacets) {
        return new OpenLineage.RunBuilder().runId(this.runUuid).facets(runFacets).build();
    }

    protected OpenLineage.RunFacets buildRunFacets(LogicalPlanFacet logicalPlanFacet, ErrorFacet errorFacet, Optional<UnknownEntryFacet> optional, Optional<OpenLineage.ParentRunFacet> optional2) {
        OpenLineage.RunFacetsBuilder runFacetsBuilder = new OpenLineage.RunFacetsBuilder();
        runFacetsBuilder.getClass();
        optional2.ifPresent(runFacetsBuilder::parent);
        optional.ifPresent(unknownEntryFacet -> {
            runFacetsBuilder.put("spark_unknown", unknownEntryFacet);
        });
        if (logicalPlanFacet != null) {
            runFacetsBuilder.put("spark.logicalPlan", logicalPlanFacet);
        }
        if (errorFacet != null) {
            runFacetsBuilder.put("spark.exception", errorFacet);
        }
        return runFacetsBuilder.build();
    }

    protected LogicalPlanFacet buildLogicalPlanFacet(LogicalPlan logicalPlan) {
        return LogicalPlanFacet.builder().plan(logicalPlan).build();
    }

    protected ErrorFacet buildJobErrorFacet(JobResult jobResult) {
        if (!(jobResult instanceof JobFailed) || ((JobFailed) jobResult).exception() == null) {
            return null;
        }
        return ErrorFacet.builder().exception(((JobFailed) jobResult).exception()).build();
    }

    protected OpenLineage.Job buildJob(QueryExecution queryExecution) {
        SparkContext sparkContext = queryExecution.executedPlan().sparkContext();
        SparkPlan executedPlan = queryExecution.executedPlan();
        if (executedPlan instanceof WholeStageCodegenExec) {
            executedPlan = ((WholeStageCodegenExec) executedPlan).child();
        }
        return new OpenLineage.JobBuilder().namespace(this.sparkContext.getJobNamespace()).name(sparkContext.appName().replaceAll(ExecutionContext.CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT) + "." + executedPlan.nodeName().replaceAll(ExecutionContext.CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT)).build();
    }
}
