forked from mirrors/NBTExplorer
1158 lines
47 KiB
C#
1158 lines
47 KiB
C#
//#define Trace
|
|
|
|
// ParallelDeflateOutputStream.cs
|
|
// ------------------------------------------------------------------
|
|
//
|
|
// A DeflateStream that does compression only, it uses a
|
|
// divide-and-conquer approach with multiple threads to exploit multiple
|
|
// CPUs for the DEFLATE computation.
|
|
//
|
|
// last saved:
|
|
// Time-stamp: <2010-January-20 19:24:58>
|
|
// ------------------------------------------------------------------
|
|
//
|
|
// Copyright (c) 2009-2010 by Dino Chiesa
|
|
// All rights reserved!
|
|
//
|
|
// ------------------------------------------------------------------
|
|
|
|
using System;
|
|
using System.Threading;
|
|
using Ionic.Zlib;
|
|
using System.IO;
|
|
|
|
|
|
namespace Ionic.Zlib
|
|
{
|
|
internal class WorkItem
|
|
{
|
|
internal enum Status { None=0, Filling=1, Filled=2, Compressing=3, Compressed=4, Writing=5, Done=6 }
|
|
public byte[] buffer;
|
|
public byte[] compressed;
|
|
public int status;
|
|
public int crc;
|
|
public int index;
|
|
public int inputBytesAvailable;
|
|
public int compressedBytesAvailable;
|
|
public ZlibCodec compressor;
|
|
|
|
public WorkItem(int size, Ionic.Zlib.CompressionLevel compressLevel, CompressionStrategy strategy)
|
|
{
|
|
buffer= new byte[size];
|
|
// alloc 5 bytes overhead for every block (margin of safety= 2)
|
|
int n = size + ((size / 32768)+1) * 5 * 2;
|
|
compressed= new byte[n];
|
|
|
|
status = (int)Status.None;
|
|
compressor = new ZlibCodec();
|
|
compressor.InitializeDeflate(compressLevel, false);
|
|
compressor.OutputBuffer = compressed;
|
|
compressor.InputBuffer = buffer;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// A class for compressing and decompressing streams using the
|
|
/// Deflate algorithm with multiple threads.
|
|
/// </summary>
|
|
///
|
|
/// <remarks>
|
|
/// <para>
|
|
/// This class is for compression only, and that can be only
|
|
/// through writing.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// For more information on the Deflate algorithm, see IETF RFC 1951, "DEFLATE
|
|
/// Compressed Data Format Specification version 1.3."
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>, except
|
|
/// that this implementation uses an approach that employs multiple worker
|
|
/// threads to perform the DEFLATE. On a multi-cpu or multi-core computer,
|
|
/// the performance of this class can be significantly higher than the
|
|
/// single-threaded DeflateStream, particularly for larger streams. How
|
|
/// large? Anything over 10mb is a good candidate for parallel compression.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// The tradeoff is that this class uses more memory and more CPU than the
|
|
/// vanilla DeflateStream, and also is less efficient as a compressor. For
|
|
/// large files the size of the compressed data stream can be less than 1%
|
|
/// larger than the size of a compressed data stream from the vanialla
|
|
/// DeflateStream. For smaller files the difference can be larger. The
|
|
/// difference will also be larger if you set the BufferSize to be lower
|
|
/// than the default value. Your mileage may vary. Finally, for small
|
|
/// files, the ParallelDeflateOutputStream can be much slower than the vanilla
|
|
/// DeflateStream, because of the overhead of using the thread pool.
|
|
/// </para>
|
|
///
|
|
/// </remarks>
|
|
/// <seealso cref="Ionic.Zlib.DeflateStream" />
|
|
internal class ParallelDeflateOutputStream : System.IO.Stream
|
|
{
|
|
|
|
private static readonly int IO_BUFFER_SIZE_DEFAULT = 64 * 1024; // 128k
|
|
|
|
private System.Collections.Generic.List<WorkItem> _pool;
|
|
private bool _leaveOpen;
|
|
private System.IO.Stream _outStream;
|
|
private int _nextToFill, _nextToWrite;
|
|
private int _bufferSize = IO_BUFFER_SIZE_DEFAULT;
|
|
private ManualResetEvent _writingDone;
|
|
private ManualResetEvent _sessionReset;
|
|
private bool _noMoreInputForThisSegment;
|
|
private object _outputLock = new object();
|
|
private bool _isClosed;
|
|
private bool _isDisposed;
|
|
private bool _firstWriteDone;
|
|
private int _pc;
|
|
private int _Crc32;
|
|
private Int64 _totalBytesProcessed;
|
|
private Ionic.Zlib.CompressionLevel _compressLevel;
|
|
private volatile Exception _pendingException;
|
|
private object _eLock = new Object(); // protects _pendingException
|
|
|
|
// This bitfield is used only when Trace is defined.
|
|
//private TraceBits _DesiredTrace = TraceBits.Write | TraceBits.WriteBegin |
|
|
//TraceBits.WriteDone | TraceBits.Lifecycle | TraceBits.Fill | TraceBits.Flush |
|
|
//TraceBits.Session;
|
|
|
|
//private TraceBits _DesiredTrace = TraceBits.WriteBegin | TraceBits.WriteDone | TraceBits.Synch | TraceBits.Lifecycle | TraceBits.Session ;
|
|
|
|
private TraceBits _DesiredTrace = TraceBits.WriterThread | TraceBits.Synch | TraceBits.Lifecycle | TraceBits.Session ;
|
|
|
|
/// <summary>
|
|
/// Create a ParallelDeflateOutputStream.
|
|
/// </summary>
|
|
/// <remarks>
|
|
///
|
|
/// <para>
|
|
/// This stream compresses data written into it via the DEFLATE
|
|
/// algorithm (see RFC 1951), and writes out the compressed byte stream.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// The instance will use the default compression level, the default
|
|
/// buffer sizes and the default number of threads and buffers per
|
|
/// thread.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>,
|
|
/// except that this implementation uses an approach that employs
|
|
/// multiple worker threads to perform the DEFLATE. On a multi-cpu or
|
|
/// multi-core computer, the performance of this class can be
|
|
/// significantly higher than the single-threaded DeflateStream,
|
|
/// particularly for larger streams. How large? Anything over 10mb is
|
|
/// a good candidate for parallel compression.
|
|
/// </para>
|
|
///
|
|
/// </remarks>
|
|
///
|
|
/// <example>
|
|
///
|
|
/// This example shows how to use a ParallelDeflateOutputStream to compress
|
|
/// data. It reads a file, compresses it, and writes the compressed data to
|
|
/// a second, output file.
|
|
///
|
|
/// <code>
|
|
/// byte[] buffer = new byte[WORKING_BUFFER_SIZE];
|
|
/// int n= -1;
|
|
/// String outputFile = fileToCompress + ".compressed";
|
|
/// using (System.IO.Stream input = System.IO.File.OpenRead(fileToCompress))
|
|
/// {
|
|
/// using (var raw = System.IO.File.Create(outputFile))
|
|
/// {
|
|
/// using (Stream compressor = new ParallelDeflateOutputStream(raw))
|
|
/// {
|
|
/// while ((n= input.Read(buffer, 0, buffer.Length)) != 0)
|
|
/// {
|
|
/// compressor.Write(buffer, 0, n);
|
|
/// }
|
|
/// }
|
|
/// }
|
|
/// }
|
|
/// </code>
|
|
/// <code lang="VB">
|
|
/// Dim buffer As Byte() = New Byte(4096) {}
|
|
/// Dim n As Integer = -1
|
|
/// Dim outputFile As String = (fileToCompress & ".compressed")
|
|
/// Using input As Stream = File.OpenRead(fileToCompress)
|
|
/// Using raw As FileStream = File.Create(outputFile)
|
|
/// Using compressor As Stream = New ParallelDeflateOutputStream(raw)
|
|
/// Do While (n <> 0)
|
|
/// If (n > 0) Then
|
|
/// compressor.Write(buffer, 0, n)
|
|
/// End If
|
|
/// n = input.Read(buffer, 0, buffer.Length)
|
|
/// Loop
|
|
/// End Using
|
|
/// End Using
|
|
/// End Using
|
|
/// </code>
|
|
/// </example>
|
|
/// <param name="stream">The stream to which compressed data will be written.</param>
|
|
public ParallelDeflateOutputStream(System.IO.Stream stream)
|
|
: this(stream, CompressionLevel.Default, CompressionStrategy.Default, false)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a ParallelDeflateOutputStream using the specified CompressionLevel.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
|
|
/// constructor for example code.
|
|
/// </remarks>
|
|
/// <param name="stream">The stream to which compressed data will be written.</param>
|
|
/// <param name="level">A tuning knob to trade speed for effectiveness.</param>
|
|
public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level)
|
|
: this(stream, level, CompressionStrategy.Default, false)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open
|
|
/// when the ParallelDeflateOutputStream is closed.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
|
|
/// constructor for example code.
|
|
/// </remarks>
|
|
/// <param name="stream">The stream to which compressed data will be written.</param>
|
|
/// <param name="leaveOpen">
|
|
/// true if the application would like the stream to remain open after inflation/deflation.
|
|
/// </param>
|
|
public ParallelDeflateOutputStream(System.IO.Stream stream, bool leaveOpen)
|
|
: this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open
|
|
/// when the ParallelDeflateOutputStream is closed.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
|
|
/// constructor for example code.
|
|
/// </remarks>
|
|
/// <param name="stream">The stream to which compressed data will be written.</param>
|
|
/// <param name="level">A tuning knob to trade speed for effectiveness.</param>
|
|
/// <param name="leaveOpen">
|
|
/// true if the application would like the stream to remain open after inflation/deflation.
|
|
/// </param>
|
|
public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level, bool leaveOpen)
|
|
: this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a ParallelDeflateOutputStream using the specified
|
|
/// CompressionLevel and CompressionStrategy, and specifying whether to
|
|
/// leave the captive stream open when the ParallelDeflateOutputStream is
|
|
/// closed.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
|
|
/// constructor for example code.
|
|
/// </remarks>
|
|
/// <param name="stream">The stream to which compressed data will be written.</param>
|
|
/// <param name="level">A tuning knob to trade speed for effectiveness.</param>
|
|
/// <param name="strategy">
|
|
/// By tweaking this parameter, you may be able to optimize the compression for
|
|
/// data with particular characteristics.
|
|
/// </param>
|
|
/// <param name="leaveOpen">
|
|
/// true if the application would like the stream to remain open after inflation/deflation.
|
|
/// </param>
|
|
public ParallelDeflateOutputStream(System.IO.Stream stream,
|
|
CompressionLevel level,
|
|
CompressionStrategy strategy,
|
|
bool leaveOpen)
|
|
{
|
|
TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "-------------------------------------------------------");
|
|
TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "Create {0:X8}", this.GetHashCode());
|
|
_compressLevel= level;
|
|
_leaveOpen = leaveOpen;
|
|
Strategy = strategy;
|
|
|
|
BuffersPerCore = 4; // default
|
|
|
|
_writingDone = new ManualResetEvent(false);
|
|
_sessionReset = new ManualResetEvent(false);
|
|
|
|
_outStream = stream;
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// The ZLIB strategy to be used during compression.
|
|
/// </summary>
|
|
///
|
|
public CompressionStrategy Strategy
|
|
{
|
|
get;
|
|
private set;
|
|
}
|
|
|
|
/// <summary>
|
|
/// The number of buffers per CPU or CPU core.
|
|
/// </summary>
|
|
///
|
|
/// <remarks>
|
|
/// <para>
|
|
/// This property sets the number of memory buffers to create, for every
|
|
/// CPU or CPU core in the machine. The divide-and-conquer approach
|
|
/// taken by this class assumes a single thread from the application
|
|
/// will call Write(). There will be multiple background threads that
|
|
/// then compress (DEFLATE) the data written into the stream, and also a
|
|
/// single output thread, also operating in the background, aggregating
|
|
/// those results and finally emitting the output.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// The default value is 4. Different values may deliver better or
|
|
/// worse results, depending on the dynamic performance characteristics
|
|
/// of your storage and compute resources.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// The total amount of storage space allocated for buffering will be
|
|
/// (n*M*S*2), where n is the number of CPUs, M is the multiple (this
|
|
/// property), S is the size of each buffer (<see cref="BufferSize"/>),
|
|
/// and there are 2 buffers used by the compressor, one for input and
|
|
/// one for output. For example, if your machine has 4 cores, and you
|
|
/// set BuffersPerCore to 3, and you retain the default buffer size of
|
|
/// 128k, then the ParallelDeflateOutputStream will use 3mb of buffer
|
|
/// memory in total.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// The application can set this value at any time, but it is effective
|
|
/// only before the first call to Write(), which is when the buffers are
|
|
/// allocated.
|
|
/// </para>
|
|
/// </remarks>
|
|
public int BuffersPerCore
|
|
{
|
|
get; set;
|
|
}
|
|
|
|
/// <summary>
|
|
/// The size of the buffers used by the compressor threads.
|
|
/// </summary>
|
|
/// <remarks>
|
|
///
|
|
/// <para>
|
|
/// The default buffer size is 128k. The application can set this value at any
|
|
/// time, but it is effective only before the first Write().
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// Larger buffer sizes implies larger memory consumption but allows
|
|
/// more efficient compression. Using smaller buffer sizes consumes less
|
|
/// memory but result in less effective compression. For example, using
|
|
/// the default buffer size of 128k, the compression delivered is within
|
|
/// 1% of the compression delivered by the single-threaded <see
|
|
/// cref="Ionic.Zlib.DeflateStream"/>. On the other hand, using a
|
|
/// BufferSize of 8k can result in a compressed data stream that is 5%
|
|
/// larger than that delivered by the single-threaded
|
|
/// <c>DeflateStream</c>. Excessively small buffer sizes can also cause
|
|
/// the speed of the ParallelDeflateOutputStream to drop, because of
|
|
/// larger thread scheduling overhead dealing with many many small
|
|
/// buffers.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// The total amount of storage space allocated for buffering will be
|
|
/// (n*M*S*2), where n is the number of CPUs, M is the multiple (<see
|
|
/// cref="BuffersPerCore"/>), S is the size of each buffer (this
|
|
/// property), and there are 2 buffers used by the compressor, one for
|
|
/// input and one for output. For example, if your machine has a total
|
|
/// of 4 cores, and if you set <see cref="BuffersPerCore"/> to 3, and
|
|
/// you keep the default buffer size of 128k, then the
|
|
/// <c>ParallelDeflateOutputStream</c> will use 3mb of buffer memory in
|
|
/// total.
|
|
/// </para>
|
|
///
|
|
/// </remarks>
|
|
public int BufferSize
|
|
{
|
|
get { return _bufferSize;}
|
|
set
|
|
{
|
|
if (value < 1024)
|
|
throw new ArgumentException();
|
|
_bufferSize = value;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// The CRC32 for the data that was written out, prior to compression.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// This value is meaningful only after a call to Close().
|
|
/// </remarks>
|
|
public int Crc32 { get { return _Crc32; } }
|
|
|
|
|
|
/// <summary>
|
|
/// The total number of uncompressed bytes processed by the ParallelDeflateOutputStream.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// This value is meaningful only after a call to Close().
|
|
/// </remarks>
|
|
public Int64 BytesProcessed { get { return _totalBytesProcessed; } }
|
|
|
|
|
|
private void _InitializePoolOfWorkItems()
|
|
{
|
|
_pool = new System.Collections.Generic.List<WorkItem>();
|
|
for(int i=0; i < BuffersPerCore * Environment.ProcessorCount; i++)
|
|
_pool.Add(new WorkItem(_bufferSize, _compressLevel, Strategy));
|
|
_pc = _pool.Count;
|
|
|
|
for(int i=0; i < _pc; i++)
|
|
_pool[i].index= i;
|
|
|
|
// set the pointers
|
|
_nextToFill= _nextToWrite= 0;
|
|
}
|
|
|
|
|
|
private void _KickoffWriter()
|
|
{
|
|
if (!ThreadPool.QueueUserWorkItem(new WaitCallback(this._PerpetualWriterMethod)))
|
|
throw new Exception("Cannot enqueue writer thread.");
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Write data to the stream.
|
|
/// </summary>
|
|
///
|
|
/// <remarks>
|
|
///
|
|
/// <para>
|
|
/// To use the ParallelDeflateOutputStream to compress data, create a
|
|
/// ParallelDeflateOutputStream with CompressionMode.Compress, passing a
|
|
/// writable output stream. Then call Write() on that
|
|
/// ParallelDeflateOutputStream, providing uncompressed data as input. The
|
|
/// data sent to the output stream will be the compressed form of the data
|
|
/// written.
|
|
/// </para>
|
|
///
|
|
/// <para>
|
|
/// To decompress data, use the <see cref="Ionic.Zlib.DeflateStream"/> class.
|
|
/// </para>
|
|
///
|
|
/// </remarks>
|
|
/// <param name="buffer">The buffer holding data to write to the stream.</param>
|
|
/// <param name="offset">the offset within that data array to find the first byte to write.</param>
|
|
/// <param name="count">the number of bytes to write.</param>
|
|
public override void Write(byte[] buffer, int offset, int count)
|
|
{
|
|
// Fill a work buffer; when full, flip state to 'Filled'
|
|
|
|
if (_isClosed)
|
|
throw new NotSupportedException();
|
|
|
|
// dispense any exceptions that occurred on the BG threads
|
|
if (_pendingException != null)
|
|
throw _pendingException;
|
|
|
|
if (count == 0) return;
|
|
|
|
if (!_firstWriteDone)
|
|
{
|
|
// Want to do this on first Write, first session, and not in the
|
|
// constructor. We want to allow the BufferSize and BuffersPerCore to
|
|
// change after construction, but before first Write.
|
|
_InitializePoolOfWorkItems();
|
|
|
|
// Only do this once (ever), the first time Write() is called:
|
|
_KickoffWriter();
|
|
|
|
// Release the writer thread.
|
|
TraceOutput(TraceBits.Synch, "Synch _sessionReset.Set() Write (first)");
|
|
_sessionReset.Set();
|
|
|
|
_firstWriteDone = true;
|
|
}
|
|
|
|
|
|
do
|
|
{
|
|
int ix = _nextToFill % _pc;
|
|
WorkItem workitem = _pool[ix];
|
|
lock(workitem)
|
|
{
|
|
TraceOutput(TraceBits.Fill,
|
|
"Fill lock wi({0}) stat({1}) iba({2}) nf({3})",
|
|
workitem.index,
|
|
workitem.status,
|
|
workitem.inputBytesAvailable,
|
|
_nextToFill
|
|
);
|
|
|
|
// If the status is what we want, then use the workitem.
|
|
if (workitem.status == (int)WorkItem.Status.None ||
|
|
workitem.status == (int)WorkItem.Status.Done ||
|
|
workitem.status == (int)WorkItem.Status.Filling)
|
|
{
|
|
workitem.status = (int)WorkItem.Status.Filling;
|
|
int limit = ((workitem.buffer.Length - workitem.inputBytesAvailable) > count)
|
|
? count
|
|
: (workitem.buffer.Length - workitem.inputBytesAvailable);
|
|
|
|
// copy from the provided buffer to our workitem, starting at
|
|
// the tail end of whatever data we might have in there currently.
|
|
Array.Copy(buffer, offset, workitem.buffer, workitem.inputBytesAvailable, limit);
|
|
|
|
count -= limit;
|
|
offset += limit;
|
|
workitem.inputBytesAvailable += limit;
|
|
if (workitem.inputBytesAvailable==workitem.buffer.Length)
|
|
{
|
|
workitem.status = (int)WorkItem.Status.Filled;
|
|
// No need for interlocked.increment: the Write() method
|
|
// is documented as not multi-thread safe, so we can assume Write()
|
|
// calls come in from only one thread.
|
|
_nextToFill++;
|
|
|
|
TraceOutput(TraceBits.Fill,
|
|
"Fill QUWI wi({0}) stat({1}) iba({2}) nf({3})",
|
|
workitem.index,
|
|
workitem.status,
|
|
workitem.inputBytesAvailable,
|
|
_nextToFill
|
|
);
|
|
|
|
if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem ))
|
|
throw new Exception("Cannot enqueue workitem");
|
|
}
|
|
|
|
}
|
|
else
|
|
{
|
|
int wcycles= 0;
|
|
|
|
while (workitem.status != (int)WorkItem.Status.None &&
|
|
workitem.status != (int)WorkItem.Status.Done &&
|
|
workitem.status != (int)WorkItem.Status.Filling)
|
|
{
|
|
TraceOutput(TraceBits.Fill,
|
|
"Fill waiting wi({0}) stat({1}) nf({2})",
|
|
workitem.index,
|
|
workitem.status,
|
|
_nextToFill);
|
|
|
|
wcycles++;
|
|
|
|
Monitor.Pulse(workitem);
|
|
Monitor.Wait(workitem);
|
|
|
|
if (workitem.status == (int)WorkItem.Status.None ||
|
|
workitem.status == (int)WorkItem.Status.Done ||
|
|
workitem.status == (int)WorkItem.Status.Filling)
|
|
TraceOutput(TraceBits.Fill,
|
|
"Fill A-OK wi({0}) stat({1}) iba({2}) cyc({3})",
|
|
workitem.index,
|
|
workitem.status,
|
|
workitem.inputBytesAvailable,
|
|
wcycles);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
while (count > 0); // until no more to write
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
|
|
/// <summary>
|
|
/// Flush the stream.
|
|
/// </summary>
|
|
public override void Flush()
|
|
{
|
|
_Flush(false);
|
|
}
|
|
|
|
|
|
private void _Flush(bool lastInput)
|
|
{
|
|
if (_isClosed)
|
|
throw new NotSupportedException();
|
|
|
|
// pass any partial buffer out to the compressor workers:
|
|
WorkItem workitem = _pool[_nextToFill % _pc];
|
|
lock(workitem)
|
|
{
|
|
if ( workitem.status == (int)WorkItem.Status.Filling)
|
|
{
|
|
workitem.status = (int)WorkItem.Status.Filled;
|
|
_nextToFill++;
|
|
|
|
// When flush is called from Close(), we set _noMore.
|
|
// can't do it before updating nextToFill, though.
|
|
if (lastInput)
|
|
_noMoreInputForThisSegment= true;
|
|
|
|
TraceOutput(TraceBits.Flush,
|
|
"Flush filled wi({0}) iba({1}) nf({2}) nomore({3})",
|
|
workitem.index, workitem.inputBytesAvailable, _nextToFill, _noMoreInputForThisSegment);
|
|
|
|
if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem ))
|
|
throw new Exception("Cannot enqueue workitem");
|
|
|
|
//Monitor.Pulse(workitem);
|
|
}
|
|
else
|
|
{
|
|
// When flush is called from Close(), we set _noMore.
|
|
// Gotta do this whether or not there is another packet to send along.
|
|
if (lastInput)
|
|
_noMoreInputForThisSegment= true;
|
|
|
|
TraceOutput(TraceBits.Flush,
|
|
"Flush noaction wi({0}) stat({1}) nf({2}) nomore({3})",
|
|
workitem.index, workitem.status, _nextToFill, _noMoreInputForThisSegment);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Close the stream.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// You must call Close on the stream to guarantee that all of the data written in has
|
|
/// been compressed, and the compressed data has been written out.
|
|
/// </remarks>
|
|
public override void Close()
|
|
{
|
|
TraceOutput(TraceBits.Session, "Close {0:X8}", this.GetHashCode());
|
|
|
|
if (_isClosed) return;
|
|
|
|
_Flush(true);
|
|
|
|
//System.Diagnostics.StackTrace st = new System.Diagnostics.StackTrace(1);
|
|
//System.Console.WriteLine(st.ToString());
|
|
|
|
// need to get Writer off the workitem, in case he's waiting forever
|
|
WorkItem workitem = _pool[_nextToFill % _pc];
|
|
lock(workitem)
|
|
{
|
|
Monitor.PulseAll(workitem);
|
|
}
|
|
|
|
// wait for the writer to complete his work
|
|
TraceOutput(TraceBits.Synch, "Synch _writingDone.WaitOne(begin) Close");
|
|
_writingDone.WaitOne();
|
|
TraceOutput(TraceBits.Synch, "Synch _writingDone.WaitOne(done) Close");
|
|
|
|
TraceOutput(TraceBits.Session, "-------------------------------------------------------");
|
|
if (!_leaveOpen)
|
|
_outStream.Close();
|
|
|
|
_isClosed= true;
|
|
}
|
|
|
|
|
|
|
|
// /// <summary>The destructor</summary>
|
|
// ~ParallelDeflateOutputStream()
|
|
// {
|
|
// TraceOutput(TraceBits.Lifecycle, "Destructor {0:X8}", this.GetHashCode());
|
|
// // call Dispose with false. Since we're in the
|
|
// // destructor call, the managed resources will be
|
|
// // disposed of anyways.
|
|
// Dispose(false);
|
|
// }
|
|
|
|
|
|
|
|
// workitem 10030 - implement a new Dispose method
|
|
|
|
/// <summary>Dispose the object</summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// Because ParallelDeflateOutputStream is IDisposable, the
|
|
/// application must call this method when finished using the instance.
|
|
/// </para>
|
|
/// <para>
|
|
/// This method is generally called implicitly upon exit from
|
|
/// a <c>using</c> scope in C# (<c>Using</c> in VB).
|
|
/// </para>
|
|
/// </remarks>
|
|
new public void Dispose()
|
|
{
|
|
TraceOutput(TraceBits.Lifecycle, "Dispose {0:X8}", this.GetHashCode());
|
|
_isDisposed= true;
|
|
_pool = null;
|
|
TraceOutput(TraceBits.Synch, "Synch _sessionReset.Set() Dispose");
|
|
_sessionReset.Set(); // tell writer to die
|
|
Dispose(true);
|
|
}
|
|
|
|
|
|
|
|
/// <summary>The Dispose method</summary>
|
|
protected override void Dispose(bool disposeManagedResources)
|
|
{
|
|
if (disposeManagedResources)
|
|
{
|
|
// dispose managed resources
|
|
_writingDone.Close();
|
|
_sessionReset.Close();
|
|
}
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Resets the stream for use with another stream.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Because the ParallelDeflateOutputStream is expensive to create, it
|
|
/// has been designed so that it can be recycled and re-used. You have
|
|
/// to call Close() on the stream first, then you can call Reset() on
|
|
/// it, to use it again on another stream.
|
|
/// </remarks>
|
|
///
|
|
/// <example>
|
|
/// <code>
|
|
/// ParallelDeflateOutputStream deflater = null;
|
|
/// foreach (var inputFile in listOfFiles)
|
|
/// {
|
|
/// string outputFile = inputFile + ".compressed";
|
|
/// using (System.IO.Stream input = System.IO.File.OpenRead(inputFile))
|
|
/// {
|
|
/// using (var outStream = System.IO.File.Create(outputFile))
|
|
/// {
|
|
/// if (deflater == null)
|
|
/// deflater = new ParallelDeflateOutputStream(outStream,
|
|
/// CompressionLevel.Best,
|
|
/// CompressionStrategy.Default,
|
|
/// true);
|
|
/// deflater.Reset(outStream);
|
|
///
|
|
/// while ((n= input.Read(buffer, 0, buffer.Length)) != 0)
|
|
/// {
|
|
/// deflater.Write(buffer, 0, n);
|
|
/// }
|
|
/// }
|
|
/// }
|
|
/// }
|
|
/// </code>
|
|
/// </example>
|
|
public void Reset(Stream stream)
|
|
{
|
|
TraceOutput(TraceBits.Session, "-------------------------------------------------------");
|
|
TraceOutput(TraceBits.Session, "Reset {0:X8} firstDone({1})", this.GetHashCode(), _firstWriteDone);
|
|
|
|
if (!_firstWriteDone) return;
|
|
|
|
if (_noMoreInputForThisSegment)
|
|
{
|
|
// wait til done writing:
|
|
TraceOutput(TraceBits.Synch, "Synch _writingDone.WaitOne(begin) Reset");
|
|
_writingDone.WaitOne();
|
|
TraceOutput(TraceBits.Synch, "Synch _writingDone.WaitOne(done) Reset");
|
|
|
|
// reset all status
|
|
foreach (var workitem in _pool)
|
|
workitem.status = (int) WorkItem.Status.None;
|
|
|
|
_noMoreInputForThisSegment= false;
|
|
_nextToFill= _nextToWrite= 0;
|
|
_totalBytesProcessed = 0L;
|
|
_Crc32= 0;
|
|
_isClosed= false;
|
|
|
|
TraceOutput(TraceBits.Synch, "Synch _writingDone.Reset() Reset");
|
|
_writingDone.Reset();
|
|
}
|
|
else
|
|
{
|
|
TraceOutput(TraceBits.Synch, "Synch Reset noMore=false");
|
|
}
|
|
|
|
_outStream = stream;
|
|
|
|
// release the writer thread for the next "session"
|
|
TraceOutput(TraceBits.Synch, "Synch _sessionReset.Set() Reset");
|
|
_sessionReset.Set();
|
|
}
|
|
|
|
|
|
|
|
private void _PerpetualWriterMethod(object state)
|
|
{
|
|
TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod START");
|
|
|
|
try
|
|
{
|
|
do
|
|
{
|
|
// wait for the next session
|
|
TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(begin) PWM");
|
|
_sessionReset.WaitOne();
|
|
TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(done) PWM");
|
|
|
|
if (_isDisposed) break;
|
|
|
|
TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.Reset() PWM");
|
|
_sessionReset.Reset();
|
|
|
|
// repeatedly write buffers as they become ready
|
|
WorkItem workitem = null;
|
|
Ionic.Zlib.CRC32 c= new Ionic.Zlib.CRC32();
|
|
do
|
|
{
|
|
workitem = _pool[_nextToWrite % _pc];
|
|
lock(workitem)
|
|
{
|
|
if (_noMoreInputForThisSegment)
|
|
TraceOutput(TraceBits.Write,
|
|
"Write drain wi({0}) stat({1}) canuse({2}) cba({3})",
|
|
workitem.index,
|
|
workitem.status,
|
|
(workitem.status == (int)WorkItem.Status.Compressed),
|
|
workitem.compressedBytesAvailable);
|
|
|
|
do
|
|
{
|
|
if (workitem.status == (int)WorkItem.Status.Compressed)
|
|
{
|
|
TraceOutput(TraceBits.WriteBegin,
|
|
"Write begin wi({0}) stat({1}) cba({2})",
|
|
workitem.index,
|
|
workitem.status,
|
|
workitem.compressedBytesAvailable);
|
|
|
|
workitem.status = (int)WorkItem.Status.Writing;
|
|
_outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable);
|
|
c.Combine(workitem.crc, workitem.inputBytesAvailable);
|
|
_totalBytesProcessed += workitem.inputBytesAvailable;
|
|
_nextToWrite++;
|
|
workitem.inputBytesAvailable= 0;
|
|
workitem.status = (int)WorkItem.Status.Done;
|
|
|
|
TraceOutput(TraceBits.WriteDone,
|
|
"Write done wi({0}) stat({1}) cba({2})",
|
|
workitem.index,
|
|
workitem.status,
|
|
workitem.compressedBytesAvailable);
|
|
|
|
|
|
Monitor.Pulse(workitem);
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
int wcycles = 0;
|
|
// I've locked a workitem I cannot use.
|
|
// Therefore, wake someone else up, and then release the lock.
|
|
while (workitem.status != (int)WorkItem.Status.Compressed)
|
|
{
|
|
TraceOutput(TraceBits.WriteWait,
|
|
"Write waiting wi({0}) stat({1}) nw({2}) nf({3}) nomore({4})",
|
|
workitem.index,
|
|
workitem.status,
|
|
_nextToWrite, _nextToFill,
|
|
_noMoreInputForThisSegment );
|
|
|
|
if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
|
|
break;
|
|
|
|
wcycles++;
|
|
|
|
// wake up someone else
|
|
Monitor.Pulse(workitem);
|
|
// release and wait
|
|
Monitor.Wait(workitem);
|
|
|
|
if (workitem.status == (int)WorkItem.Status.Compressed)
|
|
TraceOutput(TraceBits.WriteWait,
|
|
"Write A-OK wi({0}) stat({1}) iba({2}) cba({3}) cyc({4})",
|
|
workitem.index,
|
|
workitem.status,
|
|
workitem.inputBytesAvailable,
|
|
workitem.compressedBytesAvailable,
|
|
wcycles);
|
|
}
|
|
|
|
if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
|
|
break;
|
|
|
|
}
|
|
}
|
|
while (true);
|
|
}
|
|
|
|
if (_noMoreInputForThisSegment)
|
|
TraceOutput(TraceBits.Write,
|
|
"Write nomore nw({0}) nf({1}) break({2})",
|
|
_nextToWrite, _nextToFill, (_nextToWrite == _nextToFill));
|
|
|
|
if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
|
|
break;
|
|
|
|
} while (true);
|
|
|
|
|
|
// Finish:
|
|
// After writing a series of buffers, closing each one with
|
|
// Flush.Sync, we now write the final one as Flush.Finish, and
|
|
// then stop.
|
|
byte[] buffer = new byte[128];
|
|
ZlibCodec compressor = new ZlibCodec();
|
|
int rc = compressor.InitializeDeflate(_compressLevel, false);
|
|
compressor.InputBuffer = null;
|
|
compressor.NextIn = 0;
|
|
compressor.AvailableBytesIn = 0;
|
|
compressor.OutputBuffer = buffer;
|
|
compressor.NextOut = 0;
|
|
compressor.AvailableBytesOut = buffer.Length;
|
|
rc = compressor.Deflate(FlushType.Finish);
|
|
|
|
if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK)
|
|
throw new Exception("deflating: " + compressor.Message);
|
|
|
|
if (buffer.Length - compressor.AvailableBytesOut > 0)
|
|
{
|
|
TraceOutput(TraceBits.WriteBegin,
|
|
"Write begin flush bytes({0})",
|
|
buffer.Length - compressor.AvailableBytesOut);
|
|
|
|
_outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut);
|
|
|
|
TraceOutput(TraceBits.WriteBegin,
|
|
"Write done flush");
|
|
}
|
|
|
|
compressor.EndDeflate();
|
|
|
|
_Crc32 = c.Crc32Result;
|
|
|
|
// signal that writing is complete:
|
|
TraceOutput(TraceBits.Synch, "Synch _writingDone.Set() PWM");
|
|
_writingDone.Set();
|
|
}
|
|
while (true);
|
|
}
|
|
catch (System.Exception exc1)
|
|
{
|
|
lock(_eLock)
|
|
{
|
|
// expose the exception to the main thread
|
|
if (_pendingException!=null)
|
|
_pendingException = exc1;
|
|
}
|
|
}
|
|
|
|
TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod FINIS");
|
|
}
|
|
|
|
|
|
|
|
|
|
private void _DeflateOne(Object wi)
|
|
{
|
|
WorkItem workitem = (WorkItem) wi;
|
|
try
|
|
{
|
|
// compress one buffer
|
|
int myItem = workitem.index;
|
|
|
|
lock(workitem)
|
|
{
|
|
if (workitem.status != (int)WorkItem.Status.Filled)
|
|
throw new InvalidOperationException();
|
|
|
|
Ionic.Zlib.CRC32 crc = new CRC32();
|
|
|
|
// use the workitem:
|
|
// calc CRC on the buffer
|
|
crc.SlurpBlock(workitem.buffer, 0, workitem.inputBytesAvailable);
|
|
|
|
// deflate it
|
|
DeflateOneSegment(workitem);
|
|
|
|
// update status
|
|
workitem.status = (int)WorkItem.Status.Compressed;
|
|
workitem.crc = crc.Crc32Result;
|
|
|
|
TraceOutput(TraceBits.Compress,
|
|
"Compress wi({0}) stat({1}) len({2})",
|
|
workitem.index,
|
|
workitem.status,
|
|
workitem.compressedBytesAvailable
|
|
);
|
|
|
|
// release the item
|
|
Monitor.Pulse(workitem);
|
|
}
|
|
}
|
|
catch (System.Exception exc1)
|
|
{
|
|
lock(_eLock)
|
|
{
|
|
// expose the exception to the main thread
|
|
if (_pendingException!=null)
|
|
_pendingException = exc1;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
private bool DeflateOneSegment(WorkItem workitem)
|
|
{
|
|
ZlibCodec compressor = workitem.compressor;
|
|
int rc= 0;
|
|
compressor.ResetDeflate();
|
|
compressor.NextIn = 0;
|
|
|
|
compressor.AvailableBytesIn = workitem.inputBytesAvailable;
|
|
|
|
// step 1: deflate the buffer
|
|
compressor.NextOut = 0;
|
|
compressor.AvailableBytesOut = workitem.compressed.Length;
|
|
do
|
|
{
|
|
compressor.Deflate(FlushType.None);
|
|
}
|
|
while (compressor.AvailableBytesIn > 0 || compressor.AvailableBytesOut == 0);
|
|
|
|
// step 2: flush (sync)
|
|
rc = compressor.Deflate(FlushType.Sync);
|
|
|
|
workitem.compressedBytesAvailable= (int) compressor.TotalBytesOut;
|
|
return true;
|
|
}
|
|
|
|
|
|
[System.Diagnostics.ConditionalAttribute("Trace")]
|
|
private void TraceOutput(TraceBits bits, string format, params object[] varParams)
|
|
{
|
|
if ((bits & _DesiredTrace) != 0)
|
|
{
|
|
lock(_outputLock)
|
|
{
|
|
int tid = Thread.CurrentThread.GetHashCode();
|
|
Console.ForegroundColor = (ConsoleColor) (tid % 8 + 8);
|
|
Console.Write("{0:000} PDOS ", tid);
|
|
Console.WriteLine(format, varParams);
|
|
Console.ResetColor();
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// used only when Trace is defined
|
|
[Flags]
|
|
enum TraceBits
|
|
{
|
|
None = 0,
|
|
Write = 1, // write out
|
|
WriteBegin = 2, // begin to write out
|
|
WriteDone = 4, // done writing out
|
|
WriteWait = 8, // write thread waiting for buffer
|
|
Flush = 16,
|
|
Compress = 32, // async compress
|
|
Fill = 64, // filling buffers, when caller invokes Write()
|
|
Lifecycle = 128, // constructor/disposer
|
|
Session = 256, // Close/Reset
|
|
Synch = 512, // thread synchronization
|
|
WriterThread = 1024, // writer thread
|
|
}
|
|
|
|
|
|
|
|
/// <summary>
|
|
/// Indicates whether the stream supports Seek operations.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Always returns false.
|
|
/// </remarks>
|
|
public override bool CanSeek
|
|
{
|
|
get { return false; }
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Indicates whether the stream supports Read operations.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Always returns false.
|
|
/// </remarks>
|
|
public override bool CanRead
|
|
{
|
|
get {return false;}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Indicates whether the stream supports Write operations.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Returns true if the provided stream is writable.
|
|
/// </remarks>
|
|
public override bool CanWrite
|
|
{
|
|
get { return _outStream.CanWrite; }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reading this property always throws a NotImplementedException.
|
|
/// </summary>
|
|
public override long Length
|
|
{
|
|
get { throw new NotImplementedException(); }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reading or Writing this property always throws a NotImplementedException.
|
|
/// </summary>
|
|
public override long Position
|
|
{
|
|
get { throw new NotImplementedException(); }
|
|
set { throw new NotImplementedException(); }
|
|
}
|
|
|
|
/// <summary>
|
|
/// This method always throws a NotImplementedException.
|
|
/// </summary>
|
|
public override int Read(byte[] buffer, int offset, int count)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
/// <summary>
|
|
/// This method always throws a NotImplementedException.
|
|
/// </summary>
|
|
public override long Seek(long offset, System.IO.SeekOrigin origin)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
/// <summary>
|
|
/// This method always throws a NotImplementedException.
|
|
/// </summary>
|
|
public override void SetLength(long value)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|