Невозможно прочитать объект при использовании пулов потоков для обработки

Мне нужно спроектировать сервер с тремя основными пулами потоков, чтобы читать данные, обрабатывать их и выводить результаты клиенту. Я пишу такой код, но всегда замечаю такую ​​ошибку:

java.io.StreamCorruptedException: неверный заголовок потока: 00000000 в java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:958) в java.base/java.io.ObjectInputStream.(ObjectInputStream.java:392) в demo.NioServer.read(NioServer.java:99) в java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) в java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) в java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) в java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) в java.base/java.lang.Thread.run(Thread.java:840)

Я не знаю почему, и вот мой код:

package demo;


import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class NioServer {
    private ServerSocketChannel server;
    private Selector selector;
    private ByteBuffer data;
    private ByteArrayInputStream bais;
    private ByteArrayOutputStream baos;
    private ObjectInputStream in;
    private ObjectOutputStream out;
    private final ExecutorService input = Executors.newCachedThreadPool();
    private final ExecutorService processor = Executors.newFixedThreadPool(5);
    private final ExecutorService output = Executors.newCachedThreadPool();
    private final BlockingQueue<SelectionKey> keyManager = new LinkedBlockingQueue<>(5);
    private final BlockingQueue<SelectionKey> destination = new LinkedBlockingQueue<>(5);
    private final BlockingQueue<Request> requests = new LinkedBlockingQueue<>(5);
    private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(5);
    public NioServer() throws IOException {
        server = null;
        selector = null;
    }

    private void init() throws IOException {
        server = ServerSocketChannel.open();
        selector = Selector.open();
        server.socket().bind(new InetSocketAddress("127.0.0.1", 4999));
        server.configureBlocking(false);
        server.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void run() throws IOException {
        init();
        try {
            while (true) {
                selector.select();
                for(SelectionKey key : selector.selectedKeys()) {
                    if (key.isAcceptable()) {
                        accept(key);
                    } else if (key.isReadable()) {
                        keyManager.put(key);
                        input.submit(this::read);
                        processor.submit(this::process);
                        output.submit(this::write);
                    }
                }
                selector.selectedKeys().clear();
            }
        } catch(IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            for(SelectionKey key : selector.selectedKeys()) {
                key.channel().close();
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }

    private void read() {
        try {
            SelectionKey key = keyManager.take();
            SocketChannel client = (SocketChannel)key.channel();
            data = ByteBuffer.allocate(1024);
            int numRead = -1;
            try {
                numRead = client.read(data);
            } catch (IOException e) {
                key.cancel();
                client.close();
                return;
            }
            if (numRead == -1) {
                client.close();
                key.cancel();
                return;
            }
            data.flip();
            bais = new ByteArrayInputStream(data.array());
            in = new ObjectInputStream(bais);
            requests.put((Request)in.readObject());
            destination.put(key);
        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    private void process() {
        try {
            Request request = requests.take();
            responses.put(new Response(request.getInfo() + " enrolled"));
        } catch(InterruptedException e) {
            System.out.println(e.toString());
        }
    }

    private void write() {
        try {
            Response response = responses.take();
            SelectionKey key = destination.take();
            SocketChannel client = (SocketChannel)key.channel();
            baos = new ByteArrayOutputStream();
            out = new ObjectOutputStream(baos);
            out.writeObject(response);
            out.flush();
            client.write(ByteBuffer.wrap(baos.toByteArray()));
            client.close();
        } catch (InterruptedException | IOException e) {
            System.out.println(e.toString());
        }
    }

}




Проблема в строке: in = new ObjectInputStream(bais); Я попытался это исправить и понял, что если я определю фиксированный пул потоков с размером n, то после n раз получения правильных результатов произойдет эта ошибка. Иногда ошибка появлялась раньше.

Может ли кто-нибудь объяснить мне, почему это произошло, и если можете, можете ли вы предложить мне какое-нибудь решение? Большое спасибо!

🤔 А знаете ли вы, что...
Java поддерживает лямбда-выражения, введенные в Java 8, для более удобной работы с функциями.


66
1

Ответ:

Решено

О боже, с чего начнем?

Я надеюсь, что вы делаете это в целях обучения, потому что создать правильный сервер NIO довольно сложно. Так что лучше использовать существующие. Попробуйте netti/jetty, это хорошо известная библиотека.

А теперь давайте проанализируем проблемы с вашим кодом:

  1. Не делитесь переменной ByteBuffer data (в любом случае вы делаете это неправильно, вы обращаетесь к ней не потокобезопасным способом).
  2. TCP — потоковый протокол. Нет гарантии, что client.read(data) прочитаете одно сообщение целиком. Он может прочитать его часть. Он может читать два. Он может читать вторую часть первого сообщения и первую часть второго.
  3. Вам необходимо очистить SelectionKey.OP_READ перед тем, как начать читать, и вернуть его после окончания. В противном случае два потока могут читать из одного и того же канала.
  4. Нельзя просто out.writeObject(response), нужно подождать, пока канал будет готов.

Это то, что я заметил с первого взгляда, возможно, их больше.

Чтобы просмотреть полный список проблем, разместите свой код в CodeReview.

Если вас интересует рабочий пример, вы можете попробовать изучить исходный код netty, но он очень сложен.

У меня есть проект с простой версией разъема NIO, но он далек от совершенства. Например, есть третья проблема из списка выше.