package org.copperengine.core.persistent.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.LoggingRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.NullArgumentException;
import org.copperengine.core.CopperRuntimeException;
import org.copperengine.core.ProcessingState;
import org.copperengine.core.WaitMode;
import org.copperengine.core.monitoring.RuntimeStatisticsCollector;
import org.copperengine.core.persistent.SerializedWorkflow;
import org.copperengine.core.persistent.hybrid.HybridDBStorageAccessor;
import org.copperengine.core.persistent.hybrid.Storage;
import org.copperengine.core.persistent.hybrid.WorkflowInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/copperengine/core/persistent/cassandra/CassandraStorage.class */
public class CassandraStorage implements Storage {
    private static final Logger logger = LoggerFactory.getLogger(CassandraStorage.class);
    private static final String CQL_UPD_WORKFLOW_INSTANCE_NOT_WAITING = "UPDATE COP_WORKFLOW_INSTANCE SET PPOOL_ID=?, PRIO=?, CREATION_TS=?, DATA=?, OBJECT_STATE=?, STATE=? WHERE ID=?";
    private static final String CQL_UPD_WORKFLOW_INSTANCE_WAITING = "UPDATE COP_WORKFLOW_INSTANCE SET PPOOL_ID=?, PRIO=?, CREATION_TS=?, DATA=?, OBJECT_STATE=?, WAIT_MODE=?, TIMEOUT=?, RESPONSE_MAP_JSON=?, STATE=? WHERE ID=?";
    private static final String CQL_UPD_WORKFLOW_INSTANCE_STATE = "UPDATE COP_WORKFLOW_INSTANCE SET STATE=? WHERE ID=?";
    private static final String CQL_UPD_WORKFLOW_INSTANCE_STATE_AND_RESPONSE_MAP = "UPDATE COP_WORKFLOW_INSTANCE SET STATE=?, RESPONSE_MAP_JSON=?  WHERE ID=?";
    private static final String CQL_DEL_WORKFLOW_INSTANCE_WAITING = "DELETE FROM COP_WORKFLOW_INSTANCE WHERE ID=?";
    private static final String CQL_SEL_WORKFLOW_INSTANCE = "SELECT * FROM COP_WORKFLOW_INSTANCE WHERE ID=?";
    private static final String CQL_INS_EARLY_RESPONSE = "INSERT INTO COP_EARLY_RESPONSE (CORRELATION_ID, RESPONSE) VALUES (?,?) USING TTL ?";
    private static final String CQL_DEL_EARLY_RESPONSE = "DELETE FROM COP_EARLY_RESPONSE WHERE CORRELATION_ID=?";
    private static final String CQL_SEL_EARLY_RESPONSE = "SELECT RESPONSE FROM COP_EARLY_RESPONSE WHERE CORRELATION_ID=?";
    private static final String CQL_INS_WFI_ID = "INSERT INTO COP_WFI_ID (ID) VALUES (?)";
    private static final String CQL_DEL_WFI_ID = "DELETE FROM COP_WFI_ID WHERE ID=?";
    private static final String CQL_SEL_WFI_ID_ALL = "SELECT * FROM COP_WFI_ID";
    private final Executor executor;
    private final Session session;
    private final Cluster cluster;
    private final Map<String, PreparedStatement> preparedStatements;
    private final JsonMapper jsonMapper;
    private final ConsistencyLevel consistencyLevel;
    private final RuntimeStatisticsCollector runtimeStatisticsCollector;
    private final RetryPolicy alwaysRetry;
    private int ttlEarlyResponseSeconds;
    private int initializationTimeoutSeconds;
    private boolean createSchemaOnStartup;

    public CassandraStorage(CassandraSessionManager cassandraSessionManager, Executor executor, RuntimeStatisticsCollector runtimeStatisticsCollector) {
        this(cassandraSessionManager, executor, runtimeStatisticsCollector, ConsistencyLevel.LOCAL_QUORUM);
    }

