package com.tc.async.impl;

import com.tc.async.api.Source;
import com.tc.async.impl.AbstractStageQueueImpl;
import com.tc.exception.TCRuntimeException;
import com.tc.logging.TCLoggerProvider;
import com.tc.util.Assert;
import com.tc.util.concurrent.QueueFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/tc/async/impl/SingletonStageQueueImpl.class */
public class SingletonStageQueueImpl<EC> extends AbstractStageQueueImpl<EC> {
    private final SingletonStageQueueImpl<EC>.SourceQueueImpl sourceQueue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/tc/async/impl/SingletonStageQueueImpl$SourceQueueImpl.class */
    public final class SourceQueueImpl implements AbstractStageQueueImpl.SourceQueue {
        private final BlockingQueue<Event> queue;

        public SourceQueueImpl(BlockingQueue<Event> blockingQueue) {
            this.queue = blockingQueue;
        }

        public String toString() {
            return "SourceQueueImpl{Singleton size=" + this.queue.size() + '}';
        }

        @Override // com.tc.async.impl.AbstractStageQueueImpl.SourceQueue
        public int clear() {
            int i = 0;
            while (poll(0L) != null) {
                try {
                    i++;
                } catch (InterruptedException e) {
                    throw new TCRuntimeException(e);
                }
            }
            return i;
        }

        @Override // com.tc.async.impl.AbstractStageQueueImpl.SourceQueue, com.tc.async.api.Source
        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        @Override // com.tc.async.impl.AbstractStageQueueImpl.SourceQueue, com.tc.async.api.Source
        public Event poll(long j) throws InterruptedException {
            return j == 0 ? this.queue.poll() : this.queue.poll(j, TimeUnit.MILLISECONDS);
        }

        @Override // com.tc.async.impl.AbstractStageQueueImpl.SourceQueue
        public int put(Event event) throws InterruptedException {
            this.queue.put(event);
            return this.queue.size();
        }

        @Override // com.tc.async.impl.AbstractStageQueueImpl.SourceQueue, com.tc.async.api.Source
        public int size() {
            return this.queue.size();
        }

        @Override // com.tc.async.impl.AbstractStageQueueImpl.SourceQueue, com.tc.async.api.Source
        public String getSourceName() {
            return "Singleton";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SingletonStageQueueImpl(QueueFactory queueFactory, Class<EC> cls, EventCreator<EC> eventCreator, TCLoggerProvider tCLoggerProvider, String str, int i) {
        super(tCLoggerProvider, str, eventCreator);
        this.sourceQueue = createWorkerQueue(queueFactory, cls, i);
    }

    private SingletonStageQueueImpl<EC>.SourceQueueImpl createWorkerQueue(QueueFactory queueFactory, Class<EC> cls, int i) {
        Assert.eval(i >= 0);
        return new SourceQueueImpl(queueFactory.createInstance(cls, i));
    }

    @Override // com.tc.async.impl.StageQueue
    public Source getSource(int i) {
        if (i != 0) {
            return null;
        }
        return this.sourceQueue;
    }

    @Override // com.tc.async.impl.AbstractStageQueueImpl
    AbstractStageQueueImpl.SourceQueue[] getSources() {
        return new AbstractStageQueueImpl.SourceQueue[]{this.sourceQueue};
    }

    @Override // com.tc.async.api.Sink
    public void addToSink(EC ec) {
        Assert.assertNotNull(ec);
        if (isClosed()) {
            return;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Added:" + ec + " to:" + this.stageName);
        }
        Event createEvent = createEvent(ec);
        if (createEvent != null) {
            deliverToQueue(createEvent);
        }
    }

    private void deliverToQueue(Event event) {
        boolean interrupted = Thread.interrupted();
        while (true) {
            try {
                try {
                    updateDepth(this.sourceQueue.put(event));
                    break;
                } catch (InterruptedException e) {
                    this.logger.debug("StageQueue Add: " + e);
                    interrupted = true;
                }
            } finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    @Override // com.tc.async.impl.StageQueue
    public String toString() {
        return "StageQueue(" + this.stageName + ")";
    }

    @Override // com.tc.async.impl.StageQueue
    public int clear() {
        int clear = this.sourceQueue.clear();
        this.logger.info("Cleared " + clear);
        return clear;
    }
}
