1   /*
2    * Created on Apr 16, 2004
3    */
4   package org.neo.swarm.util.network.tcp;
5   
6   import java.io.InputStream;
7   import java.io.OutputStream;
8   import java.net.InetAddress;
9   import java.nio.ByteBuffer;
10  
11  import junit.framework.TestCase;
12  
13  import org.neo.swarm.util.io.ByteBufferInputStream;
14  import org.neo.swarm.util.io.ByteBufferOutputStream;
15  import org.neo.swarm.util.io.PacketInputStream;
16  import org.neo.swarm.util.io.PacketOutputStream;
17  import org.neo.swarm.util.network.tcp.ListenCallback;
18  import org.neo.swarm.util.network.tcp.NIOTcpServer;
19  import org.neo.swarm.util.network.tcp.TcpServiceAPI;
20  import org.neo.swarm.util.network.tcp.client.SocketReader;
21  import org.neo.swarm.util.network.tcp.client.SocketWriter;
22  import org.neo.swarm.util.threads.OswegoThreadPool;
23  
24  import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
25  
26  /***
27   * Tests the ability of SocketStreams that support ByteBuffer
28   *  @author navery
29   */
30  public class SocketStreamsTest extends TestCase {
31  	private static String PAD = "BytePaddingMessageThatDoesStuffXXXXYYYZZZAABBCCAADDEEFFQQWWEERRTTYY";
32      TcpServiceAPI server;
33      int dataSize;
34      long startTime;
35  	int numIterations;
36  	int callbackCount;
37  	
38  	boolean verbose = false;
39  	String host = "localhost";
40  	int port = 8888;
41  	int threadpoolsize = 1;
42  	int timeout = 1000; // 0 blocks indefinately (othewise time in ms)
43  
44  
45  	
46      /* (non-Javadoc)
47       * @see junit.framework.TestCase#setUp()
48       */
49      protected void setUp() throws Exception {
50       	this.startTime = System.currentTimeMillis();
51      }
52      
53      protected void tearDown() throws Exception {
54  		long endTime = System.currentTimeMillis();
55  		double timeSecs = (endTime - startTime) / 1000.0;
56  		double rate = numIterations/ timeSecs;
57  		double tp = rate * dataSize;
58  		System.out.println("\nDone elapsed time:" 	+ timeSecs + " pktSize:" + dataSize + " rate:" + rate + " throughput:" + tp / 1000.0 + "kb/s");		
59  		this.server.stopServer();
60      }
61      
62  	public void _testSimpleClientServer() throws Exception {
63  		verbose = true;
64  		MyListenerCallbackOneWay cb = new MyListenerCallbackOneWay();
65  		String expectedDataString = "0123456789";
66  		byte[] expectedData = expectedDataString.getBytes();
67  		// create the tcp server which will callback on our object.
68  		 server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5),  1024, InetAddress.getByName(host), port, timeout);
69  		server.startServer();
70  		
71  		// create an OutputSocketStream.
72  		OutputStream os =  new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(100), new SocketWriter(InetAddress.getByName(host), port, true)));
73  		os.write(expectedData);
74  		os.flush();
75  		// wait until the cb object has received the data
76  		int wait = 0;
77  		while (!cb.called) {
78  			Thread.sleep(100);
79  		}
80  	}
81  
82  	public void _testSimpleClientServerwithReply() throws Exception {
83  		verbose = true;
84  		ListenCallback cb = new MyListenerCallbackWithReply();
85  		String expectedDataString = "myTestMessagemyTestMessagemyTestMessagemyTestMessage";
86  		byte[] expectedData = expectedDataString.getBytes();
87  		// create the tcp server which will callback on our object.
88  		this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), 1024, InetAddress.getByName(host), port, timeout);
89  		this.server.startServer();
90  		
91  		SocketWriter sw = new SocketWriter(InetAddress.getByName(host),	port, true);
92  		OutputStream os =  new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(100), sw ));		
93          InputStream is = new PacketInputStream (new ByteBufferInputStream(PacketInputStream.START_DATA.length + 4, PacketInputStream.END_DATA.length, ByteBuffer.allocate(100), new SocketReader(null, 8888, sw.getSocketChannel())));		
94  		
95  		os.write(expectedData);
96  		os.flush();
97  		
98  		int available = is.available();
99  		byte[] returnBuffer = new byte[available];
100 		int count = is.read(returnBuffer);
101 		System.out.println("reponse:" + new String(returnBuffer));
102 		assertEquals(10, count);
103 	}
104 	
105 	
106 	public void testOneWayPerformance() throws Exception {
107 		verbose = false;
108 		ListenCallback cb = new MyListenerCallbackOneWay();
109 
110 		byte[] expectedData = makeMessage(1024);
111 		// create the tcp server which will callback on our object.
112 		this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), 4096, InetAddress.getByName(host), port, timeout);
113 		this.server.startServer();
114 		
115 		OutputStream os = new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(4096), new SocketWriter(InetAddress.getByName(host),	port, true)));		
116 
117 		this.numIterations = 100000;
118 		this.callbackCount = 0;
119 		this.startTime = System.currentTimeMillis();
120 		this.dataSize = expectedData.length;
121 		
122 		// pump through the messages 
123 		for (int i = 0; i < numIterations; i++) {
124 			os.write(expectedData);
125 			os.flush();
126 		}
127 			
128 		while (this.callbackCount < this.numIterations) {
129 			Thread.sleep(1000);
130 		}
131 	}
132 		
133 	public void testTwoWayPerformance() throws Exception {
134 		verbose = false;
135 		int dataSize = 1024;
136 		int bufferSize = 4096;
137 		ListenCallback cb = new MyListenerCallbackWithReply();
138 		byte[] expectedData = makeMessage(dataSize);
139 
140 		// create the tcp server which will callback on our object.
141 		this.server = new NIOTcpServer(cb, new OswegoThreadPool(new PooledExecutor(), 1, 5), bufferSize, InetAddress.getByName(host), port, timeout);
142 		this.server.startServer();
143 		
144 		SocketWriter sw = new SocketWriter(InetAddress.getByName(host),	port, true);
145 		OutputStream os = new PacketOutputStream(new ByteBufferOutputStream(ByteBuffer.allocate(bufferSize), sw));		
146 		InputStream is = new PacketInputStream (new ByteBufferInputStream(PacketInputStream.START_DATA.length + 4, PacketInputStream.END_DATA.length, ByteBuffer.allocate(bufferSize), new SocketReader(null, 8888, sw.getSocketChannel())));		
147 
148 
149 		//setup stats
150 		this.numIterations = 100000;
151 		this.callbackCount = 0;	
152 		this.startTime = System.currentTimeMillis();
153 		this.dataSize = expectedData.length;
154 		
155 		int avail;
156 		for (int i = 0; i < numIterations; i++) {
157 			os.write(expectedData);
158 			os.flush();
159 			avail = is.available();
160 			byte[] reply = new byte[avail];
161 			is.read(reply);
162 		}
163 
164 		while (this.callbackCount < this.numIterations) {
165 			Thread.sleep(500);
166 		}
167 	}
168 
169 	/***
170 	 * Simple callback class that allows the server to call out onto user app.
171 	 */
172 	class MyListenerCallbackOneWay implements ListenCallback {
173 		public boolean called;
174 		public byte[] messageDataReceived(byte[] data) {
175 			called = true;
176 			callbackCount++;
177 			if (verbose) System.out.println(callbackCount + " datasize: "+ data.length);
178 			if (callbackCount % 5000 == 0) 
179 				System.out.println(callbackCount + "[" + data.length + "]");
180 			return new byte[0];
181 		}
182 	}
183 	/***
184 	 * Simple callback class that allows the server to call out onto user app.
185 	 */
186 	class MyListenerCallbackWithReply implements ListenCallback {
187 		
188 		public byte[] messageDataReceived(byte[] data) {
189 			if (verbose) System.out.println(callbackCount + " MyListenerCallbackWithReply responding..:" + new String(data));
190 			if (callbackCount % 5000 == 0) {
191 				System.out.print(callbackCount + "v");
192 			}
193 
194 			callbackCount++;
195 			return "0123456789".getBytes();
196 		}
197 	}
198 
199 	private byte[] makeMessage(int size) throws Exception {
200 		byte[] sendMessage = new byte[size];
201 		
202 		byte[] padding = PAD.getBytes();
203 
204 		// fill the buffer
205 		int pos = 0;
206 		while (pos < size) {
207 			sendMessage[pos] = padding[pos++ % padding.length];
208 		}
209 		return sendMessage;
210 	}
211 
212 }