package io.trino.plugin.kafka.schema.confluent;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import io.trino.sql.query.QueryAssertions;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.assertions.Assert;
import io.trino.testing.kafka.TestingKafka;
import io.trino.testing.sql.TestTable;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.class */
public class TestKafkaWithConfluentSchemaRegistryMinimalFunctionality extends AbstractTestQueryFramework {
    private static final int MESSAGE_COUNT = 100;
    private TestingKafka testingKafka;
    private static final String RECORD_NAME = "test_record";
    private static final Schema INITIAL_SCHEMA = (Schema) SchemaBuilder.record(RECORD_NAME).fields().name("col_1").type().longType().noDefault().name("col_2").type().stringType().noDefault().endRecord();
    private static final Schema EVOLVED_SCHEMA = (Schema) ((SchemaBuilder.FieldAssembler) SchemaBuilder.record(RECORD_NAME).fields().name("col_1").type().longType().noDefault().name("col_2").type().stringType().noDefault().name("col_3").type().optional().doubleType()).endRecord();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality$JsonValue.class */
    public static class JsonValue {
        private final int id;
        private final String value;

        @JsonCreator
        public JsonValue(@JsonProperty("id") int i, @JsonProperty("value") String str) {
            this.id = i;
            this.value = (String) Objects.requireNonNull(str, "value is null");
        }

        @JsonProperty("id")
        public int getId() {
            return this.id;
        }

        @JsonProperty("value")
        public String getValue() {
            return this.value;
        }
    }

    protected QueryRunner createQueryRunner() throws Exception {
        this.testingKafka = closeAfterClass(TestingKafka.createWithSchemaRegistry());
        return KafkaWithConfluentSchemaRegistryQueryRunner.builder(this.testingKafka).setExtraKafkaProperties(ImmutableMap.builder().put("kafka.confluent-subjects-cache-refresh-interval", "1ms").build()).build();
    }

    @Test
    public void testBasicTopic() {
        String str = "topic-basic-MixedCase-" + TestTable.randomTableSuffix();
        assertTopic(this.testingKafka, str, String.format("SELECT col_1, col_2 FROM %s", toDoubleQuoted(str)), String.format("SELECT col_1, col_2, col_3 FROM %s", toDoubleQuoted(str)), false, schemaRegistryAwareProducer(this.testingKafka).put("key.serializer", LongSerializer.class.getName()).put("value.serializer", KafkaAvroSerializer.class.getName()).build());
    }

    @Test
    public void testTopicWithKeySubject() {
        String str = "topic-Key-Subject-" + TestTable.randomTableSuffix();
        assertTopic(this.testingKafka, str, String.format("SELECT \"%s-key\", col_1, col_2 FROM %s", str, toDoubleQuoted(str)), String.format("SELECT \"%s-key\", col_1, col_2, col_3 FROM %s", str, toDoubleQuoted(str)), true, schemaRegistryAwareProducer(this.testingKafka).put("key.serializer", KafkaAvroSerializer.class.getName()).put("value.serializer", KafkaAvroSerializer.class.getName()).build());
    }

    @Test
    public void testTopicWithRecordNameStrategy() {
        String str = "topic-Record-Name-Strategy-" + TestTable.randomTableSuffix();
        assertTopic(this.testingKafka, str, String.format("SELECT \"%1$s-key\", col_1, col_2 FROM \"%1$s&value-subject=%2$s\"", str, RECORD_NAME), String.format("SELECT \"%1$s-key\", col_1, col_2, col_3 FROM \"%1$s&value-subject=%2$s\"", str, RECORD_NAME), true, schemaRegistryAwareProducer(this.testingKafka).put("key.serializer", KafkaAvroSerializer.class.getName()).put("value.serializer", KafkaAvroSerializer.class.getName()).put("value.subject.name.strategy", RecordNameStrategy.class.getName()).build());
    }

    @Test
    public void testTopicWithTopicRecordNameStrategy() {
        String str = "topic-Topic-Record-Name-Strategy-" + TestTable.randomTableSuffix();
        assertTopic(this.testingKafka, str, String.format("SELECT \"%1$s-key\", col_1, col_2 FROM \"%1$s&value-subject=%1$s-%2$s\"", str, RECORD_NAME), String.format("SELECT \"%1$s-key\", col_1, col_2, col_3 FROM \"%1$s&value-subject=%1$s-%2$s\"", str, RECORD_NAME), true, schemaRegistryAwareProducer(this.testingKafka).put("key.serializer", KafkaAvroSerializer.class.getName()).put("value.serializer", KafkaAvroSerializer.class.getName()).put("value.subject.name.strategy", TopicRecordNameStrategy.class.getName()).build());
    }

