package io.debezium.connector.db2as400;

import io.debezium.DebeziumException;
import io.debezium.connector.db2as400.As400RpcConnection;
import io.debezium.data.Envelope;
import io.debezium.ibmi.db2.journal.retrieve.JournalEntryType;
import io.debezium.ibmi.db2.journal.retrieve.JournalProcessedPosition;
import io.debezium.ibmi.db2.journal.retrieve.exception.FatalException;
import io.debezium.ibmi.db2.journal.retrieve.exception.InvalidPositionException;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.io.IOException;
import java.sql.SQLNonTransientConnectionException;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/db2as400/As400StreamingChangeEventSource.class */
public class As400StreamingChangeEventSource implements StreamingChangeEventSource<As400Partition, As400OffsetContext> {
    private static final String NO_TRANSACTION_ID = "00000000000000000000";
    private final As400RpcConnection dataConnection;
    private final As400JdbcConnection jdbcConnection;
    private final EventDispatcher<As400Partition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final As400DatabaseSchema schema;
    private final Duration pollInterval;
    private final As400ConnectorConfig connectorConfig;
    private final String database;
    private static final Logger log = LoggerFactory.getLogger(As400StreamingChangeEventSource.class);
    private static Set<Character> alwaysProcess = (Set) Stream.of((Object[]) new Character[]{'J', 'C'}).collect(Collectors.toCollection(HashSet::new));
    private long connectionTime = -1;
    private final long MIN_DISCONNECT_TIME_MS = 30000;
    private final HashMap<String, Object[]> beforeMap = new HashMap<>();
    private final Map<String, TransactionContext> txMap = new HashMap();

