package io.openlineage.flink;

import io.openlineage.flink.shaded.org.apache.commons.lang3.Validate;
import io.openlineage.flink.shaded.org.apache.commons.lang3.reflect.FieldUtils;
import io.openlineage.flink.tracker.OpenLineageContinousJobTracker;
import io.openlineage.flink.tracker.OpenLineageContinousJobTrackerFactory;
import io.openlineage.flink.utils.JobTypeUtils;
import io.openlineage.flink.visitor.lifecycle.FlinkExecutionContext;
import io.openlineage.flink.visitor.lifecycle.FlinkExecutionContextFactory;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/flink/OpenLineageFlinkJobListener.class */
public class OpenLineageFlinkJobListener implements JobListener {
    private final StreamExecutionEnvironment executionEnvironment;
    private final OpenLineageContinousJobTracker jobTracker;
    private final String jobNamespace;
    private final String jobName;
    private final Duration jobTrackingInterval;
    private final Map<JobID, FlinkExecutionContext> jobContexts = new HashMap();
    private final RuntimeExecutionMode runtimeMode;
    private static final Logger log = LoggerFactory.getLogger(OpenLineageFlinkJobListener.class);
    static final Duration DEFAULT_TRACKING_INTERVAL = Duration.ofSeconds(10);
    public static final String DEFAULT_JOB_NAMESPACE = "flink_jobs";
    public static final ConfigOption<String> OPEN_LINEAGE_LISTENER_CONFIG_JOB_NAMESPACE = ConfigOptions.key("execution.job-listener.openlineage.namespace").stringType().defaultValue(DEFAULT_JOB_NAMESPACE).withDescription("Openlineage job namespace.");
    public static final ConfigOption<String> OPEN_LINEAGE_LISTENER_CONFIG_JOB_NAME = ConfigOptions.key("execution.job-listener.openlineage.job-name").stringType().defaultValue("Flink Streaming Job").withDescription("Openlienage job name.");
    public static final ConfigOption<Duration> OPENLINEAGE_LISTENER_CONFIG_DURATION = ConfigOptions.key("execution.job-listener.openlineage.tracking-interval").durationType().defaultValue(DEFAULT_TRACKING_INTERVAL).withDescription("Duration between REST API calls for checkpoint stats");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/openlineage/flink/OpenLineageFlinkJobListener$ArchivedList.class */
    public static class ArchivedList<T> extends ArrayList<T> {
        List<T> value;

        public ArchivedList(Collection<T> collection) {
            super(collection);
            this.value = new ArrayList(collection);
        }

        @Override // java.util.ArrayList, java.util.AbstractList, java.util.AbstractCollection, java.util.Collection, java.util.List
        public void clear() {
            this.value = new ArrayList(this);
            super.clear();
        }

        public List<T> getValue() {
            return this.value;
        }
    }

    /* loaded from: input_file:io/openlineage/flink/OpenLineageFlinkJobListener$OpenLineageFlinkJobListenerBuilder.class */
    public static class OpenLineageFlinkJobListenerBuilder {
        private StreamExecutionEnvironment executionEnvironment;
        private OpenLineageContinousJobTracker jobTracker;
        private String jobNamespace;
        private String jobName;
        private Duration jobTrackingInterval;
        private RuntimeExecutionMode runtimeMode;

        OpenLineageFlinkJobListenerBuilder() {
        }

        public OpenLineageFlinkJobListenerBuilder executionEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
            this.executionEnvironment = streamExecutionEnvironment;
            return this;
        }

        public OpenLineageFlinkJobListenerBuilder jobTracker(OpenLineageContinousJobTracker openLineageContinousJobTracker) {
            this.jobTracker = openLineageContinousJobTracker;
            return this;
        }

        public OpenLineageFlinkJobListenerBuilder jobNamespace(String str) {
            this.jobNamespace = str;
            return this;
        }

        public OpenLineageFlinkJobListenerBuilder jobName(String str) {
            this.jobName = str;
            return this;
        }

        public OpenLineageFlinkJobListenerBuilder jobTrackingInterval(Duration duration) {
            this.jobTrackingInterval = duration;
            return this;
        }

        public OpenLineageFlinkJobListenerBuilder runtimeMode(RuntimeExecutionMode runtimeExecutionMode) {
            this.runtimeMode = runtimeExecutionMode;
            return this;
        }

        public OpenLineageFlinkJobListener build() {
            return new OpenLineageFlinkJobListener(this.executionEnvironment, this.jobTracker, this.jobNamespace, this.jobName, this.jobTrackingInterval, this.runtimeMode);
        }

