package bi.deep.flink.connector.source;

import bi.deep.flink.connector.source.reader.JdbcReaderTask;
import bi.deep.flink.connector.source.utils.Result;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:bi/deep/flink/connector/source/JdbcSourceFunction.class */
public class JdbcSourceFunction<T> implements SourceFunction<T> {
    private final JdbcSourceConfig<T> config;
    private transient Logger logger;
    private transient ScheduledExecutorService queryService;
    private transient ScheduledFuture<?> queryTask;
    private boolean running;

    public JdbcSourceFunction(JdbcSourceConfig<T> jdbcSourceConfig) {
        this.config = jdbcSourceConfig;
    }

    private void pollLoop(BlockingQueue<Result<T>> blockingQueue, SourceFunction.SourceContext<T> sourceContext) throws InterruptedException {
        while (this.running) {
            Result<T> poll = blockingQueue.poll(this.config.getPollInterval().toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                sourceContext.collect(poll.get());
            }
        }
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) {
        this.queryService = Executors.newSingleThreadScheduledExecutor();
        this.logger = LoggerFactory.getLogger(JdbcSourceFunction.class);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        this.queryTask = this.queryService.scheduleAtFixedRate(new JdbcReaderTask(linkedBlockingQueue, this.config), this.config.getInitialDiscoveryOffset().toMillis(), this.config.getDiscoveryInterval().toMillis(), TimeUnit.MILLISECONDS);
        this.running = true;
        try {
            pollLoop(linkedBlockingQueue, sourceContext);
        } catch (InterruptedException e) {
            this.running = false;
            this.logger.info("Interrupted JDBC Source Function");
        }
    }

    public void cancel() {
        this.queryTask.cancel(true);
        this.queryService.shutdown();
        try {
            this.queryService.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            this.logger.warn("Didn't manage to stop threading services in time", e);
        }
    }
}
