package io.digdag.core.workflow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
import io.digdag.client.config.ConfigFactory;
import io.digdag.core.Limits;
import io.digdag.core.agent.AgentId;
import io.digdag.core.database.TransactionManager;
import io.digdag.core.log.LogMarkers;
import io.digdag.core.repository.ProjectStoreManager;
import io.digdag.core.repository.ResourceConflictException;
import io.digdag.core.repository.ResourceNotFoundException;
import io.digdag.core.repository.StoredProject;
import io.digdag.core.repository.StoredRevision;
import io.digdag.core.repository.WorkflowDefinition;
import io.digdag.core.session.ParameterUpdate;
import io.digdag.core.session.ResumingTask;
import io.digdag.core.session.Session;
import io.digdag.core.session.SessionAttempt;
import io.digdag.core.session.SessionControlStore;
import io.digdag.core.session.SessionMonitor;
import io.digdag.core.session.SessionStore;
import io.digdag.core.session.SessionStoreManager;
import io.digdag.core.session.StoredSessionAttempt;
import io.digdag.core.session.StoredSessionAttemptWithSession;
import io.digdag.core.session.StoredTask;
import io.digdag.core.session.Task;
import io.digdag.core.session.TaskAttemptSummary;
import io.digdag.core.session.TaskStateCode;
import io.digdag.core.session.TaskStateFlags;
import io.digdag.metrics.DigdagTimed;
import io.digdag.spi.ImmutableTaskQueueRequest;
import io.digdag.spi.TaskConflictException;
import io.digdag.spi.TaskExecutionException;
import io.digdag.spi.TaskNotFoundException;
import io.digdag.spi.TaskQueueLock;
import io.digdag.spi.TaskQueueRequest;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.TaskResult;
import io.digdag.spi.metrics.DigdagMetrics;
import io.digdag.util.RetryControl;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/digdag/core/workflow/WorkflowExecutor.class */
public class WorkflowExecutor {
    private final ProjectStoreManager rm;
    private final SessionStoreManager sm;
    private final TransactionManager tm;
    private final WorkflowCompiler compiler;
    private final TaskQueueDispatcher dispatcher;
    private final ConfigFactory cf;
    private final ObjectMapper archiveMapper;
    private final Config systemConfig;
    private final Limits limits;
    private final DigdagMetrics metrics;
    private final Lock propagatorLock = new ReentrantLock();
    private final Condition propagatorCondition = this.propagatorLock.newCondition();
    private volatile boolean propagatorNotice = false;
    private final boolean enqueueRandomFetch;
    private final Integer enqueueFetchSize;
    private static final int INITIAL_INTERVAL = 100;
    private static final int MAX_INTERVAL = 5000;
    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecutor.class);
    private static final DateTimeFormatter SESSION_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssxxx", Locale.ENGLISH);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/digdag/core/workflow/WorkflowExecutor$TaskQueuer.class */
    public class TaskQueuer implements AutoCloseable {
        private final Map<Long, Future<Void>> waiting = new ConcurrentHashMap();
        private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("task-queuer-%d").build());

        public TaskQueuer() {
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.executor.shutdown();
        }
    }

    /* loaded from: input_file:io/digdag/core/workflow/WorkflowExecutor$WorkflowSubmitterAction.class */
    public interface WorkflowSubmitterAction<T> {
        T call(WorkflowSubmitter workflowSubmitter) throws ResourceNotFoundException, AttemptLimitExceededException, SessionAttemptConflictException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/digdag/core/workflow/WorkflowExecutor$WorkflowTaskLimitExceededException.class */
    public static class WorkflowTaskLimitExceededException extends RuntimeException {
        public WorkflowTaskLimitExceededException(TaskLimitExceededException taskLimitExceededException) {
            super(taskLimitExceededException);
        }

        @Override // java.lang.Throwable
        public TaskLimitExceededException getCause() {
            return (TaskLimitExceededException) super.getCause();
        }
    }

    @Inject
    public WorkflowExecutor(ProjectStoreManager projectStoreManager, SessionStoreManager sessionStoreManager, TransactionManager transactionManager, TaskQueueDispatcher taskQueueDispatcher, WorkflowCompiler workflowCompiler, ConfigFactory configFactory, ObjectMapper objectMapper, Config config, Limits limits, DigdagMetrics digdagMetrics) {
        this.rm = projectStoreManager;
        this.sm = sessionStoreManager;
        this.tm = transactionManager;
        this.compiler = workflowCompiler;
        this.dispatcher = taskQueueDispatcher;
        this.cf = configFactory;
        this.archiveMapper = objectMapper;
        this.systemConfig = config;
        this.limits = limits;
        this.metrics = digdagMetrics;
        this.enqueueRandomFetch = ((Boolean) config.get("executor.enqueue_random_fetch", Boolean.class, false)).booleanValue();
        this.enqueueFetchSize = (Integer) config.get("executor.enqueue_fetch_size", Integer.class, Integer.valueOf(INITIAL_INTERVAL));
    }

    public StoredSessionAttemptWithSession submitWorkflow(int i, AttemptRequest attemptRequest, WorkflowDefinition workflowDefinition) throws ResourceNotFoundException, AttemptLimitExceededException, TaskLimitExceededException, SessionAttemptConflictException {
        return submitTasks(i, attemptRequest, this.compiler.compile(workflowDefinition.getName(), workflowDefinition.getConfig()).getTasks());
    }

    public <T> T submitTransaction(int i, WorkflowSubmitterAction<T> workflowSubmitterAction) throws ResourceNotFoundException, AttemptLimitExceededException, SessionAttemptConflictException {
        try {
            return (T) this.sm.getSessionStore(i).sessionTransaction(sessionTransaction -> {
                return workflowSubmitterAction.call(new WorkflowSubmitter(i, sessionTransaction, this.rm.getProjectStore(i), this.sm.getSessionStore(i), this.tm, this.limits));
            });
        } catch (Exception e) {
            Throwables.propagateIfInstanceOf(e, ResourceNotFoundException.class);
            Throwables.propagateIfInstanceOf(e, AttemptLimitExceededException.class);
            Throwables.propagateIfInstanceOf(e, SessionAttemptConflictException.class);
            throw Throwables.propagate(e);
        }
    }

    public StoredSessionAttemptWithSession submitTasks(int i, AttemptRequest attemptRequest, WorkflowTaskList workflowTaskList) throws ResourceNotFoundException, AttemptLimitExceededException, TaskLimitExceededException, SessionAttemptConflictException {
        List<ResumingTask> of;
        if (logger.isTraceEnabled()) {
            Iterator<WorkflowTask> it = workflowTaskList.iterator();
            while (it.hasNext()) {
                WorkflowTask next = it.next();
                logger.trace("  Step[{}]: {}", Integer.valueOf(next.getIndex()), next.getName());
                logger.trace("    parent: {}", next.getParentIndex().transform(num -> {
                    return Integer.toString(num.intValue());
                }).or("(root)"));
                logger.trace("    upstreams: {}", next.mo93getUpstreamIndexes().stream().map(num2 -> {
                    return Integer.toString(num2.intValue());
                }).collect(Collectors.joining(", ")));
                logger.trace("    config: {}", next.getConfig());
            }
        }
        int projectId = attemptRequest.getStored().getProjectId();
        Session of2 = Session.of(projectId, attemptRequest.getWorkflowName(), attemptRequest.getSessionTime());
        SessionAttempt of3 = SessionAttempt.of(attemptRequest.getRetryAttemptName(), attemptRequest.getSessionParams(), attemptRequest.getTimeZone(), Optional.of(Long.valueOf(attemptRequest.getStored().getWorkflowDefinitionId())));
        TaskConfig.validateAttempt(of3);
        if (attemptRequest.getResumingAttemptId().isPresent()) {
            WorkflowTask workflowTask = workflowTaskList.get(0);
            of = TaskControl.buildResumingTaskMap(this.sm.getSessionStore(i), ((Long) attemptRequest.getResumingAttemptId().get()).longValue(), attemptRequest.mo91getResumingTasks());
            Iterator<ResumingTask> it2 = of.iterator();
            while (it2.hasNext()) {
                if (it2.next().getFullName().equals(workflowTask.getFullName())) {
                    throw new IllegalResumeException("Resuming root task is not allowed");
                }
            }
        } else {
            of = ImmutableList.of();
        }
        try {
            SessionStore sessionStore = this.sm.getSessionStore(i);
            long activeAttemptCount = sessionStore.getActiveAttemptCount();
            if (activeAttemptCount + 1 > this.limits.maxAttempts()) {
                throw new AttemptLimitExceededException("Too many attempts running. Limit: " + this.limits.maxAttempts() + ", Current: " + activeAttemptCount);
            }
            List<ResumingTask> list = of;
            StoredSessionAttemptWithSession storedSessionAttemptWithSession = (StoredSessionAttemptWithSession) sessionStore.putAndLockSession(of2, (sessionControlStore, storedSession) -> {
                StoredProject projectById = this.rm.getProjectStore(i).getProjectById(projectId);
                if (projectById.getDeletedAt().isPresent()) {
                    throw new ResourceNotFoundException(String.format(Locale.ENGLISH, "Project id={} name={} is already deleted", Integer.valueOf(projectById.getId()), projectById.getName()));
                }
                StoredSessionAttempt insertAttempt = sessionControlStore.insertAttempt(storedSession.getId(), projectId, of3);
                logger.info("Starting a new session project id={} workflow name={} session_time={}", new Object[]{Integer.valueOf(projectId), attemptRequest.getWorkflowName(), SESSION_TIME_FORMATTER.withZone(attemptRequest.getTimeZone()).format(attemptRequest.getSessionTime())});
                StoredSessionAttemptWithSession of4 = StoredSessionAttemptWithSession.of(i, storedSession, insertAttempt);
                try {
                    storeTasks(sessionControlStore, of4, workflowTaskList, (List<ResumingTask>) list, attemptRequest.mo92getSessionMonitors());
                    return of4;
                } catch (TaskLimitExceededException e) {
                    throw new WorkflowTaskLimitExceededException(e);
                }
            });
            noticeStatusPropagate();
            return storedSessionAttemptWithSession;
        } catch (ResourceConflictException e) {
            this.tm.reset();
            throw new SessionAttemptConflictException("Session already exists", e, attemptRequest.getRetryAttemptName().isPresent() ? this.sm.getSessionStore(i).getAttemptByName(of2.getProjectId(), of2.getWorkflowName(), of2.getSessionTime(), (String) attemptRequest.getRetryAttemptName().get()) : this.sm.getSessionStore(i).getLastAttemptByName(of2.getProjectId(), of2.getWorkflowName(), of2.getSessionTime()));
        } catch (WorkflowTaskLimitExceededException e2) {
            throw e2.getCause();
        }
    }

    public void storeTasks(SessionControlStore sessionControlStore, StoredSessionAttemptWithSession storedSessionAttemptWithSession, WorkflowDefinition workflowDefinition, List<ResumingTask> list, List<SessionMonitor> list2) throws TaskLimitExceededException {
        storeTasks(sessionControlStore, storedSessionAttemptWithSession, this.compiler.compile(workflowDefinition.getName(), workflowDefinition.getConfig()).getTasks(), list, list2);
    }

    public void storeTasks(SessionControlStore sessionControlStore, StoredSessionAttemptWithSession storedSessionAttemptWithSession, WorkflowTaskList workflowTaskList, List<ResumingTask> list, List<SessionMonitor> list2) throws TaskLimitExceededException {
        WorkflowTask workflowTask = workflowTaskList.get(0);
        try {
            sessionControlStore.insertRootTask(storedSessionAttemptWithSession.getId(), Task.taskBuilder().parentId(Optional.absent()).fullName(workflowTask.getFullName()).config(TaskConfig.validate(workflowTask.getConfig())).taskType(workflowTask.getTaskType()).state(workflowTask.getTaskType().isGroupingOnly() ? TaskStateCode.PLANNED : TaskStateCode.READY).stateFlags(TaskStateFlags.empty().withInitialTask()).build(), (taskControlStore, j) -> {
                try {
                    TaskControl.addInitialTasksExceptingRootTask(taskControlStore, storedSessionAttemptWithSession.getId(), j, workflowTaskList, list, this.limits);
                    return null;
                } catch (TaskLimitExceededException e) {
                    throw new WorkflowTaskLimitExceededException(e);
                }
            });
            if (list2.isEmpty()) {
                return;
            }
            Iterator<SessionMonitor> it = list2.iterator();
            while (it.hasNext()) {
                logger.debug("Using session monitor: {}", it.next());
            }
            sessionControlStore.insertMonitors(storedSessionAttemptWithSession.getId(), list2);
        } catch (WorkflowTaskLimitExceededException e) {
            throw e.getCause();
        }
    }

    public boolean killAttemptById(int i, long j) throws ResourceNotFoundException {
        boolean requestCancelAttempt = this.sm.requestCancelAttempt(this.sm.getSessionStore(i).getAttemptById(j).getId());
        if (requestCancelAttempt) {
            noticeStatusPropagate();
        }
        return requestCancelAttempt;
    }

    private void noticeStatusPropagate() {
        this.propagatorLock.lock();
        try {
            this.propagatorNotice = true;
            this.propagatorCondition.signalAll();
        } finally {
            this.propagatorLock.unlock();
        }
    }

    public void noticeRunWhileConditionChange() {
        this.propagatorLock.lock();
        try {
            this.propagatorCondition.signalAll();
        } finally {
            this.propagatorLock.unlock();
        }
    }

    public void run() throws InterruptedException {
        runWhile(() -> {
            return true;
        });
    }

    public StoredSessionAttemptWithSession runUntilDone(long j) throws ResourceNotFoundException, InterruptedException {
        try {
            runWhile(() -> {
                try {
                    return !this.sm.getAttemptStateFlags(j).isDone();
                } catch (ResourceNotFoundException e) {
                    throw Throwables.propagate(e);
                }
            });
            return (StoredSessionAttemptWithSession) this.tm.begin(() -> {
                return this.sm.getAttemptWithSessionById(j);
            }, ResourceNotFoundException.class);
        } catch (RuntimeException e) {
            Throwables.propagateIfInstanceOf(e.getCause(), ResourceNotFoundException.class);
            throw e;
        }
    }

    public void runUntilAllDone() throws InterruptedException {
        runWhile(() -> {
            return this.sm.isAnyNotDoneAttempts();
        });
    }

    public void runWhile(BooleanSupplier booleanSupplier) throws InterruptedException {
        TaskQueuer taskQueuer = new TaskQueuer();
        Throwable th = null;
        try {
            propagateBlockedChildrenToReady();
            retryRetryWaitingTasks();
            enqueueReadyTasks(taskQueuer);
            propagateAllPlannedToDone();
            propagateSessionArchive();
            AtomicInteger atomicInteger = new AtomicInteger(INITIAL_INTERVAL);
            while (!((Boolean) this.tm.begin(() -> {
                return Boolean.valueOf(!booleanSupplier.getAsBoolean());
            })).booleanValue()) {
                this.metrics.increment(DigdagMetrics.Category.EXECUTOR, "loopCount");
                propagateBlockedChildrenToReady();
                retryRetryWaitingTasks();
                enqueueReadyTasks(taskQueuer);
                boolean propagateAllPlannedToDone = propagateAllPlannedToDone();
                propagateSessionArchive();
                if (!propagateAllPlannedToDone) {
                    this.propagatorLock.lock();
                    try {
                        if (this.propagatorNotice) {
                            this.propagatorNotice = false;
                            atomicInteger.set(INITIAL_INTERVAL);
                        } else {
                            this.metrics.summary(DigdagMetrics.Category.EXECUTOR, "loopWaitMsec", atomicInteger.get());
                            if (this.propagatorCondition.await(atomicInteger.get(), TimeUnit.MILLISECONDS) && this.propagatorNotice) {
                                this.propagatorNotice = false;
                                atomicInteger.set(INITIAL_INTERVAL);
                            } else {
                                atomicInteger.set(Math.min(atomicInteger.get() * 2, MAX_INTERVAL));
                            }
                        }
                        this.propagatorLock.unlock();
                    } catch (Throwable th2) {
                        this.propagatorLock.unlock();
                        throw th2;
                    }
                }
            }
            if (taskQueuer != null) {
                if (0 == 0) {
                    taskQueuer.close();
                    return;
                }
                try {
                    taskQueuer.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (taskQueuer != null) {
                if (0 != 0) {
                    try {
                        taskQueuer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    taskQueuer.close();
                }
            }
            throw th4;
        }
    }

    protected <R> R catching(Supplier<R> supplier, R r, String str) {
        try {
            return supplier.get();
        } catch (Exception e) {
            catchingNotify(e);
            this.metrics.increment(DigdagMetrics.Category.EXECUTOR, "errorInRunWhile");
            logger.warn(str);
            return r;
        }
    }

    public void catchingNotify(Exception exc) {
    }

    @VisibleForTesting
    protected Function<Long, Optional<Boolean>> funcPropagateBlockedChildrenToReady() {
        return l -> {
            return (Optional) this.tm.begin(() -> {
                return this.sm.lockTaskIfNotLocked(l.longValue(), taskControlStore -> {
                    return Boolean.valueOf(taskControlStore.trySetChildrenBlockedToReadyOrShortCircuitPlannedOrCanceled(l.longValue()) > 0);
                });
            });
        };
    }

    @DigdagTimed(category = "executor", appendMethodName = true)
    protected boolean propagateBlockedChildrenToReady() {
        boolean z = false;
        long j = 0;
        while (true) {
            long j2 = j;
            List list = (List) this.tm.begin(() -> {
                return this.sm.findDirectParentsOfBlockedTasks(j2);
            });
            if (list.isEmpty()) {
                return z;
            }
            z = ((Boolean) list.stream().map(l -> {
                return (Boolean) ((Optional) catching(() -> {
                    return funcPropagateBlockedChildrenToReady().apply(l);
                }, Optional.absent(), "Failed to set children to ready. paretId:" + l)).or(false);
            }).reduce(Boolean.valueOf(z), (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() || bool2.booleanValue());
            })).booleanValue();
            j = ((Long) list.get(list.size() - 1)).longValue();
        }
    }

    protected Function<Long, Optional<Boolean>> funcSetDoneFromDoneChildren() {
        return l -> {
            return (Optional) this.tm.begin(() -> {
                return this.sm.lockTaskIfNotLocked(l.longValue(), (taskControlStore, storedTask) -> {
                    return Boolean.valueOf(setDoneFromDoneChildren(new TaskControl(taskControlStore, storedTask, this.limits)));
                });
            });
        };
    }

    @DigdagTimed(category = "executor", appendMethodName = true)
    protected boolean propagateAllPlannedToDone() {
        boolean z = false;
        long j = 0;
        while (true) {
            long j2 = j;
            List list = (List) this.tm.begin(() -> {
                return this.sm.findTasksByState(TaskStateCode.PLANNED, j2);
            });
            if (list.isEmpty()) {
                return z;
            }
            z = ((Boolean) list.stream().map(l -> {
                return (Boolean) ((Optional) catching(() -> {
                    return funcSetDoneFromDoneChildren().apply(l);
                }, Optional.absent(), "Failed to call setDoneFromDoneChildren. taskId:" + l)).or(false);
            }).reduce(Boolean.valueOf(z), (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() || bool2.booleanValue());
            })).booleanValue();
            j = ((Long) list.get(list.size() - 1)).longValue();
        }
    }

    private boolean setDoneFromDoneChildren(TaskControl taskControl) {
        boolean plannedToGroupError;
        if (taskControl.getState() != TaskStateCode.PLANNED || taskControl.isAnyProgressibleChild()) {
            return false;
        }
        logger.trace("setDoneFromDoneChildren {}", taskControl.get());
        StoredTask storedTask = taskControl.get();
        if (storedTask.getStateFlags().isCancelRequested()) {
            return taskControl.setToCanceled();
        }
        if (storedTask.getStateFlags().isDelayedError()) {
            boolean plannedToError = taskControl.setPlannedToError();
            if (!plannedToError) {
                logger.warn("Unexpected state change failure from PLANNED to ERROR: {}", storedTask);
            }
            return plannedToError;
        }
        if (storedTask.getStateFlags().isDelayedGroupError()) {
            boolean plannedToGroupError2 = taskControl.setPlannedToGroupError();
            if (!plannedToGroupError2) {
                logger.warn("Unexpected state change failure from PLANNED to GROUP_ERROR: {}", storedTask);
            }
            return plannedToGroupError2;
        }
        if (!taskControl.isAnyErrorChild()) {
            boolean plannedToSuccess = taskControl.setPlannedToSuccess();
            if (!plannedToSuccess) {
                logger.warn("Unexpected state change failure from PLANNED to SUCCESS: {}", storedTask);
            }
            return plannedToSuccess;
        }
        Optional<RetryControl> checkRetry = checkRetry(storedTask);
        if (checkRetry.isPresent()) {
            RetryControl retryControl = (RetryControl) checkRetry.get();
            plannedToGroupError = taskControl.setPlannedToGroupRetryWaiting(retryControl.getNextRetryStateParams(), retryControl.getNextRetryInterval());
        } else {
            ArrayList arrayList = new ArrayList();
            if (!taskControl.get().getParentId().isPresent()) {
                arrayList.add(Long.valueOf(addAttemptFailureAlertTask(taskControl)));
            }
            try {
                Optional<Long> addErrorTasksIfAny = addErrorTasksIfAny(taskControl, true, config -> {
                    collectErrorParams(config, taskControl.get());
                    return config;
                });
                if (addErrorTasksIfAny.isPresent()) {
                    arrayList.add(addErrorTasksIfAny.get());
                }
            } catch (ConfigException e) {
                logger.warn("Found a broken _error task in attempt {} task {}. Skipping this task.", new Object[]{Long.valueOf(storedTask.getAttemptId()), Long.valueOf(storedTask.getId()), e});
            } catch (TaskLimitExceededException e2) {
                this.tm.reset();
                logger.warn("Failed to add error tasks because of task limit");
            }
            plannedToGroupError = arrayList.isEmpty() ? taskControl.setPlannedToGroupError() : taskControl.setPlannedToPlannedWithDelayedGroupError();
        }
        return plannedToGroupError;
    }

    Optional<RetryControl> checkRetry(StoredTask storedTask) {
        try {
            RetryControl prepare = RetryControl.prepare(storedTask.getConfig().getMerged(), storedTask.getStateParams(), false);
            return prepare.evaluate() ? Optional.of(prepare) : Optional.absent();
        } catch (ConfigException e) {
            logger.warn("Ignore retry parameter because of invalid retry configuration. attempt_id:{} config:{}", Long.valueOf(storedTask.getAttemptId()), storedTask.getConfig());
            return Optional.absent();
        }
    }

    private void collectErrorParams(Config config, StoredTask storedTask) {
        List<Long> recursiveChildrenIdList = new TaskTree(this.sm.getTaskRelations(storedTask.getAttemptId())).getRecursiveChildrenIdList(storedTask.getId());
        Iterator<ParameterUpdate> it = this.sm.getStoreParams(recursiveChildrenIdList).iterator();
        while (it.hasNext()) {
            it.next().applyTo(config);
        }
        Config create = this.cf.create();
        Iterator<Config> it2 = this.sm.getErrors(recursiveChildrenIdList).iterator();
        while (it2.hasNext()) {
            create.merge(it2.next());
        }
        config.set("error", create);
    }

    @VisibleForTesting
    protected Function<TaskAttemptSummary, Optional<Boolean>> funcArchiveTasks() {
        return taskAttemptSummary -> {
            return (Optional) this.tm.begin(() -> {
                return this.sm.lockAttemptIfExists(taskAttemptSummary.getAttemptId(), (sessionAttemptControlStore, sessionAttemptSummary) -> {
                    if (sessionAttemptSummary.getStateFlags().isDone()) {
                        return false;
                    }
                    new SessionAttemptControl(sessionAttemptControlStore, taskAttemptSummary.getAttemptId()).archiveTasks(this.archiveMapper, taskAttemptSummary.getState() == TaskStateCode.SUCCESS);
                    return true;
                });
            });
        };
    }

    @DigdagTimed(category = "executor", appendMethodName = true)
    protected boolean propagateSessionArchive() {
        boolean z = false;
        long j = 0;
        while (true) {
            long j2 = j;
            List list = (List) this.tm.begin(() -> {
                return this.sm.findRootTasksByStates(TaskStateCode.doneStates(), j2);
            });
            if (list.isEmpty()) {
                return z;
            }
            z = ((Boolean) list.stream().map(taskAttemptSummary -> {
                return (Boolean) ((Optional) catching(() -> {
                    return funcArchiveTasks().apply(taskAttemptSummary);
                }, Optional.absent(), "Failed to call archiveTasks. taskId:" + taskAttemptSummary.getId())).or(false);
            }).reduce(Boolean.valueOf(z), (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() || bool2.booleanValue());
            })).booleanValue();
            j = ((TaskAttemptSummary) list.get(list.size() - 1)).getId();
        }
    }

    @DigdagTimed(category = "executor", appendMethodName = true)
    protected boolean retryRetryWaitingTasks() {
        return ((Boolean) this.tm.begin(() -> {
            return Boolean.valueOf(this.sm.trySetRetryWaitingToReady() > 0);
        })).booleanValue();
    }

    @VisibleForTesting
    protected Function<Long, Boolean> funcEnqueueTask() {
        return l -> {
            return (Boolean) this.tm.begin(() -> {
                enqueueTask(this.dispatcher, l.longValue());
                return true;
            });
        };
    }

    @DigdagTimed(category = "executor", appendMethodName = true)
    protected void enqueueReadyTasks(TaskQueuer taskQueuer) {
        List list = (List) this.tm.begin(() -> {
            return this.sm.findAllReadyTaskIds(this.enqueueFetchSize.intValue(), this.enqueueRandomFetch);
        });
        logger.trace("readyTaskIds:{}", list);
        Iterator it = list.iterator();
        while (it.hasNext()) {
            long longValue = ((Long) it.next()).longValue();
            catching(() -> {
                return funcEnqueueTask().apply(Long.valueOf(longValue));
            }, true, "Failed to call enqueueTask. taskId:" + longValue);
        }
    }

    @DigdagTimed(category = "executor", appendMethodName = true)
    protected void enqueueTask(TaskQueueDispatcher taskQueueDispatcher, long j) {
        this.sm.lockTaskIfNotLocked(j, (taskControlStore, storedTask) -> {
            TaskControl taskControl = new TaskControl(taskControlStore, storedTask, this.limits);
            if (taskControl.getState() != TaskStateCode.READY) {
                return false;
            }
            if (storedTask.getTaskType().isGroupingOnly()) {
                return Boolean.valueOf(retryGroupingTask(taskControl));
            }
            try {
                int siteIdOfTask = this.sm.getSiteIdOfTask(j);
                try {
                    Optional<String> absent = Optional.absent();
                    String encodeUniqueQueuedTaskName = encodeUniqueQueuedTaskName(taskControl.get());
                    ImmutableTaskQueueRequest build = TaskQueueRequest.builder().priority(0).uniqueName(encodeUniqueQueuedTaskName).data(Optional.absent()).build();
                    logger.debug("Queuing task of attempt_id={}: id={} {}", new Object[]{Long.valueOf(storedTask.getAttemptId()), Long.valueOf(storedTask.getId()), storedTask.getFullName()});
                    try {
                        taskQueueDispatcher.dispatch(siteIdOfTask, absent, build);
                    } catch (TaskConflictException e) {
                        this.tm.reset();
                        logger.warn("Task name {} is already queued in queue={} of site id={}. Skipped enqueuing", new Object[]{encodeUniqueQueuedTaskName, absent.or("<shared>"), Integer.valueOf(siteIdOfTask)});
                    }
                    boolean readyToRunning = taskControl.setReadyToRunning();
                    if (!readyToRunning) {
                        logger.warn("Unexpected state change failure from READY to RUNNING: {}", storedTask);
                    }
                    return Boolean.valueOf(readyToRunning);
                } catch (Exception e2) {
                    this.tm.reset();
                    logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Enqueue error, making this task failed: {}", storedTask, e2);
                    return Boolean.valueOf(taskFailed(taskControl, TaskExecutionException.buildExceptionErrorConfig(e2).toConfig(this.cf)));
                }
            } catch (ResourceNotFoundException e3) {
                this.tm.reset();
                logger.error("Database state error enqueuing task.", new IllegalStateException("Task id=" + j + " is ready to run but associated session attempt does not exist.", e3));
                return false;
            }
        }).or(false);
    }

    private static String encodeUniqueQueuedTaskName(StoredTask storedTask) {
        int retryCount = storedTask.getRetryCount();
        return retryCount == 0 ? Long.toString(storedTask.getId()) : Long.toString(storedTask.getId()) + ".r" + Integer.toString(retryCount);
    }

    private static long parseTaskIdFromEncodedQueuedTaskName(String str) {
        int indexOf = str.indexOf(46);
        return indexOf >= 0 ? Long.parseLong(str.substring(0, indexOf)) : Long.parseLong(str);
    }

    public List<TaskRequest> getTaskRequests(List<TaskQueueLock> list) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (TaskQueueLock taskQueueLock : list) {
            try {
                Optional<TaskRequest> taskRequest = getTaskRequest(parseTaskIdFromEncodedQueuedTaskName(taskQueueLock.getUniqueName()), taskQueueLock.getLockId());
                if (taskRequest.isPresent()) {
                    builder.add(taskRequest.get());
                } else {
                    this.dispatcher.deleteInconsistentTask(taskQueueLock.getLockId());
                }
            } catch (RuntimeException e) {
                this.tm.reset();
                logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Invalid association of task queue lock id: {}", taskQueueLock, e);
            }
        }
        return builder.build();
    }

    private Optional<TaskRequest> getTaskRequest(long j, String str) {
        return (Optional) this.sm.lockTaskIfExists(j, (taskControlStore, storedTask) -> {
            try {
                StoredSessionAttemptWithSession attemptWithSessionById = this.sm.getAttemptWithSessionById(storedTask.getAttemptId());
                Optional absent = Optional.absent();
                if (attemptWithSessionById.getWorkflowDefinitionId().isPresent()) {
                    try {
                        absent = Optional.of(this.rm.getRevisionOfWorkflowDefinition(((Long) attemptWithSessionById.getWorkflowDefinitionId().get()).longValue()));
                    } catch (ResourceNotFoundException e) {
                        this.tm.reset();
                        logger.error("Database state error enqueuing task.", new IllegalStateException("Task id=" + j + " is in the task queue but associated workflow definition does not exist.", e));
                        return Optional.absent();
                    }
                }
                try {
                    StoredProject projectByIdInternal = this.rm.getProjectByIdInternal(attemptWithSessionById.getSession().getProjectId());
                    Config fromJsonString = this.cf.fromJsonString((String) this.systemConfig.get("digdag.defaultParams", String.class, "{}"));
                    if (absent.isPresent()) {
                        fromJsonString.merge(((StoredRevision) absent.get()).getDefaultParams());
                    }
                    fromJsonString.merge(attemptWithSessionById.getParams());
                    collectParams(fromJsonString, storedTask, attemptWithSessionById);
                    Config deepCopy = storedTask.getConfig().getLocal().deepCopy();
                    fromJsonString.remove("_check");
                    fromJsonString.remove("_error");
                    deepCopy.remove("_check");
                    deepCopy.remove("_error");
                    return Optional.of(TaskRequest.builder().siteId(attemptWithSessionById.getSiteId()).projectId(attemptWithSessionById.getSession().getProjectId()).projectName(projectByIdInternal.getName()).workflowName(attemptWithSessionById.getSession().getWorkflowName()).revision(absent.transform(storedRevision -> {
                        return storedRevision.getName();
                    })).taskId(storedTask.getId()).attemptId(attemptWithSessionById.getId()).sessionId(attemptWithSessionById.getSessionId()).retryAttemptName(attemptWithSessionById.getRetryAttemptName()).isCancelRequested(storedTask.getStateFlags().isCancelRequested()).taskName(storedTask.getFullName()).lockId(str).timeZone(attemptWithSessionById.getTimeZone()).sessionUuid(attemptWithSessionById.getSessionUuid()).sessionTime(attemptWithSessionById.getSession().getSessionTime()).createdAt(Instant.now()).localConfig(deepCopy).config(fromJsonString).lastStateParams(storedTask.getStateParams()).workflowDefinitionId(attemptWithSessionById.getWorkflowDefinitionId()).build());
                } catch (ResourceNotFoundException e2) {
                    this.tm.reset();
                    logger.error("Database state error enqueuing task.", new IllegalStateException("Task id=" + j + " is in the task queue but associated project does not exist.", e2));
                    return Optional.absent();
                }
            } catch (ResourceNotFoundException e3) {
                this.tm.reset();
                logger.error("Database state error enqueuing task.", new IllegalStateException("Task id=" + j + " is in the task queue but associated session attempt does not exist.", e3));
                return Optional.absent();
            }
        }).or(() -> {
            logger.error("Database state error enqueuing task.", new IllegalStateException("Task id=" + j + " is in the task queue but associated task is deleted."));
            return Optional.absent();
        });
    }

    private boolean retryGroupingTask(TaskControl taskControl) {
        StoredTask storedTask = taskControl.get();
        taskControl.copyInitialTasksForRetry(storedTask.getFullName(), new TaskTree(this.sm.getTaskRelations(storedTask.getAttemptId())).getRecursiveChildrenIdList(storedTask.getId()));
        taskControl.setGroupRetryReadyToPlanned();
        return true;
    }

    public boolean taskFailed(int i, long j, String str, AgentId agentId, Config config) {
        boolean booleanValue = ((Boolean) this.sm.lockTaskIfExists(j, (taskControlStore, storedTask) -> {
            return Boolean.valueOf(taskFailed(new TaskControl(taskControlStore, storedTask, this.limits), config));
        }).or(false)).booleanValue();
        if (booleanValue) {
            try {
                this.dispatcher.taskFinished(i, str, agentId);
            } catch (TaskNotFoundException e) {
                this.tm.reset();
                logger.warn("Ignoring missing task entry error", e);
            } catch (TaskConflictException e2) {
                this.tm.reset();
                logger.warn("Ignoring preempted task entry error", e2);
            }
        }
        return booleanValue;
    }

    public boolean taskSucceeded(int i, long j, String str, AgentId agentId, TaskResult taskResult) {
        boolean booleanValue = ((Boolean) this.sm.lockTaskIfExists(j, (taskControlStore, storedTask) -> {
            return Boolean.valueOf(taskSucceeded(new TaskControl(taskControlStore, storedTask, this.limits), taskResult));
        }).or(false)).booleanValue();
        if (booleanValue) {
            try {
                this.dispatcher.taskFinished(i, str, agentId);
            } catch (TaskNotFoundException e) {
                this.tm.reset();
                logger.warn("Ignoring missing task entry error", e);
            } catch (TaskConflictException e2) {
                this.tm.reset();
                logger.warn("Ignoring preempted task entry error", e2);
            }
        }
        return booleanValue;
    }

    public boolean retryTask(int i, long j, String str, AgentId agentId, int i2, Config config, Optional<Config> optional) {
        boolean booleanValue = ((Boolean) this.sm.lockTaskIfExists(j, (taskControlStore, storedTask) -> {
            return Boolean.valueOf(retryTask(new TaskControl(taskControlStore, storedTask, this.limits), i2, config, optional));
        }).or(false)).booleanValue();
        if (booleanValue) {
            try {
                this.dispatcher.taskFinished(i, str, agentId);
            } catch (TaskConflictException e) {
                this.tm.reset();
                logger.warn("Ignoring preempted task entry error", e);
            } catch (TaskNotFoundException e2) {
                this.tm.reset();
                logger.warn("Ignoring missing task entry error", e2);
            }
        }
        return booleanValue;
    }

    private boolean taskFailed(TaskControl taskControl, Config config) {
        boolean z;
        boolean runningToShortCircuitError;
        logger.trace("Task failed with error {} with no retry: {}", config, taskControl.get());
        if (taskControl.getState() != TaskStateCode.RUNNING) {
            logger.trace("Skipping taskFailed callback to a {} task", taskControl.getState());
            return false;
        }
        if (taskControl.get().getStateFlags().isCancelRequested()) {
            return taskControl.setToCanceled();
        }
        try {
            z = addErrorTasksIfAny(taskControl, false, config2 -> {
                return config2.set("error", config);
            }).isPresent();
        } catch (TaskLimitExceededException e) {
            this.tm.reset();
            z = false;
            logger.warn("Failed to add error tasks because of task limit");
        } catch (ConfigException e2) {
            z = false;
            logger.warn("Found a broken _error task in attempt {} task {}. Skipping this task.", new Object[]{Long.valueOf(taskControl.get().getAttemptId()), Long.valueOf(taskControl.get().getId()), e2});
        }
        if (z) {
            logger.trace("Added an error task");
            runningToShortCircuitError = taskControl.setRunningToPlannedWithDelayedError(config);
        } else {
            runningToShortCircuitError = taskControl.setRunningToShortCircuitError(config);
        }
        noticeStatusPropagate();
        if (!runningToShortCircuitError) {
            logger.warn("Unexpected state change failure from RUNNING to RETRY, PLANNED or ERROR: {}", taskControl.get());
        }
        return runningToShortCircuitError;
    }

    /* JADX WARN: Removed duplicated region for block: B:20:0x00d1  */
    /* JADX WARN: Removed duplicated region for block: B:23:0x00eb  */
    /* JADX WARN: Removed duplicated region for block: B:26:0x00db  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private boolean taskSucceeded(io.digdag.core.workflow.TaskControl r9, io.digdag.spi.TaskResult r10) {
        /*
            Method dump skipped, instructions count: 253
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.digdag.core.workflow.WorkflowExecutor.taskSucceeded(io.digdag.core.workflow.TaskControl, io.digdag.spi.TaskResult):boolean");
    }

    private boolean retryTask(TaskControl taskControl, int i, Config config, Optional<Config> optional) {
        if (taskControl.getState() != TaskStateCode.RUNNING) {
            logger.trace("Skipping retryTask callback to a {} task", taskControl.getState());
            return false;
        }
        if (optional.isPresent()) {
            logger.trace("Task failed with error {} with retrying after {} seconds: {}", new Object[]{optional.get(), Integer.valueOf(i), taskControl.get()});
        }
        boolean runningToRetryWaiting = taskControl.setRunningToRetryWaiting(config, i);
        noticeStatusPropagate();
        if (!runningToRetryWaiting) {
            logger.warn("Unexpected state change failure from RUNNING to RETRY: {}", taskControl.get());
        }
        return runningToRetryWaiting;
    }

    private void collectParams(Config config, StoredTask storedTask, StoredSessionAttempt storedSessionAttempt) {
        TaskTree taskTree = new TaskTree(this.sm.getTaskRelations(storedSessionAttempt.getId()));
        List<Long> recursiveParentIdListFromRoot = taskTree.getRecursiveParentIdListFromRoot(storedTask.getId());
        List<Long> recursiveParentsUpstreamChildrenIdListFromFar = taskTree.getRecursiveParentsUpstreamChildrenIdListFromFar(storedTask.getId());
        List<Config> exportParams = this.sm.getExportParams(recursiveParentIdListFromRoot);
        List<ParameterUpdate> storeParams = this.sm.getStoreParams(recursiveParentsUpstreamChildrenIdListFromFar);
        for (int i = 0; i < recursiveParentsUpstreamChildrenIdListFromFar.size(); i++) {
            ParameterUpdate parameterUpdate = storeParams.get(i);
            int indexOf = recursiveParentIdListFromRoot.indexOf(Long.valueOf(recursiveParentsUpstreamChildrenIdListFromFar.get(i).longValue()));
            if (indexOf >= 0) {
                config.merge(exportParams.get(indexOf));
            }
            parameterUpdate.applyTo(config);
        }
        config.merge(storedTask.getConfig().getExport());
    }

    private Optional<Long> addSubtasksIfNotEmpty(TaskControl taskControl, Config config) throws TaskLimitExceededException {
        if (config.isEmpty()) {
            return Optional.absent();
        }
        WorkflowTaskList compileTasks = this.compiler.compileTasks(taskControl.get().getFullName(), "^sub", config);
        if (compileTasks.isEmpty()) {
            return Optional.absent();
        }
        logger.trace("Adding sub tasks: {}", compileTasks);
        return Optional.of(Long.valueOf(taskControl.addGeneratedSubtasks(compileTasks, ImmutableList.of(), true, true)));
    }

    private Optional<Long> addErrorTasksIfAny(TaskControl taskControl, boolean z, Function<Config, Config> function) throws TaskLimitExceededException {
        Config errorConfig = taskControl.get().getConfig().getErrorConfig();
        if (errorConfig.isEmpty()) {
            return Optional.absent();
        }
        errorConfig.setNested("_export", function.apply(errorConfig.getNestedOrGetEmpty("_export")));
        WorkflowTaskList compileTasks = this.compiler.compileTasks(taskControl.get().getFullName(), "^error", errorConfig);
        if (compileTasks.isEmpty()) {
            return Optional.absent();
        }
        logger.trace("Adding error tasks: {}", compileTasks);
        return Optional.of(Long.valueOf(taskControl.addGeneratedSubtasks(compileTasks, ImmutableList.of(), false)));
    }

    private long addAttemptFailureAlertTask(TaskControl taskControl) {
        Config create = this.cf.create();
        create.set("_type", "notify");
        create.set("_command", "Workflow session attempt failed");
        return taskControl.addGeneratedSubtasksWithoutLimit(this.compiler.compileTasks(taskControl.get().getFullName(), "^failure-alert", create), ImmutableList.of(), false);
    }

    private Optional<Long> addCheckTasksIfAny(TaskControl taskControl, Optional<Long> optional) throws TaskLimitExceededException {
        Config checkConfig = taskControl.get().getConfig().getCheckConfig();
        if (checkConfig.isEmpty()) {
            return Optional.absent();
        }
        WorkflowTaskList compileTasks = this.compiler.compileTasks(taskControl.get().getFullName(), "^check", checkConfig);
        if (compileTasks.isEmpty()) {
            return Optional.absent();
        }
        logger.trace("Adding check tasks: {}" + compileTasks);
        return Optional.of(Long.valueOf(taskControl.addGeneratedSubtasks(compileTasks, (List) optional.transform(l -> {
            return ImmutableList.of(l);
        }).or(ImmutableList.of()), false)));
    }

    public Optional<Long> addMonitorTask(TaskControl taskControl, String str, Config config) {
        boolean z = -1;
        switch (str.hashCode()) {
            case 113960:
                if (str.equals("sla")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                config.remove("time");
                config.remove("duration");
                addSlaMonitorTasks(taskControl, str, config);
                WorkflowTaskList compileTasks = this.compiler.compileTasks(taskControl.get().getFullName(), "^" + str, config);
                if (compileTasks.isEmpty()) {
                    return Optional.absent();
                }
                logger.trace("Adding {} tasks: {}", str, compileTasks);
                return Optional.of(Long.valueOf(taskControl.addGeneratedSubtasksWithoutLimit(compileTasks, ImmutableList.of(), false)));
            default:
                throw new UnsupportedOperationException("Unsupported monitor task type: " + str);
        }
    }

    private void addSlaMonitorTasks(TaskControl taskControl, String str, Config config) {
        boolean z = true;
        try {
            z = ((Boolean) config.get("alert", Boolean.TYPE, true)).booleanValue();
        } catch (ConfigException e) {
            logger.warn("sla configuration error: ", e);
        }
        config.remove("alert");
        if (z) {
            Config create = this.cf.create();
            create.set("_type", "notify");
            create.set("_command", "SLA violation");
            WorkflowTaskList compileTasks = this.compiler.compileTasks(taskControl.get().getFullName(), "^" + str + "^alert", create);
            logger.trace("Adding {} tasks: {}", str, compileTasks);
            taskControl.addGeneratedSubtasksWithoutLimit(compileTasks, ImmutableList.of(), false);
        }
        boolean z2 = false;
        try {
            z2 = ((Boolean) config.get("fail", Boolean.TYPE, false)).booleanValue();
        } catch (ConfigException e2) {
            logger.warn("sla configuration error: ", e2);
        }
        config.remove("fail");
        if (z2) {
            Config create2 = this.cf.create();
            create2.set("_type", "fail");
            create2.set("_command", "SLA violation");
            WorkflowTaskList compileTasks2 = this.compiler.compileTasks(taskControl.get().getFullName(), "^" + str + "^fail", create2);
            logger.trace("Adding {} tasks: {}", str, compileTasks2);
            taskControl.addGeneratedSubtasksWithoutLimit(compileTasks2, ImmutableList.of(), false);
        }
    }
}
