package io.digdag.core.database;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.digdag.core.ErrorReporter;
import io.digdag.core.log.LogMarkers;
import io.digdag.core.repository.ResourceConflictException;
import io.digdag.spi.ImmutableTaskQueueLock;
import io.digdag.spi.TaskConflictException;
import io.digdag.spi.TaskNotFoundException;
import io.digdag.spi.TaskQueueLock;
import io.digdag.spi.TaskQueueRequest;
import io.digdag.spi.TaskQueueServer;
import io.digdag.spi.metrics.DigdagMetrics;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.sqlobject.Bind;
import org.skife.jdbi.v2.sqlobject.GetGeneratedKeys;
import org.skife.jdbi.v2.sqlobject.SqlQuery;
import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

/* loaded from: input_file:io/digdag/core/database/DatabaseTaskQueueServer.class */
public class DatabaseTaskQueueServer extends BasicDatabaseStoreManager<Dao> implements TaskQueueServer {
    private final DatabaseTaskQueueConfig queueConfig;
    private final ObjectMapper taskObjectMapper;
    private final int expireLockInterval;
    private final LocalLockMap localLockMap;
    private final ScheduledExecutorService expireExecutor;
    private final TransactionManager transactionManager;

    @Inject(optional = true)
    private ErrorReporter errorReporter;

    @Inject
    private DigdagMetrics metrics;
    private final Object localTaskNoticeHelper;

    /* loaded from: input_file:io/digdag/core/database/DatabaseTaskQueueServer$Dao.class */
    public interface Dao {
        @SqlQuery("select shared_site_id from queues where id = :queueId")
        Integer getSharedSiteId(@Bind("queueId") long j);

        @SqlQuery("with recursive t (site_id) as ((select site_id from queued_task_locks where lock_expire_time is null and site_id is not null order by site_id limit 1) union all select (select site_id from queued_task_locks where lock_expire_time is null and site_id is not null and site_id > t.site_id order by site_id limit 1) from t where t.site_id is not null) select site_id as id from t where site_id is not null")
        List<Integer> getActiveSiteIdList();

        @SqlUpdate("insert into queued_tasks (site_id, queue_id, unique_name, data, created_at) values (:siteId, :queueId, :uniqueName, :data, now())")
        @GetGeneratedKeys
        long insertQueuedTask(@Bind("siteId") Integer num, @Bind("queueId") Integer num2, @Bind("uniqueName") String str, @Bind("data") byte[] bArr);

        @SqlUpdate("insert into queued_task_locks (id, site_id, queue_id, priority) values (:id, :siteId, :queueId, :priority)")
        void insertQueuedTaskLock(@Bind("id") long j, @Bind("siteId") Integer num, @Bind("queueId") Integer num2, @Bind("priority") int i);

        @SqlQuery("select unique_name, data from queued_tasks where id = :taskLockId")
        ImmutableTaskQueueLock getTaskData(@Bind("taskLockId") long j);

        @SqlUpdate("delete from queued_task_locks where id = :taskLockId and lock_agent_id = :agentId")
        int deleteQueuedTaskLock(@Bind("taskLockId") long j, @Bind("agentId") String str);

        @SqlUpdate("delete from queued_task_locks where id = :taskLockId")
        int forceDeleteQueuedTaskLock(@Bind("taskLockId") long j);

        @SqlUpdate("delete from queued_tasks where id = :taskLockId and site_id = :siteId")
        int deleteQueuedTask(@Bind("siteId") int i, @Bind("taskLockId") long j);

        @SqlUpdate("delete from queued_tasks where id = :taskLockId")
        int forceDeleteQueuedTask(@Bind("taskLockId") long j);
    }