        public String toString() {
            return "OpenLineageFlinkJobListener.OpenLineageFlinkJobListenerBuilder(executionEnvironment=" + this.executionEnvironment + ", jobTracker=" + this.jobTracker + ", jobNamespace=" + this.jobNamespace + ", jobName=" + this.jobName + ", jobTrackingInterval=" + this.jobTrackingInterval + ", runtimeMode=" + this.runtimeMode + ")";
        }
    }

    /* loaded from: input_file:io/openlineage/flink/OpenLineageFlinkJobListener$OpenLineageFlinkJobListenerInternalBuilder.class */
    static class OpenLineageFlinkJobListenerInternalBuilder extends OpenLineageFlinkJobListenerBuilder {
        OpenLineageFlinkJobListenerInternalBuilder() {
        }

        @Override // io.openlineage.flink.OpenLineageFlinkJobListener.OpenLineageFlinkJobListenerBuilder
        public OpenLineageFlinkJobListener build() {
            Validate.notNull(((OpenLineageFlinkJobListenerBuilder) this).executionEnvironment, "StreamExecutionEnvironment has to be provided", new Object[0]);
            if (((OpenLineageFlinkJobListenerBuilder) this).jobNamespace == null) {
                super.jobNamespace((String) ((OpenLineageFlinkJobListenerBuilder) this).executionEnvironment.getConfiguration().get(OpenLineageFlinkJobListener.OPEN_LINEAGE_LISTENER_CONFIG_JOB_NAMESPACE));
            }
            if (((OpenLineageFlinkJobListenerBuilder) this).jobName == null) {
                super.jobName((String) ((OpenLineageFlinkJobListenerBuilder) this).executionEnvironment.getConfiguration().get(OpenLineageFlinkJobListener.OPEN_LINEAGE_LISTENER_CONFIG_JOB_NAME));
            }
            if (((OpenLineageFlinkJobListenerBuilder) this).jobTrackingInterval == null) {
                super.jobTrackingInterval((Duration) ((OpenLineageFlinkJobListenerBuilder) this).executionEnvironment.getConfiguration().get(OpenLineageFlinkJobListener.OPENLINEAGE_LISTENER_CONFIG_DURATION));
            }
            if (((OpenLineageFlinkJobListenerBuilder) this).jobTracker == null) {
                super.jobTracker(OpenLineageContinousJobTrackerFactory.getTracker(((OpenLineageFlinkJobListenerBuilder) this).executionEnvironment.getConfiguration(), ((OpenLineageFlinkJobListenerBuilder) this).jobTrackingInterval));
            }
            super.runtimeMode((RuntimeExecutionMode) ((OpenLineageFlinkJobListenerBuilder) this).executionEnvironment.getConfiguration().get(ExecutionOptions.RUNTIME_MODE));
            return super.build();
        }

        @Override // io.openlineage.flink.OpenLineageFlinkJobListener.OpenLineageFlinkJobListenerBuilder
        public OpenLineageFlinkJobListenerBuilder executionEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) {
            super.executionEnvironment(streamExecutionEnvironment);
            makeTransformationsArchivedList(streamExecutionEnvironment);
            return this;
        }

        private void makeTransformationsArchivedList(StreamExecutionEnvironment streamExecutionEnvironment) {
            try {
                Field field = FieldUtils.getField(StreamExecutionEnvironment.class, "transformations", true);
                FieldUtils.writeField(field, (Object) streamExecutionEnvironment, (Object) new ArchivedList((Collection) Optional.ofNullable((ArrayList) FieldUtils.readField(field, (Object) streamExecutionEnvironment, true)).orElse(new ArrayList())), true);
            } catch (IllegalAccessException e) {
                OpenLineageFlinkJobListener.log.error("Failed to rewrite transformations");
            }
        }
    }

    public static OpenLineageFlinkJobListenerBuilder builder() {
        return new OpenLineageFlinkJobListenerInternalBuilder();
    }

    public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th) {
        log.info("onJobSubmitted event triggered for {}.{}", this.jobNamespace, this.jobName);
        if (jobClient == null) {
            return;
        }
        try {
            start(jobClient);
        } catch (Exception | NoClassDefFoundError | NoSuchFieldError e) {
            log.error("Failed to notify OpenLineage about start", e);
        }
    }

    void start(JobClient jobClient) {
        try {
            List value = ((ArchivedList) FieldUtils.getField(StreamExecutionEnvironment.class, "transformations", true).get(this.executionEnvironment)).getValue();
            FlinkExecutionContext context = FlinkExecutionContextFactory.getContext(this.executionEnvironment.getConfiguration(), this.jobNamespace, this.jobName, jobClient.getJobID(), JobTypeUtils.extract(this.runtimeMode, value), value);
            this.jobContexts.put(jobClient.getJobID(), context);
            context.onJobSubmitted();
            this.jobTracker.startTracking(context);
        } catch (IllegalAccessException e) {
            log.error("Can't access the field. ", e);
        }
    }

    public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
        log.info("onJobExecuted event triggered for {}.{}", this.jobNamespace, this.jobName);
        try {
            this.jobTracker.stopTracking();
            finish(jobExecutionResult, th);
        } catch (Exception | NoClassDefFoundError | NoSuchFieldError e) {
            log.error("Failed to notify OpenLineage about complete", e);
        }
    }

    void finish(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
        if (jobExecutionResult instanceof DetachedJobExecutionResult) {
            this.jobContexts.remove(jobExecutionResult.getJobID());
            log.warn("Job running in detached mode. Set execution.attached to true if you want to emit completed events.");
        } else if (jobExecutionResult != null) {
            this.jobContexts.remove(jobExecutionResult.getJobID()).onJobCompleted(jobExecutionResult);
        } else if (this.jobContexts.size() == 1) {
            this.jobContexts.remove(this.jobContexts.entrySet().stream().findFirst().get().getKey()).onJobFailed(th);
        }
    }

    OpenLineageFlinkJobListener(StreamExecutionEnvironment streamExecutionEnvironment, OpenLineageContinousJobTracker openLineageContinousJobTracker, String str, String str2, Duration duration, RuntimeExecutionMode runtimeExecutionMode) {
        this.executionEnvironment = streamExecutionEnvironment;
        this.jobTracker = openLineageContinousJobTracker;
        this.jobNamespace = str;
        this.jobName = str2;
        this.jobTrackingInterval = duration;
        this.runtimeMode = runtimeExecutionMode;
    }
}
