View Javadoc

1   /*
2    * Created on May 7, 2004
3    *
4    */
5   package org.neo.swarm.util.io;
6   
7   import java.io.IOException;
8   import java.io.InputStream;
9   import java.nio.ByteBuffer;
10  import java.util.Arrays;
11  
12  /***
13   * Provides an interface between ByteBuffers and StreamCallback to use nbio and
14   * fill an internal user buffer.
15   * 
16   * @author navery
17   */
18  public class ByteBufferInputStream extends InputStream {
19  
20      // state is uninitialised, header, body, footer
21      static int HEADER = 1;
22      static int BODY = 2;
23      static int FOOTER = 3;
24      static int DONE = 4;
25  
26      int state;
27      StreamCallback callback;
28      ByteBuffer buffer;
29      int curBodyRead;
30      byte[] userBuffer;
31      byte[] headerBuffer;
32      byte[] footerBuffer;
33  
34      /***
35       * We expect the header and footer value to match packet formation, i.e.
36       * header size = 7 bytes + 4 bytes datasize buffer is of arbitrary size and
37       * used to multiread into a known user buffer
38       * 
39       * @param header
40       * @param footer
41       * @param buffer
42       * @param callback
43       */
44      public ByteBufferInputStream(int headerSize, int footerSize, ByteBuffer buffer, StreamCallback callback)
45              throws IOException {
46          if (headerSize + footerSize > buffer.capacity())
47                  throw new IOException("buffer allocation is too small = given :" + buffer.capacity());
48          this.buffer = buffer;
49          this.headerBuffer = new byte[headerSize];
50          this.footerBuffer = new byte[footerSize];
51          this.callback = callback;
52          setState(HEADER);
53      }
54  
55      /*
56       * (non-Javadoc)
57       * 
58       * @see java.io.InputStream#read()
59       */
60      public int read() throws IOException {
61          return 0;
62      }
63      /***
64       * Reuses buffer to fill the users databuffer
65       * @param footerBuffer
66       * @return byte[]
67       */
68      public byte[] getContent() {
69          byte[] retBuffer = this.userBuffer;
70          this.userBuffer = null;
71          setState(HEADER);
72          return retBuffer;
73      }
74  
75      /*
76       * @see InputStream#read(byte[], int, int)
77       */
78      public int read(byte[] bytes, int off, int len) throws IOException {
79          return 0;
80      }
81  
82      /***
83       * @return Returns the headerBuffer.
84       */
85      public byte[] getHeaderBuffer() {
86          return headerBuffer;
87      }
88      
89      /***
90       * @return Returns the footerBuffer.
91       */
92      public byte[] getFooterBuffer() {
93          return footerBuffer;
94      }
95    
96      /***
97       * @return number of bytes available
98       */
99      public synchronized int available() throws IOException {
100         if (this.state == HEADER) readHeader();
101         if (this.state == BODY) readBody();
102         if (this.state == FOOTER) readFooter();
103         if (this.state == DONE) return this.userBuffer.length;
104         return 0;
105     }
106 
107     /***
108      * Reads a header of known size
109      * @param headerBuffer
110      * @return
111      */
112     private int readHeader() {//byte[] headerBuffer) {
113         try {
114             if (this.state != HEADER) throw new IOException("Invalid state on calling read header");
115             this.callback.execute(buffer, 0);
116             if (buffer.position() == headerBuffer.length) {
117                 this.setState(BODY);
118             }
119             return buffer.position();
120         } catch (IOException e) {
121             e.printStackTrace();
122             return 0;
123         }
124     }
125 
126     /***
127      * Allows for repeated calls to read further data until the userData buffer
128      * is full.
129      * @return @throws IOException
130      */
131     private boolean readBody() throws IOException {
132         if (this.state != BODY) throw new IOException("Invalid state on calling read header");
133 
134         int read = 0;
135         int requiredRead = 0;
136         try {
137 
138             // read further
139             buffer.clear();
140 
141             requiredRead = Math.min(buffer.capacity(), this.userBuffer.length - this.curBodyRead);
142             buffer.limit(requiredRead);
143             read = this.callback.execute(buffer, 0);
144             buffer.flip();
145 
146             buffer.get(this.userBuffer, this.curBodyRead, read);
147             this.curBodyRead += read;
148             if (this.curBodyRead >= this.userBuffer.length) {
149                 this.setState(FOOTER);
150                 return true;
151             }
152             return false;
153         } catch (IndexOutOfBoundsException ex) {
154             System.err.println("read: " + read + " buffer:" + buffer + " curBodyRead:" + this.curBodyRead
155                     + " reqd read:" + requiredRead + " userBufSize:" + this.userBuffer.length);
156             ex.printStackTrace();
157         } catch (NullPointerException ex) {
158             ex.printStackTrace();
159         }
160         return false;
161     }
162 
163     /***
164      * Reads footer of a known size
165      * @param footerBuffer
166      * @return
167      */
168     private int readFooter() {
169         try {
170             if (this.state != FOOTER) throw new IOException("Invalid state on calling read header");
171             this.callback.execute(buffer, 0);
172             if (buffer.position() == footerBuffer.length) {
173                 this.setState(DONE);
174             }
175             return buffer.position();
176         } catch (IOException e) {
177             e.printStackTrace();
178             return 0;
179         }
180     }
181 
182     /***
183      * The state to set. 
184      * @param state
185      */
186     private void setState(int state) {
187         if (state == HEADER) {
188             buffer.clear();
189             buffer.limit(headerBuffer.length);
190             Arrays.fill(headerBuffer, (byte) 0);
191         } else if (state == BODY) {
192             buffer.flip();
193             buffer.get(headerBuffer);
194             this.setSize(toInt(this.headerBuffer, PacketInputStream.START_DATA.length));
195             buffer.clear();
196         } else if (state == FOOTER) {
197             buffer.clear();
198             Arrays.fill(footerBuffer, (byte) 0);
199             buffer.limit(footerBuffer.length);
200         } else if (state == DONE) {
201             buffer.flip();
202             buffer.get(footerBuffer);
203             buffer.clear();
204         }
205         this.state = state;
206     }
207 
208     /***
209      * Now that the size is known we can set the body contents
210      * @param size
211      */
212     private void setSize(int size) {
213         this.curBodyRead = 0;
214         this.userBuffer = new byte[size];
215     }
216 
217     private int toInt(byte[] b, int off) {
218         return (( b[off + 3]) & 0xFF) + (((b[off + 2]) & 0xFF) << 8) + ((( b[off + 1]) & 0xFF) << 16)
219                 + ((( b[off + 0]) & 0xFF) << 24);
220     }//toInt
221 }