package io.castled.apps.connectors.customerio;

import io.castled.apps.DataSink;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.models.AppSyncStats;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.models.Message;
import java.util.List;
import java.util.Optional;

/* loaded from: input_file:io/castled/apps/connectors/customerio/CustomerIODataSink.class */
public class CustomerIODataSink implements DataSink {
    private volatile CustomerIOObjectSink<String> customerIOObjectSink;
    private long skippedRecords = 0;

    @Override // io.castled.apps.DataSink
    public void syncRecords(DataSinkRequest dataSinkRequest) throws Exception {
        this.customerIOObjectSink = getObjectSink(dataSinkRequest);
        while (true) {
            Message readMessage = dataSinkRequest.getMessageInputStream().readMessage();
            if (readMessage == null) {
                this.customerIOObjectSink.flushRecords();
                return;
            } else if (!writeRecord(readMessage, dataSinkRequest.getPrimaryKeys())) {
                this.skippedRecords++;
            }
        }
    }

    private CustomerIOObjectSink<String> getObjectSink(DataSinkRequest dataSinkRequest) {
        CustomerIOObjectSink customerIOPersonSink;
        CustomerIOObject objectByName = CustomerIOObject.getObjectByName(((CustomerIOAppSyncConfig) dataSinkRequest.getAppSyncConfig()).getObject().getObjectName());
        switch (objectByName) {
            case EVENT:
                customerIOPersonSink = new CustomerIOEventSink(dataSinkRequest);
                break;
            case PERSON:
                customerIOPersonSink = new CustomerIOPersonSink(dataSinkRequest);
                break;
            default:
                throw new CastledRuntimeException(String.format("Invalid object type %s!", objectByName.getName()));
        }
        return customerIOPersonSink;
    }

    @Override // io.castled.apps.DataSink
    public AppSyncStats getSyncStats() {
        return (AppSyncStats) Optional.ofNullable(this.customerIOObjectSink).map((v0) -> {
            return v0.getSyncStats();
        }).map(messageSyncStats -> {
            return new AppSyncStats(messageSyncStats.getRecordsProcessed(), messageSyncStats.getOffset(), this.skippedRecords);
        }).orElse(new AppSyncStats(0L, 0L, 0L));
    }

    private boolean writeRecord(Message message, List<String> list) {
        this.customerIOObjectSink.createOrUpdateObject(message);
        return true;
    }
}
