Мне нужно спроектировать сервер с тремя основными пулами потоков, чтобы читать данные, обрабатывать их и выводить результаты клиенту. Я пишу такой код, но всегда замечаю такую ошибку:
java.io.StreamCorruptedException: неверный заголовок потока: 00000000 в java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:958) в java.base/java.io.ObjectInputStream.(ObjectInputStream.java:392) в demo.NioServer.read(NioServer.java:99) в java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) в java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) в java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) в java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) в java.base/java.lang.Thread.run(Thread.java:840)
Я не знаю почему, и вот мой код:
package demo;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class NioServer {
private ServerSocketChannel server;
private Selector selector;
private ByteBuffer data;
private ByteArrayInputStream bais;
private ByteArrayOutputStream baos;
private ObjectInputStream in;
private ObjectOutputStream out;
private final ExecutorService input = Executors.newCachedThreadPool();
private final ExecutorService processor = Executors.newFixedThreadPool(5);
private final ExecutorService output = Executors.newCachedThreadPool();
private final BlockingQueue<SelectionKey> keyManager = new LinkedBlockingQueue<>(5);
private final BlockingQueue<SelectionKey> destination = new LinkedBlockingQueue<>(5);
private final BlockingQueue<Request> requests = new LinkedBlockingQueue<>(5);
private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(5);
public NioServer() throws IOException {
server = null;
selector = null;
}
private void init() throws IOException {
server = ServerSocketChannel.open();
selector = Selector.open();
server.socket().bind(new InetSocketAddress("127.0.0.1", 4999));
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
}
public void run() throws IOException {
init();
try {
while (true) {
selector.select();
for(SelectionKey key : selector.selectedKeys()) {
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
keyManager.put(key);
input.submit(this::read);
processor.submit(this::process);
output.submit(this::write);
}
}
selector.selectedKeys().clear();
}
} catch(IOException | InterruptedException e) {
e.printStackTrace();
} finally {
for(SelectionKey key : selector.selectedKeys()) {
key.channel().close();
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
private void read() {
try {
SelectionKey key = keyManager.take();
SocketChannel client = (SocketChannel)key.channel();
data = ByteBuffer.allocate(1024);
int numRead = -1;
try {
numRead = client.read(data);
} catch (IOException e) {
key.cancel();
client.close();
return;
}
if (numRead == -1) {
client.close();
key.cancel();
return;
}
data.flip();
bais = new ByteArrayInputStream(data.array());
in = new ObjectInputStream(bais);
requests.put((Request)in.readObject());
destination.put(key);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private void process() {
try {
Request request = requests.take();
responses.put(new Response(request.getInfo() + " enrolled"));
} catch(InterruptedException e) {
System.out.println(e.toString());
}
}
private void write() {
try {
Response response = responses.take();
SelectionKey key = destination.take();
SocketChannel client = (SocketChannel)key.channel();
baos = new ByteArrayOutputStream();
out = new ObjectOutputStream(baos);
out.writeObject(response);
out.flush();
client.write(ByteBuffer.wrap(baos.toByteArray()));
client.close();
} catch (InterruptedException | IOException e) {
System.out.println(e.toString());
}
}
}
Проблема в строке: in = new ObjectInputStream(bais);
Я попытался это исправить и понял, что если я определю фиксированный пул потоков с размером n
, то после n
раз получения правильных результатов произойдет эта ошибка. Иногда ошибка появлялась раньше.
Может ли кто-нибудь объяснить мне, почему это произошло, и если можете, можете ли вы предложить мне какое-нибудь решение? Большое спасибо!
🤔 А знаете ли вы, что...
Java поддерживает лямбда-выражения, введенные в Java 8, для более удобной работы с функциями.
О боже, с чего начнем?
Я надеюсь, что вы делаете это в целях обучения, потому что создать правильный сервер NIO довольно сложно. Так что лучше использовать существующие. Попробуйте netti/jetty, это хорошо известная библиотека.
А теперь давайте проанализируем проблемы с вашим кодом:
ByteBuffer data
(в любом случае вы делаете это неправильно, вы обращаетесь к ней не потокобезопасным способом).client.read(data)
прочитаете одно сообщение целиком. Он может прочитать его часть. Он может читать два. Он может читать вторую часть первого сообщения и первую часть второго.SelectionKey.OP_READ
перед тем, как начать читать, и вернуть его после окончания. В противном случае два потока могут читать из одного и того же канала.out.writeObject(response)
, нужно подождать, пока канал будет готов.Это то, что я заметил с первого взгляда, возможно, их больше.
Чтобы просмотреть полный список проблем, разместите свой код в CodeReview.
Если вас интересует рабочий пример, вы можете попробовать изучить исходный код netty, но он очень сложен.
У меня есть проект с простой версией разъема NIO, но он далек от совершенства. Например, есть третья проблема из списка выше.