package io.hekate.metrics.statsd;

import io.hekate.core.internal.util.ArgAssert;
import io.hekate.core.internal.util.ConfigCheck;
import io.hekate.core.internal.util.HekateThreadFactory;
import io.hekate.core.internal.util.Utils;
import io.hekate.metrics.Metric;
import io.hekate.metrics.MetricFilter;
import io.hekate.util.async.AsyncUtils;
import io.hekate.util.async.Waiting;
import io.hekate.util.format.ToString;
import io.hekate.util.format.ToStringIgnore;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hekate/metrics/statsd/StatsdMetricsPublisher.class */
class StatsdMetricsPublisher {
    private static final Logger log;
    private static final boolean DEBUG;
    private static final Metric[] EMPTY_METRICS;
    private static final Pattern UNSAFE_HOST_CHARACTERS;
    private static final Pattern UNSAFE_CHARACTERS;
    private static final QueueEntry STOP_ENTRY;
    private final String statsdHost;
    private final int statsdPort;
    private final int batchSize;
    private final int maxQueueSize;
    private final MetricFilter filter;

    @ToStringIgnore
    private final BlockingQueue<QueueEntry> queue;

    @ToStringIgnore
    private final Object mux = new Object();

    @ToStringIgnore
    private ExecutorService worker;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hekate/metrics/statsd/StatsdMetricsPublisher$QueueEntry.class */
    public static class QueueEntry {
        private final long timestamp;
        private final Metric[] metrics;

        public QueueEntry(long j, Metric[] metricArr) {
            this.timestamp = j;
            this.metrics = metricArr;
        }

        public long timestamp() {
            return this.timestamp;
        }

        public Metric[] metrics() {
            return this.metrics;
        }
    }

    public StatsdMetricsPublisher(StatsdMetricsConfig statsdMetricsConfig) {
        ArgAssert.notNull(statsdMetricsConfig, "Configuration");
        ConfigCheck configCheck = ConfigCheck.get(StatsdMetricsPublisher.class);
        configCheck.notEmpty(statsdMetricsConfig.getHost(), "host");
        configCheck.positive(statsdMetricsConfig.getPort(), "port");
        configCheck.positive(statsdMetricsConfig.getMaxQueueSize(), "maximum queue size");
        configCheck.positive(statsdMetricsConfig.getBatchSize(), "batch size");
        this.statsdHost = statsdMetricsConfig.getHost();
        this.statsdPort = statsdMetricsConfig.getPort();
        this.maxQueueSize = statsdMetricsConfig.getMaxQueueSize();
        this.batchSize = statsdMetricsConfig.getBatchSize();
        this.filter = statsdMetricsConfig.getFilter();
        this.queue = new ArrayBlockingQueue(this.maxQueueSize);
    }

