Skip to content

Commit

Permalink
pack-objects: thread the path-based compression
Browse files Browse the repository at this point in the history
Adapting the implementation of ll_find_deltas(), create a threaded
version of the --path-walk compression step in 'git pack-objects'.

This involves adding a 'regions' member to the thread_params struct,
allowing each thread to own a section of paths. We can simplify the way
jobs are split because there is no value in extending the batch based on
name-hash the way sections of the object entry array are attempted to be
grouped. We re-use the 'list_size' and 'remaining' items for the purpose
of borrowing work in progress from other "victim" threads when a thread
has finished its batch of work more quickly.

Using the Git repository as a test repo, the p5313 performance test
shows that the resulting size of the repo is the same, but the threaded
implementation gives gains of varying degrees depending on the number of
objects being packed. (This was tested on a 16-core machine.)

Test                                      HEAD~1    HEAD
-----------------------------------------------------------------
5313.2: thin pack                         0.00        0.00  =
5313.3: thin pack size                     589         589  +0.0%
5313.4: thin pack with --path-walk        0.00        0.00  =
5313.5: thin pack size with --path-walk    589         589  +0.0%
5313.6: big pack                          2.84        2.80  -1.4%
5313.7: big pack size                    14.0M       14.1M  +0.3%
5313.8: big pack with --path-walk         5.46        3.77 -31.0%
5313.9: big pack size with --path-walk   13.2M       13.2M  -0.0%
5313.10: repack                           22.11      21.50  -2.8%
5313.11: repack size                     126.4M     126.2M  -0.2%
5313.12: repack with --path-walk          66.89      26.41 -60.5%
5313.13: repack size with --path-walk    109.6M     109.6M  +0.0%

This 60% reduction in 'git repack --path-walk' time is typical across
all repos I used for testing. What is interesting is to compare when the
overall time improves enough to outperform the standard case. These time
improvements correlate with repositories with data shapes that
significantly improve their data size as well.

For example, the microsoft/fluentui repo has a 439M to 122M size
reduction, and the repack time is now 36.6 seconds with --path-walk
compared to 95+ seconds without it:

Test                                      HEAD~!    HEAD
-----------------------------------------------------------------
5313.2: thin pack                         0.41        0.42  +2.4%
5313.3: thin pack size                    1.2M        1.2M  +0.0%
5313.4: thin pack with --path-walk        0.08        0.05 -37.5%
5313.5: thin pack size with --path-walk  18.4K       18.4K  +0.0%
5313.6: big pack                          4.47        4.53  +1.3%
5313.7: big pack size                    19.6M       19.7M  +0.3%
5313.8: big pack with --path-walk         6.76        3.51 -48.1%
5313.9: big pack size with --path-walk   16.5M       16.4M  -0.2%
5313.10: repack                          96.87       99.05  +2.3%
5313.11: repack size                    439.5M      439.0M  -0.1%
5313.12: repack with --path-walk         95.68       36.55 -61.8%
5313.13: repack size with --path-walk   122.6M      122.6M  +0.0%

In a more extreme example, an internal repository that has a similar
name-hash collision issue to microsoft/fluentui reduces its size from
6.4G to 805M with the --path-walk option. This also reduces the
repacking time from 2,138 seconds to 478 seconds.

Test                                      HEAD~1    HEAD
------------------------------------------------------------------
5313.10: repack                           2138.22   2138.19  -0.0%
5313.11: repack size                         6.4G      6.4G  -0.0%
5313.12: repack with --path-walk          1351.46    477.91 -64.6%
5313.13: repack size with --path-walk      804.1M    804.1M  -0.0%

Finally, the Linux kernel repository is a good test for this repacking
time change, even though the space savings is more reasonable:

Test                                      HEAD~1      HEAD
----------------------------------------------------------------
5313.10: repack                           734.26   735.11  +0.1%
5313.11: repack size                        2.5G     2.5G  -0.0%
5313.12: repack with --path-walk         1457.23   598.17 -59.0%
5313.13: repack size with --path-walk       2.2G     2.2G  +0.0%

Signed-off-by: Derrick Stolee <[email protected]>
  • Loading branch information
