package io.openlineage.spark.agent.lifecycle;

import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.EventEmitter;
import io.openlineage.spark.agent.client.OpenLineageClient;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.api.OpenLineageContext;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.WholeStageCodegenExec;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.class */
public class SparkSQLExecutionContext implements ExecutionContext {
    private static final Logger log = LoggerFactory.getLogger(SparkSQLExecutionContext.class);
    private final long executionId;
    private final OpenLineageContext olContext;
    private final EventEmitter eventEmitter;
    private final OpenLineageRunEventBuilder runEventBuilder;
    private final OpenLineage openLineage = new OpenLineage(OpenLineageClient.OPEN_LINEAGE_CLIENT_URI);
    private AtomicBoolean started = new AtomicBoolean(false);
    private AtomicBoolean finished = new AtomicBoolean(false);
    private Optional<Integer> jobId = Optional.empty();

    public SparkSQLExecutionContext(long j, EventEmitter eventEmitter, OpenLineageContext openLineageContext, OpenLineageRunEventBuilder openLineageRunEventBuilder) {
        this.executionId = j;
        this.eventEmitter = eventEmitter;
        this.olContext = openLineageContext;
        this.runEventBuilder = openLineageRunEventBuilder;
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart) {
        log.debug("SparkListenerSQLExecutionStart - executionId: {}", Long.valueOf(sparkListenerSQLExecutionStart.executionId()));
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info("No execution info {}", this.olContext);
            return;
        }
        OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(toZonedTime(sparkListenerSQLExecutionStart.time())), buildJob(this.olContext.getQueryExecution().get()), sparkListenerSQLExecutionStart);
        log.debug("Posting event for start {}: {}", Long.valueOf(this.executionId), buildRun);
        this.eventEmitter.emit(buildRun);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd) {
        log.debug("SparkListenerSQLExecutionEnd - executionId: {}", Long.valueOf(sparkListenerSQLExecutionEnd.executionId()));
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info("No execution info {}", this.olContext);
            return;
        }
        OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(toZonedTime(sparkListenerSQLExecutionEnd.time())), buildJob(this.olContext.getQueryExecution().get()), sparkListenerSQLExecutionEnd);
        log.debug("Posting event for end {}: {}", Long.valueOf(this.executionId), buildRun);
        this.eventEmitter.emit(buildRun);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerStageSubmitted sparkListenerStageSubmitted) {
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info("No execution info {}", this.olContext);
            return;
        }
        OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(ZonedDateTime.now(ZoneOffset.UTC)), buildJob(this.olContext.getQueryExecution().get()), sparkListenerStageSubmitted);
        log.debug("Posting event for stage submitted {}: {}", Long.valueOf(this.executionId), buildRun);
        this.eventEmitter.emit(buildRun);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerStageCompleted sparkListenerStageCompleted) {
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info("No execution info {}", this.olContext);
            return;
        }
        OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(ZonedDateTime.now(ZoneOffset.UTC)), buildJob(this.olContext.getQueryExecution().get()), sparkListenerStageCompleted);
        log.debug("Posting event for stage completed {}: {}", Long.valueOf(this.executionId), buildRun);
        this.eventEmitter.emit(buildRun);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void setActiveJob(ActiveJob activeJob) {
        this.runEventBuilder.registerJob(activeJob);
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void start(SparkListenerJobStart sparkListenerJobStart) {
        log.debug("SparkListenerJobStart - executionId: " + this.executionId);
        this.jobId = Optional.of(Integer.valueOf(sparkListenerJobStart.jobId()));
        if (!this.olContext.getQueryExecution().isPresent()) {
            log.info("No execution info {}", this.olContext);
        } else {
            log.debug("Posting event for start {}: {}", Long.valueOf(this.executionId), this.runEventBuilder.buildRun(buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(toZonedTime(sparkListenerJobStart.time())), buildJob(this.olContext.getQueryExecution().get()), sparkListenerJobStart));
        }
    }

    @Override // io.openlineage.spark.agent.lifecycle.ExecutionContext
    public void end(SparkListenerJobEnd sparkListenerJobEnd) {
        log.debug("SparkListenerJobEnd - executionId: " + this.executionId);
        this.jobId = Optional.of(Integer.valueOf(sparkListenerJobEnd.jobId()));
        if (!this.finished.compareAndSet(false, true)) {
            log.debug("Event already finished, returning");
        } else {
            if (!this.olContext.getQueryExecution().isPresent()) {
                log.info("No execution info {}", this.olContext);
                return;
            }
            OpenLineage.RunEvent buildRun = this.runEventBuilder.buildRun(buildParentFacet(), this.openLineage.newRunEventBuilder().eventTime(toZonedTime(sparkListenerJobEnd.time())), buildJob(this.olContext.getQueryExecution().get()), sparkListenerJobEnd);
            log.debug("Posting event for end {}: {}", Long.valueOf(this.executionId), buildRun);
            this.eventEmitter.emit(buildRun);
        }
    }

    private Optional<OpenLineage.ParentRunFacet> buildParentFacet() {
        return this.eventEmitter.getParentRunId().map(uuid -> {
            return PlanUtils.parentRunFacet(uuid, this.eventEmitter.getParentJobName(), this.eventEmitter.getJobNamespace());
        });
    }

    protected ZonedDateTime toZonedTime(long j) {
        return ZonedDateTime.ofInstant(Instant.ofEpochMilli(j), ZoneOffset.UTC);
    }

    protected OpenLineage.JobBuilder buildJob(QueryExecution queryExecution) {
        SparkContext sparkContext = queryExecution.executedPlan().sparkContext();
        SparkPlan executedPlan = queryExecution.executedPlan();
        if (executedPlan instanceof WholeStageCodegenExec) {
            executedPlan = ((WholeStageCodegenExec) executedPlan).child();
        }
        return this.openLineage.newJobBuilder().namespace(this.eventEmitter.getJobNamespace()).name(sparkContext.appName().replaceAll(ExecutionContext.CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT) + "." + executedPlan.nodeName().replaceAll(ExecutionContext.CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT));
    }
}
