Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce disk usage during write #239

Merged
merged 12 commits into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
- **CHANGED API** `Longtail_JobAPI_JobFunc` renamed `is_cancelled` to `detected_error`, now contains first error returned from a job task in the same job group (if any) or ECANCELLED if job group was cancelled
If `detected_error` is non-zero, try to exit (and cleanup) your task directly and return `0`.
- **CHANGED_API** JobAPI `WaitForAllJobs` now returns first error encountered in a job group for a task as well as any error in the job api itself, removing the need to book keep the error for tasks separately
- **ADDED** memtracer now tracks allocations in stb_ds
- **ADDED** memtracer now tracks allocations in zstd
- **CHANGED API** `Longtail_StorageAPI.OpenAppend` added to `Longtail_StorageAPI` to open files without truncating existing data
- **CHANGED API** `Longtail_CreateConcurrentChunkWriteAPI` changed to take `source_version_index` and `version_diff`
- **CHANGED API** `Longtail_ConcurrentChunkWriteAPI` refactored to use asset index and open/close files instead of keeping all open during entire lifetime
- `Longtail_ConcurrentChunkWriteAPI.CreateDir` now takes asset index instead of version local path
- `Longtail_ConcurrentChunkWriteAPI.Open` now takes asset index instead of version local path and dropping `chunk_write_count` parameter
- `Longtail_ConcurrentChunkWriteAPI.Write` now takes asset index instead of version local path and dropping `chunk_write_count` parameter
- **CHANGED API** `Longtail_SetMonitor` callback functions refactored to accomodate changes in `Longtail_ConcurrentChunkWriteAPI`
- **NEW API** `Longtail_SetReAllocAndFree`
- **NEW API** `Longtail_ReAlloc`
- **NEW API** `Longtail_MemTracer_ReAlloc`
Expand All @@ -12,12 +17,31 @@
- **NEW API** `Longtail_GetFilesRecursively2` that executes using parallel jobs improving execution speed for large file trees significantly
- **REMOVED API** `Longtail_SetAllocAndFree` is replaced by `Longtail_SetReAllocAndFree`
- **REMOVED API** `Longtail_MemTracer_Alloc` is replaced by `Longtail_MemTracer_ReAlloc`
- **CHANGED API** `Longtail_ConcurrentChunkWriteAPI::Write` has new parameter `out_chunks_remaining` which is set to the remaining number of chunks to write to asset after call completes
- **REMOVED API** Remove platform api for Read/Write mutex
- `Longtail_GetRWLockSize`
- `Longtail_CreateRWLock`
- `Longtail_DeleteRWLock`
- `Longtail_LockRWLockRead`
- `Longtail_LockRWLockWrite`
- `Longtail_UnlockRWLockRead`
- `Longtail_UnlockRWLockWrite`
- **ADDED** memtracer now tracks allocations in stb_ds
- **ADDED** memtracer now tracks allocations in zstd
- **FIXED** Fixed memory leaks in command tool
- **FIXED** `Longtail_ChangeVersion2()` can now handle workloads with a block count larger than 65535
- **FIXED** Bikeshed JobAPI implementation does efficient wait when task queue is full
- **FIXED** Bikeshed JobAPI::CreateJobs implementation now properly drains both task channels when task queue is full
- **FIXED** Make sure we retain order of assets with equal length when sorting them
- **FIXED** Fixed excessive "Disk Used" increase during `Longtail_ChangeVersion2` execution causing Out Of Disk space errors.
The changes also improves performance for more common cases with smaller archive sizes (60 Gb raw data/many files) but causes a small regression compared to 0.4.1 for archives with many very large files. It is still performing much more reasonable than 0.4.0 for these cases.
| Version | Files | Raw Size | Compressed Size | Unpack Time | Peak Memory |
|-|-|-|-|-|-|
|0.4.0|1019|735 GB|214 GB|2h44m26s|7.9 GB|
|0.4.1|1019|735 GB|214 GB|0h12m14s|1.9 GB|
|0.4.2|1019|735 GB|214 GB|0h13m25s|2.2 GB|
|0.4.0|239 340|60 GB|17 GB|0h01m24s|4.2 GB|
|0.4.1|239 340|60 GB|17 GB|0h02m48s|0.9 GB|
|0.4.2|239 340|60 GB|17 GB|0h01m12s|0.9 GB|
- **CHANGED** Refactored all internal usage of JobAPI `ReadyJobs` with new error handling
- **UPDATED** Update of ZStd: 1.5.5 https://github.com/facebook/zstd/releases/tag/v1.5.5
- **UPDATED** Update of Blake3: 1.5.0 https://github.com/BLAKE3-team/BLAKE3/releases/tag/1.5.0
Expand Down
51 changes: 18 additions & 33 deletions cmd/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ struct AssetInfo
{
TLongtail_Atomic64 m_AccessCount;
uint32_t m_TotalChunkCount;
TLongtail_Atomic64 m_WriteCount;
TLongtail_Atomic64 m_ReadWriteCount;
TLongtail_Atomic64 m_PendingChunkCount;
TLongtail_Atomic32 m_ActivityIndicator;
};

