1
2
3
4 package org.neo.swarm.util.network.tcp;
5
6 import java.io.InputStream;
7 import java.io.OutputStream;
8 import java.net.InetAddress;
9 import java.nio.ByteBuffer;
10
11 import junit.framework.TestCase;
12
13 import org.neo.swarm.util.io.ByteBufferInputStream;
14 import org.neo.swarm.util.io.ByteBufferOutputStream;
15 import org.neo.swarm.util.io.PacketInputStream;
16 import org.neo.swarm.util.io.PacketOutputStream;
17 import org.neo.swarm.util.network.tcp.ListenCallback;
18 import org.neo.swarm.util.network.tcp.NIOTcpServer;
19 import org.neo.swarm.util.network.tcp.TcpServiceAPI;
20 import org.neo.swarm.util.network.tcp.client.SocketReader;
21 import org.neo.swarm.util.network.tcp.client.SocketWriter;
22 import org.neo.swarm.util.threads.OswegoThreadPool;
23
24 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
25
26 /***
27 * Tests the ability of SocketStreams that support ByteBuffer
28 * @author navery
29 */
30 public class SocketStreamsTest extends TestCase {
31 private static String PAD = "BytePaddingMessageThatDoesStuffXXXXYYYZZZAABBCCAADDEEFFQQWWEERRTTYY";
32 TcpServiceAPI server;
33 int dataSize;
34 long startTime;
35 int numIterations;
36 int callbackCount;
37
38 boolean verbose = false;
39 String host = "localhost";
40 int port = 8888;
41 int threadpoolsize = 1;
42 int timeout = 1000;
43
44
45
46
47
48
49 protected void setUp() throws Exception {
50 this.startTime = System.currentTimeMillis();
51 }
52
53 protected void tearDown() throws Exception {
54 long endTime = System.currentTimeMillis();
55 double timeSecs = (endTime - startTime) / 1000.0;
56 double rate = numIterations/ timeSecs;
57 double tp = rate * dataSize;
58 System.out.println("\nDone elapsed time:" + timeSecs + " pktSize:" + dataSize + " rate:" + rate + " throughput:" + tp / 1000.0 + "kb/s");
59 this.server.stopServer();
60 }
61
62 public void _testSimpleClientServer() throws Exception {
63 verbose = true;
64 MyListenerCallbackOneWay cb = new MyListenerCallbackOneWay();
65 String expectedDataString = "0123456789";
66 byte[] expectedData = expectedDataString.getBytes();
67
68 server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), 1024, InetAddress.getByName(host), port, timeout);
69 server.startServer();
70
71
72 OutputStream os = new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(100), new SocketWriter(InetAddress.getByName(host), port, true)));
73 os.write(expectedData);
74 os.flush();
75
76 int wait = 0;
77 while (!cb.called) {
78 Thread.sleep(100);
79 }
80 }
81
82 public void _testSimpleClientServerwithReply() throws Exception {
83 verbose = true;
84 ListenCallback cb = new MyListenerCallbackWithReply();
85 String expectedDataString = "myTestMessagemyTestMessagemyTestMessagemyTestMessage";
86 byte[] expectedData = expectedDataString.getBytes();
87
88 this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), 1024, InetAddress.getByName(host), port, timeout);
89 this.server.startServer();
90
91 SocketWriter sw = new SocketWriter(InetAddress.getByName(host), port, true);
92 OutputStream os = new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(100), sw ));
93 InputStream is = new PacketInputStream (new ByteBufferInputStream(PacketInputStream.START_DATA.length + 4, PacketInputStream.END_DATA.length, ByteBuffer.allocate(100), new SocketReader(null, 8888, sw.getSocketChannel())));
94
95 os.write(expectedData);
96 os.flush();
97
98 int available = is.available();
99 byte[] returnBuffer = new byte[available];
100 int count = is.read(returnBuffer);
101 System.out.println("reponse:" + new String(returnBuffer));
102 assertEquals(10, count);
103 }
104
105
106 public void testOneWayPerformance() throws Exception {
107 verbose = false;
108 ListenCallback cb = new MyListenerCallbackOneWay();
109
110 byte[] expectedData = makeMessage(1024);
111
112 this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), 4096, InetAddress.getByName(host), port, timeout);
113 this.server.startServer();
114
115 OutputStream os = new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(4096), new SocketWriter(InetAddress.getByName(host), port, true)));
116
117 this.numIterations = 100000;
118 this.callbackCount = 0;
119 this.startTime = System.currentTimeMillis();
120 this.dataSize = expectedData.length;
121
122
123 for (int i = 0; i < numIterations; i++) {
124 os.write(expectedData);
125 os.flush();
126 }
127
128 while (this.callbackCount < this.numIterations) {
129 Thread.sleep(1000);
130 }
131 }
132
133 public void testTwoWayPerformance() throws Exception {
134 verbose = false;
135 int dataSize = 1024;
136 int bufferSize = 4096;
137 ListenCallback cb = new MyListenerCallbackWithReply();
138 byte[] expectedData = makeMessage(dataSize);
139
140
141 this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), bufferSize, InetAddress.getByName(host), port, timeout);
142 this.server.startServer();
143
144 SocketWriter sw = new SocketWriter(InetAddress.getByName(host), port, true);
145 OutputStream os = new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(bufferSize), sw));
146 InputStream is = new PacketInputStream (new ByteBufferInputStream(PacketInputStream.START_DATA.length + 4, PacketInputStream.END_DATA.length, ByteBuffer.allocate(bufferSize), new SocketReader(null, 8888, sw.getSocketChannel())));
147
148
149
150 this.numIterations = 100000;
151 this.callbackCount = 0;
152 this.startTime = System.currentTimeMillis();
153 this.dataSize = expectedData.length;
154
155 int avail;
156 for (int i = 0; i < numIterations; i++) {
157 os.write(expectedData);
158 os.flush();
159 avail = is.available();
160 byte[] reply = new byte[avail];
161 is.read(reply);
162 }
163
164 while (this.callbackCount < this.numIterations) {
165 Thread.sleep(500);
166 }
167 }
168
169 /***
170 * Simple callback class that allows the server to call out onto user app.
171 */
172 class MyListenerCallbackOneWay implements ListenCallback {
173 public boolean called;
174 public byte[] messageDataReceived(byte[] data) {
175 called = true;
176 callbackCount++;
177 if (verbose) System.out.println(callbackCount + " datasize: "+ data.length);
178 if (callbackCount % 5000 == 0)
179 System.out.println(callbackCount + "[" + data.length + "]");
180 return new byte[0];
181 }
182 }
183 /***
184 * Simple callback class that allows the server to call out onto user app.
185 */
186 class MyListenerCallbackWithReply implements ListenCallback {
187
188 public byte[] messageDataReceived(byte[] data) {
189 if (verbose) System.out.println(callbackCount + " MyListenerCallbackWithReply responding..:" + new String(data));
190 if (callbackCount % 5000 == 0) {
191 System.out.print(callbackCount + "v");
192 }
193
194 callbackCount++;
195 return "0123456789".getBytes();
196 }
197 }
198
199 private byte[] makeMessage(int size) throws Exception {
200 byte[] sendMessage = new byte[size];
201
202 byte[] padding = PAD.getBytes();
203
204
205 int pos = 0;
206 while (pos < size) {
207 sendMessage[pos] = padding[pos++ % padding.length];
208 }
209 return sendMessage;
210 }
211
212 }