    /* renamed from: io.debezium.connector.db2as400.As400StreamingChangeEventSource$1, reason: invalid class name */
    /* loaded from: input_file:io/debezium/connector/db2as400/As400StreamingChangeEventSource$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType = new int[JournalEntryType.values().length];

        static {
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.START_COMMIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.END_COMMIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.FILE_CHANGE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.FILE_CREATED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.BEFORE_IMAGE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.AFTER_IMAGE.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.ADD_ROW1.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.ADD_ROW2.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.DELETE_ROW1.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[JournalEntryType.DELETE_ROW2.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    public As400StreamingChangeEventSource(As400ConnectorConfig as400ConnectorConfig, As400RpcConnection as400RpcConnection, As400JdbcConnection as400JdbcConnection, EventDispatcher<As400Partition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, As400DatabaseSchema as400DatabaseSchema) {
        this.connectorConfig = as400ConnectorConfig;
        this.dataConnection = as400RpcConnection;
        this.jdbcConnection = as400JdbcConnection;
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = as400DatabaseSchema;
        this.pollInterval = as400ConnectorConfig.getPollInterval();
        this.database = as400JdbcConnection.getRealDatabaseName();
    }

    private void cacheBefore(TableId tableId, Object[] objArr) {
        this.beforeMap.put(String.format("%s-%s", tableId.schema(), tableId.table()), objArr);
    }

    private Object[] getBefore(TableId tableId) {
        String format = String.format("%s-%s", tableId.schema(), tableId.table());
        Object[] remove = this.beforeMap.remove(format);
        if (remove == null) {
            log.debug("before image not found for {}", format);
        } else {
            log.debug("found before image for {}", format);
        }
        return remove;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, As400Partition as400Partition, As400OffsetContext as400OffsetContext) throws InterruptedException {
        Metronome sleeper = Metronome.sleeper(this.pollInterval, this.clock);
        int i = 0;
        WatchDog watchDog = new WatchDog(Thread.currentThread(), this.connectorConfig.getMaxRetrievalTimeout().intValue());
        watchDog.start();
        while (changeEventSourceContext.isRunning()) {
            try {
                try {
                    try {
                        JournalProcessedPosition journalProcessedPosition = new JournalProcessedPosition(as400OffsetContext.getPosition());
                        if (!this.dataConnection.getJournalEntries(changeEventSourceContext, as400OffsetContext, processJournalEntries(as400Partition, as400OffsetContext), watchDog)) {
                            sleeper.pause();
                        }
                        if (!as400OffsetContext.getPosition().equals(journalProcessedPosition)) {
                            this.dispatcher.dispatchHeartbeatEvent(as400Partition, as400OffsetContext);
                        }
                        i = 0;
                    } catch (InterruptedException e) {
                        if (changeEventSourceContext.isRunning()) {
                            log.debug("Interrupted", e);
                        }
                    }
                } catch (FatalException e2) {
                    log.error("Unable to process offset {}", as400OffsetContext.getPosition(), e2);
                    throw new DebeziumException("Unable to process offset " + as400OffsetContext.getPosition(), e2);
                    break;
                } catch (InvalidPositionException e3) {
                    log.error("Invalid position resetting offsets to beginning", e3);
                    as400OffsetContext.setPosition(new JournalProcessedPosition());
                } catch (IOException | SQLNonTransientConnectionException e4) {
                    log.error("Connection failed offset {} retry {}", new Object[]{as400OffsetContext.getPosition(), Integer.valueOf(i), e4});
                    closeAndReconnect();
                    i++;
                    sleeper.pause();
                } catch (InterruptedException e5) {
                    if (changeEventSourceContext.isRunning()) {
                        log.error("Interrupted processing offset {} retry {}", as400OffsetContext.getPosition(), Integer.valueOf(i));
                        closeAndReconnect();
                        i++;
                        sleeper.pause();
                    }
                } catch (Exception e6) {
                    log.error("Failed to process offset {} retry {}", new Object[]{as400OffsetContext.getPosition(), Integer.valueOf(i), e6});
                    i++;
                    sleeper.pause();
                }
            } finally {
                watchDog.stop();
            }
        }
    }

    public void rateLimittedClose() {
        if (System.currentTimeMillis() - this.connectionTime > 30000) {
            closeAndReconnect();
        } else {
            log.debug("Only connected since {} ignoring disconnect", new Date(this.connectionTime));
        }
    }

    public void closeAndReconnect() {
        try {
            this.dataConnection.close();
            this.dataConnection.m11connection();
        } catch (Exception e) {
            log.error("Failure reconnecting command", e);
        }
        try {
            this.jdbcConnection.close();
            this.jdbcConnection.connect();
        } catch (Exception e2) {
            log.error("Failure reconnecting sql", e2);
        }
        this.connectionTime = System.currentTimeMillis();
    }

    private As400RpcConnection.BlockingReceiverConsumer processJournalEntries(As400Partition as400Partition, As400OffsetContext as400OffsetContext) throws IOException, SQLNonTransientConnectionException {
        return (bigInteger, retrieveJournal, entryHeader) -> {
            try {
                try {
                    JournalEntryType journalEntryType = entryHeader.getJournalEntryType();
                    if (journalEntryType == null || ignore(journalEntryType)) {
                        log.debug("excluding table {} entry type {}", entryHeader.getFile(), entryHeader.getEntryType());
                        return;
                    }
                    String file = entryHeader.getFile();
                    try {
                        file = this.jdbcConnection.getLongName(entryHeader.getLibrary(), entryHeader.getFile());
                    } catch (IllegalStateException e) {
                        log.error("failed to look up long name", e);
                    }
                    TableId tableId = new TableId(this.database, entryHeader.getLibrary(), file);
                    boolean isIncluded = this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId);
                    if (!alwaysProcess.contains(Character.valueOf(entryHeader.getJournalCode())) && !isIncluded) {
                        log.debug("excluding table {} journal code {}", tableId, Character.valueOf(entryHeader.getJournalCode()));
                        return;
                    }
                    log.debug("next event: {} - {} type: {} table: {}", new Object[]{entryHeader.getTime(), entryHeader.getSequenceNumber(), entryHeader.getEntryType(), tableId.table()});
                    switch (AnonymousClass1.$SwitchMap$io$debezium$ibmi$db2$journal$retrieve$JournalEntryType[journalEntryType.ordinal()]) {
                        case 1:
                            String bigInteger = entryHeader.getCommitCycle().toString();
                            log.debug("begin transaction: {}", bigInteger);
                            TransactionContext transactionContext = new TransactionContext();
                            transactionContext.beginTransaction(bigInteger);
                            this.txMap.put(bigInteger, transactionContext);
                            log.debug("start transaction id {} tx {} table {}", new Object[]{bigInteger, bigInteger, tableId});
                            this.dispatcher.dispatchTransactionStartedEvent(as400Partition, bigInteger, as400OffsetContext, entryHeader.getTime());
                            break;
                        case 2:
                            String bigInteger2 = entryHeader.getCommitCycle().toString();
                            TransactionContext remove = this.txMap.remove(bigInteger2);
                            log.debug("commit transaction id {} tx {} table {}", new Object[]{bigInteger, bigInteger2, tableId});
                            if (remove != null) {
                                remove.endTransaction();
                                this.dispatcher.dispatchTransactionCommittedEvent(as400Partition, as400OffsetContext, entryHeader.getTime());
                            }
                            break;
                        case 3:
                        case 4:
                            this.schema.clearCache(tableId.table(), tableId.schema());
                            this.schema.getRecordFormat(tableId.table(), tableId.schema());
                            break;
                        case 5:
                            tableId.schema();
                            cacheBefore(tableId, (Object[]) retrieveJournal.decode(this.schema.getFileDecoder()));
                            break;
                        case 6:
                            Object[] before = getBefore(tableId);
                            Object[] objArr = (Object[]) retrieveJournal.decode(this.schema.getFileDecoder());
                            as400OffsetContext.setSourceTime(entryHeader.getTime());
                            String bigInteger3 = entryHeader.getCommitCycle().toString();
                            as400OffsetContext.setTransaction(this.txMap.get(bigInteger3));
                            log.debug("update event id {} tx {} table {}", new Object[]{bigInteger, bigInteger3, tableId});
                            this.dispatcher.dispatchDataChangeEvent(as400Partition, tableId, new As400ChangeRecordEmitter(as400Partition, as400OffsetContext, Envelope.Operation.UPDATE, before, objArr, this.clock, this.connectorConfig));
                            break;
                        case 7:
                        case 8:
                            Object[] objArr2 = (Object[]) retrieveJournal.decode(this.schema.getFileDecoder());
                            as400OffsetContext.setSourceTime(entryHeader.getTime());
                            String bigInteger4 = entryHeader.getCommitCycle().toString();
                            TransactionContext transactionContext2 = this.txMap.get(bigInteger4);
                            as400OffsetContext.setTransaction(transactionContext2);
                            if (transactionContext2 != null) {
                                transactionContext2.event(tableId);
                            }
                            log.debug("insert event id {} tx {} table {}", new Object[]{as400OffsetContext.getPosition(), bigInteger4, tableId});
                            this.dispatcher.dispatchDataChangeEvent(as400Partition, tableId, new As400ChangeRecordEmitter(as400Partition, as400OffsetContext, Envelope.Operation.CREATE, null, objArr2, this.clock, this.connectorConfig));
                            break;
                        case 9:
                        case 10:
                            Object[] objArr3 = (Object[]) retrieveJournal.decode(this.schema.getFileDecoder());
                            as400OffsetContext.setSourceTime(entryHeader.getTime());
                            String bigInteger5 = entryHeader.getCommitCycle().toString();
                            TransactionContext transactionContext3 = this.txMap.get(bigInteger5);
                            as400OffsetContext.setTransaction(transactionContext3);
                            if (transactionContext3 != null) {
                                transactionContext3.event(tableId);
                            }
                            log.debug("delete event id {} tx {} table {}", new Object[]{as400OffsetContext.getPosition(), bigInteger5, tableId});
                            this.dispatcher.dispatchDataChangeEvent(as400Partition, tableId, new As400ChangeRecordEmitter(as400Partition, as400OffsetContext, Envelope.Operation.DELETE, objArr3, null, this.clock, this.connectorConfig));
                            break;
                    }
                } catch (Exception e2) {
                    log.error("Failed to process record", e2);
                }
            } catch (IOException | SQLNonTransientConnectionException e3) {
                throw e3;
            }
        };
    }

    private boolean ignore(JournalEntryType journalEntryType) {
        return journalEntryType == JournalEntryType.OPEN || journalEntryType == JournalEntryType.CLOSE;
    }
}
