package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import io.trino.Session;
import io.trino.execution.QueryInfo;
import io.trino.plugin.kafka.util.TestUtils;
import io.trino.spi.connector.SchemaTableName;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryRunner;
import io.trino.testing.ResultWithQueryId;
import io.trino.testing.kafka.BasicTestingKafka;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/plugin/kafka/TestKafkaIntegrationPushDown.class */
public class TestKafkaIntegrationPushDown extends AbstractTestQueryFramework {
    private static final int MESSAGE_NUM = 1000;
    private static final int TIMESTAMP_TEST_COUNT = 6;
    private static final int TIMESTAMP_TEST_START_INDEX = 2;
    private static final int TIMESTAMP_TEST_END_INDEX = 4;
    private BasicTestingKafka testingKafka;
    private String topicNamePartition;
    private String topicNameOffset;
    private String topicNameCreateTime;
    private String topicNameLogAppend;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/kafka/TestKafkaIntegrationPushDown$RecordMessage.class */
    public static class RecordMessage {
        private final String startTime;
        private final String endTime;
        private final long startOffset;
        private final List<String> testMessageSignatures;

        public RecordMessage(String str, String str2, Long l, List<String> list) {
            this.startTime = str;
            this.endTime = str2;
            this.startOffset = l.longValue();
            this.testMessageSignatures = list;
        }

        public String getStartTime() {
            return this.startTime;
        }

        public String getEndTime() {
            return this.endTime;
        }

        public Long getStartOffset() {
            return Long.valueOf(this.startOffset);
        }

        public List<String> getTestMessageSignatures() {
            return this.testMessageSignatures;
        }
    }

    protected QueryRunner createQueryRunner() throws Exception {
        this.testingKafka = new BasicTestingKafka();
        this.topicNamePartition = "test_push_down_partition_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.topicNameOffset = "test_push_down_offset_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.topicNameCreateTime = "test_push_down_create_time_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.topicNameLogAppend = "test_push_down_log_append_" + UUID.randomUUID().toString().replaceAll("-", "_");
        DistributedQueryRunner build = KafkaQueryRunner.builder(this.testingKafka).setExtraTopicDescription(ImmutableMap.builder().put(TestUtils.createEmptyTopicDescription(this.topicNamePartition, new SchemaTableName("default", this.topicNamePartition))).put(TestUtils.createEmptyTopicDescription(this.topicNameOffset, new SchemaTableName("default", this.topicNameOffset))).put(TestUtils.createEmptyTopicDescription(this.topicNameCreateTime, new SchemaTableName("default", this.topicNameCreateTime))).put(TestUtils.createEmptyTopicDescription(this.topicNameLogAppend, new SchemaTableName("default", this.topicNameLogAppend))).build()).setExtraKafkaProperties(ImmutableMap.builder().put("kafka.messages-per-split", "100").build()).build();
        this.testingKafka.createTopicWithConfig(TIMESTAMP_TEST_START_INDEX, 1, this.topicNamePartition, false);
        this.testingKafka.createTopicWithConfig(TIMESTAMP_TEST_START_INDEX, 1, this.topicNameOffset, false);
        this.testingKafka.createTopicWithConfig(1, 1, this.topicNameCreateTime, false);
        this.testingKafka.createTopicWithConfig(1, 1, this.topicNameLogAppend, true);
        return build;
    }

    @AfterClass(alwaysRun = true)
    public void stopKafka() throws Exception {
        if (this.testingKafka != null) {
            this.testingKafka.close();
            this.testingKafka = null;
        }
    }

    @Test
    public void testPartitionPushDown() throws ExecutionException, InterruptedException {
        createMessages(this.topicNamePartition);
        Assert.assertEquals(getQueryInfo(getDistributedQueryRunner(), getDistributedQueryRunner().executeWithQueryId(getSession(), String.format("SELECT count(*) FROM default.%s WHERE _partition_id=1", this.topicNamePartition))).getQueryStats().getProcessedInputPositions(), 500L);
    }

