package io.debezium.transforms.outbox;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.Configuration;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/transforms/outbox/AbstractEventRouterTest.class */
public abstract class AbstractEventRouterTest<T extends SourceConnector> extends AbstractConnectorTest {
    protected EventRouter<SourceRecord> outboxEventRouter;

    protected abstract Class<T> getConnectorClass();

    protected abstract JdbcConnection databaseConnection();

    protected abstract Configuration.Builder getConfigurationBuilder(boolean z);

    protected abstract String topicName();

    protected abstract String tableName();

    protected abstract String getSchemaNamePrefix();

    protected abstract Schema getPayloadSchema();

    protected abstract void createTable() throws Exception;

    protected abstract void alterTableWithExtra4Fields() throws Exception;

    protected abstract void alterTableWithTimestampField() throws Exception;

    protected abstract void alterTableModifyPayload() throws Exception;

    protected abstract String getAdditionalFieldValues(boolean z);

    protected abstract String getAdditionalFieldValuesTimestampOnly();

    protected abstract String createInsert(String str, String str2, String str3, String str4, String str5, String str6);

    protected abstract void waitForSnapshotCompleted() throws InterruptedException;

    protected abstract void waitForStreamingStarted() throws InterruptedException;

    @Before
    public void beforeEach() throws Exception {
        createTable();
        this.outboxEventRouter = new EventRouter<>();
        this.outboxEventRouter.configure(Collections.emptyMap());
    }

    @After
    public void afterEach() throws Exception {
        stopConnector();
        assertNoRecordsToConsume();
        this.outboxEventRouter.close();
    }

