package io.trino.plugin.pinot;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.net.HostAndPort;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.JsonResponseHandler;
import io.airlift.http.client.Request;
import io.airlift.http.client.StaticBodyGenerator;
import io.airlift.http.client.jetty.JettyHttpClient;
import io.airlift.json.JsonCodec;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:io/trino/plugin/pinot/TestingPinotCluster.class */
public class TestingPinotCluster implements Closeable {
    private static final String BASE_IMAGE = "apachepinot/pinot:0.6.0";
    private static final String ZOOKEEPER_INTERNAL_HOST = "zookeeper";
    private static final JsonCodec<List<String>> LIST_JSON_CODEC = JsonCodec.listJsonCodec(String.class);
    private static final JsonCodec<PinotSuccessResponse> PINOT_SUCCESS_RESPONSE_JSON_CODEC = JsonCodec.jsonCodec(PinotSuccessResponse.class);
    public static final int CONTROLLER_PORT = 9000;
    public static final int BROKER_PORT = 8099;
    public static final int SERVER_ADMIN_PORT = 8097;
    public static final int SERVER_PORT = 8098;
    private final GenericContainer<?> controller;
    private final GenericContainer<?> broker;
    private final GenericContainer<?> server;
    private final GenericContainer<?> zookeeper;
    private final Closer closer = Closer.create();
    private final HttpClient httpClient = this.closer.register(new JettyHttpClient());

    /* loaded from: input_file:io/trino/plugin/pinot/TestingPinotCluster$PinotSuccessResponse.class */
    public static class PinotSuccessResponse {
        private final String status;

        @JsonCreator
        public PinotSuccessResponse(@JsonProperty("status") String str) {
            this.status = (String) Objects.requireNonNull(str, "status is null");
        }

        @JsonProperty
        public String getStatus() {
            return this.status;
        }
    }

