1
2
3
4
5
6
7 package org.neo.swarm.util.network.tcp;
8
9 import java.net.InetAddress;
10
11 import junit.framework.TestCase;
12
13 import org.neo.swarm.util.network.tcp.ListenCallback;
14 import org.neo.swarm.util.network.tcp.NIOTcpServer;
15 import org.neo.swarm.util.network.tcp.TcpServiceAPI;
16 import org.neo.swarm.util.network.tcp.client.TcpSender;
17 import org.neo.swarm.util.network.tcp.client.TcpSenderFactory;
18 import org.neo.swarm.util.threads.OswegoThreadPool;
19
20 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
21
22 /***
23 * Test client/server invocations between client and server using TCPIP.
24 *
25 * @author neil.avery
26 */
27 public class TcpSocketsBehaviouralTest extends TestCase {
28
29 private static String PAD = "BytePaddingMessageThatDoesStuffXXXXYYYZZZAABBCCAADDEEFFQQWWEERRTTYY";
30 boolean verbose = false;
31 String host = "localhost";
32 int port = 8888;
33 int calls;
34 int threadpoolsize = 1;
35 int timeout = 1000;
36
37 TcpServiceAPI server;
38 int dataSize;
39 long startTime;
40 int numIterations;
41 int callbackCount;
42
43 protected void tearDown() throws Exception {
44 long endTime = System.currentTimeMillis();
45 double timeSecs = (endTime - startTime) / 1000.0;
46 double rate = numIterations / timeSecs;
47 double tp = rate * dataSize;
48 System.out.println("\nDone elapsed time:" + timeSecs + " pktSize:" + dataSize + " rate:" + rate
49 + " throughput:" + tp / 1000.0 + "kb/s");
50 this.server.stopServer();
51 }
52
53 /***
54 * This test shows a bug in the JDK nio socket libs - When sending large sets of data with pauses, then the
55 * repeating select fails to trigger and pickup the following messages - resulting in them being ignored completely.
56 *
57 * @throws Exception
58 */
59 public void _testStaggeredClientServerWithManyMessages() throws Exception {
60 verbose = false;
61 MyListenerCallback cb = new MyListenerCallback();
62
63 TcpSender client = new TcpSenderFactory().getTcpSender(1024, InetAddress.getByName(host), port);
64
65
66 this.server = new NIOTcpServer(cb,
67 new OswegoThreadPool(new PooledExecutor(), 1, 5), 1024,
68 InetAddress.getByName(host), port, timeout);
69 this.server.startServer();
70
71 String expectedDataString = "0123456789";
72 byte[] expectedData = expectedDataString.getBytes();
73
74
75 this.numIterations = 100000;
76 this.callbackCount = 0;
77 this.startTime = System.currentTimeMillis();
78 this.dataSize = expectedData.length;
79 int stallPeriod = 1000;
80 int batchSize = 100;
81 int numBatchs = 10;
82
83 for (int j = 0; j < numBatchs; j++) {
84 for (int i = 0; i < batchSize; i++) {
85 client.send(new String(expectedDataString + ":" + i).getBytes());
86 }
87 System.out.println("stalling.... sent batch:" + j + " total msgs:" + batchSize * j);
88 Thread.sleep(stallPeriod);
89 }
90
91 System.out.println("sent all messages");
92
93 while (calls != numBatchs * batchSize) {
94 Thread.sleep(1000);
95 }
96 assertEquals(calls, numBatchs * batchSize);
97 }
98
99 /***
100 * @throws Exception
101 */
102 public void testThroughputWith10b() throws Exception {
103 sendDataOneWay(10, 10000);
104 }
105
106 public void _testThroughputWith512K() throws Exception {
107 sendDataOneWay(512, 5000);
108 }
109
110 public void _testThroughputWith1024K() throws Exception {
111 sendDataOneWay(1024, 2000);
112 }
113
114 public void testThroughputWith4024K() throws Exception {
115 sendDataOneWay(4 * 1024, 2000);
116 }
117
118 public void testTwoWayThroughputWith10() throws Exception {
119 sendDataTwoWay(10, 10000);
120 }
121
122 public void _testTwoWayThroughputWith1024() throws Exception {
123 sendDataTwoWay(1024, 2000);
124 }
125
126 public void testTwoWayThroughputWith4K() throws Exception {
127 sendDataTwoWay(4 * 1024, 200);
128 }
129
130
131
132
133 public void testMaxDataSetSize() throws Exception {
134 int size = 1024 * 2 * 2;
135 this.verbose = true;
136 for (int i = 0; i < 8; i++) {
137 calls = 0;
138 System.out.println("xxxxxxxxxxxxxxx testing size:" + size);
139 sendDataTwoWay(size, 10);
140 size *= 2;
141 }
142 this.verbose = false;
143 }
144
145 /***
146 * Find largest amount of data possable
147 */
148 private void sendDataOneWay(int size, int totalCalls) throws Exception {
149 ;
150
151 verbose = false;
152 MyListenerCallback cb = new MyListenerCallback();
153
154
155 this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), 4 * 1024, InetAddress
156 .getByName(host), port, timeout);
157 this.server.startServer();
158
159 TcpSender client = new TcpSenderFactory().getTcpSender(4 * 1024, InetAddress.getByName(host), port);
160
161 byte[] sendMessage = makeMessage(size);
162
163
164 this.numIterations = totalCalls;
165 this.callbackCount = 0;
166 this.startTime = System.currentTimeMillis();
167 this.dataSize = size;
168
169 calls = 0;
170
171 for (int j = 0; j < this.numIterations; j++) {
172 client.send(new String(j + ":" + new String(sendMessage)).getBytes());
173 }
174
175
176 int wait = 0;
177 while (calls < this.numIterations && wait++ < 10) {
178 Thread.sleep(1000);
179 }
180 assertEquals(calls, this.numIterations);
181 }
182
183 private void sendDataTwoWay(int size, int totalCalls) throws Exception {
184 ;
185
186 MyListenerCallbackWithReply cb = new MyListenerCallbackWithReply();
187
188
189 if (this.server == null) {
190
191 this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), 4 * 1048, InetAddress
192 .getByName(host), port, timeout);
193 this.server.startServer();
194 }
195
196 TcpSender client = new TcpSenderFactory().getTcpSender(4 * 1048, InetAddress.getByName(host), port);
197
198 byte[] sendMessage = makeMessage(size);
199
200
201 this.numIterations = totalCalls;
202 this.callbackCount = 0;
203 this.startTime = System.currentTimeMillis();
204 this.dataSize = size;
205 calls = 0;
206
207 for (int j = 0; j < this.numIterations; j++) {
208 byte[] blah = client.sendAndReceiveMessage(new String(j + ":" + new String(sendMessage)).getBytes());
209 }
210
211
212 int wait = 0;
213 while (calls < this.numIterations && wait++ < 5) {
214 Thread.sleep(1000);
215 }
216 assertEquals(calls, this.numIterations);
217 }
218
219 /***
220 * Simple callback class that allows the server to call out onto use app.
221 */
222 class MyListenerCallback implements ListenCallback {
223
224 public byte[] messageDataReceived(byte[] data) {
225
226 calls++;
227 if (verbose || (calls % 5000) == 0) {
228 System.out.println("cb:" + calls + " msgSize" + data.length);
229 }
230 return new byte[0];
231 }
232 }
233
234 /***
235 * Simple callback class that allows the server to call out onto use app.
236 */
237 class MyListenerCallbackWithReply implements ListenCallback {
238
239 public long expected = 0;
240
241 public byte[] messageDataReceived(byte[] data) {
242 calls++;
243 if (verbose || (calls % 5000) == 0) {
244 System.out.println("cb:" + calls + " msgSize" + data.length);
245 }
246
247 return "datacallbackreceived".getBytes();
248 }
249 }
250
251 private byte[] makeMessage(int size) throws Exception {
252 byte[] sendMessage = new byte[size];
253
254 byte[] padding = PAD.getBytes();
255
256
257 int pos = 0;
258 while (pos < size) {
259 sendMessage[pos] = padding[pos++ % padding.length];
260 }
261 return sendMessage;
262 }
263
264 }