package io.aeron.samples.stress;

import io.aeron.Aeron;
import io.aeron.Counter;
import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.logbuffer.Header;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.Random;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2LongCounterMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.SystemEpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.YieldingIdleStrategy;

/* loaded from: input_file:io/aeron/samples/stress/StressMdcClient.class */
public class StressMdcClient implements Agent {
    private static final long TIMEOUT_MS = 5000;
    private static final int EXPECTED_RESPONSE_COUNT = 4;
    private final String serverAddress;
    private final String clientAddress;
    private final EpochClock clock;
    private final MutableDirectBuffer msg;
    private final int maxInflight;
    private final int totalToSend;
    private final int mtu;
    private Aeron aeron;
    private Publication mdcPublication;
    private Subscription mdcSubscription1;
    private Subscription mdcSubscription2;
    private int messageLength;
    private Counter clientReceiveCount;
    private Counter clientSendCount;
    private final Long2LongCounterMap inflightMessages = new Long2LongCounterMap(-1);
    private final FragmentAssembler mdcRspAssembler1 = new FragmentAssembler(this::mdcRspHandler);
    private final FragmentAssembler mdcRspAssembler2 = new FragmentAssembler(this::mdcRspHandler);
    private final byte[] buffer = new byte[131008];
    private final CRC64 crc = new CRC64();
    private long correlationId = 0;
    private long lastMessageSent = 0;

    public StressMdcClient(String str, String str2, EpochClock epochClock, int i, int i2, int i3) {
        this.serverAddress = str;
        this.clientAddress = str2;
        this.clock = epochClock;
        this.maxInflight = i;
        this.totalToSend = i2;
        this.mtu = i3;
        new Random(42L).nextBytes(this.buffer);
        this.msg = new UnsafeBuffer(this.buffer);
    }

    public void onStart() {
        this.aeron = Aeron.connect(new Aeron.Context());
        this.mdcPublication = this.aeron.addExclusivePublication(StressUtil.mdcReqPubChannel(this.clientAddress).mtu(Integer.valueOf(this.mtu)).linger(0L).build(), StressUtil.MDC_STREAM_ID);
        this.mdcSubscription1 = this.aeron.addSubscription(StressUtil.mdcRspSubChannel1(this.serverAddress, this.clientAddress).build(), StressUtil.MDC_STREAM_ID, StressUtil::imageAvailable, StressUtil::imageUnavailable);
        this.mdcSubscription2 = this.aeron.addSubscription(StressUtil.mdcRspSubChannel2(this.serverAddress, this.clientAddress).build(), StressUtil.MDC_STREAM_ID, StressUtil::imageAvailable, StressUtil::imageUnavailable);
        this.clientReceiveCount = this.aeron.addCounter(StressUtil.CLIENT_RECV_COUNT, "Client Receive Count");
        this.clientSendCount = this.aeron.addCounter(StressUtil.CLIENT_SEND_COUNT, "Client Send Count");
    }

    public int doWork() {
        int i;
        if (!this.mdcSubscription1.isConnected() || !this.mdcSubscription2.isConnected()) {
            return 0;
        }
        if (0 == this.messageLength) {
            throw new IllegalStateException("messageLength has not been set");
        }
        int i2 = 0;
        while (this.inflightMessages.size() < this.maxInflight && this.correlationId < this.totalToSend && 0 < this.mdcPublication.offer(this.msg, 0, this.messageLength, this::currentCorrelationId)) {
            this.inflightMessages.put(this.correlationId, 0L);
            this.correlationId++;
            i2++;
            this.lastMessageSent = this.clock.time();
            this.clientSendCount.increment();
        }
        int i3 = 0;
        while (true) {
            i = i3;
            int poll = poll(this.maxInflight);
            if (0 == poll) {
                break;
            }
            i3 = i + poll;
        }
        if (0 < this.correlationId && 0 == i && !this.inflightMessages.isEmpty()) {
            long time = this.clock.time() - this.lastMessageSent;
            if (TIMEOUT_MS < time) {
                throw new RuntimeException("No response received for " + time + "ms, client=" + this);
            }
        }
        return i2;
    }

    private int poll(int i) {
        return 0 + this.mdcSubscription1.poll(this.mdcRspAssembler1, i) + this.mdcSubscription2.poll(this.mdcRspAssembler2, i);
    }

    private boolean isComplete() {
        return ((long) this.totalToSend) <= this.correlationId && this.inflightMessages.isEmpty();
    }

    private void mdcRspHandler(DirectBuffer directBuffer, int i, int i2, Header header) {
        long reservedValue = header.reservedValue();
        StressUtil.validateMessage(this.crc, directBuffer, i, i2, reservedValue);
        this.clientReceiveCount.increment();
        if (4 <= this.inflightMessages.incrementAndGet(reservedValue)) {
            this.inflightMessages.remove(reservedValue);
        }
    }

    private long currentCorrelationId(DirectBuffer directBuffer, int i, int i2) {
        return this.correlationId;
    }

    void reset(int i) {
        this.messageLength = i;
        this.correlationId = 0L;
        this.msg.putLong(0, this.crc.recalculate(this.buffer, 8, i - 8), ByteOrder.LITTLE_ENDIAN);
    }

    public void onClose() {
        CloseHelper.quietCloseAll(new AutoCloseable[]{this.aeron});
    }

    public String roleName() {
        return null;
    }

    public String toString() {
        return "StressClient{inflightMessages=" + this.inflightMessages + ", mtu=" + this.mtu + ", messageLength=" + this.messageLength + ", correlationId=" + this.correlationId + ", lastMessageSent=" + this.lastMessageSent + '}';
    }

    public static void main(String[] strArr) {
        YieldingIdleStrategy yieldingIdleStrategy = new YieldingIdleStrategy();
        Iterator<Integer> it = StressUtil.MTU_LENGTHS.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            StressMdcClient stressMdcClient = new StressMdcClient(StressUtil.serverAddress(), StressUtil.clientAddress(), new SystemEpochClock(), 20, 100, intValue);
            try {
                stressMdcClient.onStart();
                for (int i = 32; i <= 131008; i += 1024) {
                    stressMdcClient.reset(i);
                    while (!stressMdcClient.isComplete()) {
                        yieldingIdleStrategy.idle(stressMdcClient.doWork());
                    }
                }
                StressUtil.info("Complete mtu=" + intValue);
                stressMdcClient.onClose();
            } catch (Throwable th) {
                stressMdcClient.onClose();
                throw th;
            }
        }
    }
}
