View Javadoc

1   package org.neo.swarm.util.network.tcp;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.OutputStream;
6   import java.nio.ByteBuffer;
7   import java.nio.channels.Selector;
8   import java.nio.channels.SocketChannel;
9   
10  import org.neo.swarm.util.io.ByteBufferInputStream;
11  import org.neo.swarm.util.io.ByteBufferOutputStream;
12  import org.neo.swarm.util.io.PacketInputStream;
13  import org.neo.swarm.util.io.PacketOutputStream;
14  import org.neo.swarm.util.network.tcp.client.SocketReader;
15  import org.neo.swarm.util.network.tcp.client.SocketWriter;
16  
17  /***
18   * Sets up some packet, buffer streams to work with NBIO sockets.
19   * 
20   * @author navery
21   */
22  public class ObjectReader implements Runnable {
23  
24      private ListenCallback callback;
25  
26      private InputStream is;
27  
28      private OutputStream os;
29  
30      public ObjectReader(SocketChannel channel, Selector selector, ListenCallback callback, int bufferSize) {
31          this.callback = callback;
32          try {
33              this.is = new PacketInputStream(new ByteBufferInputStream(PacketInputStream.START_DATA.length + 4,
34                      PacketInputStream.END_DATA.length, ByteBuffer.allocate(bufferSize), new SocketReader(null, 8888,
35                              channel)));
36              this.os = new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(bufferSize),
37                      new SocketWriter(channel)));
38          } catch (IOException ex) {
39              ex.printStackTrace();
40          }
41      }
42  
43      
44  	/***
45  	 * Called in the context of a threadpool
46  	 */
47  	public void run() {
48  		this.read();
49  	}
50      /***
51       * Drain the channel
52       * 
53       * @return
54       */
55      public boolean read() {
56          try {
57  
58              // sync the IS to prevent subsequent selects from hitting it. 
59              synchronized (is) {
60                  int avail;
61                  while ((avail = is.available()) > 0) {
62                      byte[] incoming = new byte[avail];
63                      is.read(incoming);
64                      byte[] outgoing = callback.messageDataReceived(incoming);
65                      if (outgoing.length > 0) {
66                          os.write(outgoing);
67                          os.flush();
68                      }
69                  }
70              }
71              return true;
72          } catch (IOException ex) {
73              ex.printStackTrace();
74              return true;
75          }
76      }
77  }