package io.datakernel.stream.processor;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.AbstractStreamTransformer_1_N;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;

/* loaded from: input_file:io/datakernel/stream/processor/StreamSharder.class */
public final class StreamSharder<K, T> extends AbstractStreamTransformer_1_N<T> implements StreamDataReceiver<T>, StreamSharderMBean {
    private final Sharder<K> sharder;
    private final Function<T, K> keyFunction;
    private StreamDataReceiver<T>[] dataReceivers;
    private long jmxItems;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/datakernel/stream/processor/StreamSharder$InternalProducer.class */
    public class InternalProducer extends AbstractStreamProducer<T> {
        public InternalProducer(Eventloop eventloop) {
            super(eventloop);
        }

        @Override // io.datakernel.stream.AbstractStreamProducer, io.datakernel.stream.StreamProducer
        public void bindDataReceiver() {
            super.bindDataReceiver();
            StreamSharder.this.dataReceivers[StreamSharder.this.outputs.indexOf(this)] = this.downstreamDataReceiver;
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onSuspended() {
            StreamSharder.this.suspendUpstream();
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onResumed() {
            if (StreamSharder.this.allOutputsResumed()) {
                StreamSharder.this.resumeUpstream();
            }
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onClosed() {
            StreamSharder.this.closeUpstream();
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onClosedWithError(Exception exc) {
            StreamSharder.this.onError(exc);
            this.downstreamConsumer.onError(exc);
        }
    }

    public StreamSharder(Eventloop eventloop, Sharder<K> sharder, Function<T, K> function) {
        super(eventloop);
        this.dataReceivers = new StreamDataReceiver[0];
        this.sharder = (Sharder) Preconditions.checkNotNull(sharder);
        this.keyFunction = (Function) Preconditions.checkNotNull(function);
    }

    public StreamProducer<T> newOutput() {
        InternalProducer internalProducer = new InternalProducer(this.eventloop);
        addOutput(internalProducer);
        StreamDataReceiver<T>[] streamDataReceiverArr = new StreamDataReceiver[this.dataReceivers.length + 1];
        System.arraycopy(this.dataReceivers, 0, streamDataReceiverArr, 0, this.dataReceivers.length);
        this.dataReceivers = streamDataReceiverArr;
        return internalProducer;
    }

    @Override // io.datakernel.stream.StreamConsumer
    public void onEndOfStream() {
        sendEndOfStreamToDownstreams();
    }

    @Override // io.datakernel.stream.StreamConsumer
    public StreamDataReceiver<T> getDataReceiver() {
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.datakernel.stream.StreamDataReceiver
    public void onData(T t) {
        if (!$assertionsDisabled) {
            long j = this.jmxItems;
            long j2 = this.jmxItems + 1;
            this.jmxItems = j2;
            if (j == j2) {
                throw new AssertionError();
            }
        }
        this.dataReceivers[this.sharder.shard(this.keyFunction.apply(t))].onData(t);
    }

    @Override // io.datakernel.stream.processor.StreamSharderMBean
    public long getItems() {
        return this.jmxItems;
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer
    public String toString() {
        String str = "?";
        if (!$assertionsDisabled) {
            String str2 = "" + this.jmxItems;
            str = str2;
            if (str2 == null) {
                throw new AssertionError();
            }
        }
        return '{' + super.toString() + " items:" + str + '}';
    }

    static {
        $assertionsDisabled = !StreamSharder.class.desiredAssertionStatus();
    }
}
