package io.digdag.core.schedule;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigFactory;
import io.digdag.core.BackgroundExecutor;
import io.digdag.core.ErrorReporter;
import io.digdag.core.database.TransactionManager;
import io.digdag.core.log.LogMarkers;
import io.digdag.core.repository.ProjectStoreManager;
import io.digdag.core.repository.ResourceConflictException;
import io.digdag.core.repository.ResourceLimitExceededException;
import io.digdag.core.repository.ResourceNotFoundException;
import io.digdag.core.repository.StoredWorkflowDefinitionWithProject;
import io.digdag.core.session.AttemptStateFlags;
import io.digdag.core.session.DelayedAttemptControlStore;
import io.digdag.core.session.ImmutableStoredSessionAttempt;
import io.digdag.core.session.ResumingTask;
import io.digdag.core.session.Session;
import io.digdag.core.session.SessionMonitor;
import io.digdag.core.session.SessionStore;
import io.digdag.core.session.SessionStoreManager;
import io.digdag.core.session.StoredDelayedSessionAttempt;
import io.digdag.core.session.StoredSessionAttemptWithSession;
import io.digdag.core.workflow.AttemptBuilder;
import io.digdag.core.workflow.AttemptRequest;
import io.digdag.core.workflow.SessionAttemptConflictException;
import io.digdag.core.workflow.WorkflowExecutor;
import io.digdag.spi.ScheduleTime;
import io.digdag.spi.Scheduler;
import io.digdag.spi.metrics.DigdagMetrics;
import io.digdag.util.DurationParam;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/digdag/core/schedule/ScheduleExecutor.class */
public class ScheduleExecutor implements BackgroundExecutor {
    private static final Logger logger = LoggerFactory.getLogger(ScheduleExecutor.class);
    public static final List<String> BUILT_IN_SCHEDULE_PARAMS = Arrays.asList("skip_on_overtime", "skip_delayed_by");
    private final ProjectStoreManager rm;
    private final ScheduleStoreManager sm;
    private final SchedulerManager srm;
    private final TransactionManager tm;
    private final SessionStoreManager sessionStoreManager;
    private final AttemptBuilder attemptBuilder;
    private final WorkflowExecutor workflowExecutor;
    private final ConfigFactory cf;
    private final ScheduleConfig scheduleConfig;
    private ScheduledExecutorService executor;

    @Inject(optional = true)
    private ErrorReporter errorReporter = ErrorReporter.empty();

    @Inject
    private DigdagMetrics metrics;

    @Inject
    public ScheduleExecutor(ProjectStoreManager projectStoreManager, ScheduleStoreManager scheduleStoreManager, SchedulerManager schedulerManager, TransactionManager transactionManager, SessionStoreManager sessionStoreManager, AttemptBuilder attemptBuilder, WorkflowExecutor workflowExecutor, ConfigFactory configFactory, ScheduleConfig scheduleConfig) {
        this.rm = projectStoreManager;
        this.sm = scheduleStoreManager;
        this.srm = schedulerManager;
        this.tm = transactionManager;
        this.sessionStoreManager = sessionStoreManager;
        this.attemptBuilder = attemptBuilder;
        this.workflowExecutor = workflowExecutor;
        this.cf = configFactory;
        this.scheduleConfig = scheduleConfig;
    }

    @VisibleForTesting
    boolean isStarted() {
        return this.executor != null;
    }

