package io.debezium.connector.db2as400;

import io.debezium.bean.StandardBeanNames;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.db2as400.As400OffsetContext;
import io.debezium.connector.db2as400.As400Partition;
import io.debezium.connector.db2as400.metrics.As400ChangeEventSourceMetricsFactory;
import io.debezium.connector.db2as400.metrics.As400StreamingChangeEventSourceMetrics;
import io.debezium.document.DocumentReader;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.processors.PostProcessorRegistryServiceProvider;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/db2as400/As400ConnectorTask.class */
public class As400ConnectorTask extends BaseSourceTask<As400Partition, As400OffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(As400ConnectorTask.class);
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private static final String CONTEXT_NAME = "db2as400-server-connector-task";
    private As400DatabaseSchema schema;

    public String version() {
        return Module.version();
    }

    protected ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> start(Configuration configuration) {
        LOGGER.info("starting connector task {}", version());
        As400ConnectorConfig as400ConnectorConfig = new As400ConnectorConfig(configuration);
        TopicNamingStrategy topicNamingStrategy = as400ConnectorConfig.getTopicNamingStrategy(As400ConnectorConfig.TOPIC_NAMING_STRATEGY, true);
        SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.AVRO;
        DefaultMainConnectionProvidingConnectionFactory defaultMainConnectionProvidingConnectionFactory = new DefaultMainConnectionProvidingConnectionFactory(() -> {
            return new As400JdbcConnection(as400ConnectorConfig.getJdbcConfig());
        });
        As400JdbcConnection as400JdbcConnection = (As400JdbcConnection) defaultMainConnectionProvidingConnectionFactory.mainConnection();
        this.schema = new As400DatabaseSchema(as400ConnectorConfig, as400JdbcConnection, topicNamingStrategy, schemaNameAdjuster);
        String contextName = as400ConnectorConfig.getContextName();
        String logicalName = as400ConnectorConfig.getLogicalName();
        Map customMetricTags = as400ConnectorConfig.getCustomMetricTags();
        As400DatabaseSchema as400DatabaseSchema = this.schema;
        Objects.requireNonNull(as400DatabaseSchema);
        CdcSourceTaskContext cdcSourceTaskContext = new CdcSourceTaskContext(contextName, logicalName, customMetricTags, as400DatabaseSchema::tableIds);
        as400ConnectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, configuration);
        as400ConnectorConfig.getBeanRegistry().add("ConnectorConfig", as400ConnectorConfig);
        as400ConnectorConfig.getBeanRegistry().add("Schema", this.schema);
        registerServiceProviders(as400ConnectorConfig.getServiceRegistry());
        this.queue = new ChangeEventQueue.Builder().pollInterval(as400ConnectorConfig.getPollInterval()).maxBatchSize(as400ConnectorConfig.getMaxBatchSize()).maxQueueSize(as400ConnectorConfig.getMaxQueueSize()).loggingContextSupplier(() -> {
            return cdcSourceTaskContext.configureLoggingContext(CONTEXT_NAME);
        }).build();
        ErrorHandler errorHandler = new ErrorHandler(As400RpcConnector.class, as400ConnectorConfig, this.queue, (ErrorHandler) null);
        Offsets previousOffsets = getPreviousOffsets(new As400Partition.Provider(as400ConnectorConfig), new As400OffsetContext.Loader(as400ConnectorConfig));
        As400OffsetContext as400OffsetContext = (As400OffsetContext) previousOffsets.getTheOnlyOffset();
        if (as400OffsetContext == null) {
            LOGGER.info("previous offsets not found creating from config");
            as400OffsetContext = new As400OffsetContext(as400ConnectorConfig);
        }
        SnapshotterService tryGetService = as400ConnectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);
        As400EventMetadataProvider as400EventMetadataProvider = new As400EventMetadataProvider();
        As400TaskContext as400TaskContext = new As400TaskContext(as400ConnectorConfig, this.schema, as400ConnectorConfig.getCustomMetricTags());
        As400ConnectorConfig config = as400TaskContext.getConfig();
        As400StreamingChangeEventSourceMetrics as400StreamingChangeEventSourceMetrics = new As400StreamingChangeEventSourceMetrics(as400TaskContext, this.queue, as400EventMetadataProvider);
        As400RpcConnection as400RpcConnection = new As400RpcConnection(as400ConnectorConfig, as400StreamingChangeEventSourceMetrics, as400JdbcConnection.shortIncludes(this.schema.getSchemaName(), config.tableIncludeList()));
        As400ConnectorConfig as400ConnectorConfig2 = as400ConnectorConfig;
        Set<String> additionalTablesInConfigTables = additionalTablesInConfigTables(as400ConnectorConfig, as400OffsetContext, config);
        if (additionalTablesInConfigTables.isEmpty()) {
            LOGGER.info("no new tables to stream");
        } else {
            String join = String.join(",", additionalTablesInConfigTables);
            LOGGER.info("found new tables to stream {}", join);
            as400ConnectorConfig2 = new As400ConnectorConfig(configuration, join);
            as400OffsetContext.hasNewTables(true);
        }
        EventDispatcher eventDispatcher = new EventDispatcher(as400ConnectorConfig, topicNamingStrategy, this.schema, this.queue, config.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, as400EventMetadataProvider, schemaNameAdjuster);
        As400ChangeEventSourceFactory as400ChangeEventSourceFactory = new As400ChangeEventSourceFactory(config, as400ConnectorConfig2, as400RpcConnection, defaultMainConnectionProvidingConnectionFactory, errorHandler, eventDispatcher, Clock.system(), this.schema, tryGetService);
        SignalProcessor signalProcessor = new SignalProcessor(As400RpcConnector.class, as400ConnectorConfig, Map.of(), getAvailableSignalChannels(), DocumentReader.defaultReader(), previousOffsets);
        List notificationChannels = getNotificationChannels();
        SchemaFactory schemaFactory = SchemaFactory.get();
        Objects.requireNonNull(eventDispatcher);
        ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> changeEventSourceCoordinator = new ChangeEventSourceCoordinator<>(previousOffsets, errorHandler, As400RpcConnector.class, config, as400ChangeEventSourceFactory, new As400ChangeEventSourceMetricsFactory(as400StreamingChangeEventSourceMetrics), eventDispatcher, this.schema, signalProcessor, new NotificationService(notificationChannels, as400ConnectorConfig, schemaFactory, eventDispatcher::enqueueNotification), tryGetService);
        changeEventSourceCoordinator.start(as400TaskContext, this.queue, as400EventMetadataProvider);
        return changeEventSourceCoordinator;
    }

    private Set<String> additionalTablesInConfigTables(As400ConnectorConfig as400ConnectorConfig, As400OffsetContext as400OffsetContext, As400ConnectorConfig as400ConnectorConfig2) {
        String tableIncludeList = as400ConnectorConfig2.tableIncludeList();
        String includeTables = as400OffsetContext.getIncludeTables();
        LOGGER.info("previous includes {} , new includes {}", includeTables, tableIncludeList);
        if (includeTables == null || tableIncludeList == null) {
            return Collections.emptySet();
        }
        Set<String> set = (Set) Stream.of((Object[]) tableIncludeList.split(",", -1)).collect(Collectors.toCollection(HashSet::new));
        set.removeAll(Set.of((Object[]) includeTables.split(",")));
        return set;
    }

    protected List<SourceRecord> doPoll() throws InterruptedException {
        return (List) this.queue.poll().stream().map((v0) -> {
            return v0.getRecord();
        }).collect(Collectors.toList());
    }

    protected void doStop() {
    }

    protected Iterable<Field> getAllConfigurationFields() {
        return As400ConnectorConfig.ALL_FIELDS;
    }

    protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
        serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider());
    }
}
