1
2
3
4
5 package org.neo.swarm.util.network.tcp;
6
7 import java.net.InetSocketAddress;
8 import java.net.ServerSocket;
9 import java.nio.ByteBuffer;
10 import java.nio.channels.SelectableChannel;
11 import java.nio.channels.SelectionKey;
12 import java.nio.channels.Selector;
13 import java.nio.channels.ServerSocketChannel;
14 import java.nio.channels.SocketChannel;
15 import java.util.Iterator;
16 import java.util.Set;
17 import java.util.logging.Logger;
18
19 import org.neo.swarm.util.threads.ThreadPool;
20
21 /***
22 * NBlocking ServerSocketFactory - pumps out incoming events onto the associated channels SocketStream
23 * that gets processed by the next available thread in the pool.
24 *
25 * @author neil.avery
26 */
27 public class NIOTcpServer extends Thread implements TcpServiceAPI {
28
29 private static Logger logger = Logger.getLogger(NIOTcpServer.class.toString());
30
31 private ThreadPool pool;
32 private boolean doListen = false;
33 private ListenCallback callback;
34 private java.net.InetAddress bind;
35 private int port;
36 private long timeout = 0;
37 private boolean isStarted = false;
38 private int bufferSize;
39
40 public NIOTcpServer(ListenCallback callback, ThreadPool pool, int bufferSize, java.net.InetAddress bind, int port, long timeout) {
41 this.pool = pool;
42 this.bufferSize = bufferSize;
43 this.callback = callback;
44 this.bind = bind;
45 this.port = port;
46 this.timeout = timeout;
47 }
48
49 public void run() {
50 try {
51 listen();
52 } catch (Exception x) {
53 logger.warning("Unable to start tcp server listener thread:"+ x + " bind:" + bind + " port:" + port);
54 }
55 }
56
57 public void listen() throws Exception {
58 doListen = true;
59
60 ServerSocketChannel serverChannel = ServerSocketChannel.open();
61
62 ServerSocket serverSocket = serverChannel.socket();
63
64 Selector selector = Selector.open();
65
66 serverSocket.bind(new InetSocketAddress(bind, port));
67
68 serverChannel.configureBlocking(false);
69
70 serverChannel.register(selector, SelectionKey.OP_ACCEPT);
71 logger.info("Starting tcp server on host:" + bind + " port:" + port);
72 setStarted(true);
73 while (doListen) {
74
75
76
77
78 int n = selector.select(500);
79 if (n == 0) {
80 n = selector.select(timeout);
81 }
82 if (n == 0) {
83 continue;
84 }
85
86
87 Set keySet = selector.selectedKeys();
88 Iterator it = keySet.iterator();
89
90 while (it.hasNext()) {
91 SelectionKey key = (SelectionKey) it.next();
92
93 if (key.isAcceptable()) {
94 ServerSocketChannel server = (ServerSocketChannel) key.channel();
95 SocketChannel channel = server.accept();
96 registerChannel(
97 selector,
98 channel,
99 SelectionKey.OP_READ,
100 new ObjectReader(channel, selector, callback, this.bufferSize));
101 }
102
103 if (key.isReadable()) {
104 readDataFromSocket(key);
105 }
106 }
107 keySet.clear();
108 }
109
110 serverChannel.close();
111 selector.close();
112 setStarted(false);
113 logger.info("Shutdown complete, tcp server on host:" + bind + " port:" + port);
114 }
115
116 public void stopListening() throws Exception {
117 doListen = false;
118 try {
119 Thread.sleep(timeout * 2);
120 } catch (InterruptedException e) {
121 e.printStackTrace();
122 logger.warning("stopListening() interrupted");
123 throw(e);
124 }
125 }
126
127
128
129 /***
130 * Register the given channel with the given selector for the given operations of interest
131 */
132 protected void registerChannel(Selector selector, SelectableChannel channel, int ops, Object attach)
133 throws Exception {
134 if (channel == null)
135 return;
136
137 channel.configureBlocking(false);
138
139 channel.register(selector, ops, attach);
140 }
141
142
143
144
145
146 private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
147
148 /***
149 * Sample data handler method for a channel with data ready to read.
150 *
151 * @param key
152 * A SelectionKey object associated with a channel determined by the selector to be ready for reading.
153 * If the channel returns an EOF condition, it is closed here, which automatically invalidates the
154 * associated key. The selector will then de-register the channel on the next select call.
155 */
156 protected void readDataFromSocket(SelectionKey key) throws Exception {
157 ObjectReader reader = (ObjectReader) key.attachment();
158
159 pool.execute(reader);
160 }
161 /***
162 * @return Returns the isStarted.
163 */
164 public boolean isStarted() {
165 return isStarted;
166 }
167
168 /***
169 * @param isStarted
170 * The isStarted to set.
171 */
172 private void setStarted(boolean isStarted) {
173 this.isStarted = isStarted;
174 }
175
176 /***
177 * Start the server pool, processing threads and bind to the inet address
178 */
179 public void startServer() {
180 this.start();
181 try {
182 while (this.isStarted() == false) {
183 Thread.sleep(10);
184 }
185 } catch (InterruptedException e) {
186 e.printStackTrace();
187 logger.warning("server startup interrupted:" + e);
188 }
189 }
190 /***
191 * Stop the server from processing and more TCP incoming requests
192 */
193 public void stopServer() throws Exception {
194 logger.info("about to stop tcp server");
195 stopListening();
196 try {
197 this.pool.shutdown(1000);
198
199 if (timeout == 0) {
200 logger.info("server 'stopServer' impossible due to TIMEOUT setting of '0'");
201 } else {
202 while (this.isStarted() == true) {
203 Thread.sleep(10);
204 }
205 }
206 } catch (InterruptedException e) {
207 e.printStackTrace();
208 logger.warning("server startup interrupted:" + e);
209 }
210 }
211
212 }