package io.debezium.connector.vitess;

import io.debezium.config.Configuration;
import io.debezium.connector.common.RelationalBaseSourceConnector;
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;
import io.grpc.StatusRuntimeException;
import io.vitess.proto.Vtgate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/vitess/VitessConnector.class */
public class VitessConnector extends RelationalBaseSourceConnector {
    private static final Logger LOGGER;
    private Map<String, String> properties;
    private VitessConnectorConfig connectorConfig;
    static final /* synthetic */ boolean $assertionsDisabled;

    public void start(Map<String, String> map) {
        LOGGER.info("Starting Vitess Connector");
        this.properties = Collections.unmodifiableMap(map);
        this.connectorConfig = new VitessConnectorConfig(Configuration.from(this.properties));
    }

    public Class<? extends Task> taskClass() {
        return VitessConnectorTask.class;
    }

    protected Map<String, String> getGtidPerShardFromStorage(int i, int i2, boolean z) {
        if (this.context != null && (this.context instanceof SourceConnectorContext) && context().offsetStorageReader() != null) {
            return getGtidPerShardFromStorage(context().offsetStorageReader(), this.connectorConfig, i, i2, z);
        }
        LOGGER.warn("Context {} is not setup for the connector, this can happen in unit tests.", this.context);
        return null;
    }

    public static Map<String, String> getGtidPerShardFromStorage(OffsetStorageReader offsetStorageReader, VitessConnectorConfig vitessConnectorConfig, int i, int i2, boolean z) {
        if (i2 < 0) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (int i3 = 0; i3 < i; i3++) {
            String taskKeyName = getTaskKeyName(i3, i, i2);
            VitessPartition vitessPartition = new VitessPartition(vitessConnectorConfig.getLogicalName(), taskKeyName);
            Map offset = offsetStorageReader.offset(vitessPartition.getSourcePartition());
            if (offset == null && i2 == 0) {
                LOGGER.info("No previous offset for partition: {}, fall back to only server key", vitessPartition);
                vitessPartition = new VitessPartition(vitessConnectorConfig.getLogicalName(), null);
                offset = offsetStorageReader.offset(vitessPartition.getSourcePartition());
            }
            if (offset != null) {
                String str = (String) offset.get(SourceInfo.VGTID_KEY);
                Objects.requireNonNull(str, String.format("No vgtid from %s", offset));
                for (Vgtid.ShardGtid shardGtid : Vgtid.of(str).getShardGtids()) {
                    hashMap.put(shardGtid.getShard(), shardGtid.getGtid());
                }
            } else {
                if (z) {
                    throw new IllegalArgumentException(String.format("No offset found for %s", vitessPartition));
                }
                LOGGER.warn("No offset found for task key: {}", taskKeyName);
            }
        }
        return hashMap;
    }

    public List<Map<String, String>> taskConfigs(int i) {
        LOGGER.info("Calculating taskConfigs for {} tasks", Integer.valueOf(i));
        List<String> list = null;
        if (this.connectorConfig.offsetStoragePerTask()) {
            list = this.connectorConfig.getShard();
            if (list == null) {
                list = getVitessShards(this.connectorConfig);
            }
        }
        return taskConfigs(i, list);
    }

    public static boolean hasSameShards(Collection<String> collection, Collection<String> collection2) {
        if (collection == null) {
            return collection2 == null;
        }
        if (collection2 == null) {
            return collection == null;
        }
        if (collection.size() != collection2.size()) {
            return false;
        }
        return new HashSet(collection).equals(new HashSet(collection2));
    }

