View Javadoc

1   package org.neo.swarm.util.network.tcp.client;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.OutputStream;
6   import java.net.InetAddress;
7   import java.nio.ByteBuffer;
8   
9   import org.neo.swarm.util.io.ByteBufferInputStream;
10  import org.neo.swarm.util.io.ByteBufferOutputStream;
11  import org.neo.swarm.util.io.PacketInputStream;
12  import org.neo.swarm.util.io.PacketOutputStream;
13  
14  public class TcpAsyncSender implements TcpSender {
15  
16      private InetAddress address;
17      private int port;
18      private boolean isSocketConnected = false;
19      private int bufferSize;
20      OutputStream os;
21      InputStream is;
22  
23      boolean blocking;
24  
25      public TcpAsyncSender(int bufferSize, InetAddress host, int port, boolean blocking) {
26          this.bufferSize = bufferSize;
27          this.address = host;
28          this.port = port;
29          this.blocking = blocking;
30      }
31  
32      public void connect() throws java.io.IOException {
33          // create an OutputSocketStream.
34          SocketWriter sw = new SocketWriter(address, port, blocking);
35          os = new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(this.bufferSize), sw));
36          is =         this.is = new PacketInputStream (new ByteBufferInputStream(PacketInputStream.START_DATA.length + 4, PacketInputStream.END_DATA.length, ByteBuffer.allocate(bufferSize), new SocketReader(null, 8888, sw.getSocketChannel())));
37          isSocketConnected = true;
38      }
39  
40      public void disconnect() {
41          try {
42              os.close();
43              is.close();
44          } catch (Exception x) {
45          }
46          isSocketConnected = false;
47      }
48  
49      public boolean isConnected() {
50          return isSocketConnected;
51      }
52  
53      /***
54       * Send without reply
55       */
56      public synchronized void send(byte[] data) throws IOException {
57          if (!isConnected()) {
58              connect();
59          }
60          os.write(data);
61          os.flush();
62      }
63  
64      /***
65       * Send message and wait for expected reply
66       */
67      public synchronized byte[] sendAndReceiveMessage(byte[] data) throws IOException {
68          if (!isConnected()) {
69              connect();
70          }
71          this.send(data);
72          int avail = 0;
73  
74          // polling support for NBIO
75          while ((avail = is.available()) == 0) {
76              Thread.yield();
77          }
78          byte[] buffer = new byte[avail];
79          is.read(buffer);
80          return buffer;
81      }
82  }