derrickstolee committed Oct 30, 2024
1 parent 37572d7 commit 7ae9a40
Showing 1 changed file with 160 additions and 2 deletions.
162 changes: 160 additions & 2 deletions builtin/pack-objects.c
Original file line number Diff line number Diff line change
Expand Up @@ -2935,6 +2935,7 @@ static void find_deltas(struct object_entry **list, unsigned *list_size,
struct thread_params {
pthread_t thread;
struct object_entry **list;
struct packing_region *regions;
unsigned list_size;
unsigned remaining;
int window;
Expand Down Expand Up @@ -3248,6 +3249,163 @@ static void find_deltas_by_region(struct object_entry *list,
stop_progress(&progress_state);
}

static void *threaded_find_deltas_by_path(void *arg)
{
struct thread_params *me = arg;

progress_lock();
while (me->remaining) {
while (me->remaining) {
progress_unlock();
find_deltas_for_region(to_pack.objects,
me->regions,
me->processed);
progress_lock();
me->remaining--;
me->regions++;
}

me->working = 0;
pthread_cond_signal(&progress_cond);
progress_unlock();

/*
* We must not set ->data_ready before we wait on the
* condition because the main thread may have set it to 1
* before we get here. In order to be sure that new
* work is available if we see 1 in ->data_ready, it
* was initialized to 0 before this thread was spawned
* and we reset it to 0 right away.
*/
pthread_mutex_lock(&me->mutex);
while (!me->data_ready)
pthread_cond_wait(&me->cond, &me->mutex);
me->data_ready = 0;
pthread_mutex_unlock(&me->mutex);

progress_lock();
}
progress_unlock();
/* leave ->working 1 so that this doesn't get more work assigned */
return NULL;
}

static void ll_find_deltas_by_region(struct object_entry *list,
struct packing_region *regions,
uint32_t start, uint32_t nr)
{
struct thread_params *p;
int i, ret, active_threads = 0;
unsigned int processed = 0;
uint32_t progress_nr;
init_threaded_search();

if (!nr)
return;

progress_nr = regions[nr - 1].start + regions[nr - 1].nr;
if (delta_search_threads <= 1) {
find_deltas_by_region(list, regions, start, nr);
cleanup_threaded_search();
return;
}

if (progress > pack_to_stdout)
fprintf_ln(stderr, _("Path-based delta compression using up to %d threads"),
delta_search_threads);
CALLOC_ARRAY(p, delta_search_threads);

if (progress)
progress_state = start_progress(_("Compressing objects by path"),
progress_nr);
/* Partition the work amongst work threads. */
for (i = 0; i < delta_search_threads; i++) {
unsigned sub_size = nr / (delta_search_threads - i);

p[i].window = window;
p[i].depth = depth;
p[i].processed = &processed;
p[i].working = 1;
p[i].data_ready = 0;

p[i].regions = regions;
p[i].list_size = sub_size;
p[i].remaining = sub_size;

regions += sub_size;
nr -= sub_size;
}

/* Start work threads. */
for (i = 0; i < delta_search_threads; i++) {
if (!p[i].list_size)
continue;
pthread_mutex_init(&p[i].mutex, NULL);
pthread_cond_init(&p[i].cond, NULL);
ret = pthread_create(&p[i].thread, NULL,
threaded_find_deltas_by_path, &p[i]);
if (ret)
die(_("unable to create thread: %s"), strerror(ret));
active_threads++;
}

/*
* Now let's wait for work completion. Each time a thread is done
* with its work, we steal half of the remaining work from the
* thread with the largest number of unprocessed objects and give
* it to that newly idle thread. This ensure good load balancing
* until the remaining object list segments are simply too short
* to be worth splitting anymore.
*/
while (active_threads) {
struct thread_params *target = NULL;
struct thread_params *victim = NULL;
unsigned sub_size = 0;

progress_lock();
for (;;) {
for (i = 0; !target && i < delta_search_threads; i++)
if (!p[i].working)
target = &p[i];
if (target)
break;
pthread_cond_wait(&progress_cond, &progress_mutex);
}

for (i = 0; i < delta_search_threads; i++)
if (p[i].remaining > 2*window &&
(!victim || victim->remaining < p[i].remaining))
victim = &p[i];
if (victim) {
sub_size = victim->remaining / 2;
target->regions = victim->regions + victim->remaining - sub_size;
victim->list_size -= sub_size;
victim->remaining -= sub_size;
}
target->list_size = sub_size;
target->remaining = sub_size;
target->working = 1;
progress_unlock();

pthread_mutex_lock(&target->mutex);
target->data_ready = 1;
pthread_cond_signal(&target->cond);
pthread_mutex_unlock(&target->mutex);

if (!sub_size) {
pthread_join(target->thread, NULL);
pthread_cond_destroy(&target->cond);
pthread_mutex_destroy(&target->mutex);
active_threads--;
}
}
cleanup_threaded_search();
free(p);

display_progress(progress_state, progress_nr);
stop_progress(&progress_state);
}

static void prepare_pack(int window, int depth)
{
struct object_entry **delta_list;
Expand All @@ -3273,8 +3431,8 @@ static void prepare_pack(int window, int depth)
return;

if (path_walk)
find_deltas_by_region(to_pack.objects, to_pack.regions,
0, to_pack.nr_regions);
ll_find_deltas_by_region(to_pack.objects, to_pack.regions,
0, to_pack.nr_regions);

ALLOC_ARRAY(delta_list, to_pack.nr_objects);
nr_deltas = n = 0;
Expand Down

0 comments on commit 7ae9a40

Please sign in to comment.