    public List<Map<String, String>> taskConfigs(int i, List<String> list) {
        List<String> list2;
        LOGGER.info("Calculating taskConfigs for {} tasks and shards: {}", Integer.valueOf(i), list);
        if (!this.connectorConfig.offsetStoragePerTask()) {
            if (i > 1) {
                throw new IllegalArgumentException("Only a single connector task may be started");
            }
            return Collections.singletonList(this.properties);
        }
        int prevNumTasks = this.connectorConfig.getPrevNumTasks();
        int offsetStorageTaskKeyGen = this.connectorConfig.getOffsetStorageTaskKeyGen();
        int min = Math.min(i, list == null ? Integer.MAX_VALUE : list.size());
        Logger logger = LOGGER;
        Object[] objArr = new Object[3];
        objArr[0] = list == null ? null : Integer.valueOf(list.size());
        objArr[1] = Integer.valueOf(i);
        objArr[2] = Integer.valueOf(min);
        logger.info("There are {} vitess shards for maxTasks: {}, we will use {} tasks", objArr);
        Map<String, String> gtidPerShardFromStorage = offsetStorageTaskKeyGen > 0 ? getGtidPerShardFromStorage(prevNumTasks, offsetStorageTaskKeyGen - 1, true) : null;
        LOGGER.info("Previous gtids Per shard: {}", gtidPerShardFromStorage);
        Set<String> keySet = gtidPerShardFromStorage != null ? gtidPerShardFromStorage.keySet() : null;
        if (offsetStorageTaskKeyGen > 0 && min == prevNumTasks && hasSameShards(keySet, list)) {
            throw new IllegalArgumentException(String.format("Previous num.tasks: %s and current num.tasks: %s are the same. And previous shards: %s and current shards: %s are the same. Please choose different tasks.max or have different number of vitess shards if you want to change the task parallelism.  Otherwise please reset the offset.storage.task.key.gen config to its original value", Integer.valueOf(prevNumTasks), Integer.valueOf(min), keySet, list));
        }
        if (gtidPerShardFromStorage != null && !hasSameShards(gtidPerShardFromStorage.keySet(), list)) {
            LOGGER.warn("Some shards for the previous generation {} are not persisted.  Expected shards: {}", gtidPerShardFromStorage.keySet(), list);
            if (gtidPerShardFromStorage.keySet().containsAll(list)) {
                throw new IllegalArgumentException(String.format("Previous shards: %s is the superset of current shards: %s.  We will lose gtid positions for some shards if we continue", gtidPerShardFromStorage.keySet(), list));
            }
        }
        this.connectorConfig.getKeyspace();
        Map<String, String> gtidPerShardFromStorage2 = getGtidPerShardFromStorage(min, offsetStorageTaskKeyGen, false);
        if (gtidPerShardFromStorage2 != null && !hasSameShards(gtidPerShardFromStorage2.keySet(), list)) {
            LOGGER.warn("Some shards for the current generation {} are not persisted.  Expected shards: {}", gtidPerShardFromStorage2.keySet(), list);
            if (!list.containsAll(gtidPerShardFromStorage2.keySet())) {
                LOGGER.warn("Shards from persisted offset: {} not contained within current db shards: {}", gtidPerShardFromStorage2.keySet(), list);
                gtidPerShardFromStorage2 = getGtidPerShardFromStorage(min, offsetStorageTaskKeyGen, true);
            }
        }
        if (gtidPerShardFromStorage2 == null || gtidPerShardFromStorage2.size() != 0) {
            if (gtidPerShardFromStorage2 == null || list.containsAll(gtidPerShardFromStorage2.keySet())) {
                LOGGER.warn("Current db shards is the superset of persisted offset, using current shards from db: {}", list);
                list2 = list;
            } else {
                LOGGER.info("Persisted offset has different shards, Using shards from persisted offset: {}", gtidPerShardFromStorage2.keySet());
                list2 = new ArrayList(gtidPerShardFromStorage2.keySet());
            }
        } else if (gtidPerShardFromStorage == null || gtidPerShardFromStorage.size() == 0 || list.containsAll(gtidPerShardFromStorage.keySet())) {
            LOGGER.warn("No persisted offset for current or previous gen, using current shards from db: {}", list);
            list2 = list;
        } else {
            LOGGER.info("Using shards from persisted offset from prev gen: {}", gtidPerShardFromStorage.keySet());
            list2 = new ArrayList(gtidPerShardFromStorage.keySet());
        }
        list2.sort(Comparator.naturalOrder());
        HashMap hashMap = new HashMap();
        int i2 = 0;
        Iterator<String> it = list2.iterator();
        while (it.hasNext()) {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(i2), num -> {
                return new ArrayList();
            })).add(it.next());
            i2 = (i2 + 1) % min;
        }
        LOGGER.info("Shards task distribution: {}", hashMap);
        ArrayList arrayList = new ArrayList();
        Iterator it2 = hashMap.keySet().iterator();
        while (it2.hasNext()) {
            int intValue = ((Integer) it2.next()).intValue();
            List list3 = (List) hashMap.get(Integer.valueOf(intValue));
            HashMap hashMap2 = new HashMap(this.properties);
            hashMap2.put("vitess.task.key", getTaskKeyName(intValue, min, offsetStorageTaskKeyGen));
            hashMap2.put("vitess.task.shards", String.join(VitessConnectorConfig.CSV_DELIMITER, list3));
            hashMap2.put("vitess.total.tasks", Integer.toString(min));
            arrayList.add(hashMap2);
        }
        LOGGER.info("taskConfigs are: {}", arrayList);
        return arrayList;
    }

    protected static final String getTaskKeyName(int i, int i2, int i3) {
        return String.format("task%d_%d_%d", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
    }

    public void stop() {
    }

    public ConfigDef config() {
        return VitessConnectorConfig.configDef();
    }

    public String version() {
        return Module.version();
    }

    protected void validateConnection(Map<String, ConfigValue> map, Configuration configuration) {
        ConfigValue configValue = map.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
        try {
            VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(new VitessConnectorConfig(configuration), null);
            try {
                try {
                    vitessReplicationConnection.execute("SHOW DATABASES");
                    LOGGER.info("Successfully tested connection for {} with user '{}'", vitessReplicationConnection.connectionString(), vitessReplicationConnection.username());
                } finally {
                }
            } catch (StatusRuntimeException e) {
                LOGGER.info("Failed testing connection for {} with user '{}'", vitessReplicationConnection.connectionString(), vitessReplicationConnection.username());
                configValue.addErrorMessage("Unable to connect: " + e.getMessage());
            }
            vitessReplicationConnection.close();
        } catch (Exception e2) {
            LOGGER.error("Unexpected error validating the database connection", e2);
            configValue.addErrorMessage("Unable to validate connection: " + e2.getMessage());
        }
    }

    private static List<String> getRowsFromQuery(VitessConnectorConfig vitessConnectorConfig, String str) {
        try {
            VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, null);
            try {
                Vtgate.ExecuteResponse execute = vitessReplicationConnection.execute(str);
                LOGGER.info("Got response: {} for query: {}", execute, str);
                if (!$assertionsDisabled && (execute == null || execute.hasError() || !execute.hasResult())) {
                    throw new AssertionError(String.format("Error response: %s", execute));
                }
                List rowsList = execute.getResult().getRowsList();
                if (!$assertionsDisabled && rowsList.isEmpty()) {
                    throw new AssertionError(String.format("Empty response: %s", execute));
                }
                List<String> list = (List) rowsList.stream().map(row -> {
                    return row.getValues().toStringUtf8();
                }).collect(Collectors.toList());
                vitessReplicationConnection.close();
                return list;
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format("Unexpected error while running query: %s", str), e);
        }
    }

    public static List<String> getIncludedTables(String str, String str2, List<String> list) {
        List listOfRegex = Strings.listOfRegex(str2, 2);
        ArrayList arrayList = new ArrayList();
        for (String str3 : list) {
            Iterator it = listOfRegex.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (((Pattern) it.next()).asPredicate().test(String.format("%s.%s", str, str3))) {
                    arrayList.add(str3);
                    break;
                }
            }
        }
        return arrayList;
    }

    public static List<String> getKeyspaceTables(VitessConnectorConfig vitessConnectorConfig) {
        List<String> rowsFromQuery = getRowsFromQuery(vitessConnectorConfig, String.format("SHOW TABLES FROM %s", vitessConnectorConfig.getKeyspace()));
        LOGGER.info("All tables from keyspace {} are: {}", vitessConnectorConfig.getKeyspace(), rowsFromQuery);
        return rowsFromQuery;
    }

    public static List<String> getVitessShards(VitessConnectorConfig vitessConnectorConfig) {
        List<String> list = (List) getRowsFromQuery(vitessConnectorConfig, String.format("SHOW VITESS_SHARDS LIKE '%s/%%'", vitessConnectorConfig.getKeyspace())).stream().map(str -> {
            String[] split = str.split("/");
            if ($assertionsDisabled || (split != null && split.length == 2)) {
                return split[1];
            }
            throw new AssertionError(String.format("Wrong field format: %s", str));
        }).collect(Collectors.toList());
        LOGGER.info("Shards: {}", list);
        return list;
    }

    protected Map<String, ConfigValue> validateAllFields(Configuration configuration) {
        LOGGER.info("Validating config: {}", configuration);
        Map<String, ConfigValue> validate = configuration.validate(VitessConnectorConfig.ALL_FIELDS);
        Integer integer = configuration.getInteger("tasks.max");
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(configuration);
        if (integer != null && integer.intValue() > 1 && !vitessConnectorConfig.offsetStoragePerTask()) {
            String name = VitessConnectorConfig.OFFSET_STORAGE_PER_TASK.name();
            validate.computeIfAbsent(name, str -> {
                return new ConfigValue(name);
            });
            validate.get(name).addErrorMessage(String.format("%s needs to be enabled when %s > 1", name, "tasks.max"));
        }
        if (vitessConnectorConfig.offsetStoragePerTask() && vitessConnectorConfig.getOffsetStorageTaskKeyGen() < 0) {
            String name2 = VitessConnectorConfig.OFFSET_STORAGE_TASK_KEY_GEN.name();
            validate.computeIfAbsent(name2, str2 -> {
                return new ConfigValue(name2);
            });
            validate.get(name2).addErrorMessage(String.format("%s needs to be enabled when %s is specified", name2, VitessConnectorConfig.OFFSET_STORAGE_PER_TASK.name()));
        }
        if (vitessConnectorConfig.getOffsetStorageTaskKeyGen() >= 0 && vitessConnectorConfig.getPrevNumTasks() <= 0) {
            String name3 = VitessConnectorConfig.PREV_NUM_TASKS.name();
            validate.computeIfAbsent(name3, str3 -> {
                return new ConfigValue(name3);
            });
            validate.get(name3).addErrorMessage(String.format("%s needs to be enabled when %s is specified", name3, VitessConnectorConfig.OFFSET_STORAGE_TASK_KEY_GEN.name()));
        }
        return validate;
    }

    public List<TableId> getMatchingCollections(Configuration configuration) {
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(configuration);
        String keyspace = vitessConnectorConfig.getKeyspace();
        return (List) getIncludedTables(keyspace, vitessConnectorConfig.tableIncludeList(), getKeyspaceTables(vitessConnectorConfig)).stream().map(str -> {
            return new TableId(keyspace, (String) null, str);
        }).collect(Collectors.toList());
    }

    static {
        $assertionsDisabled = !VitessConnector.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(VitessConnector.class);
    }
}