    @Test
    public void testOffsetPushDown() throws ExecutionException, InterruptedException {
        createMessages(this.topicNameOffset);
        DistributedQueryRunner distributedQueryRunner = getDistributedQueryRunner();
        Assert.assertEquals(getQueryInfo(distributedQueryRunner, distributedQueryRunner.executeWithQueryId(getSession(), String.format("SELECT count(*) FROM default.%s WHERE _partition_offset between 2 and 10", this.topicNameOffset))).getQueryStats().getProcessedInputPositions(), 18L);
        Assert.assertEquals(getQueryInfo(distributedQueryRunner, distributedQueryRunner.executeWithQueryId(getSession(), String.format("SELECT count(*) FROM default.%s WHERE _partition_offset > 2 and _partition_offset < 10", this.topicNameOffset))).getQueryStats().getProcessedInputPositions(), 14L);
        Assert.assertEquals(getQueryInfo(distributedQueryRunner, distributedQueryRunner.executeWithQueryId(getSession(), String.format("SELECT count(*) FROM default.%s WHERE _partition_offset = 3", this.topicNameOffset))).getQueryStats().getProcessedInputPositions(), 2L);
    }

    @Test
    public void testTimestampCreateTimeModePushDown() throws Exception {
        RecordMessage createTimestampTestMessages = createTimestampTestMessages(this.topicNameCreateTime);
        DistributedQueryRunner distributedQueryRunner = getDistributedQueryRunner();
        String format = String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", this.topicNameCreateTime, createTimestampTestMessages.getStartTime(), createTimestampTestMessages.getEndTime());
        ResultWithQueryId<MaterializedResult> executeWithQueryId = distributedQueryRunner.executeWithQueryId(getSession(), format);
        Assertions.assertThat(getQueryInfo(distributedQueryRunner, executeWithQueryId).getQueryStats().getProcessedInputPositions()).describedAs("with dump:\n%s", new Object[]{buildDebugDumpString(this.topicNameCreateTime, createTimestampTestMessages, distributedQueryRunner, format, executeWithQueryId)}).isEqualTo(998L);
        ResultWithQueryId<MaterializedResult> executeWithQueryId2 = distributedQueryRunner.executeWithQueryId(Session.builder(getSession()).setSystemProperty("kafka.timestamp_upper_bound_force_push_down_enabled", "true").build(), format);
        Assertions.assertThat(getQueryInfo(distributedQueryRunner, executeWithQueryId2).getQueryStats().getProcessedInputPositions()).describedAs("with dump:\n%s", new Object[]{buildDebugDumpString(this.topicNameCreateTime, createTimestampTestMessages, distributedQueryRunner, format, executeWithQueryId2)}).isEqualTo(2L);
    }

    @Test
    public void testTimestampLogAppendModePushDown() throws Exception {
        RecordMessage createTimestampTestMessages = createTimestampTestMessages(this.topicNameLogAppend);
        DistributedQueryRunner distributedQueryRunner = getDistributedQueryRunner();
        String format = String.format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s' and _timestamp < timestamp '%s'", this.topicNameLogAppend, createTimestampTestMessages.getStartTime(), createTimestampTestMessages.getEndTime());
        ResultWithQueryId<MaterializedResult> executeWithQueryId = distributedQueryRunner.executeWithQueryId(getSession(), format);
        Assertions.assertThat(getQueryInfo(distributedQueryRunner, executeWithQueryId).getQueryStats().getProcessedInputPositions()).describedAs("with dump:\n%s", new Object[]{buildDebugDumpString(this.topicNameLogAppend, createTimestampTestMessages, distributedQueryRunner, format, executeWithQueryId)}).isEqualTo(2L);
    }

