package io.castled.apps.connectors.mixpanel;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Singleton;
import io.castled.ObjectRegistry;
import io.castled.apps.connectors.mixpanel.MixpanelObjectFields;
import io.castled.apps.connectors.mixpanel.dto.UserProfileAndError;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.errors.errorclassifications.UnclassifiedError;
import io.castled.commons.models.DataSinkMessage;
import io.castled.commons.models.MessageSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.core.CastledOffsetListQueue;
import io.castled.schema.models.Tuple;
import io.castled.utils.TimeUtils;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.snowflake.client.core.arrow.AbstractArrowVectorConverter;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/castled/apps/connectors/mixpanel/MixpanelUserProfileSink.class */
public class MixpanelUserProfileSink extends MixpanelObjectSink<DataSinkMessage> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MixpanelUserProfileSink.class);
    private final MixpanelRestClient mixpanelRestClient;
    private final ErrorOutputStream errorOutputStream;
    private final MixpanelAppConfig mixpanelAppConfig;
    private final AtomicLong processedRecords = new AtomicLong(0);
    private long lastProcessedOffset = 0;
    private final AtomicLong failedRecords = new AtomicLong(0);
    private final CastledOffsetListQueue<DataSinkMessage> requestsBuffer = new CastledOffsetListQueue<>(new UpsertUserProfileConsumer(), 10, 10, true);
    private final MixpanelErrorParser mixpanelErrorParser = (MixpanelErrorParser) ObjectRegistry.getInstance(MixpanelErrorParser.class);

    /* loaded from: input_file:io/castled/apps/connectors/mixpanel/MixpanelUserProfileSink$UpsertUserProfileConsumer.class */
    private class UpsertUserProfileConsumer implements Consumer<List<DataSinkMessage>> {
        private UpsertUserProfileConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(List<DataSinkMessage> list) {
            if (CollectionUtils.isEmpty(list)) {
                return;
            }
            MixpanelUserProfileSink.this.processBulkUserProfileUpdate(list);
        }
    }

    public MixpanelUserProfileSink(DataSinkRequest dataSinkRequest) {
        this.mixpanelRestClient = new MixpanelRestClient(((MixpanelAppConfig) dataSinkRequest.getExternalApp().getConfig()).getProjectToken(), ((MixpanelAppConfig) dataSinkRequest.getExternalApp().getConfig()).getApiSecret());
        this.errorOutputStream = dataSinkRequest.getErrorOutputStream();
        this.mixpanelAppConfig = (MixpanelAppConfig) dataSinkRequest.getExternalApp().getConfig();
    }

    @Override // io.castled.apps.BufferedObjectSink
    protected void writeRecords(List<DataSinkMessage> list) {
        try {
            this.requestsBuffer.writePayload((List<DataSinkMessage>) Lists.newArrayList(list), 5, TimeUnit.MINUTES);
        } catch (TimeoutException e) {
            log.error("Unable to publish records to records queue", (Throwable) e);
            Iterator<DataSinkMessage> it = list.iterator();
            while (it.hasNext()) {
                this.errorOutputStream.writeFailedRecord(it.next(), new UnclassifiedError("Internal error!! Unable to publish records to records queue. Please contact support"));
            }
        }
    }

    private Object getDistinctID(Tuple tuple) {
        return tuple.getValue(MixpanelObjectFields.USER_PROFILE_FIELDS.DISTINCT_ID.getFieldName());
    }

    private Map<String, Object> constructUserProfileDetails(Tuple tuple) {
        Object value = tuple.getValue(MixpanelObjectFields.USER_PROFILE_FIELDS.DISTINCT_ID.getFieldName());
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("$token", this.mixpanelAppConfig.getProjectToken());
        newHashMap.put("$distinct_id", value);
        newHashMap.put("$set", constructPropertyMap(tuple));
        return newHashMap;
    }

    private Map<String, Object> constructPropertyMap(Tuple tuple) {
        HashMap newHashMap = Maps.newHashMap();
        Map map = (Map) tuple.getFields().stream().filter(field -> {
            return isMixpanelReservedKeyword(field.getName());
        }).collect(Collectors.toMap(field2 -> {
            return "$" + field2.getName();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (!map.isEmpty()) {
            newHashMap.putAll(map);
        }
        Map map2 = (Map) tuple.getFields().stream().filter(field3 -> {
            return !isMixpanelReservedKeyword(field3.getName());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (!map2.isEmpty()) {
            newHashMap.putAll(map2);
        }
        return newHashMap;
    }

    private boolean isMixpanelReservedKeyword(String str) {
        return getReservedKeywords().contains(str);
    }

    private List<String> getReservedKeywords() {
        return Lists.newArrayList("region", AbstractArrowVectorConverter.FIELD_NAME_TIME_ZONE_INDEX, "country_code", "last_seen", "city", "first_name", "last_name", "email");
    }

    @Override // io.castled.apps.connectors.mixpanel.MixpanelObjectSink
    public MessageSyncStats getSyncStats() {
        return new MessageSyncStats(this.processedRecords.get(), this.lastProcessedOffset);
    }

    @Override // io.castled.apps.BufferedObjectSink
    public long getMaxBufferedObjects() {
        return 200L;
    }

    @Override // io.castled.apps.BufferedObjectSink
    public void flushRecords() throws Exception {
        super.flushRecords();
        this.requestsBuffer.flush(TimeUtils.minutesToMillis(10L));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processBulkUserProfileUpdate(List<DataSinkMessage> list) {
        List<UserProfileAndError> upsertUserProfileDetails = this.mixpanelRestClient.upsertUserProfileDetails((List) list.stream().map((v0) -> {
            return v0.getRecord();
        }).map(this::constructUserProfileDetails).collect(Collectors.toList()));
        Map map = (Map) list.stream().filter(dataSinkMessage -> {
            return getDistinctID(dataSinkMessage.getRecord()) != null;
        }).collect(Collectors.toMap(dataSinkMessage2 -> {
            return getDistinctID(dataSinkMessage2.getRecord());
        }, Function.identity()));
        upsertUserProfileDetails.forEach(userProfileAndError -> {
            userProfileAndError.getFailureReasons().forEach(str -> {
                this.errorOutputStream.writeFailedRecord((DataSinkMessage) map.get(userProfileAndError.getDistinctID()), this.mixpanelErrorParser.getPipelineError(str));
            });
        });
        this.processedRecords.addAndGet(list.size());
        this.lastProcessedOffset = Math.max(this.lastProcessedOffset, ((DataSinkMessage) Iterables.getLast(list)).getOffset());
    }
}
