1
2
3
4
5 package org.neo.swarm.util.io;
6
7 import java.io.IOException;
8 import java.io.InputStream;
9 import java.nio.ByteBuffer;
10 import java.util.Arrays;
11
12 /***
13 * Provides an interface between ByteBuffers and StreamCallback to use nbio and
14 * fill an internal user buffer.
15 *
16 * @author navery
17 */
18 public class ByteBufferInputStream extends InputStream {
19
20
21 static int HEADER = 1;
22 static int BODY = 2;
23 static int FOOTER = 3;
24 static int DONE = 4;
25
26 int state;
27 StreamCallback callback;
28 ByteBuffer buffer;
29 int curBodyRead;
30 byte[] userBuffer;
31 byte[] headerBuffer;
32 byte[] footerBuffer;
33
34 /***
35 * We expect the header and footer value to match packet formation, i.e.
36 * header size = 7 bytes + 4 bytes datasize buffer is of arbitrary size and
37 * used to multiread into a known user buffer
38 *
39 * @param header
40 * @param footer
41 * @param buffer
42 * @param callback
43 */
44 public ByteBufferInputStream(int headerSize, int footerSize, ByteBuffer buffer, StreamCallback callback)
45 throws IOException {
46 if (headerSize + footerSize > buffer.capacity())
47 throw new IOException("buffer allocation is too small = given :" + buffer.capacity());
48 this.buffer = buffer;
49 this.headerBuffer = new byte[headerSize];
50 this.footerBuffer = new byte[footerSize];
51 this.callback = callback;
52 setState(HEADER);
53 }
54
55
56
57
58
59
60 public int read() throws IOException {
61 return 0;
62 }
63 /***
64 * Reuses buffer to fill the users databuffer
65 * @param footerBuffer
66 * @return byte[]
67 */
68 public byte[] getContent() {
69 byte[] retBuffer = this.userBuffer;
70 this.userBuffer = null;
71 setState(HEADER);
72 return retBuffer;
73 }
74
75
76
77
78 public int read(byte[] bytes, int off, int len) throws IOException {
79 return 0;
80 }
81
82 /***
83 * @return Returns the headerBuffer.
84 */
85 public byte[] getHeaderBuffer() {
86 return headerBuffer;
87 }
88
89 /***
90 * @return Returns the footerBuffer.
91 */
92 public byte[] getFooterBuffer() {
93 return footerBuffer;
94 }
95
96 /***
97 * @return number of bytes available
98 */
99 public synchronized int available() throws IOException {
100 if (this.state == HEADER) readHeader();
101 if (this.state == BODY) readBody();
102 if (this.state == FOOTER) readFooter();
103 if (this.state == DONE) return this.userBuffer.length;
104 return 0;
105 }
106
107 /***
108 * Reads a header of known size
109 * @param headerBuffer
110 * @return
111 */
112 private int readHeader() {
113 try {
114 if (this.state != HEADER) throw new IOException("Invalid state on calling read header");
115 this.callback.execute(buffer, 0);
116 if (buffer.position() == headerBuffer.length) {
117 this.setState(BODY);
118 }
119 return buffer.position();
120 } catch (IOException e) {
121 e.printStackTrace();
122 return 0;
123 }
124 }
125
126 /***
127 * Allows for repeated calls to read further data until the userData buffer
128 * is full.
129 * @return @throws IOException
130 */
131 private boolean readBody() throws IOException {
132 if (this.state != BODY) throw new IOException("Invalid state on calling read header");
133
134 int read = 0;
135 int requiredRead = 0;
136 try {
137
138
139 buffer.clear();
140
141 requiredRead = Math.min(buffer.capacity(), this.userBuffer.length - this.curBodyRead);
142 buffer.limit(requiredRead);
143 read = this.callback.execute(buffer, 0);
144 buffer.flip();
145
146 buffer.get(this.userBuffer, this.curBodyRead, read);
147 this.curBodyRead += read;
148 if (this.curBodyRead >= this.userBuffer.length) {
149 this.setState(FOOTER);
150 return true;
151 }
152 return false;
153 } catch (IndexOutOfBoundsException ex) {
154 System.err.println("read: " + read + " buffer:" + buffer + " curBodyRead:" + this.curBodyRead
155 + " reqd read:" + requiredRead + " userBufSize:" + this.userBuffer.length);
156 ex.printStackTrace();
157 } catch (NullPointerException ex) {
158 ex.printStackTrace();
159 }
160 return false;
161 }
162
163 /***
164 * Reads footer of a known size
165 * @param footerBuffer
166 * @return
167 */
168 private int readFooter() {
169 try {
170 if (this.state != FOOTER) throw new IOException("Invalid state on calling read header");
171 this.callback.execute(buffer, 0);
172 if (buffer.position() == footerBuffer.length) {
173 this.setState(DONE);
174 }
175 return buffer.position();
176 } catch (IOException e) {
177 e.printStackTrace();
178 return 0;
179 }
180 }
181
182 /***
183 * The state to set.
184 * @param state
185 */
186 private void setState(int state) {
187 if (state == HEADER) {
188 buffer.clear();
189 buffer.limit(headerBuffer.length);
190 Arrays.fill(headerBuffer, (byte) 0);
191 } else if (state == BODY) {
192 buffer.flip();
193 buffer.get(headerBuffer);
194 this.setSize(toInt(this.headerBuffer, PacketInputStream.START_DATA.length));
195 buffer.clear();
196 } else if (state == FOOTER) {
197 buffer.clear();
198 Arrays.fill(footerBuffer, (byte) 0);
199 buffer.limit(footerBuffer.length);
200 } else if (state == DONE) {
201 buffer.flip();
202 buffer.get(footerBuffer);
203 buffer.clear();
204 }
205 this.state = state;
206 }
207
208 /***
209 * Now that the size is known we can set the body contents
210 * @param size
211 */
212 private void setSize(int size) {
213 this.curBodyRead = 0;
214 this.userBuffer = new byte[size];
215 }
216
217 private int toInt(byte[] b, int off) {
218 return (( b[off + 3]) & 0xFF) + (((b[off + 2]) & 0xFF) << 8) + ((( b[off + 1]) & 0xFF) << 16)
219 + ((( b[off + 0]) & 0xFF) << 24);
220 }
221 }