package io.castled.apps.connectors.sendgrid;

import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import io.castled.ObjectRegistry;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.connectors.sendgrid.dtos.ContactAttribute;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.schema.models.Message;
import io.castled.schema.models.Tuple;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.util.backoff.ExponentialBackOff;

/* loaded from: input_file:io/castled/apps/connectors/sendgrid/SendgridContactSink.class */
public class SendgridContactSink extends BufferedObjectSink<Message> {
    private static final long UPSERT_BATCH_NUM_SIZE_MAX = 30000;
    private static final long UPSERT_BATCH_BYTES_SIZE_MAX = 6291456;
    private static final long UPSERT_BATCH_NUM_SIZE_MAX_CHOSEN = 30000;
    private final SendgridRestClient sendgridRestClient;
    private final ErrorOutputStream errorOutputStream;
    private final SendgridAppSyncConfig syncConfig;
    private final SendgridErrorParser errorParser = (SendgridErrorParser) ObjectRegistry.getInstance(SendgridErrorParser.class);
    private final AppSyncStats syncStats = new AppSyncStats(0, 0, 0);

    public SendgridContactSink(SendgridAppConfig sendgridAppConfig, SendgridAppSyncConfig sendgridAppSyncConfig, ErrorOutputStream errorOutputStream) {
        this.sendgridRestClient = new SendgridRestClient(sendgridAppConfig);
        this.errorOutputStream = errorOutputStream;
        this.syncConfig = sendgridAppSyncConfig;
    }

    @Override // io.castled.apps.BufferedObjectSink
    protected void writeRecords(List<Message> list) {
        List<SendgridUpsertError> upsertContacts = this.sendgridRestClient.upsertContacts((List) list.stream().map(message -> {
            return constructContactProperties(message.getRecord());
        }).collect(Collectors.toList()), this.syncConfig.getListIds());
        Map map = (Map) list.stream().collect(Collectors.toMap(message2 -> {
            return getEmail(message2.getRecord());
        }, Function.identity()));
        upsertContacts.forEach(sendgridUpsertError -> {
            this.errorOutputStream.writeFailedRecord((Message) map.get(sendgridUpsertError.getEmail()), this.errorParser.getPipelineError(sendgridUpsertError));
        });
        updateStats(list.size(), ((Message) Iterables.getLast(list)).getOffset());
    }

    @Override // io.castled.apps.BufferedObjectSink
    public long getMaxBufferedObjects() {
        return ExponentialBackOff.DEFAULT_MAX_INTERVAL;
    }

    private String getEmail(Tuple tuple) {
        return (String) tuple.getValue("email");
    }

    private Map<String, Object> constructContactProperties(Tuple tuple) {
        HashMap newHashMap = Maps.newHashMap();
        tuple.getFields().stream().filter(field -> {
            return !((Boolean) field.getParams().get("custom")).booleanValue();
        }).forEach(field2 -> {
            newHashMap.put(field2.getName(), SendgridRequestFormatterUtils.formatValue(field2.getValue(), field2.getSchema()));
        });
        HashMap newHashMap2 = Maps.newHashMap();
        tuple.getFields().stream().filter(field3 -> {
            return ((Boolean) field3.getParams().get("custom")).booleanValue();
        }).forEach(field4 -> {
            newHashMap2.put(field4.getName(), SendgridRequestFormatterUtils.formatValue(field4.getValue(), field4.getSchema()));
        });
        newHashMap.put(ContactAttribute.CUSTOM_FIELDS, newHashMap2);
        return newHashMap;
    }

    private void updateStats(long j, long j2) {
        this.syncStats.setRecordsProcessed(this.syncStats.getRecordsProcessed() + j);
        this.syncStats.setOffset(Math.max(this.syncStats.getOffset(), j2));
    }

    public AppSyncStats getSyncStats() {
        return this.syncStats;
    }
}