    public void start(String str, int i) throws UnknownHostException {
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError("Node host address us null.");
        }
        log.info("Starting StatsD metrics publisher [{}]", ToString.formatProperties(this));
        InetSocketAddress inetSocketAddress = new InetSocketAddress(this.statsdHost, this.statsdPort);
        if (inetSocketAddress.getAddress() == null) {
            throw new UnknownHostException(this.statsdHost);
        }
        String str2 = toSafeHost(str) + "__" + i;
        synchronized (this.mux) {
            this.worker = Executors.newSingleThreadExecutor(new HekateThreadFactory("StatsdMetrics"));
            this.worker.execute(() -> {
                DatagramChannel datagramChannel = null;
                try {
                    boolean z = false;
                    while (true) {
                        try {
                            QueueEntry poll = this.queue.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                            if (poll == null || poll == STOP_ENTRY) {
                                break;
                            }
                            long timestamp = poll.timestamp();
                            int length = poll.metrics().length;
                            if (DEBUG) {
                                log.debug("Publishing metrics [timestamp={}, metrics-size={}]", Long.valueOf(timestamp), Integer.valueOf(length));
                            }
                            if (datagramChannel != null) {
                                try {
                                } catch (IOException e) {
                                    close(datagramChannel);
                                    datagramChannel = null;
                                    if (!z) {
                                        z = true;
                                        if (log.isWarnEnabled()) {
                                            log.warn("Got an error while publishing metrics to StatsD (will ignore subsequent errors).", e);
                                        }
                                    } else if (DEBUG) {
                                        log.debug("Throttled error during metrics publishing.", e);
                                    }
                                }
                            }
                            close(datagramChannel);
                            datagramChannel = DatagramChannel.open();
                            datagramChannel.connect(inetSocketAddress);
                            List<ByteBuffer> encode = encode(str2, poll.metrics());
                            if (encode != null && !encode.isEmpty()) {
                                doWrite(datagramChannel, encode);
                                z = false;
                            }
                            if (DEBUG) {
                                log.debug("Published metrics [timestamp={}, metrics-size={}]", Long.valueOf(timestamp), Integer.valueOf(length));
                            }
                        } catch (Error | RuntimeException e2) {
                            log.error("Got an unexpected runtime error while publishing metrics to StatsD.", e2);
                            synchronized (this.mux) {
                                if (this.worker != null) {
                                    this.worker.shutdown();
                                    this.worker = null;
                                }
                                close(datagramChannel);
                                log.info("Stopped StatsD metrics publisher.");
                                return;
                            }
                        } catch (InterruptedException e3) {
                            close(datagramChannel);
                            log.info("Stopped StatsD metrics publisher.");
                            return;
                        }
                    }
                    close(datagramChannel);
                    log.info("Stopped StatsD metrics publisher.");
                } catch (Throwable th) {
                    close(datagramChannel);
                    log.info("Stopped StatsD metrics publisher.");
                    throw th;
                }
            });
        }
    }

    public void stop() {
        stopAsync().awaitUninterruptedly();
    }

    public boolean isStopped() {
        boolean z;
        synchronized (this.mux) {
            z = this.worker == null;
        }
        return z;
    }

    public boolean publish(Collection<Metric> collection) {
        if (collection == null || collection.isEmpty()) {
            return false;
        }
        synchronized (this.mux) {
            if (this.worker != null) {
                if (this.queue.offer(new QueueEntry(System.currentTimeMillis(), (Metric[]) collection.toArray(EMPTY_METRICS)))) {
                    if (DEBUG) {
                        log.debug("Scheduled asynchronous metrics publishing [metrics-size={}]", Integer.valueOf(collection.size()));
                    }
                    return true;
                }
                if (DEBUG) {
                    log.debug("Skipped asynchronous metrics publishing due to queue overflow [max-queue-size={}]", Integer.valueOf(this.maxQueueSize));
                }
            } else if (DEBUG) {
                log.debug("Skipped asynchronous metrics publishing since publisher is stopped.");
            }
            return false;
        }
    }

    static String toSafeName(String str) {
        return safeName(UNSAFE_CHARACTERS, str);
    }

    static String toSafeHost(String str) {
        return safeName(UNSAFE_HOST_CHARACTERS, str);
    }

    int queueSize() {
        return this.queue.size();
    }

    void doWrite(DatagramChannel datagramChannel, List<ByteBuffer> list) throws IOException {
        Iterator<ByteBuffer> it = list.iterator();
        while (it.hasNext()) {
            datagramChannel.write(it.next());
        }
    }

    Waiting stopAsync() {
        Waiting waiting = Waiting.NO_WAIT;
        synchronized (this.mux) {
            if (this.worker != null) {
                log.info("Stopping StatsD metrics publisher...");
                while (!this.queue.offer(STOP_ENTRY)) {
                    this.queue.clear();
                }
                waiting = AsyncUtils.shutdown(this.worker);
                this.worker = null;
            }
        }
        return waiting;
    }

    private List<ByteBuffer> encode(String str, Metric[] metricArr) {
        ArrayList arrayList = null;
        StringBuilder sb = null;
        int i = 0;
        for (Metric metric : metricArr) {
            if (this.filter == null || this.filter.accept(metric)) {
                if (arrayList == null) {
                    arrayList = new ArrayList((metricArr.length / this.batchSize) + 1);
                    sb = new StringBuilder();
                }
                if (sb.length() > 0) {
                    sb.append('\n');
                }
                sb.append(str);
                sb.append('.');
                sb.append(toSafeName(metric.name()));
                sb.append(':');
                sb.append(Long.toString(metric.value()));
                sb.append("|g");
                i++;
                if (i == this.batchSize) {
                    arrayList.add(toBytes(sb));
                    sb.setLength(0);
                    i = 0;
                }
            }
        }
        if (sb != null && sb.length() > 0) {
            arrayList.add(toBytes(sb));
        }
        return arrayList;
    }

    private ByteBuffer toBytes(StringBuilder sb) {
        return ByteBuffer.wrap(sb.toString().getBytes(Utils.UTF_8));
    }

    private void close(DatagramChannel datagramChannel) {
        if (datagramChannel != null) {
            try {
                datagramChannel.close();
            } catch (IOException e) {
            }
        }
    }

    private static String safeName(Pattern pattern, String str) {
        return pattern.matcher(str.trim()).replaceAll("_");
    }

    public String toString() {
        return ToString.format(this);
    }

    static {
        $assertionsDisabled = !StatsdMetricsPublisher.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(StatsdMetricsPlugin.class);
        DEBUG = log.isDebugEnabled();
        EMPTY_METRICS = new Metric[0];
        UNSAFE_HOST_CHARACTERS = Pattern.compile("[^a-z0-9]", 2);
        UNSAFE_CHARACTERS = Pattern.compile("[^a-z0-9.]", 2);
        STOP_ENTRY = new QueueEntry(0L, new Metric[0]);
    }
}
