package io.knotx.fragments.task.engine;

import io.knotx.fragments.api.FragmentResult;
import io.knotx.fragments.task.api.Node;
import io.knotx.fragments.task.api.NodeType;
import io.knotx.fragments.task.api.composite.CompositeNode;
import io.knotx.fragments.task.api.single.SingleNode;
import io.knotx.reactivex.fragments.api.FragmentOperation;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.reactivex.RxHelper;

/* loaded from: input_file:io/knotx/fragments/task/engine/TaskEngine.class */
class TaskEngine {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskEngine.class);
    private final Vertx vertx;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskEngine(Vertx vertx) {
        this.vertx = vertx;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Single<FragmentEvent> start(String str, Node node, FragmentEventContext fragmentEventContext) {
        return processTask(new TaskExecutionContext(str, node, fragmentEventContext)).map(taskExecutionContext -> {
            return taskExecutionContext.getFragmentEventContext().getFragmentEvent();
        });
    }

    private Single<TaskExecutionContext> processTask(TaskExecutionContext taskExecutionContext) {
        traceEvent(taskExecutionContext);
        return taskExecutionContext.hasNext() ? getResult(taskExecutionContext).flatMap(fragmentResult -> {
            taskExecutionContext.updateResult(fragmentResult);
            return processTask(taskExecutionContext);
        }) : Single.just(taskExecutionContext);
    }

    private Single<TaskExecutionContext> processTask(TaskExecutionContext taskExecutionContext, Node node) {
        return processTask(new TaskExecutionContext(taskExecutionContext, node));
    }

    private Single<FragmentResult> getResult(TaskExecutionContext taskExecutionContext) {
        return NodeType.COMPOSITE == taskExecutionContext.getCurrentNode().getType() ? mapReduce(taskExecutionContext) : execute(taskExecutionContext);
    }

    private Single<FragmentResult> execute(TaskExecutionContext taskExecutionContext) {
        Single just = Single.just(taskExecutionContext.getCurrentNode());
        Class<SingleNode> cls = SingleNode.class;
        SingleNode.class.getClass();
        Single flatMap = just.map((v1) -> {
            return r1.cast(v1);
        }).observeOn(RxHelper.blockingScheduler(this.vertx)).flatMap(singleNode -> {
            return invokeOperation(singleNode, taskExecutionContext);
        });
        taskExecutionContext.getClass();
        Single doOnSuccess = flatMap.doOnSuccess(taskExecutionContext::handleSuccess);
        taskExecutionContext.getClass();
        return doOnSuccess.onErrorResumeNext(taskExecutionContext::handleError);
    }

    private Single<FragmentResult> invokeOperation(SingleNode singleNode, TaskExecutionContext taskExecutionContext) {
        return Single.just(taskExecutionContext).doOnSuccess(this::operationStarted).flatMap(taskExecutionContext2 -> {
            return FragmentOperation.newInstance(singleNode).rxApply(taskExecutionContext2.fragmentContextInstance());
        });
    }

    private void operationStarted(TaskExecutionContext taskExecutionContext) {
        taskExecutionContext.handleStarted();
    }

    private Single<FragmentResult> mapReduce(TaskExecutionContext taskExecutionContext) {
        CompositeNode currentNode = taskExecutionContext.getCurrentNode();
        operationStarted(taskExecutionContext);
        return Observable.fromIterable(currentNode.getNodes()).flatMap(node -> {
            return processTask(taskExecutionContext, node).toObservable();
        }).reduce(taskExecutionContext, (v0, v1) -> {
            return v0.merge(v1);
        }).map((v0) -> {
            return v0.toFragmentResult();
        });
    }

    private void traceEvent(TaskExecutionContext taskExecutionContext) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Fragment event [{}] is processed via graph node [{}].", new Object[]{taskExecutionContext.getFragmentEventContext().getFragmentEvent(), taskExecutionContext.getCurrentNode()});
        }
    }
}
