package io.datakernel.stream.processor;

import io.datakernel.eventloop.Eventloop;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamTransformer_M_1;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import java.util.Iterator;

/* loaded from: input_file:io/datakernel/stream/processor/StreamUnion.class */
public final class StreamUnion<T> extends AbstractStreamTransformer_M_1<T> implements StreamDataReceiver<T> {

    /* loaded from: input_file:io/datakernel/stream/processor/StreamUnion$InputImpl.class */
    private class InputImpl extends AbstractStreamConsumer<T> {
        protected InputImpl(Eventloop eventloop) {
            super(eventloop);
        }

        @Override // io.datakernel.stream.StreamConsumer
        public StreamDataReceiver<T> getDataReceiver() {
            return StreamUnion.this.downstreamDataReceiver != null ? StreamUnion.this.downstreamDataReceiver : StreamUnion.this;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public void onEndOfStream() {
            if (StreamUnion.this.allUpstreamsEndOfStream()) {
                StreamUnion.this.sendEndOfStream();
            }
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
        public void onError(Exception exc) {
            this.upstreamProducer.closeWithError(exc);
            StreamUnion.this.closeWithError(exc);
        }
    }

    public StreamUnion(Eventloop eventloop) {
        super(eventloop);
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void onProducerStarted() {
        Iterator<AbstractStreamConsumer<?>> it = this.inputs.iterator();
        while (it.hasNext()) {
            it.next().getUpstream().bindDataReceiver();
        }
        if (this.inputs.isEmpty()) {
            sendEndOfStream();
        }
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    protected void onSuspended() {
        suspendAllUpstreams();
    }

    @Override // io.datakernel.stream.AbstractStreamProducer
    public void onResumed() {
        resumeAllUpstreams();
    }

    public StreamConsumer<T> newInput() {
        return addInput(new InputImpl(this.eventloop));
    }

    @Override // io.datakernel.stream.StreamDataReceiver
    public void onData(T t) {
        this.downstreamDataReceiver.onData(t);
    }
}