    @PostConstruct
    public synchronized void start() {
        if (!this.scheduleConfig.getEnabled()) {
            logger.debug("Scheduler is disabled.");
            return;
        }
        if (this.executor == null) {
            this.executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("scheduler-%d").build());
        }
        this.executor.scheduleWithFixedDelay(() -> {
            runSchedules();
        }, 1L, 1L, TimeUnit.SECONDS);
        this.executor.scheduleWithFixedDelay(() -> {
            runDelayedAttempts();
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    @PreDestroy
    public synchronized void shutdown() {
        if (this.executor != null) {
            this.executor.shutdown();
            this.executor = null;
        }
    }

    @Override // io.digdag.core.BackgroundExecutor
    public void eagerShutdown() {
        shutdown();
    }

    private void runSchedules() {
        runSchedules(Instant.now());
    }

    private void runSchedules(Instant instant) {
        do {
            try {
            } catch (Throwable th) {
                logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "An uncaught exception is ignored. Scheduling will be retried.", th);
                this.errorReporter.reportUncaughtError(th);
                this.metrics.increment(DigdagMetrics.Category.DEFAULT, "uncaughtErrors");
                return;
            }
        } while (runScheduleOnce(instant));
    }

    @VisibleForTesting
    boolean runScheduleOnce(Instant instant) {
        return ((Integer) this.tm.begin(() -> {
            return Integer.valueOf(this.sm.lockReadySchedules(instant, 1, (scheduleControlStore, storedSchedule) -> {
                runSchedule(new ScheduleControl(scheduleControlStore, storedSchedule), instant);
            }));
        })).intValue() > 0;
    }

    private void runDelayedAttempts() {
        runDelayedAttempts(Instant.now());
    }

    @VisibleForTesting
    void runDelayedAttempts(Instant instant) {
        try {
            this.tm.begin(() -> {
                this.sessionStoreManager.lockReadyDelayedAttempts(instant, (delayedAttemptControlStore, storedDelayedSessionAttempt) -> {
                    runDelayedAttempt(delayedAttemptControlStore, storedDelayedSessionAttempt);
                });
                return null;
            });
        } catch (Throwable th) {
            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "An uncaught exception is ignored. Submitting delayed attempts will be retried.", th);
            this.errorReporter.reportUncaughtError(th);
            this.metrics.increment(DigdagMetrics.Category.DEFAULT, "uncaughtErrors");
        }
    }

    private void runSchedule(ScheduleControl scheduleControl, Instant instant) {
        ScheduleTime of;
        StoredSchedule storedSchedule = scheduleControl.get();
        Instant instant2 = null;
        try {
            StoredWorkflowDefinitionWithProject workflowDetailsById = this.rm.getWorkflowDetailsById(storedSchedule.getWorkflowDefinitionId());
            List<StoredSessionAttemptWithSession> activeAttemptsOfWorkflow = this.sessionStoreManager.getSessionStore(workflowDetailsById.getProject().getSiteId()).getActiveAttemptsOfWorkflow(workflowDetailsById.getProject().getId(), workflowDetailsById.getName(), 1, Optional.absent());
            Scheduler scheduler = this.srm.getScheduler(workflowDetailsById);
            Config scheduleConfig = SchedulerManager.getScheduleConfig(workflowDetailsById);
            boolean booleanValue = ((Boolean) scheduleConfig.get("skip_on_overtime", Boolean.TYPE, false)).booleanValue();
            Optional optional = scheduleConfig.getOptional("skip_delayed_by", DurationParam.class);
            if (optional.isPresent() && instant.isAfter(storedSchedule.getNextRunTime().plusSeconds(((DurationParam) optional.get()).getDuration().getSeconds()))) {
                logger.info("Now={} is too late from scheduled time={}. It's over skip_delayed_by={}. Skipping this schedule: {}", new Object[]{instant, storedSchedule.getNextScheduleTime(), optional.get(), storedSchedule});
                of = scheduler.nextScheduleTime(storedSchedule.getNextScheduleTime());
            } else if (activeAttemptsOfWorkflow.isEmpty() || !booleanValue) {
                try {
                    of = startSchedule(storedSchedule, scheduler, workflowDetailsById);
                    instant2 = storedSchedule.getNextScheduleTime();
                } catch (ResourceConflictException e) {
                    logger.error("Database state error during scheduling. Skipping this schedule: {}", storedSchedule, new IllegalStateException("Detected duplicated excution of a scheduled workflow for the same scheduling time.", e));
                    of = scheduler.nextScheduleTime(storedSchedule.getNextScheduleTime());
                } catch (ResourceLimitExceededException e2) {
                    logger.info("Number of attempts or tasks exceed limit. Pending this schedule for 10 minutes: {}", storedSchedule, e2);
                    of = ScheduleTime.of(storedSchedule.getNextScheduleTime(), ScheduleTime.alignedNow().plusSeconds(600L));
                }
            } else {
                logger.info("An attempt of the scheduled workflow is still running and skip_on_overtime = true. Skipping this schedule: {}", storedSchedule);
                of = scheduler.nextScheduleTime(storedSchedule.getNextScheduleTime());
            }
        } catch (ResourceNotFoundException e3) {
            logger.error("Database state error during scheduling. Pending this schedule for 1 hour: {}", storedSchedule, e3);
            of = ScheduleTime.of(storedSchedule.getNextScheduleTime(), storedSchedule.getNextRunTime().plusSeconds(3600L));
        } catch (RuntimeException e4) {
            logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Error during scheduling. Pending this schedule for 1 hour: {}", storedSchedule, e4);
            of = ScheduleTime.of(storedSchedule.getNextScheduleTime(), ScheduleTime.alignedNow().plusSeconds(3600L));
        }
        try {
            logger.info("Updating next schedule time: sched={}, next={}, lastSessionTime={}", new Object[]{storedSchedule, of, instant2});
            if (instant2 != null) {
                scheduleControl.updateNextScheduleTimeAndLastSessionTime(of, instant2);
            } else {
                scheduleControl.updateNextScheduleTime(of);
            }
        } catch (ResourceNotFoundException e5) {
            throw new IllegalStateException("Workflow for a schedule id=" + storedSchedule.getId() + " is scheduled but does not exist.", e5);
        }
    }

