package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.inject.Scopes;
import io.airlift.configuration.ConditionalModule;
import io.airlift.json.JsonCodec;
import io.airlift.log.Level;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.airlift.units.Duration;
import io.trino.decoder.DecoderModule;
import io.trino.plugin.kafka.encoder.EncoderModule;
import io.trino.plugin.kafka.schema.ContentSchemaProvider;
import io.trino.plugin.kafka.schema.MapBasedTableDescriptionSupplier;
import io.trino.plugin.kafka.schema.TableDescriptionSupplier;
import io.trino.plugin.kafka.schema.file.FileReadContentSchemaProvider;
import io.trino.plugin.kafka.util.CodecSupplier;
import io.trino.plugin.kafka.util.TestUtils;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TypeManager;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.kafka.TestingKafka;
import io.trino.tpch.TpchTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/trino/plugin/kafka/KafkaQueryRunner.class */
public final class KafkaQueryRunner {
    private static final Logger log;
    private static final String TPCH_SCHEMA = "tpch";
    private static final String TEST = "test";

    /* loaded from: input_file:io/trino/plugin/kafka/KafkaQueryRunner$Builder.class */
    public static class Builder extends KafkaQueryRunnerBuilder {
        private List<TpchTable<?>> tables;
        private Map<SchemaTableName, KafkaTopicDescription> extraTopicDescription;

        protected Builder(TestingKafka testingKafka) {
            super(testingKafka, "kafka", KafkaQueryRunner.TPCH_SCHEMA);
            this.tables = ImmutableList.of();
            this.extraTopicDescription = ImmutableMap.of();
        }

        protected Builder(TestingKafka testingKafka, String str) {
            super(testingKafka, str, KafkaQueryRunner.TPCH_SCHEMA);
            this.tables = ImmutableList.of();
            this.extraTopicDescription = ImmutableMap.of();
        }

        public Builder setTables(Iterable<TpchTable<?>> iterable) {
            this.tables = ImmutableList.copyOf((Iterable) Objects.requireNonNull(iterable, "tables is null"));
            return this;
        }

        public Builder setExtraTopicDescription(Map<SchemaTableName, KafkaTopicDescription> map) {
            this.extraTopicDescription = ImmutableMap.copyOf((Map) Objects.requireNonNull(map, "extraTopicDescription is null"));
            return this;
        }

        @Override // io.trino.plugin.kafka.KafkaQueryRunnerBuilder
        public void preInit(DistributedQueryRunner distributedQueryRunner) throws Exception {
            distributedQueryRunner.installPlugin(new TpchPlugin());
            distributedQueryRunner.createCatalog(KafkaQueryRunner.TPCH_SCHEMA, KafkaQueryRunner.TPCH_SCHEMA);
            Map<SchemaTableName, KafkaTopicDescription> createTpchTopicDescriptions = KafkaQueryRunner.createTpchTopicDescriptions(distributedQueryRunner.getCoordinator().getTypeManager(), this.tables);
            ArrayList<SchemaTableName> arrayList = new ArrayList();
            arrayList.add(new SchemaTableName("read_test", "all_datatypes_json"));
            arrayList.add(new SchemaTableName("write_test", "all_datatypes_avro"));
            arrayList.add(new SchemaTableName("write_test", "all_datatypes_csv"));
            arrayList.add(new SchemaTableName("write_test", "all_datatypes_raw"));
            arrayList.add(new SchemaTableName("write_test", "all_datatypes_json"));
            JsonCodec jsonCodec = new CodecSupplier(KafkaTopicDescription.class, distributedQueryRunner.getCoordinator().getTypeManager()).get();
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (SchemaTableName schemaTableName : arrayList) {
                builder.put(schemaTableName, KafkaQueryRunner.createTable(schemaTableName, jsonCodec));
            }
            ImmutableMap buildOrThrow = ImmutableMap.builder().putAll(this.extraTopicDescription).putAll(createTpchTopicDescriptions).putAll(builder.buildOrThrow()).buildOrThrow();
            addExtension(ConditionalModule.conditionalModule(KafkaConfig.class, kafkaConfig -> {
                return kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(KafkaQueryRunner.TEST);
            }, binder -> {
                binder.bind(TableDescriptionSupplier.class).toInstance(new MapBasedTableDescriptionSupplier(buildOrThrow));
            }));
            addExtension(binder2 -> {
                binder2.bind(ContentSchemaProvider.class).to(FileReadContentSchemaProvider.class).in(Scopes.SINGLETON);
            });
            addExtension(new DecoderModule());
            addExtension(new EncoderModule());
            HashMap hashMap = new HashMap(this.extraKafkaProperties);
            hashMap.putIfAbsent("kafka.table-description-supplier", KafkaQueryRunner.TEST);
            setExtraKafkaProperties(hashMap);
        }

