package io.castled.apps.connectors.hubspot.objectsinks;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.castled.ObjectRegistry;
import io.castled.apps.BufferedObjectSink;
import io.castled.apps.OAuthAppConfig;
import io.castled.apps.connectors.hubspot.HubspotErrorParser;
import io.castled.apps.connectors.hubspot.HubspotSyncObject;
import io.castled.apps.connectors.hubspot.client.HubspotRestClient;
import io.castled.apps.connectors.hubspot.client.dtos.BatchUpdateRequest;
import io.castled.apps.connectors.hubspot.client.dtos.ObjectUpdateRequest;
import io.castled.apps.connectors.hubspot.client.exception.BatchObjectException;
import io.castled.apps.connectors.hubspot.schemaMappers.HubspotApiSchemaMapper;
import io.castled.commons.errors.CastledError;
import io.castled.commons.errors.errorclassifications.UnclassifiedError;
import io.castled.commons.models.MessageSyncStats;
import io.castled.commons.models.ObjectIdAndMessage;
import io.castled.commons.streams.ErrorOutputStream;
import io.castled.core.CastledOffsetListQueue;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.IncompatibleValueException;
import io.castled.schema.SchemaMapper;
import io.castled.schema.models.Field;
import io.castled.schema.models.Tuple;
import io.castled.utils.TimeUtils;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/castled/apps/connectors/hubspot/objectsinks/HubspotObjectSink.class */
public class HubspotObjectSink extends BufferedObjectSink<ObjectIdAndMessage> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HubspotObjectSink.class);
    private final CastledOffsetListQueue<ObjectIdAndMessage> requestsBuffer = new CastledOffsetListQueue<>(new ObjectUpdateConsumer(), 5, 500, true);
    private final AtomicLong processedRecords = new AtomicLong(0);
    private final HubspotRestClient hubspotRestClient;
    private final ErrorOutputStream errorOutputStream;
    private final HubspotSyncObject hubspotObject;

    /* loaded from: input_file:io/castled/apps/connectors/hubspot/objectsinks/HubspotObjectSink$ObjectUpdateConsumer.class */
    private class ObjectUpdateConsumer implements Consumer<List<ObjectIdAndMessage>> {
        private ObjectUpdateConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(List<ObjectIdAndMessage> list) {
            updateRecords((List) list.stream().filter(objectIdAndMessage -> {
                return objectIdAndMessage.getId() == null;
            }).collect(Collectors.toList()), true);
            updateRecords((List) list.stream().filter(objectIdAndMessage2 -> {
                return objectIdAndMessage2.getId() != null;
            }).collect(Collectors.toList()), false);
            HubspotObjectSink.this.processedRecords.addAndGet(list.size());
        }

        private void updateRecords(List<ObjectIdAndMessage> list, boolean z) {
            if (CollectionUtils.isEmpty(list)) {
                return;
            }
            try {
                HubspotObjectSink.this.hubspotRestClient.updateObjects(HubspotObjectSink.this.hubspotObject.getTypeId(), new BatchUpdateRequest((List) list.stream().map(objectIdAndMessage -> {
                    return new ObjectUpdateRequest(objectIdAndMessage.getId(), HubspotObjectSink.this.createObjectProperties(objectIdAndMessage.getMessage().getRecord()));
                }).collect(Collectors.toList())), z);
            } catch (BatchObjectException e) {
                if (e.getBatchObjectError() == null || e.getBatchObjectError().getCategory() == null) {
                    HubspotObjectSink.log.error("Hubspot records update failed", (Throwable) e);
                }
                CastledError parseError = ((HubspotErrorParser) ObjectRegistry.getInstance(HubspotErrorParser.class)).parseError(e.getBatchObjectError());
                Iterator<ObjectIdAndMessage> it = list.iterator();
                while (it.hasNext()) {
                    HubspotObjectSink.this.errorOutputStream.writeFailedRecord(it.next().getMessage(), parseError);
                }
            }
        }
    }

    public HubspotObjectSink(OAuthAppConfig oAuthAppConfig, ErrorOutputStream errorOutputStream, HubspotSyncObject hubspotSyncObject) {
        this.hubspotObject = hubspotSyncObject;
        this.hubspotRestClient = new HubspotRestClient(oAuthAppConfig.getOAuthToken(), oAuthAppConfig.getClientConfig());
        this.errorOutputStream = errorOutputStream;
    }

    protected Map<String, Object> createObjectProperties(Tuple tuple) {
        SchemaMapper schemaMapper = (SchemaMapper) ObjectRegistry.getInstance(HubspotApiSchemaMapper.class);
        try {
            HashMap newHashMap = Maps.newHashMap();
            for (Field field : tuple.getFields()) {
                if (tuple.getValue(field.getName()) != null) {
                    newHashMap.put(field.getName(), schemaMapper.transformValue(tuple.getValue(field.getName()), field.getSchema()));
                } else {
                    newHashMap.put(field.getName(), "");
                }
            }
            return newHashMap;
        } catch (IncompatibleValueException e) {
            throw new CastledRuntimeException(e);
        }
    }

    @Override // io.castled.apps.BufferedObjectSink
    protected void writeRecords(List<ObjectIdAndMessage> list) {
        try {
            this.requestsBuffer.writePayload((List<ObjectIdAndMessage>) Lists.newArrayList(list), 5, TimeUnit.MINUTES);
        } catch (TimeoutException e) {
            log.error("Unable to publish records to records queue", (Throwable) e);
            Iterator<ObjectIdAndMessage> it = list.iterator();
            while (it.hasNext()) {
                this.errorOutputStream.writeFailedRecord(it.next().getMessage(), new UnclassifiedError("Internal error!! Unable to publish records to records queue. Please contact support"));
            }
        }
    }

    public MessageSyncStats getSyncStats() {
        return new MessageSyncStats(this.processedRecords.get(), this.requestsBuffer.getProcessedOffset());
    }

    @Override // io.castled.apps.BufferedObjectSink
    public void flushRecords() throws Exception {
        super.flushRecords();
        this.requestsBuffer.flush(TimeUtils.minutesToMillis(10L));
    }

    @Override // io.castled.apps.BufferedObjectSink
    public long getMaxBufferedObjects() {
        return 10L;
    }
}
