package io.datakernel.dataflow.server;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.common.MemSize;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.csp.binary.ByteBufSerializer;
import io.datakernel.csp.net.MessagingWithBinaryStreaming;
import io.datakernel.csp.queue.ChannelQueue;
import io.datakernel.csp.queue.ChannelZeroBuffer;
import io.datakernel.dataflow.graph.StreamId;
import io.datakernel.dataflow.graph.TaskContext;
import io.datakernel.dataflow.node.Node;
import io.datakernel.dataflow.server.command.DatagraphCommand;
import io.datakernel.dataflow.server.command.DatagraphCommandDownload;
import io.datakernel.dataflow.server.command.DatagraphCommandExecute;
import io.datakernel.dataflow.server.command.DatagraphResponse;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.csp.ChannelSerializer;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.net.AbstractServer;
import io.datakernel.net.AsyncTcpSocket;
import java.net.InetAddress;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/* loaded from: input_file:io/datakernel/dataflow/server/DatagraphServer.class */
public final class DatagraphServer extends AbstractServer<DatagraphServer> {
    private final DatagraphEnvironment environment;
    private final Map<StreamId, ChannelQueue<ByteBuf>> pendingStreams;
    private final ByteBufSerializer<DatagraphCommand, DatagraphResponse> serializer;
    private final Map<Class, CommandHandler> handlers;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/datakernel/dataflow/server/DatagraphServer$CommandHandler.class */
    public interface CommandHandler<I, O> {
        void onCommand(MessagingWithBinaryStreaming<I, O> messagingWithBinaryStreaming, I i);
    }

    /* loaded from: input_file:io/datakernel/dataflow/server/DatagraphServer$DownloadCommandHandler.class */
    private class DownloadCommandHandler implements CommandHandler<DatagraphCommandDownload, DatagraphResponse> {
        private DownloadCommandHandler() {
        }

        @Override // io.datakernel.dataflow.server.DatagraphServer.CommandHandler
        public void onCommand(MessagingWithBinaryStreaming<DatagraphCommandDownload, DatagraphResponse> messagingWithBinaryStreaming, DatagraphCommandDownload datagraphCommandDownload) {
            StreamId streamId = datagraphCommandDownload.getStreamId();
            ChannelQueue channelQueue = (ChannelQueue) DatagraphServer.this.pendingStreams.remove(streamId);
            if (channelQueue != null) {
                DatagraphServer.this.logger.info("onDownload: transferring {}, pending downloads: {}", streamId, Integer.valueOf(DatagraphServer.this.pendingStreams.size()));
            } else {
                DatagraphServer.this.logger.info("onDownload: waiting {}, pending downloads: {}", streamId, Integer.valueOf(DatagraphServer.this.pendingStreams.size()));
                channelQueue = new ChannelZeroBuffer();
                DatagraphServer.this.pendingStreams.put(streamId, channelQueue);
            }
            ChannelConsumer sendBinaryStream = messagingWithBinaryStreaming.sendBinaryStream();
            channelQueue.getSupplier().streamTo(sendBinaryStream);
            sendBinaryStream.withAcknowledgement(promise -> {
                return promise.whenComplete((r5, th) -> {
                    if (th != null) {
                        DatagraphServer.this.logger.warn("Exception occurred while trying to send data");
                    }
                    messagingWithBinaryStreaming.close();
                });
            });
        }
    }

    /* loaded from: input_file:io/datakernel/dataflow/server/DatagraphServer$ExecuteCommandHandler.class */
    private class ExecuteCommandHandler implements CommandHandler<DatagraphCommandExecute, DatagraphResponse> {
        private ExecuteCommandHandler() {
        }

        @Override // io.datakernel.dataflow.server.DatagraphServer.CommandHandler
        public void onCommand(MessagingWithBinaryStreaming<DatagraphCommandExecute, DatagraphResponse> messagingWithBinaryStreaming, DatagraphCommandExecute datagraphCommandExecute) {
            messagingWithBinaryStreaming.close();
            TaskContext taskContext = new TaskContext(DatagraphServer.this.eventloop, DatagraphEnvironment.extend(DatagraphServer.this.environment));
            Iterator<Node> it = datagraphCommandExecute.getNodes().iterator();
            while (it.hasNext()) {
                it.next().createAndBind(taskContext);
            }
            taskContext.wireAll();
        }
    }

    public DatagraphServer(Eventloop eventloop, DatagraphEnvironment datagraphEnvironment) {
        super(eventloop);
        this.pendingStreams = new HashMap();
        this.handlers = new HashMap();
        this.handlers.put(DatagraphCommandDownload.class, new DownloadCommandHandler());
        this.handlers.put(DatagraphCommandExecute.class, new ExecuteCommandHandler());
        this.environment = DatagraphEnvironment.extend(datagraphEnvironment).with(DatagraphServer.class, this);
        DatagraphSerialization datagraphSerialization = (DatagraphSerialization) datagraphEnvironment.getInstance(DatagraphSerialization.class);
        this.serializer = ByteBufSerializer.ofJsonCodec(datagraphSerialization.getCommandCodec(), datagraphSerialization.getResponseCodec());
    }

    public <T> StreamConsumer<T> upload(StreamId streamId, Class<T> cls) {
        ChannelSerializer withAutoFlushInterval = ChannelSerializer.create(((DatagraphSerialization) this.environment.getInstance(DatagraphSerialization.class)).getSerializer(cls)).withInitialBufferSize(MemSize.kilobytes(256L)).withAutoFlushInterval(Duration.ofSeconds(1L));
        ChannelQueue<ByteBuf> remove = this.pendingStreams.remove(streamId);
        if (remove == null) {
            this.logger.info("onUpload: waiting {}, pending downloads: {}", streamId, Integer.valueOf(this.pendingStreams.size()));
            remove = new ChannelZeroBuffer<>();
            this.pendingStreams.put(streamId, remove);
        } else {
            this.logger.info("onUpload: transferring {}, pending downloads: {}", streamId, Integer.valueOf(this.pendingStreams.size()));
        }
        withAutoFlushInterval.getOutput().set(remove.getConsumer());
        return withAutoFlushInterval;
    }

    protected void serve(AsyncTcpSocket asyncTcpSocket, InetAddress inetAddress) {
        MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocket, this.serializer);
        create.receive().whenResult(datagraphCommand -> {
            if (datagraphCommand != null) {
                doRead(create, datagraphCommand);
            } else {
                this.logger.warn("unexpected end of stream");
                create.close();
            }
        }).whenException(th -> {
            this.logger.error("received error while trying to read", th);
            create.close();
        });
    }

    private void doRead(MessagingWithBinaryStreaming<DatagraphCommand, DatagraphResponse> messagingWithBinaryStreaming, DatagraphCommand datagraphCommand) {
        CommandHandler commandHandler = this.handlers.get(datagraphCommand.getClass());
        if (commandHandler != null) {
            commandHandler.onCommand(messagingWithBinaryStreaming, datagraphCommand);
        } else {
            messagingWithBinaryStreaming.close();
            this.logger.error("missing handler for " + datagraphCommand);
        }
    }
}
