1   /*
2    * Created on Nov 5, 2003
3    * 
4    * Copyright neotechnologies.org
5    */
6   
7   package org.neo.swarm.util.network.tcp;
8   
9   import java.net.InetAddress;
10  
11  import junit.framework.TestCase;
12  
13  import org.neo.swarm.util.network.tcp.ListenCallback;
14  import org.neo.swarm.util.network.tcp.NIOTcpServer;
15  import org.neo.swarm.util.network.tcp.TcpServiceAPI;
16  import org.neo.swarm.util.network.tcp.client.TcpSender;
17  import org.neo.swarm.util.network.tcp.client.TcpSenderFactory;
18  import org.neo.swarm.util.threads.OswegoThreadPool;
19  
20  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
21  
22  /***
23   * Test client/server invocations between client and server using TCPIP.
24   * 
25   * @author neil.avery
26   */
27  public class TcpSocketsBehaviouralTest extends TestCase {
28  
29  	private static String PAD = "BytePaddingMessageThatDoesStuffXXXXYYYZZZAABBCCAADDEEFFQQWWEERRTTYY";
30  	boolean verbose = false;
31  	String host = "localhost";
32  	int port = 8888;
33  	int calls;
34  	int threadpoolsize = 1;
35  	int timeout = 1000; // 0 blocks indefinately (othewise time in ms)
36  
37  	TcpServiceAPI server;
38  	int dataSize;
39  	long startTime;
40  	int numIterations;
41  	int callbackCount;
42  
43  	protected void tearDown() throws Exception {
44  		long endTime = System.currentTimeMillis();
45  		double timeSecs = (endTime - startTime) / 1000.0;
46  		double rate = numIterations / timeSecs;
47  		double tp = rate * dataSize;
48  		System.out.println("\nDone elapsed time:" + timeSecs + " pktSize:" + dataSize + " rate:" + rate
49  						+ " throughput:" + tp / 1000.0 + "kb/s");
50  		this.server.stopServer();
51  	}
52  
53  	/***
54  	 * This test shows a bug in the JDK nio socket libs - When sending large sets of data with pauses, then the
55  	 * repeating select fails to trigger and pickup the following messages - resulting in them being ignored completely.
56  	 * 
57  	 * @throws Exception
58  	 */
59  	public void _testStaggeredClientServerWithManyMessages() throws Exception {
60  		verbose = false;
61  		MyListenerCallback cb = new MyListenerCallback();
62  		// create the client and send a String.
63  		TcpSender client = new TcpSenderFactory().getTcpSender(1024, InetAddress.getByName(host), port);
64  
65  		// create the tcp server which will callback on our object.
66  		this.server = new NIOTcpServer(cb, 
67                          new OswegoThreadPool(new PooledExecutor(), 1, 5), 1024, 
68                          InetAddress.getByName(host), port, timeout);
69  		this.server.startServer();
70  
71  		String expectedDataString = "0123456789";
72  		byte[] expectedData = expectedDataString.getBytes();
73  
74  		//setup stats
75  		this.numIterations = 100000;
76  		this.callbackCount = 0;
77  		this.startTime = System.currentTimeMillis();
78  		this.dataSize = expectedData.length;
79  		int stallPeriod = 1000;
80  		int batchSize = 100;
81  		int numBatchs = 10;
82  
83  		for (int j = 0; j < numBatchs; j++) {
84  			for (int i = 0; i < batchSize; i++) {
85  				client.send(new String(expectedDataString + ":" + i).getBytes());
86  			}
87  			System.out.println("stalling.... sent batch:" + j + " total msgs:" + batchSize * j);
88  			Thread.sleep(stallPeriod);
89  		}
90  
91  		System.out.println("sent all messages");
92  
93  		while (calls != numBatchs * batchSize) {
94  			Thread.sleep(1000);
95  		}
96  		assertEquals(calls, numBatchs * batchSize);
97  	}
98  
99  	/***
100 	 * @throws Exception
101 	 */
102 	public void testThroughputWith10b() throws Exception {
103 		sendDataOneWay(10, 10000);
104 	}
105 
106 	public void _testThroughputWith512K() throws Exception {
107 		sendDataOneWay(512, 5000);
108 	}
109 
110 	public void _testThroughputWith1024K() throws Exception {
111 		sendDataOneWay(1024, 2000);
112 	}
113 
114 	public void testThroughputWith4024K() throws Exception {
115 		sendDataOneWay(4 * 1024, 2000);
116 	}
117 
118 	public void testTwoWayThroughputWith10() throws Exception {
119 		sendDataTwoWay(10, 10000);
120 	}
121 
122 	public void _testTwoWayThroughputWith1024() throws Exception {
123 		sendDataTwoWay(1024, 2000);
124 	}
125 
126 	public void testTwoWayThroughputWith4K() throws Exception {
127 		sendDataTwoWay(4 * 1024, 200);
128 	}
129 
130 	/*
131 	 * Increment datasize until we pass or throw an exception.
132 	 */
133 	public void testMaxDataSetSize() throws Exception {
134 		int size = 1024 * 2 * 2;
135 		this.verbose = true;
136 		for (int i = 0; i < 8; i++) {
137 			calls = 0;
138 			System.out.println("xxxxxxxxxxxxxxx testing size:" + size);
139 			sendDataTwoWay(size, 10);
140 			size *= 2;
141 		}
142 		this.verbose = false;
143 	}
144 
145 	/***
146 	 * Find largest amount of data possable
147 	 */
148 	private void sendDataOneWay(int size, int totalCalls) throws Exception {
149 		;
150 
151 		verbose = false;
152 		MyListenerCallback cb = new MyListenerCallback();
153 
154 		// setup the server and forget about it.
155 		this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), 4 * 1024, InetAddress
156 						.getByName(host), port, timeout);
157 		this.server.startServer();
158 		// create the client and send a String.
159 		TcpSender client = new TcpSenderFactory().getTcpSender(4 * 1024, InetAddress.getByName(host), port);
160 
161 		byte[] sendMessage = makeMessage(size);
162 
163 		//setup stats
164 		this.numIterations = totalCalls;
165 		this.callbackCount = 0;
166 		this.startTime = System.currentTimeMillis();
167 		this.dataSize = size;
168 
169 		calls = 0;
170 
171 		for (int j = 0; j < this.numIterations; j++) {
172 			client.send(new String(j + ":" + new String(sendMessage)).getBytes());
173 		}
174 
175 		// wait until the cb object has received the data
176 		int wait = 0;
177 		while (calls < this.numIterations && wait++ < 10) {
178 			Thread.sleep(1000);
179 		}
180 		assertEquals(calls, this.numIterations);
181 	}
182 
183 	private void sendDataTwoWay(int size, int totalCalls) throws Exception {
184 		;
185 
186 		MyListenerCallbackWithReply cb = new MyListenerCallbackWithReply();
187 
188 		// setup the server and forget about it.
189 		if (this.server == null) {
190 
191 			this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), 4 * 1048, InetAddress
192 							.getByName(host), port, timeout);
193 			this.server.startServer();
194 		}
195 		// create the client and send a String.
196 		TcpSender client = new TcpSenderFactory().getTcpSender(4 * 1048, InetAddress.getByName(host), port);
197 
198 		byte[] sendMessage = makeMessage(size);
199 
200 		//setup stats
201 		this.numIterations = totalCalls;
202 		this.callbackCount = 0;
203 		this.startTime = System.currentTimeMillis();
204 		this.dataSize = size;
205 		calls = 0;
206 
207 		for (int j = 0; j < this.numIterations; j++) {
208 			byte[] blah = client.sendAndReceiveMessage(new String(j + ":" + new String(sendMessage)).getBytes());
209 		}
210 
211 		// wait until the cb object has received the data
212 		int wait = 0;
213 		while (calls < this.numIterations && wait++ < 5) {
214 			Thread.sleep(1000);
215 		}
216 		assertEquals(calls, this.numIterations);
217 	}
218 
219 	/***
220 	 * Simple callback class that allows the server to call out onto use app.
221 	 */
222 	class MyListenerCallback implements ListenCallback {
223 
224 		public byte[] messageDataReceived(byte[] data) {
225 
226 			calls++;
227 			if (verbose || (calls % 5000) == 0) {
228 				System.out.println("cb:" + calls + " msgSize" + data.length);
229 			}
230 			return new byte[0];
231 		}
232 	}
233 
234 	/***
235 	 * Simple callback class that allows the server to call out onto use app.
236 	 */
237 	class MyListenerCallbackWithReply implements ListenCallback {
238 
239 		public long expected = 0;
240 
241 		public byte[] messageDataReceived(byte[] data) {
242 			calls++;
243 			if (verbose || (calls % 5000) == 0) {
244 				System.out.println("cb:" + calls + " msgSize" + data.length);
245 			}
246 
247 			return "datacallbackreceived".getBytes();
248 		}
249 	}
250 
251 	private byte[] makeMessage(int size) throws Exception {
252 		byte[] sendMessage = new byte[size];
253 
254 		byte[] padding = PAD.getBytes();
255 
256 		// fill the buffer
257 		int pos = 0;
258 		while (pos < size) {
259 			sendMessage[pos] = padding[pos++ % padding.length];
260 		}
261 		return sendMessage;
262 	}
263 
264 }