package io.castled.apps.connectors.marketo;

import com.google.api.client.util.Lists;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.connectors.marketo.dtos.BatchLeadUpdateRequest;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.errors.errorclassifications.ExternallyCategorizedError;
import io.castled.commons.models.AppSyncMode;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.functionalinterfaces.ThrowingConsumer;
import io.castled.schema.models.Tuple;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/castled/apps/connectors/marketo/MarketoLeadSink.class */
public class MarketoLeadSink extends BufferedObjectSink<DataSinkMessage> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MarketoLeadSink.class);
    private static final long BULK_REQUEST_BYTES_SIZE_MAX = 10485760;
    private static final long BULK_REQUEST_NUM_MAX = 30000;
    private static final long BATCH_REQUEST_NUM_MAX = 300;
    private final MarketoBulkClient marketoBulkClient;
    private final ErrorOutputStream errorOutputStream;
    private final MarketoAppSyncConfig syncConfig;
    private final AppSyncStats syncStats = new AppSyncStats(0, 0, 0);
    private final String pkDisplayName;
    private final List<String> mappedFields;

    public MarketoLeadSink(DataSinkRequest dataSinkRequest) {
        this.marketoBulkClient = new MarketoBulkClient((MarketoAppConfig) dataSinkRequest.getExternalApp().getConfig());
        this.errorOutputStream = dataSinkRequest.getErrorOutputStream();
        this.syncConfig = (MarketoAppSyncConfig) dataSinkRequest.getAppSyncConfig();
        if (CollectionUtils.size(dataSinkRequest.getPrimaryKeys()) > 1) {
            log.error("Only 1 primary key allowed, we have more => " + dataSinkRequest.getPrimaryKeys());
        }
        this.pkDisplayName = dataSinkRequest.getPrimaryKeys().stream().findFirst().get();
        this.mappedFields = dataSinkRequest.getMappedFields();
    }

    @Override // io.castled.apps.BufferedObjectSink
    protected void writeRecords(List<DataSinkMessage> list) {
        List<MarketoSyncError> errors;
        long j = 0;
        if (this.syncConfig.getMode() == AppSyncMode.UPSERT) {
            errors = this.marketoBulkClient.bulkUploadLeads(constructLeadFormData(list), getPrimaryKeyName(this.pkDisplayName, list.stream().findFirst().get().getRecord()), Integer.valueOf(list.size()));
        } else {
            if (this.syncConfig.getMode() != AppSyncMode.UPDATE) {
                throw new CastledRuntimeException(String.format("Invalid sync mode %s", this.syncConfig.getMode()));
            }
            BatchSyncStats batchUpdateLeads = this.marketoBulkClient.batchUpdateLeads(constructLeadUpdateRequest(list));
            errors = batchUpdateLeads.getErrors();
            j = batchUpdateLeads.getSkipped();
        }
        errors.forEach(marketoSyncError -> {
            this.errorOutputStream.writeFailedRecord((DataSinkMessage) list.get(marketoSyncError.getMsgIdx().intValue()), new ExternallyCategorizedError(marketoSyncError.getErrorCode(), marketoSyncError.getMessage()));
        });
        updateStats(list.size(), ((DataSinkMessage) Iterables.getLast(list)).getOffset(), j);
    }

    @Override // io.castled.apps.BufferedObjectSink
    public long getMaxBufferedObjects() {
        switch (this.syncConfig.getMode()) {
            case UPDATE:
                return 300L;
            case UPSERT:
                return BULK_REQUEST_NUM_MAX;
            default:
                throw new CastledRuntimeException(String.format("Invalid sync mode %s!", this.syncConfig.getMode().name()));
        }
    }

    private ByteArrayOutputStream constructLeadFormData(List<DataSinkMessage> list) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(byteArrayOutputStream));
        Tuple record = list.stream().findFirst().get().getRecord();
        List list2 = (List) this.mappedFields.stream().map(str -> {
            return (String) record.getField(str).getParams().get("name");
        }).collect(Collectors.toList());
        List list3 = (List) record.getFields().stream().map(field -> {
            return field.getName();
        }).collect(Collectors.toList());
        try {
            CSVPrinter cSVPrinter = new CSVPrinter(bufferedWriter, CSVFormat.DEFAULT.withHeader((String[]) list2.stream().toArray(i -> {
                return new String[i];
            })));
            ThrowingConsumer throwingConsumer = tuple -> {
                cSVPrinter.printRecord(((List) list3.stream().map(str2 -> {
                    return MarketoUtils.formatValue(tuple.getValue(str2), tuple.getField(str2).getSchema());
                }).collect(Collectors.toList())).toArray());
            };
            Iterator<DataSinkMessage> it = list.iterator();
            while (it.hasNext()) {
                throwingConsumer.accept(it.next().getRecord());
            }
            cSVPrinter.close();
            return byteArrayOutputStream;
        } catch (Exception e) {
            throw new CastledRuntimeException(e);
        }
    }

    BatchLeadUpdateRequest constructLeadUpdateRequest(List<DataSinkMessage> list) {
        BatchLeadUpdateRequest batchLeadUpdateRequest = new BatchLeadUpdateRequest();
        batchLeadUpdateRequest.setAction(getMarketoSyncMode());
        batchLeadUpdateRequest.setLookupField(getDedupeKey(list));
        ArrayList newArrayList = Lists.newArrayList();
        for (DataSinkMessage dataSinkMessage : list) {
            HashMap newHashMap = Maps.newHashMap();
            dataSinkMessage.getRecord().getFields().forEach(field -> {
                newHashMap.put((String) field.getParams().get("name"), MarketoUtils.formatValue(field.getValue(), field.getSchema()));
            });
            newArrayList.add(newHashMap);
        }
        batchLeadUpdateRequest.setInput(newArrayList);
        return batchLeadUpdateRequest;
    }

    private String getDedupeKey(List<DataSinkMessage> list) {
        return (String) list.stream().findFirst().orElseThrow(() -> {
            return new CastledRuntimeException("Records empty!");
        }).getRecord().getField(this.pkDisplayName).getParams().get("name");
    }

    private String getMarketoSyncMode() {
        switch (this.syncConfig.getMode()) {
            case UPDATE:
                return MarketoSyncMode.UPDATE.getName();
            case UPSERT:
                return MarketoSyncMode.UPSERT.getName();
            case INSERT:
                return MarketoSyncMode.INSERT.getName();
            default:
                throw new CastledRuntimeException(String.format("Invalid sync mode %s!", this.syncConfig.getMode().name()));
        }
    }

    private String getPrimaryKeyName(String str, Tuple tuple) {
        return (String) this.mappedFields.stream().filter(str2 -> {
            return str.equals(str2);
        }).map(str3 -> {
            return (String) tuple.getField(str3).getParams().get("name");
        }).findFirst().get();
    }

    private void updateStats(long j, long j2, long j3) {
        this.syncStats.setRecordsProcessed(this.syncStats.getRecordsProcessed() + j);
        this.syncStats.setOffset(Math.max(this.syncStats.getOffset(), j2));
        this.syncStats.setRecordsSkipped(this.syncStats.getRecordsSkipped() + j3);
    }

    public AppSyncStats getSyncStats() {
        return this.syncStats;
    }
}
