package io.castled.apps.connectors.Iterable;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.castled.ObjectRegistry;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.connectors.Iterable.client.IterableRestClient;
import io.castled.apps.connectors.Iterable.client.IterableSyncErrors;
import io.castled.apps.connectors.Iterable.client.dtos.CatalogItemField;
import io.castled.apps.connectors.Iterable.client.dtos.EventField;
import io.castled.apps.connectors.Iterable.client.dtos.FieldConsts;
import io.castled.apps.connectors.Iterable.client.dtos.UserPrimaryKey;
import io.castled.commons.models.AppSyncStats;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.models.Field;
import io.castled.schema.models.Tuple;
import io.castled.utils.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/* loaded from: input_file:io/castled/apps/connectors/Iterable/IterableBufferedObjectSink.class */
public class IterableBufferedObjectSink extends BufferedObjectSink<DataSinkMessage> {
    private static long BATCH_SIZE = 1000;
    private final IterableSyncConfig iterableSyncConfig;
    private final IterableRestClient iterableRestClient;
    private final AppSyncStats appSyncStats = new AppSyncStats();
    private final ErrorOutputStream errorOutputStream;

    /* JADX INFO: Access modifiers changed from: package-private */
    public IterableBufferedObjectSink(IterableAppConfig iterableAppConfig, IterableSyncConfig iterableSyncConfig, ErrorOutputStream errorOutputStream) {
        this.iterableSyncConfig = iterableSyncConfig;
        this.iterableRestClient = new IterableRestClient(iterableAppConfig);
        this.errorOutputStream = errorOutputStream;
    }

    @Override // io.castled.apps.BufferedObjectSink
    protected void writeRecords(List<DataSinkMessage> list) {
        switch (IterableSchemaUtils.getIterableObject(this.iterableSyncConfig.getObject())) {
            case USERS:
                processErrors(this.iterableRestClient.bulkUserUpdate(getUserData(list)), list);
                break;
            case EVENTS:
                this.iterableRestClient.bulkEventUpdate(getEventData(list));
                break;
            case CATALOGS:
                this.iterableRestClient.bulkCatalogItemsUpdate(this.iterableSyncConfig.getCatalogName(), getCatalogData(list));
                break;
            default:
                throw new CastledRuntimeException("Not implemented!");
        }
        updateStats(list.size(), ((DataSinkMessage) Iterables.getLast(list)).getOffset());
    }

    public void processErrors(IterableSyncErrors iterableSyncErrors, List<DataSinkMessage> list) {
        IterableErrorParser iterableErrorParser = (IterableErrorParser) ObjectRegistry.getInstance(IterableErrorParser.class);
        if (iterableSyncErrors.getFailedEmails().size() > 0) {
            Map map = (Map) list.stream().collect(Collectors.toMap(dataSinkMessage -> {
                return (String) dataSinkMessage.getRecord().getValue(UserPrimaryKey.EMAIL.getName());
            }, Function.identity()));
            iterableSyncErrors.getFailedEmails().stream().forEach(str -> {
                this.errorOutputStream.writeFailedRecord((DataSinkMessage) map.get(str), iterableErrorParser.getPipelineError(UserPrimaryKey.EMAIL.getName()));
            });
        }
        if (iterableSyncErrors.getFailedUserIds().size() > 0) {
            Map map2 = (Map) list.stream().collect(Collectors.toMap(dataSinkMessage2 -> {
                return (String) dataSinkMessage2.getRecord().getValue(UserPrimaryKey.USER_ID.getName());
            }, Function.identity()));
            iterableSyncErrors.getFailedEmails().stream().forEach(str2 -> {
                this.errorOutputStream.writeFailedRecord((DataSinkMessage) map2.get(str2), iterableErrorParser.getPipelineError(UserPrimaryKey.USER_ID.getName()));
            });
        }
    }

    @Override // io.castled.apps.BufferedObjectSink
    public long getMaxBufferedObjects() {
        return BATCH_SIZE;
    }

    List<Map<String, Object>> getUserData(List<DataSinkMessage> list) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<DataSinkMessage> it = list.iterator();
        while (it.hasNext()) {
            Tuple record = it.next().getRecord();
            HashMap newHashMap = Maps.newHashMap();
            HashMap newHashMap2 = Maps.newHashMap();
            for (Field field : record.getFields()) {
                if (IterableSchemaUtils.isUserPrimaryKey(field.getName())) {
                    newHashMap.put(field.getName(), field.getValue());
                } else {
                    newHashMap2.put(field.getName(), field.getValue());
                }
            }
            if (!newHashMap2.isEmpty()) {
                newHashMap.put(FieldConsts.DATA_FIELDS, newHashMap2);
            }
            newHashMap.put(FieldConsts.MERGE_NESTED_OBJECTS, true);
            newHashMap.put(FieldConsts.PREFER_USER_ID, true);
            newArrayList.add(newHashMap);
        }
        return newArrayList;
    }

    Map<String, Map<String, Object>> getCatalogData(List<DataSinkMessage> list) {
        HashMap newHashMap = Maps.newHashMap();
        for (DataSinkMessage dataSinkMessage : list) {
            Tuple record = dataSinkMessage.getRecord();
            HashMap newHashMap2 = Maps.newHashMap();
            for (Field field : record.getFields()) {
                if (!IterableSchemaUtils.isCatalogPrimaryKey(field.getName())) {
                    newHashMap2.put(field.getName(), field.getValue());
                }
            }
            String str = (String) dataSinkMessage.getRecord().getValue(CatalogItemField.ITEM_ID.getName());
            IterableValidationUtils.validateValue(str, CatalogItemField.ITEM_ID.getName());
            newHashMap.put(str, newHashMap2);
        }
        return newHashMap;
    }

    List<Map<String, Object>> getEventData(List<DataSinkMessage> list) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<DataSinkMessage> it = list.iterator();
        while (it.hasNext()) {
            Tuple record = it.next().getRecord();
            HashMap newHashMap = Maps.newHashMap();
            HashMap newHashMap2 = Maps.newHashMap();
            for (Field field : record.getFields()) {
                if (IterableSchemaUtils.isCustomEventField(field.getName())) {
                    newHashMap2.put(field.getName(), field.getValue());
                } else {
                    newHashMap.put(field.getName(), field.getValue());
                }
            }
            if (!newHashMap2.isEmpty()) {
                newHashMap.put(FieldConsts.DATA_FIELDS, newHashMap2);
            }
            if (!newHashMap.containsKey(EventField.CAMPAIGN_ID.getName()) && !StringUtils.isEmpty(this.iterableSyncConfig.getCampaignId())) {
                newHashMap.put(EventField.CAMPAIGN_ID.getName(), Integer.valueOf(Integer.parseInt(this.iterableSyncConfig.getCampaignId())));
            }
            if (!newHashMap.containsKey(EventField.TEMPLATE_ID.getName()) && !StringUtils.isEmpty(this.iterableSyncConfig.getTemplateId())) {
                newHashMap.put(EventField.TEMPLATE_ID.getName(), Integer.valueOf(Integer.parseInt(this.iterableSyncConfig.getTemplateId())));
            }
            newArrayList.add(newHashMap);
        }
        return newArrayList;
    }

    private void updateStats(long j, long j2) {
        this.appSyncStats.setRecordsProcessed(this.appSyncStats.getRecordsProcessed() + j);
        this.appSyncStats.setOffset(Math.max(this.appSyncStats.getOffset(), j2));
    }

    public AppSyncStats getSyncStats() {
        return this.appSyncStats;
    }
}
