16
16
import pandas as pd
17
17
import pytest
18
18
19
- from gitpandas .cache import DiskCache , CacheMissError , multicache
19
+ from gitpandas .cache import DiskCache , CacheMissError , multicache , EphemeralCache
20
20
21
21
22
22
class MockRepoMethod :
@@ -515,4 +515,280 @@ def verifier(worker_id):
515
515
516
516
# Verify that at least some data was persisted and loaded correctly
517
517
total_verified = sum (len (results ) for results in verification_results .values ())
518
- assert total_verified > 0 , "No data was successfully persisted and loaded"
518
+ assert total_verified > 0 , "No data was successfully persisted and loaded"
519
+
520
+ def test_concurrent_additions_near_max_key_limit (self , temp_cache_file ):
521
+ """
522
+ Test the race condition that occurs when multiple threads add items
523
+ to the cache simultaneously when near the max key limit.
524
+
525
+ This test demonstrates the potential for IndexError, KeyError, and
526
+ cache inconsistency when multiple threads trigger eviction simultaneously.
527
+ """
528
+ # Use a small max_keys to easily trigger the race condition
529
+ cache = DiskCache (filepath = temp_cache_file , max_keys = 10 )
530
+
531
+ # Pre-populate cache to near the limit (8 out of 10 keys)
532
+ for i in range (8 ):
533
+ cache .set (f"initial_key_{ i } " , pd .DataFrame ({"initial" : [i ]}))
534
+
535
+ errors_caught = []
536
+ errors_lock = threading .Lock ()
537
+ successful_operations = []
538
+ operations_lock = threading .Lock ()
539
+
540
+ def concurrent_adder (worker_id ):
541
+ """
542
+ Worker that adds multiple keys, potentially triggering eviction.
543
+ Each worker adds 5 keys, so total will exceed max_keys significantly.
544
+ """
545
+ for i in range (5 ):
546
+ try :
547
+ key = f"worker_{ worker_id } _key_{ i } "
548
+ value = pd .DataFrame ({"worker" : [worker_id ], "iteration" : [i ]})
549
+ cache .set (key , value )
550
+
551
+ with operations_lock :
552
+ successful_operations .append (f"{ worker_id } _{ i } " )
553
+
554
+ except (IndexError , KeyError , ValueError ) as e :
555
+ # These are the expected race condition errors
556
+ with errors_lock :
557
+ errors_caught .append (f"Worker { worker_id } , iteration { i } : { type (e ).__name__ } : { e } " )
558
+ except Exception as e :
559
+ # Any other unexpected error
560
+ with errors_lock :
561
+ errors_caught .append (f"Worker { worker_id } , iteration { i } : Unexpected { type (e ).__name__ } : { e } " )
562
+
563
+ # Start multiple threads that will all try to add keys simultaneously
564
+ # This should trigger the race condition in eviction
565
+ num_threads = 8
566
+ threads = []
567
+
568
+ for worker_id in range (num_threads ):
569
+ thread = threading .Thread (target = concurrent_adder , args = (worker_id ,))
570
+ threads .append (thread )
571
+
572
+ # Start all threads at roughly the same time to maximize race condition chance
573
+ for thread in threads :
574
+ thread .start ()
575
+
576
+ # Wait for all threads to complete
577
+ for thread in threads :
578
+ thread .join ()
579
+
580
+ # Check if any race condition errors occurred
581
+ if errors_caught :
582
+ print (f"Race condition errors detected (this demonstrates the bug):" )
583
+ for error in errors_caught [:10 ]: # Show first 10 errors
584
+ print (f" - { error } " )
585
+
586
+ # Verify cache consistency after the operations
587
+ cache_size = len (cache ._cache )
588
+ key_list_size = len (cache ._key_list )
589
+
590
+ print (f"Final cache state: { cache_size } items in cache, { key_list_size } items in key_list" )
591
+ print (f"Successful operations: { len (successful_operations )} " )
592
+ print (f"Errors caught: { len (errors_caught )} " )
593
+
594
+ # The cache should be consistent even if there were errors
595
+ assert cache_size == key_list_size , f"Cache inconsistency: { cache_size } != { key_list_size } "
596
+
597
+ # Cache size should not exceed max_keys (allowing for some tolerance due to threading)
598
+ assert cache_size <= cache ._max_keys + num_threads , f"Cache size { cache_size } significantly exceeds max_keys { cache ._max_keys } "
599
+
600
+ # If this test fails with race condition errors, it demonstrates the threading issue
601
+ # Note: This test might pass on some runs due to timing, but should fail consistently
602
+ # on systems with high concurrency or when run multiple times
603
+
604
+ def test_ephemeral_cache_race_condition_near_max_keys (self ):
605
+ """
606
+ Test to demonstrate race condition in EphemeralCache when multiple threads
607
+ add items near the max key limit. EphemeralCache has NO thread safety,
608
+ so this should expose the race condition more reliably.
609
+ """
610
+ # Use a very small max_keys to easily trigger the race condition
611
+ cache = EphemeralCache (max_keys = 5 )
612
+
613
+ # Pre-populate cache to near the limit (3 out of 5 keys)
614
+ for i in range (3 ):
615
+ cache .set (f"initial_key_{ i } " , pd .DataFrame ({"initial" : [i ]}))
616
+
617
+ errors_caught = []
618
+ errors_lock = threading .Lock ()
619
+ successful_operations = []
620
+ operations_lock = threading .Lock ()
621
+
622
+ def concurrent_adder (worker_id ):
623
+ """
624
+ Worker that adds multiple keys, triggering eviction.
625
+ Each worker adds 4 keys, so total will significantly exceed max_keys.
626
+ """
627
+ for i in range (4 ):
628
+ try :
629
+ key = f"worker_{ worker_id } _key_{ i } "
630
+ value = pd .DataFrame ({"worker" : [worker_id ], "iteration" : [i ]})
631
+ cache .set (key , value )
632
+
633
+ with operations_lock :
634
+ successful_operations .append (f"{ worker_id } _{ i } " )
635
+
636
+ except (IndexError , KeyError , ValueError , RuntimeError ) as e :
637
+ # These are the expected race condition errors
638
+ with errors_lock :
639
+ errors_caught .append (f"Worker { worker_id } , iteration { i } : { type (e ).__name__ } : { e } " )
640
+ except Exception as e :
641
+ # Any other unexpected error
642
+ with errors_lock :
643
+ errors_caught .append (f"Worker { worker_id } , iteration { i } : Unexpected { type (e ).__name__ } : { e } " )
644
+
645
+ # Start multiple threads that will all try to add keys simultaneously
646
+ # This should trigger the race condition in eviction for EphemeralCache
647
+ num_threads = 10
648
+ threads = []
649
+
650
+ for worker_id in range (num_threads ):
651
+ thread = threading .Thread (target = concurrent_adder , args = (worker_id ,))
652
+ threads .append (thread )
653
+
654
+ # Start all threads at roughly the same time to maximize race condition chance
655
+ for thread in threads :
656
+ thread .start ()
657
+
658
+ # Wait for all threads to complete
659
+ for thread in threads :
660
+ thread .join ()
661
+
662
+ # Report results
663
+ cache_size = len (cache ._cache )
664
+ key_list_size = len (cache ._key_list )
665
+
666
+ print (f"\n EphemeralCache Race Condition Test Results:" )
667
+ print (f"Final cache state: { cache_size } items in cache, { key_list_size } items in key_list" )
668
+ print (f"Successful operations: { len (successful_operations )} " )
669
+ print (f"Errors caught: { len (errors_caught )} " )
670
+
671
+ if errors_caught :
672
+ print (f"Race condition errors detected:" )
673
+ for error in errors_caught [:5 ]: # Show first 5 errors
674
+ print (f" - { error } " )
675
+
676
+ # Check for cache consistency issues
677
+ cache_inconsistent = cache_size != key_list_size
678
+ if cache_inconsistent :
679
+ print (f"CACHE INCONSISTENCY DETECTED: cache size ({ cache_size } ) != key_list size ({ key_list_size } )" )
680
+
681
+ # This test is meant to demonstrate the issue, so we'll make it informational
682
+ # In a real fix, we'd want these assertions to pass
683
+ print (f"Cache consistency: { 'FAILED' if cache_inconsistent else 'OK' } " )
684
+ print (f"Race condition errors: { 'DETECTED' if errors_caught else 'NONE' } " )
685
+
686
+ # For now, just ensure the test doesn't crash completely
687
+ # In a production environment, these race conditions could cause:
688
+ # 1. IndexError when evict() tries to pop from empty list
689
+ # 2. KeyError when evict() tries to delete already-deleted keys
690
+ # 3. Cache inconsistency where _cache and _key_list get out of sync
691
+ assert True # Test always passes, but demonstrates the issues above
692
+
693
+ def test_aggressive_ephemeral_cache_race_condition (self ):
694
+ """
695
+ More aggressive test to expose race conditions in EphemeralCache.
696
+ Uses many threads, smaller cache, and rapid operations to maximize
697
+ the chance of exposing threading issues.
698
+ """
699
+ # Very small cache to force frequent evictions
700
+ cache = EphemeralCache (max_keys = 3 )
701
+
702
+ errors_caught = []
703
+ errors_lock = threading .Lock ()
704
+ operation_count = 0
705
+ count_lock = threading .Lock ()
706
+
707
+ def aggressive_worker (worker_id ):
708
+ """Worker that rapidly adds many keys to force evictions."""
709
+ nonlocal operation_count
710
+
711
+ for i in range (20 ): # Each worker does 20 operations
712
+ try :
713
+ key = f"w{ worker_id } _k{ i } "
714
+ value = pd .DataFrame ({"w" : [worker_id ], "i" : [i ]})
715
+
716
+ # Add some variability in timing
717
+ if i % 3 == 0 :
718
+ time .sleep (0.0001 ) # Tiny delay to vary timing
719
+
720
+ cache .set (key , value )
721
+
722
+ with count_lock :
723
+ operation_count += 1
724
+
725
+ except Exception as e :
726
+ with errors_lock :
727
+ errors_caught .append (f"W{ worker_id } I{ i } : { type (e ).__name__ } : { str (e )} " )
728
+
729
+ # Use many threads to increase contention
730
+ num_threads = 20
731
+ threads = []
732
+
733
+ # Create and start all threads quickly
734
+ for worker_id in range (num_threads ):
735
+ thread = threading .Thread (target = aggressive_worker , args = (worker_id ,))
736
+ threads .append (thread )
737
+
738
+ # Start all threads as close together as possible
739
+ for thread in threads :
740
+ thread .start ()
741
+
742
+ # Wait for completion
743
+ for thread in threads :
744
+ thread .join ()
745
+
746
+ # Analyze results
747
+ cache_size = len (cache ._cache )
748
+ key_list_size = len (cache ._key_list )
749
+
750
+ print (f"\n Aggressive EphemeralCache Test Results:" )
751
+ print (f"Total operations attempted: { num_threads * 20 } " )
752
+ print (f"Successful operations: { operation_count } " )
753
+ print (f"Final cache state: { cache_size } items in cache, { key_list_size } items in key_list" )
754
+ print (f"Errors caught: { len (errors_caught )} " )
755
+
756
+ if errors_caught :
757
+ print (f"Race condition errors detected:" )
758
+ error_types = {}
759
+ for error in errors_caught :
760
+ error_type = error .split (":" )[1 ].strip ()
761
+ error_types [error_type ] = error_types .get (error_type , 0 ) + 1
762
+
763
+ for error_type , count in error_types .items ():
764
+ print (f" - { error_type } : { count } occurrences" )
765
+
766
+ # Show a few example errors
767
+ print (f"Example errors:" )
768
+ for error in errors_caught [:3 ]:
769
+ print (f" - { error } " )
770
+
771
+ # Check for inconsistencies
772
+ cache_inconsistent = cache_size != key_list_size
773
+ if cache_inconsistent :
774
+ print (f"CACHE INCONSISTENCY: cache has { cache_size } items but key_list has { key_list_size } " )
775
+
776
+ # Try to identify the discrepancy
777
+ cache_keys = set (cache ._cache .keys ())
778
+ list_keys = set (cache ._key_list )
779
+
780
+ only_in_cache = cache_keys - list_keys
781
+ only_in_list = list_keys - cache_keys
782
+
783
+ if only_in_cache :
784
+ print (f"Keys only in cache dict: { only_in_cache } " )
785
+ if only_in_list :
786
+ print (f"Keys only in key list: { only_in_list } " )
787
+
788
+ print (f"Cache consistency: { 'FAILED' if cache_inconsistent else 'OK' } " )
789
+ print (f"Race conditions: { 'DETECTED' if errors_caught else 'NONE' } " )
790
+
791
+ # This test demonstrates potential issues but doesn't fail
792
+ # The key insight is that even if we don't see errors every time,
793
+ # the potential for race conditions exists in the current implementation
794
+ assert True
0 commit comments