package bi.deep.flink.connector.source.reader;

import bi.deep.flink.connector.source.JdbcSourceConfig;
import bi.deep.flink.connector.source.split.JdbcSplit;
import bi.deep.flink.connector.source.utils.Result;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.core.io.InputStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:bi/deep/flink/connector/source/reader/JdbcReader.class */
public class JdbcReader<T> implements SourceReader<T, JdbcSplit> {
    private ExecutorService service;
    private final JdbcSourceConfig<T> config;
    private final Logger logger = LoggerFactory.getLogger(JdbcReader.class);
    public final BlockingQueue<Result<T>> results = new LinkedBlockingQueue();
    private CompletableFuture<Void> availability;
    private CompletableFuture<Void> submittedTask;

    public JdbcReader(JdbcSourceConfig<T> jdbcSourceConfig) {
        this.config = jdbcSourceConfig;
    }

    private void setAvailability(boolean z) {
        if (z) {
            this.availability.complete(null);
        } else {
            this.availability = new CompletableFuture<>();
        }
    }

    private void validateTask() {
        if (this.submittedTask == null || !this.submittedTask.isCompletedExceptionally()) {
            return;
        }
        try {
            Throwable th = (Throwable) this.submittedTask.handle((r2, th2) -> {
                return th2;
            }).get();
            this.logger.error("Task completed exceptionally", th);
            throw new RuntimeException(th);
        } catch (InterruptedException | ExecutionException e) {
            this.logger.error("Error occurred when checking task exception", e);
            throw new RuntimeException(e);
        }
    }

    public void start() {
        this.service = Executors.newSingleThreadExecutor();
        setAvailability(false);
    }

    public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
        validateTask();
        Result<T> poll = this.results.poll(this.config.getPollInterval().toMillis(), TimeUnit.MILLISECONDS);
        if (poll != null) {
            readerOutput.collect(poll.get());
            return InputStatus.MORE_AVAILABLE;
        }
        validateTask();
        setAvailability(false);
        return InputStatus.NOTHING_AVAILABLE;
    }

    public List<JdbcSplit> snapshotState(long j) {
        return new LinkedList();
    }

    public CompletableFuture<Void> isAvailable() {
        validateTask();
        return this.availability;
    }

    public void addSplits(List<JdbcSplit> list) {
        if (list.size() == 0) {
            this.logger.warn("Added empty splits");
            return;
        }
        if (list.size() > 1) {
            this.logger.warn("Added more than one split. Executing only one");
        }
        this.submittedTask = CompletableFuture.runAsync(new JdbcReaderTask(this.results, this.config), this.service);
        setAvailability(true);
    }

    public void notifyNoMoreSplits() {
    }

    public void close() throws Exception {
        this.service.shutdown();
        if (this.service.awaitTermination(5L, TimeUnit.SECONDS)) {
            this.logger.info("Closed execution service");
        } else {
            this.logger.info("Timeout occurred when closing execution service");
        }
    }
}
