package io.castled.apps.connectors.salesforce;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.castled.apps.DataSink;
import io.castled.apps.OAuthAppConfig;
import io.castled.apps.connectors.salesforce.client.SFDCBulkClient;
import io.castled.apps.connectors.salesforce.client.SFDCRestClient;
import io.castled.apps.connectors.salesforce.client.dtos.ContentType;
import io.castled.apps.connectors.salesforce.client.dtos.InsertJobRequest;
import io.castled.apps.connectors.salesforce.client.dtos.Job;
import io.castled.apps.connectors.salesforce.client.dtos.JobRequest;
import io.castled.apps.connectors.salesforce.client.dtos.JobState;
import io.castled.apps.connectors.salesforce.client.dtos.JobStateUpdateRequest;
import io.castled.apps.connectors.salesforce.client.dtos.PkChunking;
import io.castled.apps.connectors.salesforce.client.dtos.UpsertJobRequest;
import io.castled.apps.models.DataSinkRequest;
import io.castled.apps.models.GenericSyncObject;
import io.castled.apps.syncconfigs.AppSyncConfig;
import io.castled.apps.syncconfigs.GenericObjectRadioGroupConfig;
import io.castled.commons.models.AppSyncMode;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.exceptions.CastledException;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.SchemaUtils;
import io.castled.schema.models.Field;
import io.castled.schema.models.FieldSchema;
import io.castled.schema.models.Message;
import io.castled.schema.models.RecordSchema;
import io.castled.schema.models.Schema;
import io.castled.schema.models.Tuple;
import io.castled.utils.SizeUtils;
import io.castled.utils.ThreadUtils;
import io.castled.utils.TimeUtils;
import java.io.IOException;
import java.io.Reader;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.csv.QuoteMode;
import org.apache.commons.io.input.CharSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/castled/apps/connectors/salesforce/SalesforceDataSink.class */
public class SalesforceDataSink implements DataSink {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SalesforceDataSink.class);
    private CSVPrinter csvPrinter;
    private List<String> trackableFields;
    private final List<Object> existingPrimaryKeyValues = Lists.newArrayList();
    private final AtomicLong skippedRecords = new AtomicLong(0);
    private final AtomicLong processedRecords = new AtomicLong(0);
    private final SalesforceSinkConfig salesforceSinkConfig;
    private final SalesforceFailedRecordsSchemaMapper salesforceFailedRecordsSchemaMapper;
    private final SalesforceErrorParser salesforceErrorParser;

    @Inject
    public SalesforceDataSink(SalesforceSinkConfig salesforceSinkConfig, SalesforceFailedRecordsSchemaMapper salesforceFailedRecordsSchemaMapper, SalesforceErrorParser salesforceErrorParser) {
        this.salesforceSinkConfig = salesforceSinkConfig;
        this.salesforceFailedRecordsSchemaMapper = salesforceFailedRecordsSchemaMapper;
        this.salesforceErrorParser = salesforceErrorParser;
    }

    @Override // io.castled.apps.DataSink
    public void syncRecords(DataSinkRequest dataSinkRequest) throws Exception {
        StringBuilder sb = null;
        OAuthAppConfig oAuthAppConfig = (OAuthAppConfig) dataSinkRequest.getExternalApp().getConfig();
        SFDCRestClient sFDCRestClient = new SFDCRestClient(oAuthAppConfig.getOAuthToken(), oAuthAppConfig.getClientConfig());
        computeExistingPrimaryKeysIfReqd(oAuthAppConfig, dataSinkRequest.getPrimaryKeys(), dataSinkRequest.getAppSyncConfig());
        long j = 0;
        String str = dataSinkRequest.getPrimaryKeys().get(0);
        HashMap newHashMap = Maps.newHashMap();
        while (true) {
            Message readMessage = dataSinkRequest.getMessageInputStream().readMessage();
            if (readMessage == null) {
                break;
            }
            if (this.csvPrinter == null) {
                sb = new StringBuilder();
                Stream<R> map = readMessage.getRecord().getFields().stream().map((v0) -> {
                    return v0.getName();
                });
                List<String> mappedFields = dataSinkRequest.getMappedFields();
                Objects.requireNonNull(mappedFields);
                this.trackableFields = (List) map.filter((v1) -> {
                    return r2.contains(v1);
                }).collect(Collectors.toList());
                this.csvPrinter = new CSVPrinter(sb, CSVFormat.DEFAULT.withHeader((String[]) this.trackableFields.toArray(new String[0])).withQuoteMode(QuoteMode.ALL));
            }
            if (appendRecordToBuffer(readMessage, dataSinkRequest.getAppSyncConfig(), dataSinkRequest.getPrimaryKeys())) {
                newHashMap.putIfAbsent(readMessage.getRecord().getValue(str), Long.valueOf(readMessage.getOffset()));
                j++;
            } else {
                this.skippedRecords.incrementAndGet();
            }
            if (j > 0 && sb.length() > SizeUtils.convertMBToBytes(this.salesforceSinkConfig.getRequestBufferThreshold())) {
                this.csvPrinter.flush();
                this.csvPrinter.close();
                uploadBufferedRecords(j, sFDCRestClient, dataSinkRequest.getAppSyncConfig(), sb, dataSinkRequest.getObjectSchema(), dataSinkRequest.getErrorOutputStream(), str, newHashMap);
                newHashMap.clear();
                this.csvPrinter = null;
                j = 0;
                sb = null;
            }
        }
        if (this.csvPrinter != null) {
            this.csvPrinter.flush();
            this.csvPrinter.close();
        }
        if (j > 0) {
            uploadBufferedRecords(j, sFDCRestClient, dataSinkRequest.getAppSyncConfig(), sb, dataSinkRequest.getObjectSchema(), dataSinkRequest.getErrorOutputStream(), str, newHashMap);
        }
    }

    @Override // io.castled.apps.DataSink
    public AppSyncStats getSyncStats() {
        return new AppSyncStats(this.processedRecords.get(), 0L, this.skippedRecords.get());
    }

    private void computeExistingPrimaryKeysIfReqd(OAuthAppConfig oAuthAppConfig, List<String> list, AppSyncConfig appSyncConfig) throws Exception {
        GenericObjectRadioGroupConfig genericObjectRadioGroupConfig = (GenericObjectRadioGroupConfig) appSyncConfig;
        GenericSyncObject object = genericObjectRadioGroupConfig.getObject();
        if (genericObjectRadioGroupConfig.getMode() == AppSyncMode.UPDATE) {
            String str = list.get(0);
            new SFDCBulkClient(oAuthAppConfig.getOAuthToken(), oAuthAppConfig.getClientConfig()).runBulkQuery(String.format("select %s from %s", str, object.getObjectName()), PkChunking.builder().chunkSize(50000).enabled(true).build(), object.getObjectName(), TimeUtils.minutesToMillis(10L), map -> {
                this.existingPrimaryKeyValues.add(map.get(str));
            });
        }
    }

    private void uploadBufferedRecords(long j, SFDCRestClient sFDCRestClient, AppSyncConfig appSyncConfig, StringBuilder sb, RecordSchema recordSchema, ErrorOutputStream errorOutputStream, String str, Map<Object, Long> map) throws Exception {
        Job createJob = sFDCRestClient.createJob(createJobRequest(appSyncConfig, str));
        sFDCRestClient.uploadCsv(createJob.getId(), sb.toString());
        sFDCRestClient.updateJobState(createJob.getId(), new JobStateUpdateRequest(JobState.UPLOAD_COMPLETE));
        long currentTimeMillis = System.currentTimeMillis();
        ThreadUtils.interruptIgnoredSleep(TimeUtils.secondsToMillis(10L));
        int i = 0;
        while (true) {
            createJob = sFDCRestClient.getJob(createJob.getId());
            if (Lists.newArrayList(JobState.JOB_COMPLETE, JobState.ABORTED, JobState.FAILED).contains(createJob.getState())) {
                processFailedReport(new CharSequenceReader(sFDCRestClient.getFailedReport(createJob.getId())), recordSchema, errorOutputStream, str, map);
                this.processedRecords.addAndGet(j);
                return;
            } else {
                if (System.currentTimeMillis() - currentTimeMillis > TimeUtils.minutesToMillis(this.salesforceSinkConfig.getUploadTimeoutMins())) {
                    throw new TimeoutException();
                }
                i++;
                ThreadUtils.interruptIgnoredSleep(i * TimeUtils.secondsToMillis(5L));
            }
        }
    }

    private void processFailedReport(Reader reader, RecordSchema recordSchema, ErrorOutputStream errorOutputStream, String str, Map<Object, Long> map) throws CastledException {
        try {
            Iterator<CSVRecord> it = new CSVParser(reader, CSVFormat.RFC4180.withHeader(new String[0]).withSkipHeaderRecord()).iterator();
            while (it.hasNext()) {
                CSVRecord next = it.next();
                String str2 = next.get("sf__Error");
                Tuple.Builder builder = Tuple.builder();
                for (FieldSchema fieldSchema : recordSchema.getFieldSchemas()) {
                    if (str.equals(fieldSchema.getName()) && next.isSet(fieldSchema.getName())) {
                        String str3 = next.get(fieldSchema.getName());
                        if (str3 != null) {
                            builder.put(fieldSchema, this.salesforceFailedRecordsSchemaMapper.transformValue(str3, fieldSchema.getSchema()));
                        }
                    }
                }
                Tuple build = builder.build();
                errorOutputStream.writeFailedRecord(new Message(((Long) Optional.ofNullable(map.get(build.getValue(str))).orElse(0L)).longValue(), build), this.salesforceErrorParser.parseSalesforceError(str2));
            }
        } catch (Exception e) {
            log.error("Process failed records failed", (Throwable) e);
            throw new CastledException(e);
        }
    }

    private JobRequest createJobRequest(AppSyncConfig appSyncConfig, String str) {
        GenericObjectRadioGroupConfig genericObjectRadioGroupConfig = (GenericObjectRadioGroupConfig) appSyncConfig;
        GenericSyncObject object = ((GenericObjectRadioGroupConfig) appSyncConfig).getObject();
        switch (genericObjectRadioGroupConfig.getMode()) {
            case UPSERT:
                return new UpsertJobRequest(object.getObjectName(), ContentType.CSV, str);
            case INSERT:
                return new InsertJobRequest(object.getObjectName(), ContentType.CSV);
            case UPDATE:
                return new UpsertJobRequest(object.getObjectName(), ContentType.CSV, str);
            default:
                throw new CastledRuntimeException("Unhandled app sync mode: " + genericObjectRadioGroupConfig.getMode());
        }
    }

    private boolean appendRecordToBuffer(Message message, AppSyncConfig appSyncConfig, List<String> list) throws IOException {
        if (((GenericObjectRadioGroupConfig) appSyncConfig).getMode() == AppSyncMode.UPDATE) {
            if (!this.existingPrimaryKeyValues.contains(message.getRecord().getValue(list.get(0)))) {
                return false;
            }
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (Field field : message.getRecord().getFields()) {
            if (this.trackableFields.contains(field.getName())) {
                newArrayList.add((String) Optional.ofNullable(transformValue(message.getRecord().getValue(field.getName()), field.getSchema())).orElse("#N/A"));
            }
        }
        this.csvPrinter.printRecord(newArrayList);
        return true;
    }

    private String transformValue(Object obj, Schema schema) {
        if (obj == null) {
            return null;
        }
        return SchemaUtils.isZonedTimestamp(schema) ? ((ZonedDateTime) obj).format(DateTimeFormatter.ISO_DATE_TIME) : SchemaUtils.isDateSchema(schema) ? ((LocalDate) obj).format(DateTimeFormatter.ISO_DATE) : SchemaUtils.isTimeSchema(schema) ? ((LocalTime) obj).format(DateTimeFormatter.ISO_TIME) : obj.toString();
    }
}
