package net.dempsy.transport.blockingqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import net.dempsy.Infrastructure;
import net.dempsy.transport.Listener;
import net.dempsy.transport.MessageTransportException;
import net.dempsy.transport.NodeAddress;
import net.dempsy.transport.Receiver;
import net.dempsy.util.SafeString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/dempsy/transport/blockingqueue/BlockingQueueReceiver.class */
public class BlockingQueueReceiver implements Runnable, Receiver {
    private final BlockingQueueAddress address;
    private final BlockingQueue<Object> queue;
    private Listener<Object> listener = null;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private Thread currentThread = null;
    private boolean shutdown;
    private static Logger LOGGER = LoggerFactory.getLogger(BlockingQueueReceiver.class);
    private static final AtomicLong guidGenerator = new AtomicLong(0);

    public BlockingQueueReceiver(BlockingQueue<Object> blockingQueue) {
        this.queue = blockingQueue;
        this.address = new BlockingQueueAddress(blockingQueue, "BlockingQueue_" + guidGenerator.getAndIncrement());
    }

    @Override // java.lang.Runnable
    public void run() {
        synchronized (this) {
            this.currentThread = Thread.currentThread();
            if (this.shutdown) {
                return;
            }
            this.running.set(true);
            Listener<Object> listener = this.listener;
            while (!this.shutdown) {
                try {
                    listener.onMessage(this.queue.take());
                } catch (MessageTransportException e) {
                    LOGGER.error("Exception while handling message.", e);
                } catch (InterruptedException e2) {
                    synchronized (this) {
                        if (!this.shutdown) {
                            LOGGER.warn("Superfluous interrupt.", e2);
                        }
                    }
                }
            }
            this.running.set(false);
        }
    }

    public synchronized void start(Listener listener, Infrastructure infrastructure) {
        if (listener == null) {
            throw new IllegalArgumentException("Cannot pass null to " + BlockingQueueReceiver.class.getSimpleName() + ".setListener");
        }
        if (this.listener != null) {
            throw new IllegalStateException("Cannot set a new Listener (" + SafeString.objectDescription(listener) + ") on a " + BlockingQueueReceiver.class.getSimpleName() + " when there's one already set (" + SafeString.objectDescription(this.listener) + ")");
        }
        this.listener = listener;
        infrastructure.getThreadingModel().runDaemon(this, "BQReceiver-" + this.address.toString());
    }

    public void close() {
        synchronized (this) {
            this.shutdown = true;
        }
        while (this.running.get()) {
            if (this.currentThread != null) {
                this.currentThread.interrupt();
            }
            Thread.yield();
        }
        this.address.close();
    }

    public NodeAddress getAddress() {
        return this.address;
    }
}
