1 package org.neo.swarm.util.network.tcp;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.OutputStream;
6 import java.nio.ByteBuffer;
7 import java.nio.channels.Selector;
8 import java.nio.channels.SocketChannel;
9
10 import org.neo.swarm.util.io.ByteBufferInputStream;
11 import org.neo.swarm.util.io.ByteBufferOutputStream;
12 import org.neo.swarm.util.io.PacketInputStream;
13 import org.neo.swarm.util.io.PacketOutputStream;
14 import org.neo.swarm.util.network.tcp.client.SocketReader;
15 import org.neo.swarm.util.network.tcp.client.SocketWriter;
16
17 /***
18 * Sets up some packet, buffer streams to work with NBIO sockets.
19 *
20 * @author navery
21 */
22 public class ObjectReader implements Runnable {
23
24 private ListenCallback callback;
25
26 private InputStream is;
27
28 private OutputStream os;
29
30 public ObjectReader(SocketChannel channel, Selector selector, ListenCallback callback, int bufferSize) {
31 this.callback = callback;
32 try {
33 this.is = new PacketInputStream(new ByteBufferInputStream(PacketInputStream.START_DATA.length + 4,
34 PacketInputStream.END_DATA.length, ByteBuffer.allocate(bufferSize), new SocketReader(null, 8888,
35 channel)));
36 this.os = new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(bufferSize),
37 new SocketWriter(channel)));
38 } catch (IOException ex) {
39 ex.printStackTrace();
40 }
41 }
42
43
44 /***
45 * Called in the context of a threadpool
46 */
47 public void run() {
48 this.read();
49 }
50 /***
51 * Drain the channel
52 *
53 * @return
54 */
55 public boolean read() {
56 try {
57
58
59 synchronized (is) {
60 int avail;
61 while ((avail = is.available()) > 0) {
62 byte[] incoming = new byte[avail];
63 is.read(incoming);
64 byte[] outgoing = callback.messageDataReceived(incoming);
65 if (outgoing.length > 0) {
66 os.write(outgoing);
67 os.flush();
68 }
69 }
70 }
71 return true;
72 } catch (IOException ex) {
73 ex.printStackTrace();
74 return true;
75 }
76 }
77 }