package io.digdag.core.agent;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.digdag.core.ErrorReporter;
import io.digdag.core.database.TransactionManager;
import io.digdag.spi.TaskRequest;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/digdag/core/agent/MultiThreadAgent.class */
public class MultiThreadAgent implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(MultiThreadAgent.class);
    private final AgentConfig config;
    private final AgentId agentId;
    private final TaskServerApi taskServer;
    private final OperatorManager runner;
    private final TransactionManager transactionManager;
    private final ErrorReporter errorReporter;
    private final BlockingQueue<Runnable> executorQueue;
    private final ThreadPoolExecutor executor;
    private final Object addActiveTaskLock = new Object();
    private final AtomicInteger activeTaskCount = new AtomicInteger(0);
    private volatile boolean stop = false;

    public MultiThreadAgent(AgentConfig agentConfig, AgentId agentId, TaskServerApi taskServerApi, OperatorManager operatorManager, TransactionManager transactionManager, ErrorReporter errorReporter) {
        this.agentId = agentId;
        this.config = agentConfig;
        this.taskServer = taskServerApi;
        this.runner = operatorManager;
        this.transactionManager = transactionManager;
        this.errorReporter = errorReporter;
        ThreadFactory build = new ThreadFactoryBuilder().setDaemon(false).setNameFormat("task-thread-%d").build();
        if (agentConfig.getMaxThreads() > 0) {
            this.executorQueue = new LinkedBlockingQueue();
            this.executor = new ThreadPoolExecutor(agentConfig.getMaxThreads(), agentConfig.getMaxThreads(), 0L, TimeUnit.SECONDS, this.executorQueue, build);
        } else {
            this.executorQueue = new SynchronousQueue();
            this.executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, this.executorQueue, build);
        }
    }

    public void shutdown(Optional<Duration> optional) throws InterruptedException {
        int i;
        this.stop = true;
        this.taskServer.interruptLocalWait();
        synchronized (this.addActiveTaskLock) {
            this.executor.shutdown();
            i = this.activeTaskCount.get();
            this.addActiveTaskLock.notifyAll();
        }
        if (i > 0) {
            logger.info("Waiting for completion of {} running tasks...", Integer.valueOf(i));
        }
        if (optional.isPresent()) {
            long seconds = ((Duration) optional.get()).getSeconds();
            if (this.executor.awaitTermination(seconds, TimeUnit.SECONDS)) {
                return;
            }
            logger.warn("Some tasks didn't finish within maximum wait time ({} seconds)", Long.valueOf(seconds));
            return;
        }
        do {
        } while (!this.executor.awaitTermination(24L, TimeUnit.HOURS));
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.stop) {
            try {
                synchronized (this.addActiveTaskLock) {
                    if (this.executor.isShutdown()) {
                        return;
                    }
                    int min = Math.min(this.executor.getMaximumPoolSize() - this.activeTaskCount.get(), 10);
                    if (min > 0) {
                        this.transactionManager.begin(() -> {
                            for (TaskRequest taskRequest : this.taskServer.lockSharedAgentTasks(min, this.agentId, this.config.getLockRetentionTime(), 1000L)) {
                                this.executor.submit(() -> {
                                    try {
                                        this.runner.run(taskRequest);
                                    } catch (Throwable th) {
                                        logger.error("Uncaught exception. Task queue will detect this failure and this task will be retried later.", th);
                                        this.errorReporter.reportUncaughtError(th);
                                    } finally {
                                        this.activeTaskCount.decrementAndGet();
                                    }
                                });
                                this.activeTaskCount.incrementAndGet();
                            }
                            return null;
                        });
                    } else {
                        this.addActiveTaskLock.wait(500L);
                    }
                }
            } catch (Throwable th) {
                logger.error("Uncaught exception during acquiring tasks from a server. Ignoring. Agent thread will be retried.", th);
                this.errorReporter.reportUncaughtError(th);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}
