package io.castled.apps.connectors.googlepubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.castled.apps.DataSink;
import io.castled.apps.models.DataSinkRequest;
import io.castled.commons.models.AppSyncStats;
import io.castled.schema.models.Message;
import io.castled.utils.MessageUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

/* loaded from: input_file:io/castled/apps/connectors/googlepubsub/GooglePubSubDataSink.class */
public class GooglePubSubDataSink implements DataSink {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GooglePubSubDataSink.class);
    public static final long REQUEST_BYTES_THRESHOLD = 10485760;
    public static final long MESSAGE_COUNT_BATCH_SIZE = 1000;
    public static final int PUBLISH_DELAY_THRESHOLD = 1000;
    private static final long FLUSH_BATCH_SIZE = 10000;
    private final AtomicLong recordsProcessed = new AtomicLong(0);
    private final Set<Long> pendingMessageIds = Sets.newConcurrentHashSet();
    private long lastBufferedOffset = 0;
    private volatile Exception exception;

    /* loaded from: input_file:io/castled/apps/connectors/googlepubsub/GooglePubSubDataSink$DataSinkCallback.class */
    public class DataSinkCallback implements ApiFutureCallback<String> {
        private final long messageOffset;

        public DataSinkCallback(long j) {
            this.messageOffset = j;
        }

        @Override // com.google.api.core.ApiFutureCallback
        public void onFailure(Throwable th) {
            GooglePubSubDataSink.this.exception = (Exception) th;
        }

        @Override // com.google.api.core.ApiFutureCallback
        public void onSuccess(String str) {
            GooglePubSubDataSink.this.recordsProcessed.incrementAndGet();
            GooglePubSubDataSink.this.pendingMessageIds.remove(Long.valueOf(this.messageOffset));
        }
    }

    @Override // io.castled.apps.DataSink
    public void syncRecords(DataSinkRequest dataSinkRequest) throws Exception {
        GooglePubSubAppConfig googlePubSubAppConfig = (GooglePubSubAppConfig) dataSinkRequest.getExternalApp().getConfig();
        TopicName of = TopicName.of(googlePubSubAppConfig.getProjectID(), ((GooglePubSubAppSyncConfig) dataSinkRequest.getAppSyncConfig()).getObject().getTopicId());
        Publisher publisher = null;
        ArrayList arrayList = new ArrayList();
        try {
            publisher = Publisher.newBuilder(of).setBatchingSettings(BatchingSettings.newBuilder().setElementCountThreshold(1000L).setRequestByteThreshold(10485760L).setDelayThreshold(Duration.ofMillis(1000L)).build()).setCredentialsProvider(new GooglePubSubCredentialsProvider(googlePubSubAppConfig.getServiceAccountDetails())).build();
            while (true) {
                Message readMessage = dataSinkRequest.getMessageInputStream().readMessage();
                if (readMessage == null) {
                    break;
                }
                arrayList.add(publishMessage(publisher, readMessage));
                if (arrayList.size() == 10000) {
                    publishOutstanding(publisher, arrayList);
                    arrayList.clear();
                }
            }
            publishOutstanding(publisher, arrayList);
            if (publisher != null) {
                publisher.shutdown();
                publisher.awaitTermination(1L, TimeUnit.MINUTES);
            }
        } catch (Throwable th) {
            if (publisher != null) {
                publisher.shutdown();
                publisher.awaitTermination(1L, TimeUnit.MINUTES);
            }
            throw th;
        }
    }

    private void publishOutstanding(Publisher publisher, List<ApiFuture<String>> list) throws Exception {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        publisher.publishAllOutstanding();
        ApiFutures.allAsList(list).get();
        validateAndThrow();
    }

    private ApiFuture<String> publishMessage(Publisher publisher, Message message) throws Exception {
        this.pendingMessageIds.add(Long.valueOf(message.getOffset()));
        ApiFuture<String> publish = publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFrom(MessageUtils.messageToBytes(message))).build());
        this.lastBufferedOffset = message.getOffset();
        ApiFutures.addCallback(publish, new DataSinkCallback(message.getOffset()), MoreExecutors.directExecutor());
        validateAndThrow();
        return publish;
    }

    @Override // io.castled.apps.DataSink
    public AppSyncStats getSyncStats() {
        return new AppSyncStats(this.recordsProcessed.get(), getProcessedOffset(), 0L);
    }

    public long getProcessedOffset() {
        try {
            return ((Long) Collections.min(this.pendingMessageIds)).longValue() - 1;
        } catch (NoSuchElementException e) {
            return this.lastBufferedOffset;
        }
    }

    private void validateAndThrow() throws Exception {
        if (this.exception != null) {
            throw this.exception;
        }
    }
}