uint32_t MonitorAssetInfosCount = 0;
struct AssetInfo* MonitorAssetInfos = 0;

void MonitorAssetRemove(const struct Longtail_VersionIndex* version_index, uint32_t asset_index)
void MonitorAssetRemove(const struct Longtail_VersionIndex* version_index, uint32_t asset_index, int err)
{
if (MonitorAssetInfos)
{
Expand All @@ -161,34 +161,26 @@ void MonitorAssetRemove(const struct Longtail_VersionIndex* version_index, uint3
}
}

void MonitorAssetOpen(const struct Longtail_VersionIndex* version_index, uint32_t asset_index)
void MonitorAssetOpen(const struct Longtail_VersionIndex* version_index, uint32_t asset_index, int err)
{
if (MonitorAssetInfos)
{
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_AccessCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_WriteCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_ReadWriteCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_PendingChunkCount, 1);
}
}

void MonitorAssetWrite(const struct Longtail_StoreIndex* store_index, const struct Longtail_VersionIndex* version_index, uint32_t asset_index, uint64_t write_offset, uint32_t size, uint32_t chunk_index, uint32_t chunk_index_in_block, uint32_t chunk_count_in_block, uint32_t block_index, uint32_t block_data_offset)
void MonitorAssetWrite(const struct Longtail_StoreIndex* store_index, const struct Longtail_VersionIndex* version_index, uint32_t asset_index, uint64_t write_offset, uint32_t size, uint32_t chunk_index, uint32_t chunk_index_in_block, uint32_t chunk_count_in_block, uint32_t block_index, uint32_t block_data_offset, int err)
{
if (MonitorAssetInfos)
{
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_AccessCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_WriteCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_ReadWriteCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_PendingChunkCount, -((int64_t)chunk_count_in_block));
}
}

void MonitorAssetComplete(const struct Longtail_VersionIndex* version_index, uint32_t asset_index, int err)
{
if (MonitorAssetInfos)
{
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_AccessCount, 1);
// Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_WriteCount, 1);
}
}

struct MonitorChunkInfo
{
TLongtail_Atomic64 m_AccessCount;
Expand All @@ -197,7 +189,7 @@ struct MonitorChunkInfo
uint32_t MonitorChunkInfosCount = 0;
struct MonitorChunkInfo* MonitorChunkInfos = 0;

void MonitorChunkRead(const struct Longtail_StoreIndex* store_index, const struct Longtail_VersionIndex* version_index, uint32_t block_index, uint32_t chunk_index, uint32_t chunk_index_in_block)
void MonitorChunkRead(const struct Longtail_StoreIndex* store_index, const struct Longtail_VersionIndex* version_index, uint32_t block_index, uint32_t chunk_index, uint32_t chunk_index_in_block, int err)
{
if (MonitorBlockInfos)
{
Expand Down Expand Up @@ -237,22 +229,22 @@ void MonitorBlockSaved(const struct Longtail_StoreIndex* store_index, uint32_t b
}
}

void MonitorAssetRead(const struct Longtail_StoreIndex* store_index, const struct Longtail_VersionIndex* version_index, uint32_t asset_index, uint64_t read_offset, uint32_t size, TLongtail_Hash chunk_hash, uint32_t block_index, uint32_t block_data_offset)
void MonitorAssetRead(const struct Longtail_StoreIndex* store_index, const struct Longtail_VersionIndex* version_index, uint32_t asset_index, uint64_t read_offset, uint32_t size, TLongtail_Hash chunk_hash, uint32_t block_index, uint32_t block_data_offset, int err)
{
if (MonitorAssetInfos)
{
Longtail_AtomicAdd64(&MonitorBlockInfos[block_index].m_AccessCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_AccessCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_WriteCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_ReadWriteCount, 1);
}
}

void MonitorAssetClose(const struct Longtail_VersionIndex* version_index, uint32_t asset_index, int err)
void MonitorAssetClose(const struct Longtail_VersionIndex* version_index, uint32_t asset_index)
{
if (MonitorAssetInfos)
{
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_AccessCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_PendingChunkCount, -MonitorAssetInfos[asset_index].m_PendingChunkCount);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_ReadWriteCount, 1);
Longtail_AtomicAdd64(&MonitorAssetInfos[asset_index].m_PendingChunkCount, -1);
}
}