    public CassandraStorage(CassandraSessionManager cassandraSessionManager, Executor executor, RuntimeStatisticsCollector runtimeStatisticsCollector, ConsistencyLevel consistencyLevel) {
        this.preparedStatements = new HashMap();
        this.jsonMapper = new JsonMapperImpl();
        this.alwaysRetry = new LoggingRetryPolicy(new AlwaysRetryPolicy());
        this.ttlEarlyResponseSeconds = 86400;
        this.initializationTimeoutSeconds = 86400;
        this.createSchemaOnStartup = true;
        if (cassandraSessionManager == null) {
            throw new NullArgumentException("sessionManager");
        }
        if (consistencyLevel == null) {
            throw new NullArgumentException("consistencyLevel");
        }
        if (executor == null) {
            throw new NullArgumentException("executor");
        }
        if (runtimeStatisticsCollector == null) {
            throw new NullArgumentException("runtimeStatisticsCollector");
        }
        this.executor = executor;
        this.consistencyLevel = consistencyLevel;
        this.session = cassandraSessionManager.getSession();
        this.cluster = cassandraSessionManager.getCluster();
        this.runtimeStatisticsCollector = runtimeStatisticsCollector;
    }

    public void setCreateSchemaOnStartup(boolean z) {
        this.createSchemaOnStartup = z;
    }

    protected void prepareStatements() throws Exception {
        prepare(CQL_UPD_WORKFLOW_INSTANCE_NOT_WAITING);
        prepare(CQL_UPD_WORKFLOW_INSTANCE_WAITING);
        prepare(CQL_DEL_WORKFLOW_INSTANCE_WAITING);
        prepare(CQL_SEL_WORKFLOW_INSTANCE);
        prepare(CQL_UPD_WORKFLOW_INSTANCE_STATE);
        prepare(CQL_INS_EARLY_RESPONSE);
        prepare(CQL_DEL_EARLY_RESPONSE);
        prepare(CQL_SEL_EARLY_RESPONSE);
        prepare(CQL_UPD_WORKFLOW_INSTANCE_STATE_AND_RESPONSE_MAP);
        prepare(CQL_INS_WFI_ID);
        prepare(CQL_DEL_WFI_ID);
        prepare(CQL_SEL_WFI_ID_ALL, DefaultRetryPolicy.INSTANCE);
    }

    protected void createSchema(Session session, Cluster cluster) throws Exception {
        if (this.createSchemaOnStartup) {
            if (cluster.getMetadata().getKeyspace(session.getLoggedKeyspace()).getTable("COP_WORKFLOW_INSTANCE") != null) {
                logger.info("skipping schema creation");
                return;
            }
            logger.info("Creating tables...");
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(CassandraStorage.class.getResourceAsStream("copper-schema.cql")));
            Throwable th = null;
            try {
                try {
                    StringBuilder sb = new StringBuilder();
                    while (true) {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        }
                        String trim = readLine.trim();
                        if (!trim.isEmpty() && !trim.startsWith("--")) {
                            if (trim.endsWith(";")) {
                                if (trim.length() > 1) {
                                    sb.append(trim.substring(0, trim.length() - 1));
                                }
                                String sb2 = sb.toString();
                                sb = new StringBuilder();
                                logger.info("Executing CQL {}", sb2);
                                session.execute(sb2);
                            } else {
                                sb.append(trim).append(" ");
                            }
                        }
                    }
                    if (bufferedReader != null) {
                        if (0 == 0) {
                            bufferedReader.close();
                            return;
                        }
                        try {
                            bufferedReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (bufferedReader != null) {
                    if (th != null) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                throw th4;
            }
        }
    }

    public void setTtlEarlyResponseSeconds(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException();
        }
        this.ttlEarlyResponseSeconds = i;
    }

    public void setInitializationTimeoutSeconds(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException();
        }
        this.initializationTimeoutSeconds = i;
    }

