package io.debezium.connector.vitess.connection;

import binlogdata.Binlogdata;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.VitessConnector;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.util.Strings;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.vitess.client.Proto;
import io.vitess.client.grpc.StaticAuthCredentials;
import io.vitess.proto.Topodata;
import io.vitess.proto.Vtgate;
import io.vitess.proto.grpc.VitessGrpc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/vitess/connection/VitessReplicationConnection.class */
public class VitessReplicationConnection implements ReplicationConnection {
    private static final Logger LOGGER = LoggerFactory.getLogger(VitessReplicationConnection.class);
    private final MessageDecoder messageDecoder;
    private final VitessConnectorConfig config;
    private final AtomicReference<ManagedChannel> managedChannel = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.debezium.connector.vitess.connection.VitessReplicationConnection$2, reason: invalid class name */
    /* loaded from: input_file:io/debezium/connector/vitess/connection/VitessReplicationConnection$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$binlogdata$Binlogdata$VEventType;
        static final /* synthetic */ int[] $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode;

        static {
            try {
                $SwitchMap$io$debezium$connector$vitess$connection$VitessTabletType[VitessTabletType.MASTER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$debezium$connector$vitess$connection$VitessTabletType[VitessTabletType.REPLICA.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$debezium$connector$vitess$connection$VitessTabletType[VitessTabletType.RDONLY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode = new int[CommonConnectorConfig.EventProcessingFailureHandlingMode.values().length];
            try {
                $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[CommonConnectorConfig.EventProcessingFailureHandlingMode.FAIL.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[CommonConnectorConfig.EventProcessingFailureHandlingMode.WARN.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[CommonConnectorConfig.EventProcessingFailureHandlingMode.SKIP.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[CommonConnectorConfig.EventProcessingFailureHandlingMode.IGNORE.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
            $SwitchMap$binlogdata$Binlogdata$VEventType = new int[Binlogdata.VEventType.values().length];
            try {
                $SwitchMap$binlogdata$Binlogdata$VEventType[Binlogdata.VEventType.ROW.ordinal()] = 1;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$binlogdata$Binlogdata$VEventType[Binlogdata.VEventType.VGTID.ordinal()] = 2;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$binlogdata$Binlogdata$VEventType[Binlogdata.VEventType.BEGIN.ordinal()] = 3;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$binlogdata$Binlogdata$VEventType[Binlogdata.VEventType.COMMIT.ordinal()] = 4;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$binlogdata$Binlogdata$VEventType[Binlogdata.VEventType.DDL.ordinal()] = 5;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$binlogdata$Binlogdata$VEventType[Binlogdata.VEventType.OTHER.ordinal()] = 6;
            } catch (NoSuchFieldError e13) {
            }
        }
    }

    public VitessReplicationConnection(VitessConnectorConfig vitessConnectorConfig, VitessDatabaseSchema vitessDatabaseSchema) {
        this.messageDecoder = new VStreamOutputMessageDecoder(vitessDatabaseSchema);
        this.config = vitessConnectorConfig;
    }

    public Vtgate.ExecuteResponse execute(String str) {
        LOGGER.info("Executing sqlStament {}", str);
        ManagedChannel newChannel = newChannel(this.config.getVtgateHost(), this.config.getVtgatePort(), this.config.getGrpcMaxInboundMessageSize());
        this.managedChannel.compareAndSet(null, newChannel);
        return newBlockingStub(newChannel).execute(Vtgate.ExecuteRequest.newBuilder().setQuery(Proto.bindQuery(str, Collections.emptyMap())).build());
    }

    @Override // io.debezium.connector.vitess.connection.ReplicationConnection
    public void startStreaming(Vgtid vgtid, final ReplicationMessageProcessor replicationMessageProcessor, final AtomicReference<Throwable> atomicReference) {
        Objects.requireNonNull(vgtid);
        ManagedChannel newChannel = newChannel(this.config.getVtgateHost(), this.config.getVtgatePort(), this.config.getGrpcMaxInboundMessageSize());
        this.managedChannel.compareAndSet(null, newChannel);
        AbstractStub newStub = newStub(newChannel);
        Map<String, String> grpcHeaders = this.config.getGrpcHeaders();
        if (!grpcHeaders.isEmpty()) {
            LOGGER.info("Setting VStream gRPC headers: {}", grpcHeaders);
            Metadata metadata = new Metadata();
            for (Map.Entry<String, String> entry : grpcHeaders.entrySet()) {
                metadata.put(Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER), entry.getValue());
            }
            newStub = (VitessGrpc.VitessStub) MetadataUtils.attachHeaders(newStub, metadata);
        }
        StreamObserver<Vtgate.VStreamResponse> streamObserver = new StreamObserver<Vtgate.VStreamResponse>() { // from class: io.debezium.connector.vitess.connection.VitessReplicationConnection.1
            private List<Binlogdata.VEvent> bufferedEvents = new ArrayList();
            private Vgtid newVgtid;
            private boolean beginEventSeen;
            private boolean commitEventSeen;
            private int numOfRowEvents;
            private int numResponses;

            public void onNext(Vtgate.VStreamResponse vStreamResponse) {
                VitessReplicationConnection.LOGGER.debug("Received {} VEvents in the VStreamResponse:", Integer.valueOf(vStreamResponse.getEventsCount()));
                boolean z = false;
                for (Binlogdata.VEvent vEvent : vStreamResponse.getEventsList()) {
                    VitessReplicationConnection.LOGGER.debug("VEvent: {}", vEvent);
                    switch (AnonymousClass2.$SwitchMap$binlogdata$Binlogdata$VEventType[vEvent.getType().ordinal()]) {
                        case 1:
                            this.numOfRowEvents++;
                            break;
                        case 2:
                            if (this.newVgtid != null) {
                                if (((Boolean) this.newVgtid.getRawVgtid().getShardGtidsList().stream().findFirst().map(shardGtid -> {
                                    return Boolean.valueOf(shardGtid.getTablePKsCount() == 0);
                                }).orElse(false)).booleanValue() && ((Boolean) vEvent.getVgtid().getShardGtidsList().stream().findFirst().map(shardGtid2 -> {
                                    return Boolean.valueOf(0 < shardGtid2.getTablePKsCount());
                                }).orElse(false)).booleanValue()) {
                                    VitessReplicationConnection.LOGGER.info("Received more than one VGTID events during a copy operation and the previous one is {}. Using the latest: {}", this.newVgtid.toString(), vEvent.getVgtid().toString());
                                } else {
                                    VitessReplicationConnection.LOGGER.warn("Received more than one VGTID events and the previous one is {}. Using the latest: {}", this.newVgtid.toString(), vEvent.getVgtid().toString());
                                }
                            }
                            this.newVgtid = Vgtid.of(vEvent.getVgtid());
                            break;
                        case 3:
                            if (this.commitEventSeen) {
                                setError("Received BEGIN event after receiving COMMIT event");
                                return;
                            }
                            if (this.beginEventSeen) {
                                String str = (String) this.bufferedEvents.stream().map((v0) -> {
                                    return v0.getType();
                                }).map((v0) -> {
                                    return Objects.toString(v0);
                                }).collect(Collectors.joining(", "));
                                if (!str.equals("BEGIN, FIELD") && !str.equals("BEGIN, FIELD, VGTID")) {
                                    setError("Received duplicate BEGIN events");
                                    return;
                                } else {
                                    VitessReplicationConnection.LOGGER.info("Received duplicate BEGIN events" + String.format(" during a copy operation. No harm to skip the buffered events. Buffered event types: %s", str));
                                    reset();
                                }
                            }
                            this.beginEventSeen = true;
                            break;
                        case 4:
                            if (!this.beginEventSeen) {
                                setError("Received COMMIT event before receiving BEGIN event");
                                return;
                            } else if (this.commitEventSeen) {
                                setError("Received duplicate COMMIT events");
                                return;
                            } else {
                                this.commitEventSeen = true;
                                break;
                            }
                        case 5:
                        case 6:
                            z = true;
                            break;
                    }
                    this.bufferedEvents.add(vEvent);
                }
                this.numResponses++;
                if ((!this.beginEventSeen || !this.commitEventSeen) && !z) {
                    VitessReplicationConnection.LOGGER.info("Received partial transaction: number of responses so far is {}", Integer.valueOf(this.numResponses));
                    return;
                }
                if (this.numResponses > 1) {
                    VitessReplicationConnection.LOGGER.info("Processing multi-response transaction: number of responses is {}", Integer.valueOf(this.numResponses));
                }
                if (this.newVgtid == null) {
                    VitessReplicationConnection.LOGGER.warn("Skipping because no vgtid is found in buffered event types: {}", this.bufferedEvents.stream().map((v0) -> {
                        return v0.getType();
                    }).map((v0) -> {
                        return Objects.toString(v0);
                    }).collect(Collectors.joining(", ")));
                    reset();
                    return;
                }
                int i = 0;
                for (int i2 = 0; i2 < this.bufferedEvents.size(); i2++) {
                    try {
                        try {
                            if (this.bufferedEvents.get(i2).getType() == Binlogdata.VEventType.ROW) {
                                i++;
                            }
                            VitessReplicationConnection.this.messageDecoder.processMessage(this.bufferedEvents.get(i2), replicationMessageProcessor, this.newVgtid, (this.newVgtid == null || this.numOfRowEvents == 0 || i != this.numOfRowEvents) ? false : true);
                        } catch (InterruptedException e) {
                            VitessReplicationConnection.LOGGER.error("Message processing is interrupted", e);
                            atomicReference.compareAndSet(null, e);
                            Thread.currentThread().interrupt();
                            reset();
                            return;
                        }
                    } finally {
                        reset();
                    }
                }
            }

            public void onError(Throwable th) {
                Status fromThrowable = Status.fromThrowable(th);
                if (!fromThrowable.getCode().equals(Status.Code.RESOURCE_EXHAUSTED) || !fromThrowable.getDescription().matches("gRPC message exceeds maximum size.*")) {
                    VitessReplicationConnection.LOGGER.error("VStream streaming onError. Status: {}", fromThrowable, th);
                    atomicReference.compareAndSet(null, th);
                    reset();
                    return;
                }
                switch (AnonymousClass2.$SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[VitessReplicationConnection.this.config.getEventProcessingFailureHandlingMode().ordinal()]) {
                    case 1:
                        VitessReplicationConnection.LOGGER.error("VStream streaming onError. Status: {}", fromThrowable, th);
                        atomicReference.compareAndSet(null, th);
                        reset();
                        return;
                    case 2:
                        VitessReplicationConnection.LOGGER.warn("VStream streaming onError. Status: {}", fromThrowable, th);
                        return;
                    case 3:
                    case 4:
                        VitessReplicationConnection.LOGGER.debug("VStream streaming onError. Status: {}", fromThrowable, th);
                        return;
                    default:
                        return;
                }
            }

            public void onCompleted() {
                VitessReplicationConnection.LOGGER.info("VStream streaming completed.");
                reset();
            }

            private void reset() {
                this.bufferedEvents.clear();
                this.newVgtid = null;
                this.beginEventSeen = false;
                this.commitEventSeen = false;
                this.numOfRowEvents = 0;
                this.numResponses = 0;
            }

            private void setError(String str) {
                String str2 = str + String.format(". Buffered event types: %s", this.bufferedEvents.stream().map((v0) -> {
                    return v0.getType();
                }).map((v0) -> {
                    return Objects.toString(v0);
                }).collect(Collectors.joining(", ")));
                VitessReplicationConnection.LOGGER.error(str2);
                atomicReference.compareAndSet(null, new DebeziumException(str2));
                reset();
            }
        };
        Vtgate.VStreamFlags build = Vtgate.VStreamFlags.newBuilder().setStopOnReshard(this.config.getStopOnReshard()).build();
        Binlogdata.Filter.Builder newBuilder = Binlogdata.Filter.newBuilder();
        if (!Strings.isNullOrEmpty(this.config.tableIncludeList())) {
            this.config.getKeyspace();
            for (String str : VitessConnector.getIncludedTables(this.config.getKeyspace(), this.config.tableIncludeList(), VitessConnector.getKeyspaceTables(this.config))) {
                Binlogdata.Rule build2 = Binlogdata.Rule.newBuilder().setMatch(str).setFilter("select * from " + str).build();
                LOGGER.info("Add vstream table filtering: {}", build2.getMatch());
                newBuilder.addRules(build2);
            }
        }
        Vtgate.VStreamRequest.Builder flags = Vtgate.VStreamRequest.newBuilder().setVgtid(vgtid.getRawVgtid()).setTabletType(toTopodataTabletType(VitessTabletType.valueOf(this.config.getTabletType()))).setFlags(build);
        if (newBuilder.getRulesCount() > 0) {
            flags.setFilter(newBuilder);
        }
        newStub.vStream(flags.build(), streamObserver);
        LOGGER.info("Started VStream");
    }

    private VitessGrpc.VitessStub newStub(ManagedChannel managedChannel) {
        return withCredentials(VitessGrpc.newStub(managedChannel));
    }

    private VitessGrpc.VitessBlockingStub newBlockingStub(ManagedChannel managedChannel) {
        return withCredentials(VitessGrpc.newBlockingStub(managedChannel));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v9, types: [io.grpc.stub.AbstractStub] */
    private <T extends AbstractStub<T>> T withCredentials(T t) {
        if (this.config.getVtgateUsername() != null && this.config.getVtgatePassword() != null) {
            LOGGER.info("Use authenticated vtgate grpc.");
            t = t.withCallCredentials(new StaticAuthCredentials(this.config.getVtgateUsername(), this.config.getVtgatePassword()));
        }
        return t;
    }

    private ManagedChannel newChannel(String str, int i, int i2) {
        return ManagedChannelBuilder.forAddress(str, i).usePlaintext().maxInboundMessageSize(i2).keepAliveTime(this.config.getKeepaliveInterval().toMillis(), TimeUnit.MILLISECONDS).build();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOGGER.info("Closing replication connection");
        this.managedChannel.get().shutdownNow();
        LOGGER.trace("VStream GRPC channel shutdownNow is invoked.");
        if (this.managedChannel.get().awaitTermination(5L, TimeUnit.SECONDS)) {
            LOGGER.info("VStream GRPC channel is shutdown in time.");
        } else {
            LOGGER.warn("VStream GRPC channel is not shutdown in time. Give up waiting.");
        }
    }

    public static Vgtid buildVgtid(String str, List<String> list, List<String> list2) {
        Vgtid of;
        Binlogdata.VGtid.Builder newBuilder = Binlogdata.VGtid.newBuilder();
        if (list == null || list.isEmpty()) {
            of = Vgtid.of(newBuilder.addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(str).setGtid(Vgtid.CURRENT_GTID).build()).build());
        } else {
            for (int i = 0; i < list.size(); i++) {
                newBuilder.addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(str).setShard(list.get(i)).setGtid(list2.get(i)).build());
            }
            of = Vgtid.of(newBuilder.build());
        }
        LOGGER.info("Default VGTID '{}' for keyspace {}, shards: {}, gtids {}", new Object[]{of, str, list, list2});
        return of;
    }

    public static Vgtid defaultVgtid(VitessConnectorConfig vitessConnectorConfig) {
        Vgtid buildVgtid;
        if (vitessConnectorConfig.offsetStoragePerTask()) {
            List<String> vitessTaskKeyShards = vitessConnectorConfig.getVitessTaskKeyShards();
            buildVgtid = vitessConnectorConfig.getVitessTaskVgtid();
            LOGGER.info("VGTID '{}' is set for the keyspace: {} shards: {}", new Object[]{buildVgtid, vitessConnectorConfig.getKeyspace(), vitessTaskKeyShards});
        } else if (vitessConnectorConfig.getShard() == null || vitessConnectorConfig.getShard().isEmpty()) {
            if (vitessConnectorConfig.getGtid() == VitessConnectorConfig.EMPTY_GTID_LIST) {
                List<String> vitessShards = VitessConnector.getVitessShards(vitessConnectorConfig);
                buildVgtid = buildVgtid(vitessConnectorConfig.getKeyspace(), vitessShards, Collections.nCopies(vitessShards.size(), vitessConnectorConfig.getGtid().get(0)));
            } else {
                buildVgtid = buildVgtid(vitessConnectorConfig.getKeyspace(), Collections.emptyList(), Collections.emptyList());
            }
            LOGGER.info("Default VGTID '{}' is set to the current gtid of all shards from keyspace: {}", buildVgtid, vitessConnectorConfig.getKeyspace());
        } else {
            List<String> shard = vitessConnectorConfig.getShard();
            List<String> gtid = vitessConnectorConfig.getGtid();
            if (gtid == VitessConnectorConfig.DEFAULT_GTID_LIST || gtid == VitessConnectorConfig.EMPTY_GTID_LIST) {
                gtid = Collections.nCopies(shard.size(), gtid.get(0));
            }
            buildVgtid = buildVgtid(vitessConnectorConfig.getKeyspace(), shard, gtid);
            LOGGER.info("VGTID '{}' is set to the GTID {} for keyspace: {} shard: {}", new Object[]{buildVgtid, gtid, vitessConnectorConfig.getKeyspace(), shard});
        }
        return buildVgtid;
    }

    public String connectionString() {
        return String.format("vtgate gRPC connection %s:%s", this.config.getVtgateHost(), Integer.valueOf(this.config.getVtgatePort()));
    }

    public String username() {
        return this.config.getVtgateUsername();
    }

    private static Topodata.TabletType toTopodataTabletType(VitessTabletType vitessTabletType) {
        switch (vitessTabletType) {
            case MASTER:
                return Topodata.TabletType.MASTER;
            case REPLICA:
                return Topodata.TabletType.REPLICA;
            case RDONLY:
                return Topodata.TabletType.RDONLY;
            default:
                LOGGER.warn("Unknown tabletType {}", vitessTabletType);
                return null;
        }
    }
}