Expand Down Expand Up @@ -415,7 +407,7 @@ static int UpdateProgressWindow()
Longtail_AtomicAdd32(&MonitorAssetInfos[a].m_ActivityIndicator, -1);
continue;
}
if (MonitorAssetInfos[a].m_WriteCount == 0)
if (MonitorAssetInfos[a].m_ReadWriteCount == 0)
{
SetAsset(a, Grey);
}
Expand Down Expand Up @@ -550,7 +542,7 @@ static int UpdateProgressWindow()
}
else
{
if (MonitorAssetInfos[a].m_WriteCount == 0)
if (MonitorAssetInfos[a].m_ReadWriteCount == 0)
{
tmp_buffer[u] = ' ';
incomplete_asset_count++;
Expand Down Expand Up @@ -622,13 +614,6 @@ void InitMonitor(struct Longtail_StoreIndex* store_index, struct Longtail_Versio
MonitorAssetInfos[a].m_PendingChunkCount = version_index->m_AssetChunkCounts[a];
}
}
else
{
for (uint32_t a = 0; a < MonitorAssetInfosCount; a++)
{
MonitorAssetInfos[a].m_PendingChunkCount = 1;
}
}

MonitorChunkInfosCount = (*version_index->m_ChunkCount);
size_t MonitorChunkInfosSize = sizeof(struct MonitorChunkInfo) * MonitorChunkInfosCount;
Expand All @@ -645,7 +630,7 @@ void InitMonitor(struct Longtail_StoreIndex* store_index, struct Longtail_Versio
MonitorAssetOpen,
MonitorAssetWrite,
MonitorChunkRead,
MonitorAssetComplete,
// MonitorAssetComplete,
MonitorBlockCompose,
MonitorBlockSave,
MonitorBlockSaved,
Expand Down Expand Up @@ -1541,7 +1526,7 @@ int DownSync(
struct Longtail_ProgressAPI* progress = MakeProgressAPI("Updating version", enable_detailed_progress ? 0 : 5);
if (progress)
{
struct Longtail_ConcurrentChunkWriteAPI* concurrent_chunk_write = Longtail_CreateConcurrentChunkWriteAPI(storage_api, target_path);
struct Longtail_ConcurrentChunkWriteAPI* concurrent_chunk_write = Longtail_CreateConcurrentChunkWriteAPI(storage_api, source_version_index, version_diff, target_path);
if (concurrent_chunk_write)
{
err = Longtail_ChangeVersion2(
Expand Down Expand Up @@ -2663,7 +2648,7 @@ int Unpack(
struct Longtail_ProgressAPI* progress = MakeProgressAPI("Updating version", enable_detailed_progress ? 0 : 5);
if (progress)
{
struct Longtail_ConcurrentChunkWriteAPI* concurrent_chunk_write = Longtail_CreateConcurrentChunkWriteAPI(storage_api, target_path);
struct Longtail_ConcurrentChunkWriteAPI* concurrent_chunk_write = Longtail_CreateConcurrentChunkWriteAPI(storage_api, &archive_index->m_VersionIndex, version_diff, target_path);
if (concurrent_chunk_write)
{
err = Longtail_ChangeVersion2(
Expand Down
22 changes: 21 additions & 1 deletion lib/blockstorestorage/longtail_blockstorestorage.c
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,25 @@ static int BlockStoreStorageAPI_OpenWriteFile(
return ENOTSUP;
}

static int BlockStoreStorageAPI_OpenAppendFile(
struct Longtail_StorageAPI* storage_api,
const char* path,
Longtail_StorageAPI_HOpenFile* out_open_file)
{
MAKE_LOG_CONTEXT_FIELDS(ctx)
LONGTAIL_LOGFIELD(storage_api, "%p"),
LONGTAIL_LOGFIELD(path, "%s"),
LONGTAIL_LOGFIELD(out_open_file, "%p")
MAKE_LOG_CONTEXT_WITH_FIELDS(ctx, 0, LONGTAIL_LOG_LEVEL_OFF)

LONGTAIL_VALIDATE_INPUT(ctx, storage_api != 0, return 0)
LONGTAIL_VALIDATE_INPUT(ctx, path != 0, return 0)
LONGTAIL_VALIDATE_INPUT(ctx, out_open_file != 0, return 0)

LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "Unsupported, failed with %d", ENOTSUP)
return ENOTSUP;
}

static int BlockStoreStorageAPI_Write(
struct Longtail_StorageAPI* storage_api,
Longtail_StorageAPI_HOpenFile f,
Expand Down Expand Up @@ -1407,7 +1426,8 @@ static int BlockStoreStorageAPI_Init(
BlockStoreStorageAPI_UnlockFile,
BlockStoreStorageAPI_GetParentPath,
BlockStoreStorageAPI_MapFile,
BlockStoreStorageAPI_UnmapFile);
BlockStoreStorageAPI_UnmapFile,
BlockStoreStorageAPI_OpenAppendFile);

struct BlockStoreStorageAPI* block_store_fs = (struct BlockStoreStorageAPI*)api;

Expand Down
Loading
Loading