    private String buildDebugDumpString(String str, RecordMessage recordMessage, DistributedQueryRunner distributedQueryRunner, String str2, ResultWithQueryId<MaterializedResult> resultWithQueryId) {
        StringBuilder sb = new StringBuilder();
        sb.append("Main messages:").append("\n");
        for (int i = 0; i < recordMessage.getTestMessageSignatures().size(); i++) {
            sb.append("  ").append(recordMessage.getTestMessageSignatures().get(i));
            if (i == TIMESTAMP_TEST_START_INDEX) {
                sb.append(", startTime:").append(recordMessage.getStartTime());
            } else if (i == TIMESTAMP_TEST_END_INDEX) {
                sb.append(", endTime:").append(recordMessage.getEndTime());
            }
            sb.append("\n");
        }
        sb.append(String.format("test sql:%s\n", str2));
        sb.append("test sql result:").append("\n").append(buildResultsDebugDumpString((MaterializedResult) resultWithQueryId.getResult()));
        sb.append("data check result:").append("\n").append(buildResultsDebugDumpString(distributedQueryRunner.execute(getSession(), String.format("SELECT _partition_id,_partition_offset,_timestamp FROM default.%s WHERE _partition_offset between %s and %s order by _partition_id, _timestamp", str, recordMessage.getStartOffset(), Long.valueOf(recordMessage.getStartOffset().longValue() + 1000))))).append("\n");
        return sb.toString();
    }

    private static String buildResultsDebugDumpString(MaterializedResult materializedResult) {
        StringBuilder sb = new StringBuilder();
        Iterator it = materializedResult.iterator();
        while (it.hasNext()) {
            sb.append(String.format("  dump values:%s\n", ((MaterializedRow) it.next()).toString()));
        }
        return sb.toString();
    }

    private static QueryInfo getQueryInfo(DistributedQueryRunner distributedQueryRunner, ResultWithQueryId<MaterializedResult> resultWithQueryId) {
        return distributedQueryRunner.getCoordinator().getQueryManager().getFullQueryInfo(resultWithQueryId.getQueryId());
    }

    private RecordMessage createTimestampTestMessages(String str) throws Exception {
        String str2 = null;
        String str3 = null;
        long j = -1;
        ImmutableList.Builder builder = ImmutableList.builder();
        Future immediateFuture = Futures.immediateFuture((Object) null);
        long j2 = -1;
        Thread.sleep(100L);
        KafkaProducer createProducer = this.testingKafka.createProducer();
        for (long j3 = 0; j3 < 1000; j3++) {
            try {
                immediateFuture = createProducer.send(new ProducerRecord(str, Long.valueOf(j3), Long.valueOf(j3)));
                if (j3 < 6) {
                    RecordMetadata recordMetadata = (RecordMetadata) immediateFuture.get();
                    Assert.assertTrue(j2 != recordMetadata.timestamp());
                    j2 = recordMetadata.timestamp();
                    builder.add(String.format("timestamp:%s: partitionId:%s, offset:%s", Long.valueOf(recordMetadata.timestamp()), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())));
                    if (j3 == 0) {
                        j = recordMetadata.offset();
                    } else if (j3 == 2) {
                        str2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.ofInstant(Instant.ofEpochMilli(recordMetadata.timestamp()), ZoneId.of("UTC")));
                    } else if (j3 == 4) {
                        str3 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.ofInstant(Instant.ofEpochMilli(recordMetadata.timestamp()), ZoneId.of("UTC")));
                    }
                    Thread.sleep(100L);
                }
            } catch (Throwable th) {
                if (createProducer != null) {
                    try {
                        createProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (createProducer != null) {
            createProducer.close();
        }
        immediateFuture.get();
        Objects.requireNonNull(str2, "startTime result is none");
        Objects.requireNonNull(str3, "endTime result is none");
        return new RecordMessage(str2, str3, Long.valueOf(j), builder.build());
    }

    private void createMessages(String str) throws ExecutionException, InterruptedException {
        Future immediateFuture = Futures.immediateFuture((Object) null);
        KafkaProducer createProducer = this.testingKafka.createProducer();
        for (long j = 0; j < 1000; j++) {
            try {
                immediateFuture = createProducer.send(new ProducerRecord(str, Long.valueOf(j), Long.valueOf(j)));
            } catch (Throwable th) {
                if (createProducer != null) {
                    try {
                        createProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (createProducer != null) {
            createProducer.close();
        }
        immediateFuture.get();
    }
}