    @VisibleForTesting
    ScheduleTime startSchedule(StoredSchedule storedSchedule, Scheduler scheduler, StoredWorkflowDefinitionWithProject storedWorkflowDefinitionWithProject) throws ResourceNotFoundException, ResourceConflictException, ResourceLimitExceededException {
        Instant nextScheduleTime = storedSchedule.getNextScheduleTime();
        try {
            this.workflowExecutor.submitWorkflow(storedWorkflowDefinitionWithProject.getProject().getSiteId(), newAttemptRequest(storedWorkflowDefinitionWithProject, ScheduleTime.of(nextScheduleTime, storedSchedule.getNextRunTime()), Optional.absent(), storedSchedule.getLastSessionTime()), storedWorkflowDefinitionWithProject);
        } catch (SessionAttemptConflictException e) {
            logger.debug("Scheduled attempt {} is already executed. Skipping", e.getConflictedSession());
        }
        return scheduler.nextScheduleTime(nextScheduleTime);
    }

    public StoredSchedule skipScheduleToTime(int i, int i2, Instant instant, Optional<Instant> optional, boolean z) throws ResourceNotFoundException, ResourceConflictException {
        return (StoredSchedule) this.sm.getScheduleStore(i).updateScheduleById(i2, (scheduleControlStore, storedSchedule) -> {
            ScheduleControl scheduleControl = new ScheduleControl(scheduleControlStore, storedSchedule);
            ScheduleTime firstScheduleTime = getSchedulerOfSchedule(storedSchedule).getFirstScheduleTime(instant);
            if (!storedSchedule.getNextScheduleTime().isBefore(firstScheduleTime.getTime())) {
                throw new ResourceConflictException("Specified time to skip schedules is already past");
            }
            if (optional.isPresent()) {
                firstScheduleTime = ScheduleTime.of(firstScheduleTime.getTime(), (Instant) optional.get());
            }
            StoredSchedule copyWithUpdatedScheduleTime = copyWithUpdatedScheduleTime(storedSchedule, firstScheduleTime);
            if (!z) {
                scheduleControl.updateNextScheduleTime(firstScheduleTime);
            }
            return copyWithUpdatedScheduleTime;
        });
    }