    @Test
    public void testUnsupportedInsert() {
        String str = "topic-unsupported-insert-" + TestTable.randomTableSuffix();
        assertNotExists(str);
        this.testingKafka.sendMessages(createMessages(str, MESSAGE_COUNT, true).stream(), schemaRegistryAwareProducer(this.testingKafka).put("key.serializer", KafkaAvroSerializer.class.getName()).put("value.serializer", KafkaAvroSerializer.class.getName()).build());
        waitUntilTableExists(str);
        Assertions.assertThatThrownBy(() -> {
            getQueryRunner().execute(String.format("INSERT INTO %s VALUES(0, 0, '')", toDoubleQuoted(str)));
        }).hasMessage("Insert not supported");
    }

    @Test
    public void testUnsupportedFormat() {
        String str = "topic-unsupported-format-" + TestTable.randomTableSuffix();
        assertNotExists(str);
        this.testingKafka.sendMessages(IntStream.range(0, MESSAGE_COUNT).mapToObj(i -> {
            return new ProducerRecord(str, Long.valueOf(i), new JsonValue(i, "value_" + i));
        }), schemaRegistryAwareProducer(this.testingKafka).put("key.serializer", LongSerializer.class.getName()).put("value.serializer", KafkaJsonSchemaSerializer.class.getName()).build());
        String str2 = "Not supported schema: JSON";
        Assert.assertEventually(Duration.succinctDuration(10.0d, TimeUnit.SECONDS), () -> {
            Assertions.assertThatThrownBy(() -> {
                tableExists(str);
            }).isInstanceOf(RuntimeException.class).hasMessage(str2);
        });
        Assertions.assertThatThrownBy(() -> {
            getQueryRunner().execute("SHOW COLUMNS FROM " + toDoubleQuoted(str));
        }).hasMessage("Not supported schema: JSON");
        Assertions.assertThatThrownBy(() -> {
            getQueryRunner().execute("SELECT * FROM " + toDoubleQuoted(str));
        }).hasMessage("Not supported schema: JSON");
        Assertions.assertThatThrownBy(() -> {
            getQueryRunner().execute(String.format("INSERT INTO %s VALUES(0, 0, '')", toDoubleQuoted(str)));
        }).hasMessage("Not supported schema: JSON");
    }

    private static ImmutableMap.Builder<String, String> schemaRegistryAwareProducer(TestingKafka testingKafka) {
        return ImmutableMap.builder().put("schema.registry.url", testingKafka.getSchemaRegistryConnectString());
    }

    private void assertTopic(TestingKafka testingKafka, String str, String str2, String str3, boolean z, Map<String, String> map) {
        assertNotExists(str);
        List<ProducerRecord<Long, GenericRecord>> createMessages = createMessages(str, MESSAGE_COUNT, true);
        testingKafka.sendMessages(createMessages.stream(), map);
        waitUntilTableExists(str);
        assertCount(str, 100L);
        QueryAssertions queryAssertions = new QueryAssertions(getQueryRunner());
        ((QueryAssertions.QueryAssert) queryAssertions.query(str2).assertThat()).containsAll(getExpectedValues(createMessages, INITIAL_SCHEMA, z));
        testingKafka.sendMessages(createMessages(str, MESSAGE_COUNT, false).stream(), map);
        assertCount(str, ImmutableList.builder().addAll(createMessages).addAll(r0).build().size());
        ((QueryAssertions.QueryAssert) queryAssertions.query(str3).assertThat()).containsAll(getExpectedValues(createMessages, EVOLVED_SCHEMA, z));
    }

    private static String getExpectedValues(List<ProducerRecord<Long, GenericRecord>> list, Schema schema, boolean z) {
        StringBuilder sb = new StringBuilder("VALUES ");
        ImmutableList.Builder builder = ImmutableList.builder();
        for (ProducerRecord<Long, GenericRecord> producerRecord : list) {
            ImmutableList.Builder builder2 = ImmutableList.builder();
            if (z) {
                builder2.add(String.format("CAST(%s as bigint)", producerRecord.key()));
            }
            addExpectedColumns(schema, (GenericRecord) producerRecord.value(), builder2);
            builder.add(String.format("(%s)", String.join(", ", (Iterable<? extends CharSequence>) builder2.build())));
        }
        sb.append(String.join(", ", (Iterable<? extends CharSequence>) builder.build()));
        return sb.toString();
    }