    /* loaded from: input_file:io/digdag/core/database/DatabaseTaskQueueServer$ImmutableTaskQueueLockMapper.class */
    static class ImmutableTaskQueueLockMapper implements ResultSetMapper<ImmutableTaskQueueLock> {
        /* renamed from: map, reason: merged with bridge method [inline-methods] */
        public ImmutableTaskQueueLock m60map(int i, ResultSet resultSet, StatementContext statementContext) throws SQLException {
            return ImmutableTaskQueueLock.builder().lockId("").uniqueName(resultSet.getString("unique_name")).data(BasicDatabaseStoreManager.getOptionalBytes(resultSet, "data")).build();
        }
    }

    @Inject
    public DatabaseTaskQueueServer(DatabaseConfig databaseConfig, TransactionManager transactionManager, ConfigMapper configMapper, DatabaseTaskQueueConfig databaseTaskQueueConfig, ObjectMapper objectMapper) {
        super(databaseConfig.getType(), Dao.class, transactionManager, configMapper);
        this.localLockMap = new LocalLockMap();
        this.errorReporter = ErrorReporter.empty();
        this.localTaskNoticeHelper = new Object();
        this.queueConfig = databaseTaskQueueConfig;
        this.taskObjectMapper = objectMapper;
        this.transactionManager = transactionManager;
        this.expireLockInterval = databaseConfig.getExpireLockInterval();
        this.expireExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("lock-expire-%d").build());
    }

    @SuppressFBWarnings({"NN_NAKED_NOTIFY"})
    public void interruptLocalWait() {
        synchronized (this.localTaskNoticeHelper) {
            this.localTaskNoticeHelper.notifyAll();
        }
    }

    private void sleepForEnqueue(long j) {
        synchronized (this.localTaskNoticeHelper) {
            try {
                this.localTaskNoticeHelper.wait(j);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @PostConstruct
    public void start() throws Exception {
        this.expireExecutor.scheduleWithFixedDelay(() -> {
            this.transactionManager.begin(() -> {
                expireLocks();
                return null;
            });
        }, this.expireLockInterval, this.expireLockInterval, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void shutdown() {
        this.expireExecutor.shutdown();
    }

    private boolean isEmbededDatabase() {
        String str = this.databaseType;
        boolean z = -1;
        switch (str.hashCode()) {
            case 3274:
                if (str.equals("h2")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return true;
            default:
                return false;
        }
    }

    private String statementUnixTimestampSql() {
        return "extract(epoch from now())";
    }

    public void enqueueDefaultQueueTask(int i, TaskQueueRequest taskQueueRequest) throws TaskConflictException {
        try {
            enqueue(Integer.valueOf(i), null, taskQueueRequest.getPriority(), taskQueueRequest.getUniqueName(), (byte[]) taskQueueRequest.getData().orNull());
        } catch (ResourceConflictException e) {
            throw new TaskConflictException(e);
        }
    }

    public void enqueueQueueBoundTask(int i, TaskQueueRequest taskQueueRequest) throws TaskConflictException {
        try {
            enqueue((Integer) autoCommit((handle, dao) -> {
                return dao.getSharedSiteId(i);
            }), Integer.valueOf(i), taskQueueRequest.getPriority(), taskQueueRequest.getUniqueName(), (byte[]) taskQueueRequest.getData().orNull());
        } catch (ResourceConflictException e) {
            throw new TaskConflictException(e);
        }
    }

    private long enqueue(@Nullable Integer num, @Nullable Integer num2, int i, String str, @Nullable byte[] bArr) throws ResourceConflictException {
        long longValue = ((Long) transaction((handle, dao) -> {
            long longValue2 = ((Long) catchConflict(() -> {
                return Long.valueOf(dao.insertQueuedTask(num, num2, str, bArr));
            }, "lock of task name=%s in site id = %d and queue id=%d", str, num, num2)).longValue();
            dao.insertQueuedTaskLock(longValue2, num, num2, i);
            return Long.valueOf(longValue2);
        }, ResourceConflictException.class)).longValue();
        interruptLocalWait();
        return longValue;
    }

    private static String formatSharedTaskLockId(long j) {
        return "s" + Long.toString(j);
    }

    private static String formatQueueBoundTaskLockId(long j, int i) {
        return "q" + Long.toString(j) + "." + Integer.toString(i);
    }

    private static boolean isSharedTaskLockId(String str) {
        return str.startsWith("s");
    }

    private static long parseTaskLockId(String str) {
        return Long.parseLong(str.split("\\.", 2)[0].substring(1));
    }

    private static int parseQueueId(String str) {
        String[] split = str.split("\\.");
        if (split.length != 2) {
            throw new IllegalArgumentException("Invalid queue-bound task lock id: " + str);
        }
        return Integer.parseInt(split[1]);
    }

    public void deleteTask(int i, String str, String str2) throws TaskNotFoundException, TaskConflictException {
        deleteTask0(i, parseTaskLockId(str), str2);
    }

    private void deleteTask0(int i, long j, String str) throws TaskNotFoundException, TaskConflictException {
        transaction((handle, dao) -> {
            if (dao.deleteQueuedTask(i, j) == 0) {
                throw new TaskNotFoundException("Deleting lock does not exist: lock id=" + j + " site id=" + i);
            }
            if (dao.deleteQueuedTaskLock(j, str) == 0) {
                throw new TaskConflictException("Deleting lock does not exist or preempted by another agent: lock id=" + j + " agent id=" + str);
            }
            return true;
        }, TaskNotFoundException.class, TaskConflictException.class);
    }

    public boolean forceDeleteTask(String str) {
        return forceDeleteTask0(parseTaskLockId(str));
    }

    private boolean forceDeleteTask0(long j) {
        return ((Boolean) transaction((handle, dao) -> {
            return Boolean.valueOf(dao.forceDeleteQueuedTask(j) > 0 || dao.forceDeleteQueuedTaskLock(j) > 0);
        })).booleanValue();
    }

    public List<String> taskHeartbeat(int i, List<String> list, String str, int i2) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (String str2 : list) {
            if (!(isSharedTaskLockId(str2) ? taskHeartbeat0(i, null, parseTaskLockId(str2), str, i2) : taskHeartbeat0(i, Integer.valueOf(parseQueueId(str2)), parseTaskLockId(str2), str, i2))) {
                builder.add(str2);
            }
        }
        return builder.build();
    }

    private boolean taskHeartbeat0(int i, Integer num, long j, String str, int i2) {
        return ((Integer) autoCommit((handle, dao) -> {
            return Integer.valueOf(handle.createStatement("update queued_task_locks set lock_expire_time = " + (isEmbededDatabase() ? Long.toString(Instant.now().getEpochSecond() + i2) : statementUnixTimestampSql() + " + " + Integer.toString(i2)) + " where id = :id and lock_agent_id = :agentId and coalesce(site_id, (select site_id from queue_settings where id = :queueId)) = :siteId").bind("expireTime", Instant.now().getEpochSecond() + i2).bind("id", j).bind("agentId", str).bind("queueId", num).bind("siteId", i).execute());
        })).intValue() > 0;
    }

    public List<TaskQueueLock> lockSharedAgentTasks(int i, String str, int i2, long j) {
        List list = (List) autoCommit((handle, dao) -> {
            return dao.getActiveSiteIdList();
        });
        Collections.shuffle(list);
        Iterator it = list.iterator();
        while (it.hasNext()) {
            List<Long> tryLockSharedAgentTasks = tryLockSharedAgentTasks(((Integer) it.next()).intValue(), i, str, i2);
            if (!tryLockSharedAgentTasks.isEmpty()) {
                ImmutableList.Builder builder = ImmutableList.builder();
                Iterator<Long> it2 = tryLockSharedAgentTasks.iterator();
                while (it2.hasNext()) {
                    long longValue = it2.next().longValue();
                    ImmutableTaskQueueLock immutableTaskQueueLock = (ImmutableTaskQueueLock) autoCommit((handle2, dao2) -> {
                        return dao2.getTaskData(longValue);
                    });
                    if (immutableTaskQueueLock != null) {
                        builder.add(immutableTaskQueueLock.withLockId(formatSharedTaskLockId(longValue)));
                    }
                }
                return builder.build();
            }
        }
        if (j >= 0) {
            sleepForEnqueue(j);
        }
        return ImmutableList.of();
    }

    private List<Long> tryLockSharedAgentTasks(int i, int i2, String str, int i3) {
        int siteMaxConcurrency = this.queueConfig.getSiteMaxConcurrency(i);
        try {
            if (!this.localLockMap.tryLock(i, 500L)) {
                return ImmutableList.of();
            }
            try {
                if (isEmbededDatabase()) {
                    List<Long> list = (List) transaction((handle, dao) -> {
                        List list2 = handle.createQuery("select id from queued_task_locks where lock_expire_time is null and site_id = :siteId and not exists (select * from (select queue_id, count(*) as count from queued_task_locks where lock_expire_time is not null and site_id = :siteId group by queue_id) runnings join queues on queues.id = runnings.queue_id where runnings.count >= queues.max_concurrency and runnings.queue_id = queued_task_locks.queue_id) and not exists (select count(*) from queued_task_locks where lock_expire_time is not null and site_id = :siteId having count(*) >= :siteMaxConcurrency) order by queue_id, priority desc, id limit :limit").bind("siteId", i).bind("siteMaxConcurrency", siteMaxConcurrency).bind("limit", i2).mapTo(Long.TYPE).list();
                        handle.createStatement("update queued_task_locks set lock_expire_time = :expireTime, lock_agent_id = :agentId where id in (" + ((String) list2.stream().map(l -> {
                            return Long.toString(l.longValue());
                        }).collect(Collectors.joining(", "))) + ")").bind("expireTime", Instant.now().getEpochSecond() + i3).bind("agentId", str).execute();
                        return list2;
                    });
                    this.localLockMap.unlock(i);
                    return list;
                }
                List<Long> list2 = (List) autoCommit((handle2, dao2) -> {
                    return handle2.createQuery("select lock_shared_tasks(:siteId, :siteMaxConcurrency, :limit, :lockExpireSeconds, :agentId)").bind("siteId", i).bind("siteMaxConcurrency", siteMaxConcurrency).bind("limit", i2).bind("lockExpireSeconds", i3).bind("agentId", str).mapTo(Long.TYPE).list();
                });
                this.localLockMap.unlock(i);
                return list2;
            } catch (Throwable th) {
                this.localLockMap.unlock(i);
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return ImmutableList.of();
        }
    }

    @VisibleForTesting
    void expireLocks() {
        try {
            int intValue = ((Integer) autoCommit((handle, dao) -> {
                return isEmbededDatabase() ? Integer.valueOf(handle.createStatement("update queued_task_locks set lock_expire_time = NULL, lock_agent_id = NULL, retry_count = retry_count + 1 where lock_expire_time is not null and lock_expire_time < :expireTime").bind("expireTime", Instant.now().getEpochSecond()).execute()) : Integer.valueOf(handle.createStatement("update queued_task_locks set lock_expire_time = NULL, lock_agent_id = NULL, retry_count = retry_count + 1 where lock_expire_time is not null and lock_expire_time < " + statementUnixTimestampSql()).execute());
            })).intValue();
            if (intValue > 0) {
                this.logger.warn("{} task locks are expired. Tasks will be retried.", Integer.valueOf(intValue));
            }
        } catch (Throwable th) {
            this.logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "An uncaught exception is ignored. This lock expiration thread will be restarted.", th);
            this.errorReporter.reportUncaughtError(th);
            this.metrics.increment(DigdagMetrics.Category.DB, "uncaughtErrors");
        }
    }
}