        @Override // io.trino.plugin.kafka.KafkaQueryRunnerBuilder
        public void postInit(DistributedQueryRunner distributedQueryRunner) {
            KafkaQueryRunner.log.info("Loading data...");
            long nanoTime = System.nanoTime();
            for (TpchTable<?> tpchTable : this.tables) {
                long nanoTime2 = System.nanoTime();
                KafkaQueryRunner.log.info("Running import for %s", new Object[]{tpchTable.getTableName()});
                distributedQueryRunner.execute(String.format("INSERT INTO %1$s SELECT * FROM tpch.tiny.%1$s", tpchTable.getTableName()));
                KafkaQueryRunner.log.info("Imported %s in %s", new Object[]{tpchTable.getTableName(), Duration.nanosSince(nanoTime2).convertToMostSuccinctTimeUnit()});
            }
            KafkaQueryRunner.log.info("Loading complete in %s", new Object[]{Duration.nanosSince(nanoTime).toString(TimeUnit.SECONDS)});
        }
    }

    private KafkaQueryRunner() {
    }

    public static Builder builder(TestingKafka testingKafka) {
        return new Builder(testingKafka);
    }

    private static KafkaTopicDescription createTable(SchemaTableName schemaTableName, JsonCodec<KafkaTopicDescription> jsonCodec) throws IOException {
        KafkaTopicDescription kafkaTopicDescription = (KafkaTopicDescription) jsonCodec.fromJson(ByteStreams.toByteArray(KafkaQueryRunner.class.getResourceAsStream(String.format("/%s/%s.json", schemaTableName.getSchemaName(), schemaTableName.getTableName()))));
        return new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), schemaTableName.toString(), kafkaTopicDescription.getKey().map(kafkaTopicFieldGroup -> {
            return new KafkaTopicFieldGroup(kafkaTopicFieldGroup.getDataFormat(), kafkaTopicFieldGroup.getDataSchema().map(str -> {
                return KafkaQueryRunner.class.getResource(str).getPath();
            }), Optional.empty(), kafkaTopicFieldGroup.getFields());
        }), kafkaTopicDescription.getMessage().map(kafkaTopicFieldGroup2 -> {
            return new KafkaTopicFieldGroup(kafkaTopicFieldGroup2.getDataFormat(), kafkaTopicFieldGroup2.getDataSchema().map(str -> {
                return KafkaQueryRunner.class.getResource(str).getPath();
            }), Optional.empty(), kafkaTopicFieldGroup2.getFields());
        }));
    }

    private static Map<SchemaTableName, KafkaTopicDescription> createTpchTopicDescriptions(TypeManager typeManager, Iterable<TpchTable<?>> iterable) throws Exception {
        JsonCodec jsonCodec = new CodecSupplier(KafkaTopicDescription.class, typeManager).get();
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator<TpchTable<?>> it = iterable.iterator();
        while (it.hasNext()) {
            SchemaTableName schemaTableName = new SchemaTableName(TPCH_SCHEMA, it.next().getTableName());
            builder.put(TestUtils.loadTpchTopicDescription(jsonCodec, schemaTableName.toString(), schemaTableName));
        }
        return builder.buildOrThrow();
    }

    public static void main(String[] strArr) throws Exception {
        Logging.initialize();
        DistributedQueryRunner build = ((KafkaQueryRunnerBuilder) builder(TestingKafka.create()).setTables(TpchTable.getTables()).setCoordinatorProperties(ImmutableMap.of("http-server.http.port", "8080"))).build();
        Logger logger = Logger.get(KafkaQueryRunner.class);
        logger.info("======== SERVER STARTED ========");
        logger.info("\n====\n%s\n====", new Object[]{build.getCoordinator().getBaseUrl()});
    }

    static {
        Logging.initialize().setLevel("org.apache.kafka", Level.OFF);
        log = Logger.get(KafkaQueryRunner.class);
    }
}