    private static void addExpectedColumns(Schema schema, GenericRecord genericRecord, ImmutableList.Builder<String> builder) {
        for (Schema.Field field : schema.getFields()) {
            Object obj = genericRecord.get(field.name());
            if (obj == null && field.schema().getType().equals(Schema.Type.UNION) && field.schema().getTypes().contains(Schema.create(Schema.Type.NULL))) {
                if (!field.schema().getTypes().contains(Schema.create(Schema.Type.DOUBLE))) {
                    throw new IllegalArgumentException("Unsupported field: " + field);
                }
                builder.add("CAST(null AS double)");
            } else if (field.schema().getType().equals(Schema.Type.STRING)) {
                builder.add(String.format("VARCHAR '%s'", obj));
            } else {
                if (!field.schema().getType().equals(Schema.Type.LONG)) {
                    throw new IllegalArgumentException("Unsupported field: " + field);
                }
                builder.add(String.format("CAST(%s AS bigint)", obj));
            }
        }
    }

    private void assertNotExists(String str) {
        if (schemaExists()) {
            Assertions.assertThat(getQueryRunner().execute("SHOW TABLES LIKE " + toSingleQuoted(str)).getRowCount()).isZero();
        }
    }

    private void waitUntilTableExists(String str) {
        Failsafe.with(new RetryPolicy[]{new RetryPolicy().withMaxAttempts(10).withDelay(java.time.Duration.ofMillis(100L))}).run(() -> {
            org.testng.Assert.assertTrue(schemaExists());
        });
        Failsafe.with(new RetryPolicy[]{new RetryPolicy().withMaxAttempts(10).withDelay(java.time.Duration.ofMillis(100L))}).run(() -> {
            org.testng.Assert.assertTrue(tableExists(str));
        });
    }

    private boolean schemaExists() {
        return getQueryRunner().execute(String.format("SHOW SCHEMAS FROM %s LIKE '%s'", getSession().getCatalog().orElseThrow(), getSession().getSchema().orElseThrow())).getRowCount() == 1;
    }

    private boolean tableExists(String str) {
        return getQueryRunner().execute(String.format("SHOW TABLES LIKE '%s'", str.toLowerCase(Locale.ENGLISH))).getRowCount() == 1;
    }

    private void assertCount(String str, long j) {
        Assertions.assertThat(getQueryRunner().execute("SELECT count(*) FROM " + toDoubleQuoted(str)).getOnlyValue()).isEqualTo(Long.valueOf(j));
    }

    private static String toDoubleQuoted(String str) {
        return String.format("\"%s\"", str);
    }

    private static String toSingleQuoted(Object obj) {
        Objects.requireNonNull(obj, "value is null");
        return String.format("'%s'", obj);
    }

    private static List<ProducerRecord<Long, GenericRecord>> createMessages(String str, int i, boolean z) {
        ImmutableList.Builder builder = ImmutableList.builder();
        if (!z) {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= i) {
                    break;
                }
                builder.add(new ProducerRecord(str, Long.valueOf(j2), createRecordWithEvolvedSchema(j2)));
                j = j2 + 1;
            }
        } else {
            long j3 = 0;
            while (true) {
                long j4 = j3;
                if (j4 >= i) {
                    break;
                }
                builder.add(new ProducerRecord(str, Long.valueOf(j4), createRecordWithInitialSchema(j4)));
                j3 = j4 + 1;
            }
        }
        return builder.build();
    }

    private static GenericRecord createRecordWithInitialSchema(long j) {
        return new GenericRecordBuilder(INITIAL_SCHEMA).set("col_1", Long.valueOf(Math.multiplyExact(j, MESSAGE_COUNT))).set("col_2", String.format("string-%s", Long.valueOf(j))).build();
    }

    private static GenericRecord createRecordWithEvolvedSchema(long j) {
        return new GenericRecordBuilder(EVOLVED_SCHEMA).set("col_1", Long.valueOf(Math.multiplyExact(j, MESSAGE_COUNT))).set("col_2", String.format("string-%s", Long.valueOf(j))).set("col_3", Double.valueOf((j + 10.1d) / 10.0d)).build();
    }
}
