package io.trino.plugin.accumulo.io;

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.Slice;
import io.trino.plugin.accumulo.AccumuloErrorCode;
import io.trino.plugin.accumulo.Types;
import io.trino.plugin.accumulo.index.Indexer;
import io.trino.plugin.accumulo.metadata.AccumuloTable;
import io.trino.plugin.accumulo.model.AccumuloColumnHandle;
import io.trino.plugin.accumulo.model.Field;
import io.trino.plugin.accumulo.model.Row;
import io.trino.plugin.accumulo.serializers.AccumuloRowSerializer;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DateType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeUtils;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;

/* loaded from: input_file:io/trino/plugin/accumulo/io/AccumuloPageSink.class */
public class AccumuloPageSink implements ConnectorPageSink {
    public static final Text ROW_ID_COLUMN = new Text("___ROW___");
    private final AccumuloRowSerializer serializer;
    private final BatchWriter writer;
    private final Optional<Indexer> indexer;
    private final List<AccumuloColumnHandle> columns;
    private final int rowIdOrdinal;
    private long numRows;

    public AccumuloPageSink(Connector connector, AccumuloTable accumuloTable, String str) {
        Objects.requireNonNull(accumuloTable, "table is null");
        this.columns = accumuloTable.getColumns();
        Optional findAny = this.columns.stream().filter(accumuloColumnHandle -> {
            return accumuloColumnHandle.getName().equals(accumuloTable.getRowId());
        }).map((v0) -> {
            return v0.getOrdinal();
        }).findAny();
        if (findAny.isEmpty()) {
            throw new TrinoException(StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR, "Row ID ordinal not found");
        }
        this.rowIdOrdinal = ((Integer) findAny.get()).intValue();
        this.serializer = accumuloTable.getSerializerInstance();
        try {
            BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
            this.writer = connector.createBatchWriter(accumuloTable.getFullTableName(), batchWriterConfig);
            if (accumuloTable.isIndexed()) {
                this.indexer = Optional.of(new Indexer(connector, connector.securityOperations().getUserAuthorizations(str), accumuloTable, batchWriterConfig));
            } else {
                this.indexer = Optional.empty();
            }
        } catch (TableNotFoundException e) {
            throw new TrinoException(AccumuloErrorCode.ACCUMULO_TABLE_DNE, "Accumulo error when creating BatchWriter and/or Indexer, table does not exist", e);
        } catch (AccumuloException | AccumuloSecurityException e2) {
            throw new TrinoException(AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR, "Accumulo error when creating BatchWriter and/or Indexer", e2);
        }
    }

    public static Mutation toMutation(Row row, int i, List<AccumuloColumnHandle> list, AccumuloRowSerializer accumuloRowSerializer) {
        Text text = new Text();
        Field field = row.getField(i);
        if (field.isNull()) {
            throw new TrinoException(StandardErrorCode.INVALID_FUNCTION_ARGUMENT, "Column mapped as the Accumulo row ID cannot be null");
        }
        setText(field, text, accumuloRowSerializer);
        Mutation mutation = new Mutation(text);
        mutation.put(ROW_ID_COLUMN, ROW_ID_COLUMN, new Value(text.copyBytes()));
        for (AccumuloColumnHandle accumuloColumnHandle : list) {
            if (accumuloColumnHandle.getOrdinal() != i && !row.getField(accumuloColumnHandle.getOrdinal()).isNull()) {
                setText(row.getField(accumuloColumnHandle.getOrdinal()), text, accumuloRowSerializer);
                mutation.put(accumuloColumnHandle.getFamily().get(), accumuloColumnHandle.getQualifier().get(), new Value(text.copyBytes()));
            }
        }
        return mutation;
    }

    private static void setText(Field field, Text text, AccumuloRowSerializer accumuloRowSerializer) {
        Type type = field.getType();
        if (Types.isArrayType(type)) {
            accumuloRowSerializer.setArray(text, type, field.getArray());
            return;
        }
        if (Types.isMapType(type)) {
            accumuloRowSerializer.setMap(text, type, field.getMap());
            return;
        }
        if (type.equals(BigintType.BIGINT)) {
            accumuloRowSerializer.setLong(text, field.getLong());
            return;
        }
        if (type.equals(BooleanType.BOOLEAN)) {
            accumuloRowSerializer.setBoolean(text, field.getBoolean());
            return;
        }
        if (type.equals(DateType.DATE)) {
            accumuloRowSerializer.setDate(text, field.getDate());
            return;
        }
        if (type.equals(DoubleType.DOUBLE)) {
            accumuloRowSerializer.setDouble(text, field.getDouble());
            return;
        }
        if (type.equals(IntegerType.INTEGER)) {
            accumuloRowSerializer.setInt(text, field.getInt());
            return;
        }
        if (type.equals(RealType.REAL)) {
            accumuloRowSerializer.setFloat(text, field.getFloat());
            return;
        }
        if (type.equals(SmallintType.SMALLINT)) {
            accumuloRowSerializer.setShort(text, field.getShort());
            return;
        }
        if (type.equals(TimeType.TIME)) {
            accumuloRowSerializer.setTime(text, field.getTime());
            return;
        }
        if (type.equals(TinyintType.TINYINT)) {
            accumuloRowSerializer.setByte(text, field.getByte());
            return;
        }
        if (type.equals(TimestampType.TIMESTAMP_MILLIS)) {
            accumuloRowSerializer.setTimestamp(text, field.getTimestamp());
        } else if (type.equals(VarbinaryType.VARBINARY)) {
            accumuloRowSerializer.setVarbinary(text, field.getVarbinary());
        } else {
            if (!(type instanceof VarcharType)) {
                throw new UnsupportedOperationException("Unsupported type " + type);
            }
            accumuloRowSerializer.setVarchar(text, field.getVarchar());
        }
    }

    public CompletableFuture<?> appendPage(Page page) {
        for (int i = 0; i < page.getPositionCount(); i++) {
            Row row = new Row();
            for (int i2 = 0; i2 < page.getChannelCount(); i2++) {
                Type type = this.columns.get(i2).getType();
                row.addField(TypeUtils.readNativeValue(type, page.getBlock(i2), i), type);
            }
            try {
                Mutation mutation = toMutation(row, this.rowIdOrdinal, this.columns, this.serializer);
                this.writer.addMutation(mutation);
                if (this.indexer.isPresent()) {
                    this.indexer.get().index(mutation);
                }
                this.numRows++;
                if (this.numRows % 100000 == 0) {
                    flush();
                }
            } catch (MutationsRejectedException e) {
                throw new TrinoException(AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR, "Mutation rejected by server", e);
            }
        }
        return NOT_BLOCKED;
    }

    public CompletableFuture<Collection<Slice>> finish() {
        try {
            this.writer.flush();
            this.writer.close();
            if (this.indexer.isPresent()) {
                this.indexer.get().close();
            }
            return CompletableFuture.completedFuture(ImmutableList.of());
        } catch (MutationsRejectedException e) {
            throw new TrinoException(AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR, "Mutation rejected by server on flush", e);
        }
    }

    public void abort() {
        MoreFutures.getFutureValue(finish());
    }

    private void flush() {
        try {
            if (this.indexer.isPresent()) {
                this.indexer.get().flush();
            }
            this.writer.flush();
        } catch (MutationsRejectedException e) {
            throw new TrinoException(AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR, "Mutation rejected by server on flush", e);
        }
    }
}
