View Javadoc

1   /*
2    * Created on Nov 5, 2003
3    * 
4    */
5   package org.neo.swarm.util.network.tcp;
6   
7   import java.net.InetSocketAddress;
8   import java.net.ServerSocket;
9   import java.nio.ByteBuffer;
10  import java.nio.channels.SelectableChannel;
11  import java.nio.channels.SelectionKey;
12  import java.nio.channels.Selector;
13  import java.nio.channels.ServerSocketChannel;
14  import java.nio.channels.SocketChannel;
15  import java.util.Iterator;
16  import java.util.Set;
17  import java.util.logging.Logger;
18  
19  import org.neo.swarm.util.threads.ThreadPool;
20  
21  /***
22   * NBlocking ServerSocketFactory - pumps out incoming events onto the associated channels SocketStream 
23   * that gets processed by the next available thread in the pool.
24   * 
25   * @author neil.avery
26   */
27  public class NIOTcpServer extends Thread implements TcpServiceAPI {
28  
29  	private static Logger logger = Logger.getLogger(NIOTcpServer.class.toString());	
30  
31  	private ThreadPool pool;
32  	private boolean doListen = false;
33  	private ListenCallback callback;
34  	private java.net.InetAddress bind;
35  	private int port;
36  	private long timeout = 0;
37  	private boolean isStarted = false;
38  	private int bufferSize;
39  
40  	public NIOTcpServer(ListenCallback callback, ThreadPool pool, int bufferSize, java.net.InetAddress bind, int port, long timeout) {
41  		this.pool = pool;
42  		this.bufferSize = bufferSize;
43  		this.callback = callback;
44  		this.bind = bind;
45  		this.port = port;
46  		this.timeout = timeout;
47  	}
48  
49  	public void run() {
50  		try {
51  			listen();
52  		} catch (Exception x) {
53  			logger.warning("Unable to start tcp server listener thread:"+ x + " bind:" + bind + " port:" + port);
54  		}
55  	}
56  
57  	public void listen() throws Exception {
58  		doListen = true;
59  		// allocate an unbound server socket channel
60  		ServerSocketChannel serverChannel = ServerSocketChannel.open();
61  		// Get the associated ServerSocket to bind it with
62  		ServerSocket serverSocket = serverChannel.socket();
63  		// create a new Selector for use below
64  		Selector selector = Selector.open();
65  		// set the port the server channel will listen to
66  		serverSocket.bind(new InetSocketAddress(bind, port));
67  		// set non-blocking mode for the listening socket
68  		serverChannel.configureBlocking(false);
69  		// register the ServerSocketChannel with the Selector
70  		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
71  		logger.info("Starting tcp server on host:" + bind + " port:" + port);
72  		setStarted(true);
73  		while (doListen) {
74  			// this may block for a long time, upon return the
75  			// selected set contains keys of the ready channels
76  			// workaround for bug in JDK1.4.2 nio libs...without this staggered sending cause staggereing
77  			// reception of messages where the stagger == timeout period (recommended max 500ms).
78  			int n = selector.select(500);
79  			if (n == 0) {
80  				n = selector.select(timeout);
81  			}
82  			if (n == 0) {
83  				continue; // nothing to do
84  			}
85  
86  			// get an iterator over the set of selected keys
87  			Set keySet = selector.selectedKeys();
88  			Iterator it = keySet.iterator();
89  			// look at each key in the selected set
90  			while (it.hasNext()) {
91  				SelectionKey key = (SelectionKey) it.next();
92  				// Is a new connection coming in?
93  				if (key.isAcceptable()) {
94  					ServerSocketChannel server = (ServerSocketChannel) key.channel();
95  					SocketChannel channel = server.accept();
96  					registerChannel(
97  						selector,
98  						channel,
99  						SelectionKey.OP_READ,
100 						new ObjectReader(channel, selector, callback, this.bufferSize));
101 				}
102 				// is there data to read on this channel?
103 				if (key.isReadable()) {
104 					readDataFromSocket(key);
105 				}
106 			} //while
107 			keySet.clear();
108 		} 
109 		
110 		serverChannel.close();
111 		selector.close();
112 		setStarted(false);
113 		logger.info("Shutdown complete, tcp server on host:" + bind + " port:" + port);
114 	}
115 
116 	public void stopListening() throws Exception {
117 		doListen = false;
118 		try {
119 			Thread.sleep(timeout * 2);
120 		} catch (InterruptedException e) {
121 			e.printStackTrace();
122 			logger.warning("stopListening() interrupted");
123 			throw(e);
124 		}
125 	}
126 
127 	// ----------------------------------------------------------
128 
129 	/***
130 	 * Register the given channel with the given selector for the given operations of interest
131 	 */
132 	protected void registerChannel(Selector selector, SelectableChannel channel, int ops, Object attach)
133 		throws Exception {
134 		if (channel == null)
135 			return; // could happen
136 		// set the new channel non-blocking
137 		channel.configureBlocking(false);
138 		// register it with the selector
139 		channel.register(selector, ops, attach);
140 	}
141 
142 	// ----------------------------------------------------------
143 
144 	// Use the same byte buffer for all channels. A single thread is
145 	// servicing all the channels, so no danger of concurrent acccess.
146 	private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
147 
148 	/***
149 	 * Sample data handler method for a channel with data ready to read.
150 	 * 
151 	 * @param key
152 	 *            A SelectionKey object associated with a channel determined by the selector to be ready for reading.
153 	 *            If the channel returns an EOF condition, it is closed here, which automatically invalidates the
154 	 *            associated key. The selector will then de-register the channel on the next select call.
155 	 */
156 	protected void readDataFromSocket(SelectionKey key) throws Exception {
157 		ObjectReader reader = (ObjectReader) key.attachment();
158 		// ask the ThreadPool to service this runnable
159 		pool.execute(reader);
160 	}
161 	/***
162 	 * @return Returns the isStarted.
163 	 */
164 	public boolean isStarted() {
165 		return isStarted;
166 	}
167 
168 	/***
169 	 * @param isStarted
170 	 *            The isStarted to set.
171 	 */
172 	private void setStarted(boolean isStarted) {
173 		this.isStarted = isStarted;
174 	}
175 
176 	/***
177 	 * Start the server pool, processing threads and bind to the inet address
178 	 */
179 	public void startServer() {
180 		this.start();
181 		try {
182 			while (this.isStarted() == false) {
183 				Thread.sleep(10);
184 			}
185 		} catch (InterruptedException e) {
186 			e.printStackTrace();
187 			logger.warning("server startup interrupted:" + e);
188 		}
189 	}
190 	/***
191 	 * Stop the server from processing and more TCP incoming requests
192 	 */
193 	public void stopServer() throws Exception {
194 		logger.info("about to stop tcp server");		
195 		stopListening();
196 		try {
197 			this.pool.shutdown(1000);
198 			// we can only wait for server exit when timeout is not set to 0
199 			if (timeout == 0) {
200 				logger.info("server 'stopServer' impossible due to TIMEOUT setting of '0'");
201 			} else {
202 				while (this.isStarted() == true) {
203 					Thread.sleep(10);
204 				}
205 			}
206 		} catch (InterruptedException e) {
207 			e.printStackTrace();
208 			logger.warning("server startup interrupted:" + e);
209 		}
210 	}
211 
212 }