package io.datakernel.dataflow.server;

import io.datakernel.csp.binary.ByteBufSerializer;
import io.datakernel.csp.net.MessagingWithBinaryStreaming;
import io.datakernel.dataflow.graph.StreamId;
import io.datakernel.dataflow.node.Node;
import io.datakernel.dataflow.server.command.DatagraphCommand;
import io.datakernel.dataflow.server.command.DatagraphCommandDownload;
import io.datakernel.dataflow.server.command.DatagraphCommandExecute;
import io.datakernel.dataflow.server.command.DatagraphResponse;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.csp.ChannelDeserializer;
import io.datakernel.eventloop.net.SocketSettings;
import io.datakernel.net.AsyncTcpSocketImpl;
import io.datakernel.promise.Promise;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;

/* loaded from: input_file:io/datakernel/dataflow/server/DatagraphClient.class */
public final class DatagraphClient {
    private final DatagraphSerialization serialization;
    private final ByteBufSerializer<DatagraphResponse, DatagraphCommand> serializer;
    private final SocketSettings socketSettings = SocketSettings.create();

    public DatagraphClient(DatagraphSerialization datagraphSerialization) {
        this.serialization = datagraphSerialization;
        this.serializer = ByteBufSerializer.ofJsonCodec(datagraphSerialization.getResponseCodec(), datagraphSerialization.getCommandCodec());
    }

    public <T> Promise<StreamSupplier<T>> download(InetSocketAddress inetSocketAddress, StreamId streamId, Class<T> cls) {
        return AsyncTcpSocketImpl.connect(inetSocketAddress, 0L, this.socketSettings).then(asyncTcpSocketImpl -> {
            MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocketImpl, this.serializer);
            return create.send(new DatagraphCommandDownload(streamId)).map(r7 -> {
                return ((StreamSupplier) create.receiveBinaryStream().transformWith(ChannelDeserializer.create(this.serialization.getSerializer(cls)))).withEndOfStream(promise -> {
                    create.getClass();
                    return promise.whenComplete(create::close);
                }).withLateBinding();
            });
        });
    }

    public Promise<Void> execute(InetSocketAddress inetSocketAddress, Collection<Node> collection) {
        return AsyncTcpSocketImpl.connect(inetSocketAddress, 0L, this.socketSettings).then(asyncTcpSocketImpl -> {
            MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocketImpl, this.serializer);
            return create.send(new DatagraphCommandExecute(new ArrayList(collection))).then(r3 -> {
                return create.sendEndOfStream();
            });
        });
    }
}