    @Test
    @FixFor({"DBZ-1169", "DBZ-3940"})
    public void shouldConsumeRecordsFromInsert() throws Exception {
        startConnectorWithInitialSnapshotRecord();
        databaseConnection().execute(new String[]{createInsert("59a42efd-b015-44a9-9dde-cb36d9002425", "UserCreated", "User", "10711fa5", "{}", "")});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        SourceRecord apply = this.outboxEventRouter.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.User");
        Assertions.assertThat(apply.keySchema()).isEqualTo(Schema.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("10711fa5");
        Assertions.assertThat(apply.value()).isInstanceOf(String.class);
        Assertions.assertThat(new ObjectMapper().readTree((String) apply.value()).get("email")).isNull();
    }

    @Test
    @FixFor({"DBZ-1385", "DBZ-3940"})
    public void shouldSendEventTypeAsHeader() throws Exception {
        startConnectorWithInitialSnapshotRecord();
        databaseConnection().execute(new String[]{createInsert("59a42efd-b015-44a9-9dde-cb36d9002425", "UserCreated", "User", "10711fa5", "{\"email\": \"gh@mefi.in\"}", "")});
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), getFieldEventType() + ":header:eventType");
        this.outboxEventRouter.configure(hashMap);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        SourceRecord apply = this.outboxEventRouter.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.User");
        Object value = apply.value();
        Assertions.assertThat(apply.headers().lastWithName("eventType").value()).isEqualTo("UserCreated");
        Assertions.assertThat(value).isInstanceOf(String.class);
        Assertions.assertThat(new ObjectMapper().readTree((String) value).get("email").asText()).isEqualTo("gh@mefi.in");
    }

    @Test
    @FixFor({"DBZ-2014", "DBZ-3940"})
    public void shouldSendEventTypeAsValue() throws Exception {
        startConnectorWithInitialSnapshotRecord();
        databaseConnection().execute(new String[]{createInsert("d4da2428-8b19-11ea-bc55-0242ac130003", "UserCreated", "User", "9948fcad", "{\"email\": \"gh@mefi.in\"}", "")});
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), getFieldEventType() + ":envelope:eventType");
        this.outboxEventRouter.configure(hashMap);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        SourceRecord apply = this.outboxEventRouter.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.User");
        Struct requireStruct = Requirements.requireStruct(apply.value(), "test payload");
        Assertions.assertThat(requireStruct.getString("eventType")).isEqualTo("UserCreated");
        Assertions.assertThat(new ObjectMapper().readTree(requireStruct.getString("payload")).get("email").asText()).isEqualTo("gh@mefi.in");
    }

    @Test
    @FixFor({"DBZ-2014", "DBZ-3940"})
    public void shouldRespectJsonFormatAsString() throws Exception {
        startConnectorWithInitialSnapshotRecord();
        databaseConnection().execute(new String[]{createInsert("f9171eb6-19f3-4579-9206-0e179d2ebad7", "UserCreated", "User", "7bdf2e9e", "{\"email\": \"gh@mefi.in\"}", "")});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        SourceRecord apply = this.outboxEventRouter.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Assertions.assertThat(apply.value()).isInstanceOf(String.class);
        Assertions.assertThat(new ObjectMapper().readTree((String) apply.value()).get("email").asText()).isEqualTo("gh@mefi.in");
    }

    @Test
    @FixFor({"DBZ-1169", "DBZ-3940"})
    public void shouldSupportAllFeatures() throws Exception {
        startConnectorWithNoSnapshot();
        StringBuilder sb = new StringBuilder();
        sb.append(envelope(getFieldSchemaVersion(), "eventVersion")).append(",");
        sb.append(envelope(getFieldAggregateType(), "aggregateType")).append(",");
        sb.append(envelope(getSomeBoolType(), "someBoolType")).append(",");
        sb.append(header(getSomeBoolType(), null)).append(",");
        sb.append(envelope(getIsDeleted(), "deleted"));
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), getFieldSchemaVersion());
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), getFieldEventTimestamp());
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), sb.toString());
        this.outboxEventRouter.configure(hashMap);
        alterTableWithExtra4Fields();
        databaseConnection().execute(new String[]{createInsert("f9171eb6-19f3-4579-9206-0e179d2ebad7", "UserUpdated", "UserEmail", "7bdf2e9e", "{\"email\": \"gh@mefi.in\"}", getAdditionalFieldValues(false))});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        SourceRecord apply = this.outboxEventRouter.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        VerifyRecord.assertConnectSchemasAreEqual((String) null, apply.valueSchema(), SchemaBuilder.struct().version(1).name(getSchemaNamePrefix() + "UserEmail.Value").field("payload", getPayloadSchema()).field("eventVersion", Schema.INT32_SCHEMA).field("aggregateType", Schema.STRING_SCHEMA).field("someBoolType", Schema.BOOLEAN_SCHEMA).field("deleted", SchemaBuilder.bool().optional().defaultValue(false).build()).build());
        Assertions.assertThat(apply.timestamp()).isEqualTo(1553460779000L);
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.UserEmail");
        Headers headers = apply.headers();
        Assertions.assertThat(headers).hasSize(2);
        Header lastWithName = headers.lastWithName("id");
        Assertions.assertThat(lastWithName.schema()).isEqualTo(getIdSchema());
        Assertions.assertThat(lastWithName.value()).isEqualTo(getId("f9171eb6-19f3-4579-9206-0e179d2ebad7"));
        Header lastWithName2 = headers.lastWithName(getSomeBoolType());
        Assertions.assertThat(lastWithName2.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
        Assertions.assertThat(lastWithName2.value()).isEqualTo(true);
        Assertions.assertThat(apply.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("7bdf2e9e");
        Struct requireStruct = Requirements.requireStruct(apply.value(), "test envelope");
        Assertions.assertThat(requireStruct.getString("aggregateType")).isEqualTo("UserEmail");
        Assertions.assertThat(requireStruct.getInt32("eventVersion")).isEqualTo(1);
        Assertions.assertThat(requireStruct.get("someBoolType")).isEqualTo(true);
        Assertions.assertThat(requireStruct.get("deleted")).isEqualTo(false);
    }

    @Test
    @FixFor({"DBZ-1707", "DBZ-3940"})
    public void shouldConvertMicrosecondsTimestampToMilliseconds() throws Exception {
        startConnectorWithNoSnapshot();
        this.outboxEventRouter = new EventRouter<>();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), getFieldEventTimestamp());
        this.outboxEventRouter.configure(hashMap);
        alterTableWithTimestampField();
        databaseConnection().execute(new String[]{createInsert("f9171eb6-19f3-4579-9206-0e179d2ebad7", "UserUpdated", "UserEmail", "7bdf2e9e", "{\"email\": \"gh@mefi.in\"}", getAdditionalFieldValuesTimestampOnly())});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        Assertions.assertThat(this.outboxEventRouter.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0)).timestamp()).isEqualTo(1553460779000L);
    }

    @Test
    @FixFor({"DBZ-1320", "DBZ-3940"})
    public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {
        startConnectorWithNoSnapshot();
        StringBuilder sb = new StringBuilder();
        sb.append(envelope(getFieldSchemaVersion(), "eventVersion")).append(",");
        sb.append(envelope(getFieldAggregateType(), "agregateType")).append(",");
        sb.append(envelope(getSomeBoolType(), "someBoolType")).append(",");
        sb.append(header(getSomeBoolType(), null)).append(",");
        sb.append(envelope(getIsDeleted(), "deleted"));
        this.outboxEventRouter = new EventRouter<>();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), getFieldSchemaVersion());
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), getFieldEventTimestamp());
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), sb.toString());
        this.outboxEventRouter.configure(hashMap);
        alterTableWithExtra4Fields();
        databaseConnection().execute(new String[]{createInsert("a9d76f78-bda6-48d3-97ed-13a146163218", "UserUpdated", "UserEmail", "a9d76f78", null, getAdditionalFieldValues(true))});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
        SourceRecord apply = this.outboxEventRouter.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Assertions.assertThat(apply.valueSchema()).isNotNull();
        Assertions.assertThat(apply.timestamp()).isEqualTo(1553460779000L);
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.UserEmail");
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(2);
        Header lastWithName = headers.lastWithName("id");
        Assertions.assertThat(lastWithName.schema()).isEqualTo(getIdSchema());
        Assertions.assertThat(lastWithName.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));
        Header lastWithName2 = headers.lastWithName(getSomeBoolType());
        Assertions.assertThat(lastWithName2.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
        Assertions.assertThat(lastWithName2.value()).isEqualTo(true);
        Assertions.assertThat(apply.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("a9d76f78");
        System.out.println(apply);
        Assertions.assertThat(apply.value()).isNotNull();
        Assertions.assertThat(((Struct) apply.value()).get("payload")).isNull();
    }

    @Test
    @FixFor({"DBZ-1320", "DBZ-3940"})
    public void shouldProduceTombstoneEventForNullPayload() throws Exception {
        startConnectorWithNoSnapshot();
        StringBuilder sb = new StringBuilder();
        sb.append(envelope(getFieldSchemaVersion(), "eventVersion")).append(",");
        sb.append(envelope(getFieldAggregateType(), "aggregateType")).append(",");
        sb.append(envelope(getSomeBoolType(), "someBoolType")).append(",");
        sb.append(header(getSomeBoolType(), null)).append(",");
        sb.append(envelope(getIsDeleted(), "deleted"));
        this.outboxEventRouter = new EventRouter<>();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), getFieldSchemaVersion());
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), getFieldEventTimestamp());
        hashMap.put(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), "true");
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), sb.toString());
        this.outboxEventRouter.configure(hashMap);
        alterTableWithExtra4Fields();
        databaseConnection().execute(new String[]{createInsert("a9d76f78-bda6-48d3-97ed-13a146163218", "UserUpdated", "UserEmail", "a9d76f78", null, getAdditionalFieldValues(true))});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
        SourceRecord apply = this.outboxEventRouter.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Assertions.assertThat(apply.valueSchema()).isNull();
        Assertions.assertThat(apply.timestamp()).isEqualTo(1553460779000L);
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.UserEmail");
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(2);
        Header lastWithName = headers.lastWithName("id");
        Assertions.assertThat(lastWithName.schema()).isEqualTo(getIdSchema());
        Assertions.assertThat(lastWithName.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));
        Header lastWithName2 = headers.lastWithName(getSomeBoolType());
        Assertions.assertThat(lastWithName2.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
        Assertions.assertThat(lastWithName2.value()).isEqualTo(true);
        Assertions.assertThat(apply.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("a9d76f78");
        Assertions.assertThat(apply.value()).isNull();
    }

    @Test
    @FixFor({"DBZ-1320", "DBZ-3940"})
    public void shouldProduceTombstoneEventForEmptyPayload() throws Exception {
        startConnectorWithNoSnapshot();
        this.outboxEventRouter = new EventRouter<>();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), "true");
        this.outboxEventRouter.configure(hashMap);
        alterTableModifyPayload();
        databaseConnection().execute(new String[]{createInsert("a9d76f78-bda6-48d3-97ed-13a146163218", "UserUpdated", "UserEmail", "a9d76f78", "", null)});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
        SourceRecord apply = this.outboxEventRouter.apply(consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Assertions.assertThat(apply.valueSchema()).isNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.UserEmail");
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(1);
        Header lastWithName = headers.lastWithName("id");
        Assertions.assertThat(lastWithName.schema()).isEqualTo(getIdSchema());
        Assertions.assertThat(lastWithName.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));
        Assertions.assertThat(apply.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("a9d76f78");
        Assertions.assertThat(apply.value()).isNull();
    }

    protected String getFieldEventType() {
        return EventRouterConfigDefinition.FIELD_EVENT_TYPE.defaultValueAsString();
    }

    protected String getFieldSchemaVersion() {
        return "version";
    }

    protected String getFieldEventTimestamp() {
        return "createdat";
    }

    protected String getFieldAggregateType() {
        return "aggregatetype";
    }

    protected String getSomeBoolType() {
        return "somebooltype";
    }

    protected String getIsDeleted() {
        return "is_deleted";
    }

    protected Schema getIdSchema() {
        return SchemaBuilder.STRING_SCHEMA;
    }

    protected Object getId(String str) {
        return str;
    }

    private String envelope(String str, String str2) {
        return str + ":envelope:" + str2;
    }

    private String header(String str, String str2) {
        return str + ":header" + ((str2 == null || str2.length() <= 0) ? "" : ":" + str2);
    }

    private void startConnectorWithInitialSnapshotRecord() throws Exception {
        doInsert(createInsert("70f52ae3-f671-4bac-ae62-1b9be6e73700", "UserCreated", "User", "10711faf", "{}", ""));
        start(getConnectorClass(), getConfigurationBuilder(true).build());
        assertConnectorIsRunning();
        waitForSnapshotCompleted();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName())).hasSize(1);
    }

    private void startConnectorWithNoSnapshot() throws Exception {
        start(getConnectorClass(), getConfigurationBuilder(false).build());
        assertConnectorIsRunning();
        waitForStreamingStarted();
        assertNoRecordsToConsume();
    }

    private void doInsert(String str) throws SQLException {
        databaseConnection().execute(new String[]{str});
    }
}
