package io.reactivex.netty.examples.http.logtail;

import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Func1;

/* loaded from: input_file:io/reactivex/netty/examples/http/logtail/LogProducer.class */
public class LogProducer {
    private final int port;
    private final long interval;
    private final String source;

    public LogProducer(int i, int i2) {
        this.port = i;
        this.interval = i2;
        this.source = "localhost:" + i;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public HttpServer<ByteBuf, ServerSentEvent> createServer() {
        HttpServer<ByteBuf, ServerSentEvent> build = ((HttpServerBuilder) ((HttpServerBuilder) RxNetty.newHttpServerBuilder(this.port, new RequestHandler<ByteBuf, ServerSentEvent>() { // from class: io.reactivex.netty.examples.http.logtail.LogProducer.1
            @Override // io.reactivex.netty.channel.Handler
            public Observable<Void> handle(HttpServerRequest<ByteBuf> httpServerRequest, HttpServerResponse<ServerSentEvent> httpServerResponse) {
                return LogProducer.this.createReplyHandlerObservable(httpServerResponse);
            }
        }).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator())).enableWireLogging(LogLevel.DEBUG)).build();
        System.out.println("Started log producer on port " + this.port);
        return build;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Void> createReplyHandlerObservable(final HttpServerResponse<ServerSentEvent> httpServerResponse) {
        return Observable.interval(this.interval, TimeUnit.MILLISECONDS).flatMap(new Func1<Long, Observable<Void>>() { // from class: io.reactivex.netty.examples.http.logtail.LogProducer.2
            @Override // rx.functions.Func1
            public Observable<Void> call(Long l) {
                return httpServerResponse.writeAndFlush(ServerSentEvent.withEventId(httpServerResponse.getAllocator().buffer().writeLong(l.longValue()), httpServerResponse.getAllocator().buffer().writeBytes(LogEvent.randomLogEvent(LogProducer.this.source).toCSV().getBytes())));
            }
        });
    }

    public static void main(String[] strArr) {
        if (strArr.length < 2) {
            System.err.println("ERROR: specify log producer's port number and a message sending interval");
        } else {
            new LogProducer(Integer.valueOf(strArr[0]).intValue(), Integer.valueOf(strArr[1]).intValue()).createServer().startAndWait();
            System.out.println("LogProducer service terminated");
        }
    }
}