    public StoredSchedule skipScheduleByCount(int i, int i2, Instant instant, int i3, Optional<Instant> optional, boolean z) throws ResourceNotFoundException, ResourceConflictException {
        return (StoredSchedule) this.sm.getScheduleStore(i).updateScheduleById(i2, (scheduleControlStore, storedSchedule) -> {
            ScheduleControl scheduleControl = new ScheduleControl(scheduleControlStore, storedSchedule);
            Scheduler schedulerOfSchedule = getSchedulerOfSchedule(storedSchedule);
            ScheduleTime firstScheduleTime = schedulerOfSchedule.getFirstScheduleTime(instant);
            for (int i4 = 0; i4 < i3; i4++) {
                firstScheduleTime = schedulerOfSchedule.nextScheduleTime(firstScheduleTime.getTime());
            }
            if (!storedSchedule.getNextScheduleTime().isBefore(firstScheduleTime.getTime())) {
                throw new ResourceConflictException("Specified time to skip schedules is already past");
            }
            if (optional.isPresent()) {
                firstScheduleTime = ScheduleTime.of(firstScheduleTime.getTime(), (Instant) optional.get());
            }
            StoredSchedule copyWithUpdatedScheduleTime = copyWithUpdatedScheduleTime(storedSchedule, firstScheduleTime);
            if (!z) {
                scheduleControl.updateNextScheduleTime(firstScheduleTime);
            }
            return copyWithUpdatedScheduleTime;
        });
    }

    private static StoredSchedule copyWithUpdatedScheduleTime(StoredSchedule storedSchedule, ScheduleTime scheduleTime) {
        return ImmutableStoredSchedule.builder().from(storedSchedule).nextRunTime(scheduleTime.getRunTime()).nextScheduleTime(scheduleTime.getTime()).build();
    }

    private Scheduler getSchedulerOfSchedule(StoredSchedule storedSchedule) throws ResourceNotFoundException {
        return this.srm.getScheduler(this.rm.getWorkflowDetailsById(storedSchedule.getWorkflowDefinitionId()));
    }