    @Override // org.copperengine.core.persistent.hybrid.Storage
    public void safeWorkflowInstance(final WorkflowInstance workflowInstance, final boolean z) throws Exception {
        logger.debug("safeWorkflow({})", workflowInstance);
        new CassandraOperation<Void>(logger) { // from class: org.copperengine.core.persistent.cassandra.CassandraStorage.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.copperengine.core.persistent.cassandra.CassandraOperation
            public Void execute() throws Exception {
                if (z) {
                    PreparedStatement preparedStatement = (PreparedStatement) CassandraStorage.this.preparedStatements.get(CassandraStorage.CQL_INS_WFI_ID);
                    long nanoTime = System.nanoTime();
                    CassandraStorage.this.session.execute(preparedStatement.bind(new Object[]{workflowInstance.id}));
                    CassandraStorage.this.runtimeStatisticsCollector.submit("wfii.ins", 1, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                }
                if (workflowInstance.cid2ResponseMap == null || workflowInstance.cid2ResponseMap.isEmpty()) {
                    PreparedStatement preparedStatement2 = (PreparedStatement) CassandraStorage.this.preparedStatements.get(CassandraStorage.CQL_UPD_WORKFLOW_INSTANCE_NOT_WAITING);
                    long nanoTime2 = System.nanoTime();
                    CassandraStorage.this.session.execute(preparedStatement2.bind(new Object[]{workflowInstance.ppoolId, Integer.valueOf(workflowInstance.prio), workflowInstance.creationTS, workflowInstance.serializedWorkflow.getData(), workflowInstance.serializedWorkflow.getObjectState(), workflowInstance.state.name(), workflowInstance.id}));
                    CassandraStorage.this.runtimeStatisticsCollector.submit("wfi.update.nowait", 1, System.nanoTime() - nanoTime2, TimeUnit.NANOSECONDS);
                    return null;
                }
                PreparedStatement preparedStatement3 = (PreparedStatement) CassandraStorage.this.preparedStatements.get(CassandraStorage.CQL_UPD_WORKFLOW_INSTANCE_WAITING);
                String json = CassandraStorage.this.jsonMapper.toJSON(workflowInstance.cid2ResponseMap);
                long nanoTime3 = System.nanoTime();
                CassandraStorage.this.session.execute(preparedStatement3.bind(new Object[]{workflowInstance.ppoolId, Integer.valueOf(workflowInstance.prio), workflowInstance.creationTS, workflowInstance.serializedWorkflow.getData(), workflowInstance.serializedWorkflow.getObjectState(), workflowInstance.waitMode.name(), workflowInstance.timeout, json, workflowInstance.state.name(), workflowInstance.id}));
                CassandraStorage.this.runtimeStatisticsCollector.submit("wfi.update.wait", 1, System.nanoTime() - nanoTime3, TimeUnit.NANOSECONDS);
                return null;
            }
        }.run();
    }

    @Override // org.copperengine.core.persistent.hybrid.Storage
    public ListenableFuture<Void> deleteWorkflowInstance(String str) throws Exception {
        logger.debug("deleteWorkflowInstance({})", str);
        this.session.executeAsync(this.preparedStatements.get(CQL_DEL_WFI_ID).bind(new Object[]{str}));
        return createSettableFuture(this.session.executeAsync(this.preparedStatements.get(CQL_DEL_WORKFLOW_INSTANCE_WAITING).bind(new Object[]{str})), "wfi.delete", System.nanoTime());
    }

    private SettableFuture<Void> createSettableFuture(final ResultSetFuture resultSetFuture, final String str, final long j) {
        final SettableFuture<Void> create = SettableFuture.create();
        resultSetFuture.addListener(new Runnable() { // from class: org.copperengine.core.persistent.cassandra.CassandraStorage.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    CassandraStorage.this.runtimeStatisticsCollector.submit(str, 1, System.nanoTime() - j, TimeUnit.NANOSECONDS);
                    resultSetFuture.get();
                    create.set((Object) null);
                } catch (InterruptedException e) {
                    create.setException(e);
                } catch (ExecutionException e2) {
                    create.setException(e2.getCause());
                }
            }
        }, this.executor);
        return create;
    }

    @Override // org.copperengine.core.persistent.hybrid.Storage
    public WorkflowInstance readWorkflowInstance(final String str) throws Exception {
        logger.debug("readCassandraWorkflow({})", str);
        return new CassandraOperation<WorkflowInstance>(logger) { // from class: org.copperengine.core.persistent.cassandra.CassandraStorage.3
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.copperengine.core.persistent.cassandra.CassandraOperation
            public WorkflowInstance execute() throws Exception {
                PreparedStatement preparedStatement = (PreparedStatement) CassandraStorage.this.preparedStatements.get(CassandraStorage.CQL_SEL_WORKFLOW_INSTANCE);
                long nanoTime = System.nanoTime();
                Row one = CassandraStorage.this.session.execute(preparedStatement.bind(new Object[]{str})).one();
                if (one == null) {
                    return null;
                }
                WorkflowInstance workflowInstance = new WorkflowInstance();
                workflowInstance.id = str;
                workflowInstance.ppoolId = one.getString("PPOOL_ID");
                workflowInstance.prio = one.getInt("PRIO");
                workflowInstance.creationTS = one.getDate("CREATION_TS");
                workflowInstance.timeout = one.getDate("TIMEOUT");
                workflowInstance.waitMode = CassandraStorage.this.toWaitMode(one.getString("WAIT_MODE"));
                workflowInstance.serializedWorkflow = new SerializedWorkflow();
                workflowInstance.serializedWorkflow.setData(one.getString("DATA"));
                workflowInstance.serializedWorkflow.setObjectState(one.getString("OBJECT_STATE"));
                workflowInstance.cid2ResponseMap = CassandraStorage.this.toResponseMap(one.getString("RESPONSE_MAP_JSON"));
                workflowInstance.state = ProcessingState.valueOf(one.getString("STATE"));
                CassandraStorage.this.runtimeStatisticsCollector.submit("wfi.read", 1, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                return workflowInstance;
            }
        }.run();
    }

    @Override // org.copperengine.core.persistent.hybrid.Storage
    public ListenableFuture<Void> safeEarlyResponse(String str, String str2) throws Exception {
        logger.debug("safeEarlyResponse({})", str);
        return createSettableFuture(this.session.executeAsync(this.preparedStatements.get(CQL_INS_EARLY_RESPONSE).bind(new Object[]{str, str2, Integer.valueOf(this.ttlEarlyResponseSeconds)})), "ear.insert", System.nanoTime());
    }

    @Override // org.copperengine.core.persistent.hybrid.Storage
    public String readEarlyResponse(final String str) throws Exception {
        logger.debug("readEarlyResponse({})", str);
        return new CassandraOperation<String>(logger) { // from class: org.copperengine.core.persistent.cassandra.CassandraStorage.4
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.copperengine.core.persistent.cassandra.CassandraOperation
            public String execute() throws Exception {
                long nanoTime = System.nanoTime();
                Row one = CassandraStorage.this.session.execute(((PreparedStatement) CassandraStorage.this.preparedStatements.get(CassandraStorage.CQL_SEL_EARLY_RESPONSE)).bind(new Object[]{str})).one();
                CassandraStorage.this.runtimeStatisticsCollector.submit("ear.read", 1, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                if (one == null) {
                    return null;
                }
                CassandraStorage.logger.debug("early response with correlationId {} found!", str);
                return one.getString("RESPONSE");
            }
        }.run();
    }

    @Override // org.copperengine.core.persistent.hybrid.Storage
    public ListenableFuture<Void> deleteEarlyResponse(String str) throws Exception {
        logger.debug("deleteEarlyResponse({})", str);
        return createSettableFuture(this.session.executeAsync(this.preparedStatements.get(CQL_DEL_EARLY_RESPONSE).bind(new Object[]{str})), "ear.delete", System.nanoTime());
    }

    @Override // org.copperengine.core.persistent.hybrid.Storage
    public void initialize(final HybridDBStorageAccessor hybridDBStorageAccessor, int i) throws Exception {
        createSchema(this.session, this.cluster);
        prepareStatements();
        if (i <= 0) {
            i = 1;
        }
        logger.info("Starting to initialize with {} threads ...", Integer.valueOf(i));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i);
        long currentTimeMillis = System.currentTimeMillis();
        ResultSet execute = this.session.execute(this.preparedStatements.get(CQL_SEL_WFI_ID_ALL).bind().setFetchSize(500).setConsistencyLevel(ConsistencyLevel.ONE));
        int i2 = 0;
        while (true) {
            Row one = execute.one();
            if (one == null) {
                break;
            }
            i2++;
            final String string = one.getString("ID");
            newFixedThreadPool.execute(new Runnable() { // from class: org.copperengine.core.persistent.cassandra.CassandraStorage.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        CassandraStorage.this.resume(string, hybridDBStorageAccessor);
                    } catch (Exception e) {
                        CassandraStorage.logger.error("resume failed", e);
                    }
                }
            });
        }
        logger.info("Read {} IDs in {} msec", Integer.valueOf(i2), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        newFixedThreadPool.shutdown();
        if (!newFixedThreadPool.awaitTermination((long) this.initializationTimeoutSeconds, TimeUnit.SECONDS)) {
            throw new CopperRuntimeException("initialize timed out!");
        }
        logger.info("Finished initialization - read {} rows in {} msec", Integer.valueOf(i2), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        this.runtimeStatisticsCollector.submit("storage.init", i2, System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resume(String str, HybridDBStorageAccessor hybridDBStorageAccessor) throws Exception {
        logger.trace("resume(wfId={})", str);
        Row one = this.session.execute(this.preparedStatements.get(CQL_SEL_WORKFLOW_INSTANCE).bind(new Object[]{str})).one();
        if (one == null) {
            logger.warn("No workflow instance {} found - deleting row in COP_WFI_ID", str);
            this.session.executeAsync(this.preparedStatements.get(CQL_DEL_WFI_ID).bind(new Object[]{str}));
            return;
        }
        String string = one.getString("PPOOL_ID");
        int i = one.getInt("PRIO");
        WaitMode waitMode = toWaitMode(one.getString("WAIT_MODE"));
        Map<String, String> responseMap = toResponseMap(one.getString("RESPONSE_MAP_JSON"));
        ProcessingState valueOf = ProcessingState.valueOf(one.getString("STATE"));
        Date date = one.getDate("TIMEOUT");
        boolean z = date != null && date.getTime() <= System.currentTimeMillis();
        if (valueOf == ProcessingState.ERROR || valueOf == ProcessingState.INVALID) {
            return;
        }
        if (valueOf == ProcessingState.ENQUEUED) {
            hybridDBStorageAccessor.enqueue(str, string, i);
            return;
        }
        if (responseMap != null) {
            ArrayList<String> arrayList = new ArrayList();
            int i2 = 0;
            for (Map.Entry<String, String> entry : responseMap.entrySet()) {
                String key = entry.getKey();
                String value = entry.getValue();
                hybridDBStorageAccessor.registerCorrelationId(key, str);
                if (value != null) {
                    i2++;
                } else {
                    arrayList.add(key);
                }
            }
            boolean z2 = false;
            if (!arrayList.isEmpty()) {
                for (String str2 : arrayList) {
                    String readEarlyResponse = readEarlyResponse(str2);
                    if (readEarlyResponse != null) {
                        responseMap.put(str2, readEarlyResponse);
                        i2++;
                        z2 = true;
                    }
                }
            }
            if (z2 || z) {
                ProcessingState processingState = (z || i2 == responseMap.size() || (i2 == 1 && waitMode == WaitMode.FIRST)) ? ProcessingState.ENQUEUED : ProcessingState.WAITING;
                this.session.execute(this.preparedStatements.get(CQL_UPD_WORKFLOW_INSTANCE_STATE_AND_RESPONSE_MAP).bind(new Object[]{processingState.name(), this.jsonMapper.toJSON(responseMap), str}));
                if (processingState == ProcessingState.ENQUEUED) {
                    hybridDBStorageAccessor.enqueue(str, string, i);
                }
            }
        }
    }

    @Override // org.copperengine.core.persistent.hybrid.Storage
    public ListenableFuture<Void> updateWorkflowInstanceState(String str, ProcessingState processingState) throws Exception {
        logger.debug("updateWorkflowInstanceState({}, {})", str, processingState);
        return createSettableFuture(this.session.executeAsync(this.preparedStatements.get(CQL_UPD_WORKFLOW_INSTANCE_STATE).bind(new Object[]{processingState.name(), str})), "wfi.update.state", System.nanoTime());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, String> toResponseMap(String str) {
        if (str == null) {
            return null;
        }
        return (HashMap) this.jsonMapper.fromJSON(str, HashMap.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public WaitMode toWaitMode(String str) {
        if (str == null) {
            return null;
        }
        return WaitMode.valueOf(str);
    }

    private void prepare(String str) {
        prepare(str, this.alwaysRetry);
    }

    private void prepare(String str, RetryPolicy retryPolicy) {
        logger.info("Preparing cql stmt {}", str);
        PreparedStatement prepare = this.session.prepare(str);
        prepare.setConsistencyLevel(this.consistencyLevel);
        prepare.setRetryPolicy(retryPolicy);
        this.preparedStatements.put(str, prepare);
    }
}
