package io.castled.apps.connectors.mixpanel;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Singleton;
import io.castled.ObjectRegistry;
import io.castled.apps.connectors.customerio.client.CustomerIORestClient;
import io.castled.apps.connectors.mixpanel.MixpanelObjectFields;
import io.castled.apps.connectors.mixpanel.dto.EventAndError;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.errors.errorclassifications.UnclassifiedError;
import io.castled.commons.models.MessageSyncStats;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.core.CastledOffsetListQueue;
import io.castled.schema.models.Message;
import io.castled.schema.models.Tuple;
import io.castled.utils.TimeUtils;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.backoff.ExponentialBackOff;

@Singleton
/* loaded from: input_file:io/castled/apps/connectors/mixpanel/MixpanelEventSink.class */
public class MixpanelEventSink extends MixpanelObjectSink<Message> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MixpanelEventSink.class);
    private final MixpanelRestClient mixpanelRestClient;
    private final ErrorOutputStream errorOutputStream;
    private final MixpanelAppSyncConfig mixpanelAppSyncConfig;
    private final AtomicLong processedRecords = new AtomicLong(0);
    private final AtomicLong failedRecords = new AtomicLong(0);
    private final CastledOffsetListQueue<Message> requestsBuffer = new CastledOffsetListQueue<>(new CreateEventConsumer(), 10, 10, true);
    private long lastProcessedOffset = 0;
    private final MixpanelErrorParser mixpanelErrorParser = (MixpanelErrorParser) ObjectRegistry.getInstance(MixpanelErrorParser.class);

    /* loaded from: input_file:io/castled/apps/connectors/mixpanel/MixpanelEventSink$CreateEventConsumer.class */
    private class CreateEventConsumer implements Consumer<List<Message>> {
        private CreateEventConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(List<Message> list) {
            if (CollectionUtils.isEmpty(list)) {
                return;
            }
            MixpanelEventSink.this.processBulkEventCreation(list);
        }
    }

    public MixpanelEventSink(DataSinkRequest dataSinkRequest) {
        this.mixpanelRestClient = new MixpanelRestClient(((MixpanelAppConfig) dataSinkRequest.getExternalApp().getConfig()).getProjectToken(), ((MixpanelAppConfig) dataSinkRequest.getExternalApp().getConfig()).getApiSecret());
        this.mixpanelAppSyncConfig = (MixpanelAppSyncConfig) dataSinkRequest.getAppSyncConfig();
        this.errorOutputStream = dataSinkRequest.getErrorOutputStream();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processBulkEventCreation(List<Message> list) {
        List<EventAndError> insertEventDetails = this.mixpanelRestClient.insertEventDetails((List) list.stream().map((v0) -> {
            return v0.getRecord();
        }).map(this::constructEventDetails).collect(Collectors.toList()));
        Map map = (Map) list.stream().filter(message -> {
            return getEventID(message.getRecord()) != null;
        }).collect(Collectors.toMap(message2 -> {
            return getEventID(message2.getRecord());
        }, Function.identity()));
        insertEventDetails.forEach(eventAndError -> {
            eventAndError.getFailureReasons().forEach(str -> {
                this.errorOutputStream.writeFailedRecord((Message) map.get(eventAndError.getInsertId()), this.mixpanelErrorParser.getPipelineError(str));
            });
        });
        this.processedRecords.addAndGet(list.size());
        this.lastProcessedOffset = Math.max(this.lastProcessedOffset, ((Message) Iterables.getLast(list)).getOffset());
    }

    @Override // io.castled.apps.BufferedObjectSink
    protected void writeRecords(List<Message> list) {
        try {
            this.requestsBuffer.writePayload((List<Message>) Lists.newArrayList(list), 5, TimeUnit.MINUTES);
        } catch (TimeoutException e) {
            log.error("Unable to publish records to records queue", (Throwable) e);
            Iterator<Message> it = list.iterator();
            while (it.hasNext()) {
                this.errorOutputStream.writeFailedRecord(it.next(), new UnclassifiedError("Internal error!! Unable to publish records to records queue. Please contact support"));
            }
        }
    }

    private String getEventID(Tuple tuple) {
        return (String) tuple.getValue(MixpanelObjectFields.EVENT_FIELDS.INSERT_ID.getFieldName());
    }

    private Map<String, Object> constructEventDetails(Tuple tuple) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(CustomerIORestClient.NORMAL_EVENT, this.mixpanelAppSyncConfig.getEventName());
        HashMap newHashMap2 = Maps.newHashMap();
        newHashMap2.put("$" + MixpanelObjectFields.EVENT_FIELDS.INSERT_ID.getFieldName(), tuple.getValue(MixpanelObjectFields.EVENT_FIELDS.INSERT_ID.getFieldName()));
        newHashMap2.put(MixpanelObjectFields.EVENT_FIELDS.DISTINCT_ID.getFieldName(), Optional.ofNullable(tuple.getValue(MixpanelObjectFields.EVENT_FIELDS.DISTINCT_ID.getFieldName())).orElse(""));
        newHashMap2.put(MixpanelObjectFields.EVENT_FIELDS.EVENT_TIMESTAMP.getFieldName(), convertTimeStampToEpoch(tuple.getValue(MixpanelObjectFields.EVENT_FIELDS.EVENT_TIMESTAMP.getFieldName())));
        newHashMap2.put(MixpanelObjectFields.EVENT_FIELDS.GEO_IP.getFieldName(), tuple.getValue(MixpanelObjectFields.EVENT_FIELDS.GEO_IP.getFieldName()));
        newHashMap2.putAll((Map) tuple.getFields().stream().filter(field -> {
            return !isMixpanelReservedKeyword(field.getName());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, field2 -> {
            return transformFieldValue(field2.getValue());
        })));
        newHashMap.put("properties", newHashMap2);
        return newHashMap;
    }

    private String transformFieldValue(Object obj) {
        return ((obj instanceof Integer) || (obj instanceof Long)) ? String.valueOf(obj) : obj instanceof String ? (String) obj : obj instanceof LocalDate ? ((LocalDate) obj).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) : obj instanceof LocalDateTime ? ((LocalDateTime) obj).format(DateTimeFormatter.ofPattern("yyyy-MM-ddTHH:mm:ssZ")) : obj instanceof ZonedDateTime ? ((ZonedDateTime) obj).format(DateTimeFormatter.ofPattern("yyyy-MM-ddTHH:mm:ssZ")) : obj instanceof Boolean ? Boolean.toString(((Boolean) obj).booleanValue()) : "";
    }

    /* JADX WARN: Type inference failed for: r0v12, types: [java.time.ZonedDateTime] */
    private Long convertTimeStampToEpoch(Object obj) {
        if (obj instanceof LocalDateTime) {
            return Long.valueOf(((LocalDateTime) obj).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
        }
        if (obj instanceof ZonedDateTime) {
            return Long.valueOf(((ZonedDateTime) obj).toInstant().toEpochMilli());
        }
        return null;
    }

    private boolean isMixpanelReservedKeyword(String str) {
        return getReservedKeywords().contains(str);
    }

    private List<String> getReservedKeywords() {
        return Lists.newArrayList(CustomerIORestClient.NORMAL_EVENT, "time", "distinct_id", "insert_id", "ip");
    }

    @Override // io.castled.apps.BufferedObjectSink
    public void flushRecords() throws Exception {
        super.flushRecords();
        this.requestsBuffer.flush(TimeUtils.minutesToMillis(10L));
    }

    @Override // io.castled.apps.connectors.mixpanel.MixpanelObjectSink
    public MessageSyncStats getSyncStats() {
        return new MessageSyncStats(this.processedRecords.get(), this.lastProcessedOffset);
    }

    @Override // io.castled.apps.BufferedObjectSink
    public long getMaxBufferedObjects() {
        return ExponentialBackOff.DEFAULT_INITIAL_INTERVAL;
    }
}
