//#define Trace // ParallelDeflateOutputStream.cs // ------------------------------------------------------------------ // // A DeflateStream that does compression only, it uses a // divide-and-conquer approach with multiple threads to exploit multiple // CPUs for the DEFLATE computation. // // last saved: // Time-stamp: <2010-January-20 19:24:58> // ------------------------------------------------------------------ // // Copyright (c) 2009-2010 by Dino Chiesa // All rights reserved! // // ------------------------------------------------------------------ using System; using System.Threading; using Ionic.Zlib; using System.IO; namespace Ionic.Zlib { internal class WorkItem { internal enum Status { None=0, Filling=1, Filled=2, Compressing=3, Compressed=4, Writing=5, Done=6 } public byte[] buffer; public byte[] compressed; public int status; public int crc; public int index; public int inputBytesAvailable; public int compressedBytesAvailable; public ZlibCodec compressor; public WorkItem(int size, Ionic.Zlib.CompressionLevel compressLevel, CompressionStrategy strategy) { buffer= new byte[size]; // alloc 5 bytes overhead for every block (margin of safety= 2) int n = size + ((size / 32768)+1) * 5 * 2; compressed= new byte[n]; status = (int)Status.None; compressor = new ZlibCodec(); compressor.InitializeDeflate(compressLevel, false); compressor.OutputBuffer = compressed; compressor.InputBuffer = buffer; } } /// /// A class for compressing and decompressing streams using the /// Deflate algorithm with multiple threads. /// /// /// /// /// This class is for compression only, and that can be only /// through writing. /// /// /// /// For more information on the Deflate algorithm, see IETF RFC 1951, "DEFLATE /// Compressed Data Format Specification version 1.3." /// /// /// /// This class is similar to , except /// that this implementation uses an approach that employs multiple worker /// threads to perform the DEFLATE. On a multi-cpu or multi-core computer, /// the performance of this class can be significantly higher than the /// single-threaded DeflateStream, particularly for larger streams. How /// large? Anything over 10mb is a good candidate for parallel compression. /// /// /// /// The tradeoff is that this class uses more memory and more CPU than the /// vanilla DeflateStream, and also is less efficient as a compressor. For /// large files the size of the compressed data stream can be less than 1% /// larger than the size of a compressed data stream from the vanialla /// DeflateStream. For smaller files the difference can be larger. The /// difference will also be larger if you set the BufferSize to be lower /// than the default value. Your mileage may vary. Finally, for small /// files, the ParallelDeflateOutputStream can be much slower than the vanilla /// DeflateStream, because of the overhead of using the thread pool. /// /// /// /// public class ParallelDeflateOutputStream : System.IO.Stream { private static readonly int IO_BUFFER_SIZE_DEFAULT = 64 * 1024; // 128k private System.Collections.Generic.List _pool; private bool _leaveOpen; private System.IO.Stream _outStream; private int _nextToFill, _nextToWrite; private int _bufferSize = IO_BUFFER_SIZE_DEFAULT; private ManualResetEvent _writingDone; private ManualResetEvent _sessionReset; private bool _noMoreInputForThisSegment; private object _outputLock = new object(); private bool _isClosed; private bool _isDisposed; private bool _firstWriteDone; private int _pc; private int _Crc32; private Int64 _totalBytesProcessed; private Ionic.Zlib.CompressionLevel _compressLevel; private volatile Exception _pendingException; private object _eLock = new Object(); // protects _pendingException // This bitfield is used only when Trace is defined. //private TraceBits _DesiredTrace = TraceBits.Write | TraceBits.WriteBegin | //TraceBits.WriteDone | TraceBits.Lifecycle | TraceBits.Fill | TraceBits.Flush | //TraceBits.Session; //private TraceBits _DesiredTrace = TraceBits.WriteBegin | TraceBits.WriteDone | TraceBits.Synch | TraceBits.Lifecycle | TraceBits.Session ; private TraceBits _DesiredTrace = TraceBits.WriterThread | TraceBits.Synch | TraceBits.Lifecycle | TraceBits.Session ; /// /// Create a ParallelDeflateOutputStream. /// /// /// /// /// This stream compresses data written into it via the DEFLATE /// algorithm (see RFC 1951), and writes out the compressed byte stream. /// /// /// /// The instance will use the default compression level, the default /// buffer sizes and the default number of threads and buffers per /// thread. /// /// /// /// This class is similar to , /// except that this implementation uses an approach that employs /// multiple worker threads to perform the DEFLATE. On a multi-cpu or /// multi-core computer, the performance of this class can be /// significantly higher than the single-threaded DeflateStream, /// particularly for larger streams. How large? Anything over 10mb is /// a good candidate for parallel compression. /// /// /// /// /// /// /// This example shows how to use a ParallelDeflateOutputStream to compress /// data. It reads a file, compresses it, and writes the compressed data to /// a second, output file. /// /// /// byte[] buffer = new byte[WORKING_BUFFER_SIZE]; /// int n= -1; /// String outputFile = fileToCompress + ".compressed"; /// using (System.IO.Stream input = System.IO.File.OpenRead(fileToCompress)) /// { /// using (var raw = System.IO.File.Create(outputFile)) /// { /// using (Stream compressor = new ParallelDeflateOutputStream(raw)) /// { /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0) /// { /// compressor.Write(buffer, 0, n); /// } /// } /// } /// } /// /// /// Dim buffer As Byte() = New Byte(4096) {} /// Dim n As Integer = -1 /// Dim outputFile As String = (fileToCompress & ".compressed") /// Using input As Stream = File.OpenRead(fileToCompress) /// Using raw As FileStream = File.Create(outputFile) /// Using compressor As Stream = New ParallelDeflateOutputStream(raw) /// Do While (n <> 0) /// If (n > 0) Then /// compressor.Write(buffer, 0, n) /// End If /// n = input.Read(buffer, 0, buffer.Length) /// Loop /// End Using /// End Using /// End Using /// /// /// The stream to which compressed data will be written. public ParallelDeflateOutputStream(System.IO.Stream stream) : this(stream, CompressionLevel.Default, CompressionStrategy.Default, false) { } /// /// Create a ParallelDeflateOutputStream using the specified CompressionLevel. /// /// /// See the /// constructor for example code. /// /// The stream to which compressed data will be written. /// A tuning knob to trade speed for effectiveness. public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level) : this(stream, level, CompressionStrategy.Default, false) { } /// /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open /// when the ParallelDeflateOutputStream is closed. /// /// /// See the /// constructor for example code. /// /// The stream to which compressed data will be written. /// /// true if the application would like the stream to remain open after inflation/deflation. /// public ParallelDeflateOutputStream(System.IO.Stream stream, bool leaveOpen) : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen) { } /// /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open /// when the ParallelDeflateOutputStream is closed. /// /// /// See the /// constructor for example code. /// /// The stream to which compressed data will be written. /// A tuning knob to trade speed for effectiveness. /// /// true if the application would like the stream to remain open after inflation/deflation. /// public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level, bool leaveOpen) : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen) { } /// /// Create a ParallelDeflateOutputStream using the specified /// CompressionLevel and CompressionStrategy, and specifying whether to /// leave the captive stream open when the ParallelDeflateOutputStream is /// closed. /// /// /// See the /// constructor for example code. /// /// The stream to which compressed data will be written. /// A tuning knob to trade speed for effectiveness. /// /// By tweaking this parameter, you may be able to optimize the compression for /// data with particular characteristics. /// /// /// true if the application would like the stream to remain open after inflation/deflation. /// public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level, CompressionStrategy strategy, bool leaveOpen) { TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "-------------------------------------------------------"); TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "Create {0:X8}", this.GetHashCode()); _compressLevel= level; _leaveOpen = leaveOpen; Strategy = strategy; BuffersPerCore = 4; // default _writingDone = new ManualResetEvent(false); _sessionReset = new ManualResetEvent(false); _outStream = stream; } /// /// The ZLIB strategy to be used during compression. /// /// public CompressionStrategy Strategy { get; private set; } /// /// The number of buffers per CPU or CPU core. /// /// /// /// /// This property sets the number of memory buffers to create, for every /// CPU or CPU core in the machine. The divide-and-conquer approach /// taken by this class assumes a single thread from the application /// will call Write(). There will be multiple background threads that /// then compress (DEFLATE) the data written into the stream, and also a /// single output thread, also operating in the background, aggregating /// those results and finally emitting the output. /// /// /// /// The default value is 4. Different values may deliver better or /// worse results, depending on the dynamic performance characteristics /// of your storage and compute resources. /// /// /// /// The total amount of storage space allocated for buffering will be /// (n*M*S*2), where n is the number of CPUs, M is the multiple (this /// property), S is the size of each buffer (), /// and there are 2 buffers used by the compressor, one for input and /// one for output. For example, if your machine has 4 cores, and you /// set BuffersPerCore to 3, and you retain the default buffer size of /// 128k, then the ParallelDeflateOutputStream will use 3mb of buffer /// memory in total. /// /// /// /// The application can set this value at any time, but it is effective /// only before the first call to Write(), which is when the buffers are /// allocated. /// /// public int BuffersPerCore { get; set; } /// /// The size of the buffers used by the compressor threads. /// /// /// /// /// The default buffer size is 128k. The application can set this value at any /// time, but it is effective only before the first Write(). /// /// /// /// Larger buffer sizes implies larger memory consumption but allows /// more efficient compression. Using smaller buffer sizes consumes less /// memory but result in less effective compression. For example, using /// the default buffer size of 128k, the compression delivered is within /// 1% of the compression delivered by the single-threaded . On the other hand, using a /// BufferSize of 8k can result in a compressed data stream that is 5% /// larger than that delivered by the single-threaded /// DeflateStream. Excessively small buffer sizes can also cause /// the speed of the ParallelDeflateOutputStream to drop, because of /// larger thread scheduling overhead dealing with many many small /// buffers. /// /// /// /// The total amount of storage space allocated for buffering will be /// (n*M*S*2), where n is the number of CPUs, M is the multiple (), S is the size of each buffer (this /// property), and there are 2 buffers used by the compressor, one for /// input and one for output. For example, if your machine has a total /// of 4 cores, and if you set to 3, and /// you keep the default buffer size of 128k, then the /// ParallelDeflateOutputStream will use 3mb of buffer memory in /// total. /// /// /// public int BufferSize { get { return _bufferSize;} set { if (value < 1024) throw new ArgumentException(); _bufferSize = value; } } /// /// The CRC32 for the data that was written out, prior to compression. /// /// /// This value is meaningful only after a call to Close(). /// public int Crc32 { get { return _Crc32; } } /// /// The total number of uncompressed bytes processed by the ParallelDeflateOutputStream. /// /// /// This value is meaningful only after a call to Close(). /// public Int64 BytesProcessed { get { return _totalBytesProcessed; } } private void _InitializePoolOfWorkItems() { _pool = new System.Collections.Generic.List(); for(int i=0; i < BuffersPerCore * Environment.ProcessorCount; i++) _pool.Add(new WorkItem(_bufferSize, _compressLevel, Strategy)); _pc = _pool.Count; for(int i=0; i < _pc; i++) _pool[i].index= i; // set the pointers _nextToFill= _nextToWrite= 0; } private void _KickoffWriter() { if (!ThreadPool.QueueUserWorkItem(new WaitCallback(this._PerpetualWriterMethod))) throw new Exception("Cannot enqueue writer thread."); } /// /// Write data to the stream. /// /// /// /// /// /// To use the ParallelDeflateOutputStream to compress data, create a /// ParallelDeflateOutputStream with CompressionMode.Compress, passing a /// writable output stream. Then call Write() on that /// ParallelDeflateOutputStream, providing uncompressed data as input. The /// data sent to the output stream will be the compressed form of the data /// written. /// /// /// /// To decompress data, use the class. /// /// /// /// The buffer holding data to write to the stream. /// the offset within that data array to find the first byte to write. /// the number of bytes to write. public override void Write(byte[] buffer, int offset, int count) { // Fill a work buffer; when full, flip state to 'Filled' if (_isClosed) throw new NotSupportedException(); // dispense any exceptions that occurred on the BG threads if (_pendingException != null) throw _pendingException; if (count == 0) return; if (!_firstWriteDone) { // Want to do this on first Write, first session, and not in the // constructor. We want to allow the BufferSize and BuffersPerCore to // change after construction, but before first Write. _InitializePoolOfWorkItems(); // Only do this once (ever), the first time Write() is called: _KickoffWriter(); // Release the writer thread. TraceOutput(TraceBits.Synch, "Synch _sessionReset.Set() Write (first)"); _sessionReset.Set(); _firstWriteDone = true; } do { int ix = _nextToFill % _pc; WorkItem workitem = _pool[ix]; lock(workitem) { TraceOutput(TraceBits.Fill, "Fill lock wi({0}) stat({1}) iba({2}) nf({3})", workitem.index, workitem.status, workitem.inputBytesAvailable, _nextToFill ); // If the status is what we want, then use the workitem. if (workitem.status == (int)WorkItem.Status.None || workitem.status == (int)WorkItem.Status.Done || workitem.status == (int)WorkItem.Status.Filling) { workitem.status = (int)WorkItem.Status.Filling; int limit = ((workitem.buffer.Length - workitem.inputBytesAvailable) > count) ? count : (workitem.buffer.Length - workitem.inputBytesAvailable); // copy from the provided buffer to our workitem, starting at // the tail end of whatever data we might have in there currently. Array.Copy(buffer, offset, workitem.buffer, workitem.inputBytesAvailable, limit); count -= limit; offset += limit; workitem.inputBytesAvailable += limit; if (workitem.inputBytesAvailable==workitem.buffer.Length) { workitem.status = (int)WorkItem.Status.Filled; // No need for interlocked.increment: the Write() method // is documented as not multi-thread safe, so we can assume Write() // calls come in from only one thread. _nextToFill++; TraceOutput(TraceBits.Fill, "Fill QUWI wi({0}) stat({1}) iba({2}) nf({3})", workitem.index, workitem.status, workitem.inputBytesAvailable, _nextToFill ); if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem )) throw new Exception("Cannot enqueue workitem"); } } else { int wcycles= 0; while (workitem.status != (int)WorkItem.Status.None && workitem.status != (int)WorkItem.Status.Done && workitem.status != (int)WorkItem.Status.Filling) { TraceOutput(TraceBits.Fill, "Fill waiting wi({0}) stat({1}) nf({2})", workitem.index, workitem.status, _nextToFill); wcycles++; Monitor.Pulse(workitem); Monitor.Wait(workitem); if (workitem.status == (int)WorkItem.Status.None || workitem.status == (int)WorkItem.Status.Done || workitem.status == (int)WorkItem.Status.Filling) TraceOutput(TraceBits.Fill, "Fill A-OK wi({0}) stat({1}) iba({2}) cyc({3})", workitem.index, workitem.status, workitem.inputBytesAvailable, wcycles); } } } } while (count > 0); // until no more to write return; } /// /// Flush the stream. /// public override void Flush() { _Flush(false); } private void _Flush(bool lastInput) { if (_isClosed) throw new NotSupportedException(); // pass any partial buffer out to the compressor workers: WorkItem workitem = _pool[_nextToFill % _pc]; lock(workitem) { if ( workitem.status == (int)WorkItem.Status.Filling) { workitem.status = (int)WorkItem.Status.Filled; _nextToFill++; // When flush is called from Close(), we set _noMore. // can't do it before updating nextToFill, though. if (lastInput) _noMoreInputForThisSegment= true; TraceOutput(TraceBits.Flush, "Flush filled wi({0}) iba({1}) nf({2}) nomore({3})", workitem.index, workitem.inputBytesAvailable, _nextToFill, _noMoreInputForThisSegment); if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem )) throw new Exception("Cannot enqueue workitem"); //Monitor.Pulse(workitem); } else { // When flush is called from Close(), we set _noMore. // Gotta do this whether or not there is another packet to send along. if (lastInput) _noMoreInputForThisSegment= true; TraceOutput(TraceBits.Flush, "Flush noaction wi({0}) stat({1}) nf({2}) nomore({3})", workitem.index, workitem.status, _nextToFill, _noMoreInputForThisSegment); } } } /// /// Close the stream. /// /// /// You must call Close on the stream to guarantee that all of the data written in has /// been compressed, and the compressed data has been written out. /// public override void Close() { TraceOutput(TraceBits.Session, "Close {0:X8}", this.GetHashCode()); if (_isClosed) return; _Flush(true); //System.Diagnostics.StackTrace st = new System.Diagnostics.StackTrace(1); //System.Console.WriteLine(st.ToString()); // need to get Writer off the workitem, in case he's waiting forever WorkItem workitem = _pool[_nextToFill % _pc]; lock(workitem) { Monitor.PulseAll(workitem); } // wait for the writer to complete his work TraceOutput(TraceBits.Synch, "Synch _writingDone.WaitOne(begin) Close"); _writingDone.WaitOne(); TraceOutput(TraceBits.Synch, "Synch _writingDone.WaitOne(done) Close"); TraceOutput(TraceBits.Session, "-------------------------------------------------------"); if (!_leaveOpen) _outStream.Close(); _isClosed= true; } // /// The destructor // ~ParallelDeflateOutputStream() // { // TraceOutput(TraceBits.Lifecycle, "Destructor {0:X8}", this.GetHashCode()); // // call Dispose with false. Since we're in the // // destructor call, the managed resources will be // // disposed of anyways. // Dispose(false); // } // workitem 10030 - implement a new Dispose method /// Dispose the object /// /// /// Because ParallelDeflateOutputStream is IDisposable, the /// application must call this method when finished using the instance. /// /// /// This method is generally called implicitly upon exit from /// a using scope in C# (Using in VB). /// /// new public void Dispose() { TraceOutput(TraceBits.Lifecycle, "Dispose {0:X8}", this.GetHashCode()); _isDisposed= true; _pool = null; TraceOutput(TraceBits.Synch, "Synch _sessionReset.Set() Dispose"); _sessionReset.Set(); // tell writer to die Dispose(true); } /// The Dispose method protected override void Dispose(bool disposeManagedResources) { if (disposeManagedResources) { // dispose managed resources _writingDone.Close(); _sessionReset.Close(); } } /// /// Resets the stream for use with another stream. /// /// /// Because the ParallelDeflateOutputStream is expensive to create, it /// has been designed so that it can be recycled and re-used. You have /// to call Close() on the stream first, then you can call Reset() on /// it, to use it again on another stream. /// /// /// /// /// ParallelDeflateOutputStream deflater = null; /// foreach (var inputFile in listOfFiles) /// { /// string outputFile = inputFile + ".compressed"; /// using (System.IO.Stream input = System.IO.File.OpenRead(inputFile)) /// { /// using (var outStream = System.IO.File.Create(outputFile)) /// { /// if (deflater == null) /// deflater = new ParallelDeflateOutputStream(outStream, /// CompressionLevel.Best, /// CompressionStrategy.Default, /// true); /// deflater.Reset(outStream); /// /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0) /// { /// deflater.Write(buffer, 0, n); /// } /// } /// } /// } /// /// public void Reset(Stream stream) { TraceOutput(TraceBits.Session, "-------------------------------------------------------"); TraceOutput(TraceBits.Session, "Reset {0:X8} firstDone({1})", this.GetHashCode(), _firstWriteDone); if (!_firstWriteDone) return; if (_noMoreInputForThisSegment) { // wait til done writing: TraceOutput(TraceBits.Synch, "Synch _writingDone.WaitOne(begin) Reset"); _writingDone.WaitOne(); TraceOutput(TraceBits.Synch, "Synch _writingDone.WaitOne(done) Reset"); // reset all status foreach (var workitem in _pool) workitem.status = (int) WorkItem.Status.None; _noMoreInputForThisSegment= false; _nextToFill= _nextToWrite= 0; _totalBytesProcessed = 0L; _Crc32= 0; _isClosed= false; TraceOutput(TraceBits.Synch, "Synch _writingDone.Reset() Reset"); _writingDone.Reset(); } else { TraceOutput(TraceBits.Synch, "Synch Reset noMore=false"); } _outStream = stream; // release the writer thread for the next "session" TraceOutput(TraceBits.Synch, "Synch _sessionReset.Set() Reset"); _sessionReset.Set(); } private void _PerpetualWriterMethod(object state) { TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod START"); try { do { // wait for the next session TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(begin) PWM"); _sessionReset.WaitOne(); TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(done) PWM"); if (_isDisposed) break; TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.Reset() PWM"); _sessionReset.Reset(); // repeatedly write buffers as they become ready WorkItem workitem = null; Ionic.Zlib.CRC32 c= new Ionic.Zlib.CRC32(); do { workitem = _pool[_nextToWrite % _pc]; lock(workitem) { if (_noMoreInputForThisSegment) TraceOutput(TraceBits.Write, "Write drain wi({0}) stat({1}) canuse({2}) cba({3})", workitem.index, workitem.status, (workitem.status == (int)WorkItem.Status.Compressed), workitem.compressedBytesAvailable); do { if (workitem.status == (int)WorkItem.Status.Compressed) { TraceOutput(TraceBits.WriteBegin, "Write begin wi({0}) stat({1}) cba({2})", workitem.index, workitem.status, workitem.compressedBytesAvailable); workitem.status = (int)WorkItem.Status.Writing; _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable); c.Combine(workitem.crc, workitem.inputBytesAvailable); _totalBytesProcessed += workitem.inputBytesAvailable; _nextToWrite++; workitem.inputBytesAvailable= 0; workitem.status = (int)WorkItem.Status.Done; TraceOutput(TraceBits.WriteDone, "Write done wi({0}) stat({1}) cba({2})", workitem.index, workitem.status, workitem.compressedBytesAvailable); Monitor.Pulse(workitem); break; } else { int wcycles = 0; // I've locked a workitem I cannot use. // Therefore, wake someone else up, and then release the lock. while (workitem.status != (int)WorkItem.Status.Compressed) { TraceOutput(TraceBits.WriteWait, "Write waiting wi({0}) stat({1}) nw({2}) nf({3}) nomore({4})", workitem.index, workitem.status, _nextToWrite, _nextToFill, _noMoreInputForThisSegment ); if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill) break; wcycles++; // wake up someone else Monitor.Pulse(workitem); // release and wait Monitor.Wait(workitem); if (workitem.status == (int)WorkItem.Status.Compressed) TraceOutput(TraceBits.WriteWait, "Write A-OK wi({0}) stat({1}) iba({2}) cba({3}) cyc({4})", workitem.index, workitem.status, workitem.inputBytesAvailable, workitem.compressedBytesAvailable, wcycles); } if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill) break; } } while (true); } if (_noMoreInputForThisSegment) TraceOutput(TraceBits.Write, "Write nomore nw({0}) nf({1}) break({2})", _nextToWrite, _nextToFill, (_nextToWrite == _nextToFill)); if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill) break; } while (true); // Finish: // After writing a series of buffers, closing each one with // Flush.Sync, we now write the final one as Flush.Finish, and // then stop. byte[] buffer = new byte[128]; ZlibCodec compressor = new ZlibCodec(); int rc = compressor.InitializeDeflate(_compressLevel, false); compressor.InputBuffer = null; compressor.NextIn = 0; compressor.AvailableBytesIn = 0; compressor.OutputBuffer = buffer; compressor.NextOut = 0; compressor.AvailableBytesOut = buffer.Length; rc = compressor.Deflate(FlushType.Finish); if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK) throw new Exception("deflating: " + compressor.Message); if (buffer.Length - compressor.AvailableBytesOut > 0) { TraceOutput(TraceBits.WriteBegin, "Write begin flush bytes({0})", buffer.Length - compressor.AvailableBytesOut); _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut); TraceOutput(TraceBits.WriteBegin, "Write done flush"); } compressor.EndDeflate(); _Crc32 = c.Crc32Result; // signal that writing is complete: TraceOutput(TraceBits.Synch, "Synch _writingDone.Set() PWM"); _writingDone.Set(); } while (true); } catch (System.Exception exc1) { lock(_eLock) { // expose the exception to the main thread if (_pendingException!=null) _pendingException = exc1; } } TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod FINIS"); } private void _DeflateOne(Object wi) { WorkItem workitem = (WorkItem) wi; try { // compress one buffer int myItem = workitem.index; lock(workitem) { if (workitem.status != (int)WorkItem.Status.Filled) throw new InvalidOperationException(); Ionic.Zlib.CRC32 crc = new CRC32(); // use the workitem: // calc CRC on the buffer crc.SlurpBlock(workitem.buffer, 0, workitem.inputBytesAvailable); // deflate it DeflateOneSegment(workitem); // update status workitem.status = (int)WorkItem.Status.Compressed; workitem.crc = crc.Crc32Result; TraceOutput(TraceBits.Compress, "Compress wi({0}) stat({1}) len({2})", workitem.index, workitem.status, workitem.compressedBytesAvailable ); // release the item Monitor.Pulse(workitem); } } catch (System.Exception exc1) { lock(_eLock) { // expose the exception to the main thread if (_pendingException!=null) _pendingException = exc1; } } } private bool DeflateOneSegment(WorkItem workitem) { ZlibCodec compressor = workitem.compressor; int rc= 0; compressor.ResetDeflate(); compressor.NextIn = 0; compressor.AvailableBytesIn = workitem.inputBytesAvailable; // step 1: deflate the buffer compressor.NextOut = 0; compressor.AvailableBytesOut = workitem.compressed.Length; do { compressor.Deflate(FlushType.None); } while (compressor.AvailableBytesIn > 0 || compressor.AvailableBytesOut == 0); // step 2: flush (sync) rc = compressor.Deflate(FlushType.Sync); workitem.compressedBytesAvailable= (int) compressor.TotalBytesOut; return true; } [System.Diagnostics.ConditionalAttribute("Trace")] private void TraceOutput(TraceBits bits, string format, params object[] varParams) { if ((bits & _DesiredTrace) != 0) { lock(_outputLock) { int tid = Thread.CurrentThread.GetHashCode(); Console.ForegroundColor = (ConsoleColor) (tid % 8 + 8); Console.Write("{0:000} PDOS ", tid); Console.WriteLine(format, varParams); Console.ResetColor(); } } } // used only when Trace is defined [Flags] enum TraceBits { None = 0, Write = 1, // write out WriteBegin = 2, // begin to write out WriteDone = 4, // done writing out WriteWait = 8, // write thread waiting for buffer Flush = 16, Compress = 32, // async compress Fill = 64, // filling buffers, when caller invokes Write() Lifecycle = 128, // constructor/disposer Session = 256, // Close/Reset Synch = 512, // thread synchronization WriterThread = 1024, // writer thread } /// /// Indicates whether the stream supports Seek operations. /// /// /// Always returns false. /// public override bool CanSeek { get { return false; } } /// /// Indicates whether the stream supports Read operations. /// /// /// Always returns false. /// public override bool CanRead { get {return false;} } /// /// Indicates whether the stream supports Write operations. /// /// /// Returns true if the provided stream is writable. /// public override bool CanWrite { get { return _outStream.CanWrite; } } /// /// Reading this property always throws a NotImplementedException. /// public override long Length { get { throw new NotImplementedException(); } } /// /// Reading or Writing this property always throws a NotImplementedException. /// public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } /// /// This method always throws a NotImplementedException. /// public override int Read(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } /// /// This method always throws a NotImplementedException. /// public override long Seek(long offset, System.IO.SeekOrigin origin) { throw new NotImplementedException(); } /// /// This method always throws a NotImplementedException. /// public override void SetLength(long value) { throw new NotImplementedException(); } } }