    public List<StoredSessionAttemptWithSession> backfill(int i, int i2, Instant instant, String str, Optional<Integer> optional, boolean z) throws ResourceNotFoundException, ResourceConflictException, ResourceLimitExceededException {
        SessionStore sessionStore = this.sessionStoreManager.getSessionStore(i);
        return (List) this.sm.getScheduleStore(i).lockScheduleById(i2, (scheduleControlStore, storedSchedule) -> {
            StoredWorkflowDefinitionWithProject workflowDetailsById = this.rm.getWorkflowDetailsById(storedSchedule.getWorkflowDefinitionId());
            Scheduler scheduler = this.srm.getScheduler(workflowDetailsById);
            boolean isPresent = optional.isPresent();
            int intValue = ((Integer) optional.or(0)).intValue();
            ArrayList<Instant> arrayList = new ArrayList();
            Instant time = scheduler.getFirstScheduleTime(instant).getTime();
            while (true) {
                Instant instant2 = time;
                if (!instant2.isBefore(storedSchedule.getNextScheduleTime())) {
                    break;
                }
                if (isPresent) {
                    if (intValue <= 0) {
                        break;
                    }
                    intValue--;
                }
                arrayList.add(instant2);
                time = scheduler.nextScheduleTime(instant2).getTime();
            }
            if (isPresent && intValue > 0) {
                throw new IllegalArgumentException(String.format(Locale.ENGLISH, "count is set to %d but there are only %d attempts until the next schedule time", optional.get(), Integer.valueOf(((Integer) optional.get()).intValue() - intValue)));
            }
            for (Instant instant3 : arrayList) {
                try {
                    sessionStore.getAttemptByName(workflowDetailsById.getProject().getId(), workflowDetailsById.getName(), instant3, str);
                    throw new ResourceConflictException(String.format(Locale.ENGLISH, "Attempt of project id=%d workflow=%s instant=%s attempt name=%s already exists", Integer.valueOf(workflowDetailsById.getProject().getId()), workflowDetailsById.getName(), instant3, str));
                    break;
                } catch (ResourceNotFoundException e) {
                }
            }
            return (ImmutableList) this.workflowExecutor.submitTransaction(i, workflowSubmitter -> {
                ImmutableList.Builder builder = ImmutableList.builder();
                Optional absent = Optional.absent();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    Instant instant4 = (Instant) it.next();
                    if (z) {
                        builder.add(StoredSessionAttemptWithSession.dryRunDummy(i, Session.of(workflowDetailsById.getProject().getId(), workflowDetailsById.getName(), instant4), ImmutableStoredSessionAttempt.builder().retryAttemptName(Optional.of(str)).workflowDefinitionId(Optional.of(Long.valueOf(workflowDetailsById.getId()))).timeZone(workflowDetailsById.getTimeZone()).id(0L).index(0).params(workflowDetailsById.getConfig().getFactory().create()).stateFlags(AttemptStateFlags.empty()).sessionId(0L).createdAt(Instant.now()).finishedAt(Optional.absent()).build()));
                    } else {
                        Optional<Instant> transform = absent.transform(storedSessionAttemptWithSession -> {
                            return storedSessionAttemptWithSession.getSession().getSessionTime();
                        });
                        if (!transform.isPresent()) {
                            transform = workflowSubmitter.getLastExecutedSessionTime(storedSchedule.getProjectId(), storedSchedule.getWorkflowName(), instant4);
                        }
                        StoredSessionAttemptWithSession submitDelayedAttempt = workflowSubmitter.submitDelayedAttempt(newAttemptRequest(workflowDetailsById, ScheduleTime.of(instant4, storedSchedule.getNextScheduleTime()), Optional.of(str), transform), absent.transform(storedSessionAttemptWithSession2 -> {
                            return Long.valueOf(storedSessionAttemptWithSession2.getSessionId());
                        }));
                        absent = Optional.of(submitDelayedAttempt);
                        builder.add(submitDelayedAttempt);
                    }
                }
                return builder.build();
            });
        });
    }

    public void runDelayedAttempt(DelayedAttemptControlStore delayedAttemptControlStore, StoredDelayedSessionAttempt storedDelayedSessionAttempt) {
        try {
            delayedAttemptControlStore.lockSessionOfAttempt(storedDelayedSessionAttempt.getAttemptId(), (sessionControlStore, storedSessionAttemptWithSession) -> {
                if (!storedSessionAttemptWithSession.getWorkflowDefinitionId().isPresent()) {
                    throw new ResourceNotFoundException("Delayed attempt must have a stored workflow");
                }
                this.workflowExecutor.storeTasks(sessionControlStore, storedSessionAttemptWithSession, this.rm.getProjectStore(storedSessionAttemptWithSession.getSiteId()).getWorkflowDefinitionById(((Long) storedSessionAttemptWithSession.getWorkflowDefinitionId().get()).longValue()), (List<ResumingTask>) ImmutableList.of(), (List<SessionMonitor>) ImmutableList.of());
                return true;
            });
        } catch (ResourceConflictException e) {
            logger.warn("Delayed attempt conflicted: {}", storedDelayedSessionAttempt, e);
        } catch (ResourceLimitExceededException e2) {
            this.tm.reset();
            logger.warn("Failed to start delayed attempt Due to too many active tasks. Will be retried after 5 minutes.", e2);
            delayedAttemptControlStore.delayDelayedAttempt(storedDelayedSessionAttempt.getAttemptId(), Instant.now().plusSeconds(300L));
            return;
        } catch (ResourceNotFoundException e3) {
            logger.warn("Invalid delayed attempt: {}", storedDelayedSessionAttempt, e3);
        }
        delayedAttemptControlStore.completeDelayedAttempt(storedDelayedSessionAttempt.getAttemptId());
    }

    private AttemptRequest newAttemptRequest(StoredWorkflowDefinitionWithProject storedWorkflowDefinitionWithProject, ScheduleTime scheduleTime, Optional<String> optional, Optional<Instant> optional2) {
        return this.attemptBuilder.buildFromStoredWorkflow(storedWorkflowDefinitionWithProject, this.cf.create(), scheduleTime, optional, Optional.absent(), (List<Long>) ImmutableList.of(), optional2);
    }
}
