Java Source Code: org.limewire.nio.channel.DeflaterWriter


   1: package org.limewire.nio.channel;
   2: 
   3: import java.io.IOException;
   4: import java.nio.ByteBuffer;
   5: import java.nio.channels.Channel;
   6: import java.util.zip.Deflater;
   7: 
   8: import org.limewire.nio.observer.Shutdownable;
   9: import org.limewire.nio.observer.WriteObserver;
  10: 
  11: 
  12: 
  13: /**
  14:  * Deflates data written to this channel and writes the deflated
  15:  * data to another channel sink.
  16:  */
  17:	  public class DeflaterWriter implements ChannelWriter, InterestWritableByteChannel {
  18:    
  19:    //private static final Log LOG = LogFactory.getLog(DeflaterWriter.class);
  20:    
  21:    /** The channel to write to & interest on. */    
  22:    private volatile InterestWritableByteChannel channel;
  23:    /** The next observer. */
  24:    private volatile WriteObserver observer;
  25:    /** The buffer used for deflating into. */
  26:    private ByteBuffer outgoing;
  27:    /** The buffer used for writing data into. */
  28:    private ByteBuffer incoming;
  29:    /** The deflater to use */
  30:    private Deflater deflater;
  31:    /** The sync level we're on.  0: not sync, 1: NO_COMPRESSION, 2: DEFAULT */
  32:    private int sync = 0;
  33:    /** An empty byte array to reuse. */
  34:    private static final byte[] EMPTY = new byte[0];
  35:        
  36:    /**
  37:     * Constructs a new <code>DeflaterWriter</code> with the given deflater.
  38:     * <p>
  39:     * <b>NOTE:</b> You must call <code>setWriteChannel</code> prior to 
  40:     * <code>handleWrite</code>.
  41:     */
  42:	      public DeflaterWriter(Deflater deflater) {
  43:        this(deflater, null);
  44:    }
  45:    
  46:    /**
  47:     * Constructs a new <code>DeflaterWriter</code> with the given deflater 
  48:     * and channel.
  49:     */
  50:	      public DeflaterWriter(Deflater deflater, InterestWritableByteChannel channel) {
  51:        this.deflater = deflater;
  52:        this.incoming = ByteBuffer.allocate(4 * 1024);
  53:        this.outgoing = ByteBuffer.allocate(512);
  54:        outgoing.flip();
  55:        this.channel = channel;
  56:    }
  57:    
  58:    /** {@inheritDoc} */
  59:	      public InterestWritableByteChannel getWriteChannel() {
  60:        return channel;
  61:    }
  62:    
  63:    /** {@inheritDoc} */
  64:	      public void setWriteChannel(InterestWritableByteChannel channel) {
  65:        this.channel = channel;
  66:        channel.interestWrite(this, true);
  67:    }
  68:    
  69:    /**
  70:     * {@inheritDoc}
  71:     */
  72:	      public synchronized void interestWrite(WriteObserver observer, boolean status) {
  73:        this.observer = status ? observer : null;
  74:        
  75:        // just always set interest on.  it's easiest & it'll be turned off
  76:        // immediately once we're notified if we don't wanna do anything.
  77:        // note that if we did want to do it correctly, we'd have to check
  78:        // incoming.hasRemaining() || outgoing.hasRemaining(), but since
  79:        // interest can be called in any thread, we'd have to introduce
  80:        // locking around incoming & outgoing, which just isn't worth it.
  81:        InterestWritableByteChannel source = channel;
  82:        if(source != null)
  83:            source.interestWrite(this, true); 
  84:    }
  85:    
  86:    /**
  87:     * Writes data to our internal buffer, if there's room.
  88:     */
  89:	      public int write(ByteBuffer buffer) throws IOException {
  90:        int wrote = 0;
  91:        
  92:	          if(incoming.hasRemaining()) {
  93:            int remaining = incoming.remaining();
  94:            int adding = buffer.remaining();
  95:	              if(remaining >= adding) {
  96:                incoming.put(buffer);
  97:                wrote = adding;
  98:            } else {
  99:                int oldLimit = buffer.limit();
 100:                int position = buffer.position();
 101:                buffer.limit(position + remaining);
 102:                incoming.put(buffer);
 103:                buffer.limit(oldLimit);
 104:                wrote = remaining;
 105:            }
 106:        }
 107:        
 108:        return wrote;
 109:    }
 110:    
 111:    /** Closes the underlying channel. */
 112:	      public void close() throws IOException {
 113:        Channel source = channel;
 114:        if(source != null)
 115:            source.close();
 116:    }
 117:    
 118:    /** Determines if the underlying channel is open. */
 119:	      public boolean isOpen() {
 120:        Channel source = channel;
 121:        return source != null ? source.isOpen() : false;
 122:    }
 123:    
 124:    /**
 125:     * Writes as much data as possible to the underlying source.
 126:     * This tries to write any previously unwritten data, then tries
 127:     * to deflate any new data, then tries to get more data by telling
 128:     * its interested-observer to write to it.  This continues until
 129:     * there is no more data to be written or the sink is full.
 130:     */
 131:	      public boolean handleWrite() throws IOException {
 132:        InterestWritableByteChannel source = channel;
 133:        if(source == null)
 134:            throw new IllegalStateException("writing with no source.");
 135:            
 136:	          while(true) {
 137:            // Step 1: See if there is any pending deflated data to be written.
 138:            channel.write(outgoing);
 139:            if(outgoing.hasRemaining())
 140:                return true; // there is still deflated data that is pending a write.
 141:
 142:	              while(true) {
 143:                // Step 2: Try and deflate the existing data.
 144:                int deflated;
 145:	                  try {
 146:                    deflated = deflater.deflate(outgoing.array());
 147:                } catch(NullPointerException npe) {
 148:                    // stupid deflater not supporting asynchronous ends..
 149:                    throw (IOException) new IOException().initCause(npe);
 150:                }
 151:	                  if(deflated > 0) {
 152:                    outgoing.position(0).limit(deflated);
 153:                    break; // we managed to deflate some data, try to write it...
 154:                }
 155:                    
 156:                // Step 3: Normal deflate didn't work, try to simulate a Z_SYNC_FLUSH
 157:                // Note that this requires we tried deflating until deflate returned 0
 158:                // above.  Otherwise, this setInput call would erase prior input.
 159:                // We must use different levels of syncing because we have to make sure
 160:                // that we write everything out of deflate after each level is set.
 161:                // Otherwise compression doesn't work.
 162:	                  try {
 163:	                      if(sync == 0) {
 164:                        deflater.setInput(EMPTY);
 165:                        deflater.setLevel(Deflater.NO_COMPRESSION);
 166:                        sync = 1;
 167:                        continue;
 168:                    } else if(sync == 1) {
 169:                        deflater.setLevel(Deflater.DEFAULT_COMPRESSION);
 170:                        sync = 2;
 171:                        continue;
 172:                    }
 173:                } catch(NullPointerException npe) {
 174:                    // stupid deflater not supporting asynchronous ends..
 175:                    throw (IOException) new IOException().initCause(npe);
 176:                }
 177:                
 178:                // Step 4: If we have no data, tell any interested parties to add some.
 179:	                  if(incoming.position() == 0) {
 180:                    WriteObserver interested = observer;
 181:                    if(interested != null)
 182:                        interested.handleWrite();
 183:                    
 184:                    // If still no data after that, we've written everything we want -- exit.
 185:	                      if (incoming.position() == 0) {
 186:                        // We have nothing left to write, however, it is possible
 187:                        // that between the above check for interested.handleWrite & here,
 188:                        // we got pre-empted and another thread turned on interest.
 189:	                          synchronized (this) {
 190:                            if (observer == null) // no observer? good, we can turn interest off
 191:                                source.interestWrite(this, false);
 192:                            // else, we've got nothing to write, but our observer might.
 193:                        }
 194:                        return false;
 195:                    }
 196:                }
 197:
 198:                // Step 5: We've got new data to deflate.
 199:	                  try {
 200:                    deflater.setInput(incoming.array(), 0, incoming.position());
 201:                } catch(NullPointerException npe) {
 202:                    // stupid deflater not supporting asynchronous ends..
 203:                    throw (IOException) new IOException().initCause(npe);
 204:                }
 205:                incoming.clear();
 206:                sync = 0;
 207:            }
 208:        }
 209:    }
 210:    
 211:    /** Shuts down the last observer. */
 212:	      public void shutdown() {
 213:        Shutdownable listener = observer;
 214:        if(listener != null)
 215:            listener.shutdown();
 216:    }
 217:    
 218:    /** Unused, Unsupported */
 219:	      public void handleIOException(IOException x) {
 220:        throw new RuntimeException("Unsupported", x);
 221:    }
 222:
 223:	      public boolean hasBufferedOutput() {
 224:        InterestWritableByteChannel channel = this.channel;
 225:        return incoming.position() > 0 || outgoing.hasRemaining() || (channel != null && channel.hasBufferedOutput());
 226:    }
 227:    
 228:}