Skip to content

Commit 59a2a74

Browse files
gaogaotiantianHyukjinKwon
authored andcommitted
[SPARK-54441][INFRA] Enable coverage data for workers
### What changes were proposed in this pull request? By patching the worker in coverage runs, we save coverage data for code executed in workers. This code will only be triggered in coverage runs, not normal runs. Also we uses the zip lib for many tests, this PR will also combine the coverage data with the `[paths]` config in `.coveragerc`. ### Why are the changes needed? To have a better idea of what test coverage we are missing in our tests suite. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test that coverage data shows up for the worker file. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53145 from gaogaotiantian/enable-worker-coverage. Authored-by: Tian Gao <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent d3346eb commit 59a2a74

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

python/.coveragerc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,8 @@ omit =
3535
*/pyspark/streaming/tests/*
3636
*/pyspark/tests/*
3737
*/pyspark/testing/tests/*
38+
39+
[paths]
40+
source =
41+
pyspark/
42+
lib/pyspark.zip/pyspark

python/test_coverage/sitecustomize.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,32 @@
2121
# variable is set or not. If set, it starts to run the coverage.
2222
try:
2323
import coverage
24-
coverage.process_startup()
24+
cov = coverage.process_startup()
25+
if cov:
26+
import os
27+
28+
def patch_worker():
29+
# If it's a worker forked from the daemon, we need to patch it to save
30+
# the coverage data. Otherwise the worker will be killed by a signal and
31+
# the coverage data will not be saved.
32+
import sys
33+
frame = sys._getframe(1)
34+
if (
35+
frame.f_code.co_name == "manager" and
36+
"daemon.py" in frame.f_code.co_filename and
37+
"worker" in frame.f_globals
38+
):
39+
40+
def save_when_exit(func):
41+
def wrapper(*args, **kwargs):
42+
result = func(*args, **kwargs)
43+
cov.save()
44+
return result
45+
return wrapper
46+
47+
frame.f_globals["worker"] = save_when_exit(frame.f_globals["worker"])
48+
49+
os.register_at_fork(after_in_child=patch_worker)
50+
2551
except ImportError:
2652
pass

0 commit comments

Comments
 (0)