    public TestingPinotCluster(Network network) {
        this.zookeeper = new GenericContainer(DockerImageName.parse("zookeeper:3.5.6")).withNetwork(network).withNetworkAliases(new String[]{ZOOKEEPER_INTERNAL_HOST}).withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(2181)).withExposedPorts(new Integer[]{2181});
        Closer closer = this.closer;
        GenericContainer<?> genericContainer = this.zookeeper;
        Objects.requireNonNull(genericContainer);
        closer.register(genericContainer::stop);
        this.controller = new GenericContainer(DockerImageName.parse(BASE_IMAGE)).withNetwork(network).withClasspathResourceMapping("/pinot-controller", "/var/pinot/controller/config", BindMode.READ_ONLY).withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-controller-log4j2.xml -Dplugins.dir=/opt/pinot/plugins").withCommand(new String[]{"StartController", "-configFileName", "/var/pinot/controller/config/pinot-controller.conf"}).withNetworkAliases(new String[]{"pinot-controller", "localhost"}).withExposedPorts(new Integer[]{Integer.valueOf(CONTROLLER_PORT)});
        Closer closer2 = this.closer;
        GenericContainer<?> genericContainer2 = this.controller;
        Objects.requireNonNull(genericContainer2);
        closer2.register(genericContainer2::stop);
        this.broker = new GenericContainer(DockerImageName.parse(BASE_IMAGE)).withNetwork(network).withClasspathResourceMapping("/pinot-broker", "/var/pinot/broker/config", BindMode.READ_ONLY).withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-broker-log4j2.xml -Dplugins.dir=/opt/pinot/plugins").withCommand(new String[]{"StartBroker", "-clusterName", PinotQueryRunner.PINOT_CATALOG, "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/broker/config/pinot-broker.conf"}).withNetworkAliases(new String[]{"pinot-broker", "localhost"}).withExposedPorts(new Integer[]{Integer.valueOf(BROKER_PORT)});
        Closer closer3 = this.closer;
        GenericContainer<?> genericContainer3 = this.broker;
        Objects.requireNonNull(genericContainer3);
        closer3.register(genericContainer3::stop);
        this.server = new GenericContainer(DockerImageName.parse(BASE_IMAGE)).withNetwork(network).withClasspathResourceMapping("/pinot-server", "/var/pinot/server/config", BindMode.READ_ONLY).withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-server-log4j2.xml -Dplugins.dir=/opt/pinot/plugins").withCommand(new String[]{"StartServer", "-clusterName", PinotQueryRunner.PINOT_CATALOG, "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/server/config/pinot-server.conf"}).withNetworkAliases(new String[]{"pinot-server", "localhost"}).withExposedPorts(new Integer[]{Integer.valueOf(SERVER_PORT), Integer.valueOf(SERVER_ADMIN_PORT)});
        Closer closer4 = this.closer;
        GenericContainer<?> genericContainer4 = this.server;
        Objects.requireNonNull(genericContainer4);
        closer4.register(genericContainer4::stop);
    }

    public void start() {
        this.zookeeper.start();
        this.controller.start();
        this.broker.start();
        this.server.start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closer.close();
    }

    private static String getZookeeperInternalHostPort() {
        return String.format("%s:%s", ZOOKEEPER_INTERNAL_HOST, 2181);
    }

    public String getControllerConnectString() {
        return this.controller.getContainerIpAddress() + ":" + this.controller.getMappedPort(CONTROLLER_PORT);
    }

    public HostAndPort getBrokerHostAndPort() {
        return HostAndPort.fromParts(this.broker.getContainerIpAddress(), this.broker.getMappedPort(BROKER_PORT).intValue());
    }

    public HostAndPort getServerHostAndPort() {
        return HostAndPort.fromParts(this.server.getContainerIpAddress(), this.server.getMappedPort(SERVER_PORT).intValue());
    }

    public void createSchema(InputStream inputStream, String str) throws Exception {
        Request build = Request.Builder.preparePost().setUri(getControllerUri("schemas")).setHeader("Accept", "application/json").setHeader("Content-Type", "application/json").setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(ByteStreams.toByteArray(inputStream))).build();
        PinotSuccessResponse pinotSuccessResponse = (PinotSuccessResponse) doWithRetries(() -> {
            return (PinotSuccessResponse) this.httpClient.execute(build, JsonResponseHandler.createJsonResponseHandler(PINOT_SUCCESS_RESPONSE_JSON_CODEC));
        }, 10);
        Preconditions.checkState(pinotSuccessResponse.getStatus().equals(String.format("%s successfully added", str)), "Unexpected response: '%s'", pinotSuccessResponse.getStatus());
        verifySchema(str);
    }

    private URI getControllerUri(String str) {
        return URI.create(String.format("http://%s/%s", getControllerConnectString(), str));
    }

    private void verifySchema(String str) throws Exception {
        Request build = Request.Builder.prepareGet().setUri(getControllerUri("schemas")).setHeader("Accept", "application/json").build();
        doWithRetries(() -> {
            Preconditions.checkState(((List) this.httpClient.execute(build, JsonResponseHandler.createJsonResponseHandler(LIST_JSON_CODEC))).contains(str), String.format("Schema for '%s' not found", str));
            return null;
        }, 10);
    }

    public void addRealTimeTable(InputStream inputStream, String str) throws Exception {
        Request build = Request.Builder.preparePost().setUri(getControllerUri("tables")).setHeader("Accept", "application/json").setHeader("Content-Type", "application/json").setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(ByteStreams.toByteArray(inputStream))).build();
        PinotSuccessResponse pinotSuccessResponse = (PinotSuccessResponse) doWithRetries(() -> {
            return (PinotSuccessResponse) this.httpClient.execute(build, JsonResponseHandler.createJsonResponseHandler(PINOT_SUCCESS_RESPONSE_JSON_CODEC));
        }, 10);
        Preconditions.checkState(pinotSuccessResponse.getStatus().equals(String.format("Table %s_REALTIME succesfully added", str)), "Unexpected response: '%s'", pinotSuccessResponse.getStatus());
    }

    private static <T> T doWithRetries(Supplier<T> supplier, int i) throws Exception {
        Exception exc = null;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                return supplier.get();
            } catch (Exception e) {
                exc = e;
                Thread.sleep(1000L);
            }
        }
        throw exc;
    }
}
