Added file locks to allow parallel access to chunks.

This allows a multi-threaded reading of chunks.
This commit is contained in:
Fredrik Blom 2012-11-15 10:11:23 +01:00
parent a002336603
commit 6326bdac82

View file

@ -21,6 +21,14 @@ namespace Substrate.Core
private string fileName; private string fileName;
private FileStream file; private FileStream file;
/// <summary>
/// The file lock used so that we do not seek in different areas
/// of the same file at the same time. All file access should lock this
/// object before moving the file pointer.
/// </summary>
private object fileLock = new object();
private int[] offsets; private int[] offsets;
private int[] chunkTimestamps; private int[] chunkTimestamps;
private List<Boolean> sectorFree; private List<Boolean> sectorFree;
@ -62,8 +70,10 @@ namespace Substrate.Core
// Cleanup unmanaged resources // Cleanup unmanaged resources
if (file != null) { if (file != null) {
file.Close(); lock (this.fileLock) {
file = null; file.Close();
file = null;
}
} }
} }
_disposed = true; _disposed = true;
@ -93,75 +103,76 @@ namespace Substrate.Core
} }
try { try {
file = new FileStream(fileName, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite); lock (this.fileLock) {
file = new FileStream(fileName, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite);
//using (file) { //using (file) {
if (file.Length < SectorBytes) { if (file.Length < SectorBytes) {
byte[] int0 = BitConverter.GetBytes((int)0); byte[] int0 = BitConverter.GetBytes((int)0);
/* we need to write the chunk offset table */ /* we need to write the chunk offset table */
for (int i = 0; i < SectorInts; ++i) {
file.Write(int0, 0, 4);
}
// write another sector for the timestamp info
for (int i = 0; i < SectorInts; ++i) {
file.Write(int0, 0, 4);
}
file.Flush();
sizeDelta += SectorBytes * 2;
}
if ((file.Length & 0xfff) != 0) {
/* the file size is not a multiple of 4KB, grow it */
for (int i = 0; i < (file.Length & 0xfff); ++i) {
file.WriteByte(0);
}
file.Flush();
}
/* set up the available sector map */
int nSectors = (int)file.Length / SectorBytes;
sectorFree = new List<Boolean>(nSectors);
for (int i = 0; i < nSectors; ++i) {
sectorFree.Add(true);
}
sectorFree[0] = false; // chunk offset table
sectorFree[1] = false; // for the last modified info
file.Seek(0, SeekOrigin.Begin);
for (int i = 0; i < SectorInts; ++i) { for (int i = 0; i < SectorInts; ++i) {
file.Write(int0, 0, 4); byte[] offsetBytes = new byte[4];
} file.Read(offsetBytes, 0, 4);
// write another sector for the timestamp info
for (int i = 0; i < SectorInts; ++i) {
file.Write(int0, 0, 4);
}
file.Flush(); if (BitConverter.IsLittleEndian) {
Array.Reverse(offsetBytes);
}
int offset = BitConverter.ToInt32(offsetBytes, 0);
sizeDelta += SectorBytes * 2; offsets[i] = offset;
} if (offset != 0 && (offset >> 8) + (offset & 0xFF) <= sectorFree.Count) {
for (int sectorNum = 0; sectorNum < (offset & 0xFF); ++sectorNum) {
if ((file.Length & 0xfff) != 0) { sectorFree[(offset >> 8) + sectorNum] = false;
/* the file size is not a multiple of 4KB, grow it */ }
for (int i = 0; i < (file.Length & 0xfff); ++i) {
file.WriteByte(0);
}
file.Flush();
}
/* set up the available sector map */
int nSectors = (int)file.Length / SectorBytes;
sectorFree = new List<Boolean>(nSectors);
for (int i = 0; i < nSectors; ++i) {
sectorFree.Add(true);
}
sectorFree[0] = false; // chunk offset table
sectorFree[1] = false; // for the last modified info
file.Seek(0, SeekOrigin.Begin);
for (int i = 0; i < SectorInts; ++i) {
byte[] offsetBytes = new byte[4];
file.Read(offsetBytes, 0, 4);
if (BitConverter.IsLittleEndian) {
Array.Reverse(offsetBytes);
}
int offset = BitConverter.ToInt32(offsetBytes, 0);
offsets[i] = offset;
if (offset != 0 && (offset >> 8) + (offset & 0xFF) <= sectorFree.Count) {
for (int sectorNum = 0; sectorNum < (offset & 0xFF); ++sectorNum) {
sectorFree[(offset >> 8) + sectorNum] = false;
} }
} }
} for (int i = 0; i < SectorInts; ++i) {
for (int i = 0; i < SectorInts; ++i) { byte[] modBytes = new byte[4];
byte[] modBytes = new byte[4]; file.Read(modBytes, 0, 4);
file.Read(modBytes, 0, 4);
if (BitConverter.IsLittleEndian) { if (BitConverter.IsLittleEndian) {
Array.Reverse(modBytes); Array.Reverse(modBytes);
}
int lastModValue = BitConverter.ToInt32(modBytes, 0);
chunkTimestamps[i] = lastModValue;
} }
int lastModValue = BitConverter.ToInt32(modBytes, 0);
chunkTimestamps[i] = lastModValue;
} }
//}
} }
catch (IOException e) { catch (IOException e) {
System.Console.WriteLine(e.Message); System.Console.WriteLine(e.Message);
@ -239,47 +250,49 @@ namespace Substrate.Core
return null; return null;
} }
file.Seek(sectorNumber * SectorBytes, SeekOrigin.Begin); lock (this.fileLock) {
byte[] lengthBytes = new byte[4]; file.Seek(sectorNumber * SectorBytes, SeekOrigin.Begin);
file.Read(lengthBytes, 0, 4); byte[] lengthBytes = new byte[4];
file.Read(lengthBytes, 0, 4);
if (BitConverter.IsLittleEndian) { if (BitConverter.IsLittleEndian) {
Array.Reverse(lengthBytes); Array.Reverse(lengthBytes);
} }
int length = BitConverter.ToInt32(lengthBytes, 0); int length = BitConverter.ToInt32(lengthBytes, 0);
if (length > SectorBytes * numSectors) { if (length > SectorBytes * numSectors) {
Debugln("READ", x, z, "invalid length: " + length + " > 4096 * " + numSectors); Debugln("READ", x, z, "invalid length: " + length + " > 4096 * " + numSectors);
return null;
}
byte version = (byte)file.ReadByte();
if (version == VERSION_GZIP) {
byte[] data = new byte[length - 1];
file.Read(data, 0, data.Length);
Stream ret = new GZipStream(new MemoryStream(data), CompressionMode.Decompress);
return ret;
}
else if (version == VERSION_DEFLATE) {
byte[] data = new byte[length - 1];
file.Read(data, 0, data.Length);
Stream ret = new ZlibStream(new MemoryStream(data), CompressionMode.Decompress, true);
return ret;
/*MemoryStream sinkZ = new MemoryStream();
ZlibStream zOut = new ZlibStream(sinkZ, CompressionMode.Decompress, true);
zOut.Write(data, 0, data.Length);
zOut.Flush();
zOut.Close();
sinkZ.Seek(0, SeekOrigin.Begin);
return sinkZ;*/
}
Debugln("READ", x, z, "unknown version " + version);
return null; return null;
} }
byte version = (byte)file.ReadByte();
if (version == VERSION_GZIP) {
byte[] data = new byte[length - 1];
file.Read(data, 0, data.Length);
Stream ret = new GZipStream(new MemoryStream(data), CompressionMode.Decompress);
return ret;
}
else if (version == VERSION_DEFLATE) {
byte[] data = new byte[length - 1];
file.Read(data, 0, data.Length);
Stream ret = new ZlibStream(new MemoryStream(data), CompressionMode.Decompress, true);
return ret;
/*MemoryStream sinkZ = new MemoryStream();
ZlibStream zOut = new ZlibStream(sinkZ, CompressionMode.Decompress, true);
zOut.Write(data, 0, data.Length);
zOut.Flush();
zOut.Close();
sinkZ.Seek(0, SeekOrigin.Begin);
return sinkZ;*/
}
Debugln("READ", x, z, "unknown version " + version);
return null;
} }
catch (IOException) { catch (IOException) {
Debugln("READ", x, z, "exception"); Debugln("READ", x, z, "exception");
@ -407,17 +420,19 @@ namespace Substrate.Core
* no free space large enough found -- we need to grow the * no free space large enough found -- we need to grow the
* file * file
*/ */
Debug("SAVE", x, z, length, "grow"); lock (this.fileLock) {
file.Seek(0, SeekOrigin.End); Debug("SAVE", x, z, length, "grow");
sectorNumber = sectorFree.Count; file.Seek(0, SeekOrigin.End);
for (int i = 0; i < sectorsNeeded; ++i) { sectorNumber = sectorFree.Count;
file.Write(emptySector, 0, emptySector.Length); for (int i = 0; i < sectorsNeeded; ++i) {
sectorFree.Add(false); file.Write(emptySector, 0, emptySector.Length);
} sectorFree.Add(false);
sizeDelta += SectorBytes * sectorsNeeded; }
sizeDelta += SectorBytes * sectorsNeeded;
Write(sectorNumber, data, length); Write(sectorNumber, data, length);
SetOffset(x, z, (sectorNumber << 8) | sectorsNeeded); SetOffset(x, z, (sectorNumber << 8) | sectorsNeeded);
}
} }
} }
SetTimestamp(x, z, timestamp); SetTimestamp(x, z, timestamp);
@ -430,32 +445,36 @@ namespace Substrate.Core
/* write a chunk data to the region file at specified sector number */ /* write a chunk data to the region file at specified sector number */
private void Write (int sectorNumber, byte[] data, int length) private void Write (int sectorNumber, byte[] data, int length)
{ {
Debugln(" " + sectorNumber); lock (this.fileLock) {
file.Seek(sectorNumber * SectorBytes, SeekOrigin.Begin); Debugln(" " + sectorNumber);
file.Seek(sectorNumber * SectorBytes, SeekOrigin.Begin);
byte[] bytes = BitConverter.GetBytes(length + 1); byte[] bytes = BitConverter.GetBytes(length + 1);
if (BitConverter.IsLittleEndian) { if (BitConverter.IsLittleEndian) {
; ;
Array.Reverse(bytes); Array.Reverse(bytes);
}
file.Write(bytes, 0, 4); // chunk length
file.WriteByte(VERSION_DEFLATE); // chunk version number
file.Write(data, 0, length); // chunk data
} }
file.Write(bytes, 0, 4); // chunk length
file.WriteByte(VERSION_DEFLATE); // chunk version number
file.Write(data, 0, length); // chunk data
} }
public void DeleteChunk (int x, int z) public void DeleteChunk (int x, int z)
{ {
int offset = GetOffset(x, z); lock (this.fileLock) {
int sectorNumber = offset >> 8; int offset = GetOffset(x, z);
int sectorsAllocated = offset & 0xFF; int sectorNumber = offset >> 8;
int sectorsAllocated = offset & 0xFF;
file.Seek(sectorNumber * SectorBytes, SeekOrigin.Begin); file.Seek(sectorNumber * SectorBytes, SeekOrigin.Begin);
for (int i = 0; i < sectorsAllocated; i++) { for (int i = 0; i < sectorsAllocated; i++) {
file.Write(emptySector, 0, SectorBytes); file.Write(emptySector, 0, SectorBytes);
}
SetOffset(x, z, 0);
SetTimestamp(x, z, 0);
} }
SetOffset(x, z, 0);
SetTimestamp(x, z, 0);
} }
/* is this an invalid chunk coordinate? */ /* is this an invalid chunk coordinate? */
@ -476,16 +495,18 @@ namespace Substrate.Core
private void SetOffset (int x, int z, int offset) private void SetOffset (int x, int z, int offset)
{ {
offsets[x + z * 32] = offset; lock (this.fileLock) {
file.Seek((x + z * 32) * 4, SeekOrigin.Begin); offsets[x + z * 32] = offset;
file.Seek((x + z * 32) * 4, SeekOrigin.Begin);
byte[] bytes = BitConverter.GetBytes(offset); byte[] bytes = BitConverter.GetBytes(offset);
if (BitConverter.IsLittleEndian) { if (BitConverter.IsLittleEndian) {
; ;
Array.Reverse(bytes); Array.Reverse(bytes);
}
file.Write(bytes, 0, 4);
} }
file.Write(bytes, 0, 4);
} }
private int Timestamp () private int Timestamp ()
@ -507,21 +528,25 @@ namespace Substrate.Core
public void SetTimestamp (int x, int z, int value) public void SetTimestamp (int x, int z, int value)
{ {
chunkTimestamps[x + z * 32] = value; lock (this.fileLock) {
file.Seek(SectorBytes + (x + z * 32) * 4, SeekOrigin.Begin); chunkTimestamps[x + z * 32] = value;
file.Seek(SectorBytes + (x + z * 32) * 4, SeekOrigin.Begin);
byte[] bytes = BitConverter.GetBytes(value); byte[] bytes = BitConverter.GetBytes(value);
if (BitConverter.IsLittleEndian) { if (BitConverter.IsLittleEndian) {
; ;
Array.Reverse(bytes); Array.Reverse(bytes);
}
file.Write(bytes, 0, 4);
} }
file.Write(bytes, 0, 4);
} }
public void Close () public void Close ()
{ {
file.Close(); lock (this.fileLock) {
file.Close();
}
} }
protected virtual int SectorBytes protected virtual int SectorBytes