Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] pyiceberg hanging on multiprocessing #1488

Open
3 tasks
frankliee opened this issue Jan 6, 2025 · 8 comments
Open
3 tasks

[BUG] pyiceberg hanging on multiprocessing #1488

frankliee opened this issue Jan 6, 2025 · 8 comments

Comments

@frankliee
Copy link
Contributor

frankliee commented Jan 6, 2025

Apache Iceberg version

0.7.1

Please describe the bug 🐞

The bad code: load table in the sub process.
The arr will never be printed due to process hanging

from multiprocessing import Process
worker_num = 2
def worker(tbl):
    arr = tbl.scan().to_arrow()
    print(arr)

from pyiceberg.catalog import load_catalog
catalog = load_catalog("mycatalog")
tbl = catalog.load_table("db.table")
workers = [Process(target=worker, args=(tbl, )) for worker_id in range(worker_num)]
[p.start() for p in workers]
[p.join() for p in workers]

the good code: load table in the main process

from multiprocessing import Process
worker_num = 2
def worker():
    from pyiceberg.catalog import load_catalog
    catalog = load_catalog("mycatalog")
    tbl = catalog.load_table("db.table")
    arr = tbl.scan().to_arrow()
    print(arr)

workers = [Process(target=worker, args=()) for worker_id in range(worker_num)]
[p.start() for p in workers]
[p.join() for p in workers]

The only difference is where to load the table.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@frankliee frankliee changed the title pyiceberg hanging on multiprocessing [BUG] pyiceberg hanging on multiprocessing Jan 6, 2025
@kevinjqliu
Copy link
Contributor

Hi @frankliee thanks for reporting this issue. I noticed you're using version 0.7.1, the latest version is 0.8.1. Could you retry with the latest version?

The issue might be with due to the ability to pickle Table object #513

The arr will never be printed due to process hanging

it would be helpful if you have a stacktrace to show where the hang happens.

@frankliee
Copy link
Contributor Author

@kevinjqliu
I use pystack to get stack of child process, it shows that pyarrow.FileSystem causes the hanging.
By the way, our env is not easy to upgrade pyiceberg to 8.0.1 immediately.

       for manifest_file in snapshot.manifests(self.io)
    (Python) File "/home/env/lib/python3.9/site-packages/pyiceberg/table/snapshots.py", line 255, in manifests
        return list(read_manifest_list(file))
    (Python) File "/home/env/lib/python3.9/site-packages/pyiceberg/manifest.py", line 659, in read_manifest_list
        with AvroFile[ManifestFile](
    (Python) File "/home/env/lib/python3.9/site-packages/pyiceberg/avro/file.py", line 170, in __enter__
        with self.input_file.open() as f:
    (Python) File "/home/env/lib/python3.9/site-packages/pyiceberg/io/pyarrow.py", line 272, in open
        input_file = self._filesystem.open_input_file(self._path)

@kevinjqliu
Copy link
Contributor

i suspect this is due to how we cache PyArrowFileIO

class PyArrowFileIO(FileIO):
fs_by_scheme: Callable[[str, Optional[str]], FileSystem]
def __init__(self, properties: Properties = EMPTY_DICT):
self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs)
super().__init__(properties=properties)

I dont think this is safe in multi-process env.

In the second example, load_catalog in the worker will create a new FileIO

def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO:
return load_file_io({**self.properties, **properties}, location)

@kevinjqliu
Copy link
Contributor

One thing we can test is to force create a new FileIO in the worker.
Something like this

from multiprocessing import Process
from pyiceberg.io.pyarrow import PyArrowFileIO

worker_num = 2
def worker(tbl):
    tbl.io = PyArrowFileIO(tbl.properties)
    arr = tbl.scan().to_arrow()
    print(arr)

from pyiceberg.catalog import load_catalog
catalog = load_catalog("mycatalog")
tbl = catalog.load_table("db.table")
workers = [Process(target=worker, args=(tbl, )) for worker_id in range(worker_num)]
[p.start() for p in workers]
[p.join() for p in workers]

Can you try this out?

@frankliee
Copy link
Contributor Author

frankliee commented Jan 9, 2025

One thing we can test is to force create a new FileIO in the worker. Something like this

from multiprocessing import Process
from pyiceberg.io.pyarrow import PyArrowFileIO

worker_num = 2
def worker(tbl):
    tbl.io = PyArrowFileIO(tbl.properties)
    arr = tbl.scan().to_arrow()
    print(arr)

from pyiceberg.catalog import load_catalog
catalog = load_catalog("mycatalog")
tbl = catalog.load_table("db.table")
workers = [Process(target=worker, args=(tbl, )) for worker_id in range(worker_num)]
[p.start() for p in workers]
[p.join() for p in workers]

Can you try this out?

I have tried to renew PyArrowFileIO in the worker subprocess, but it is still blocked on input_file = self._filesystem.open_input_file(self._path).

By the way, I have add some log to check that _initialize_fs is called in the worker process.

main pid is 10087
pid 10087 init fs
worker pid is 10377
pid 10377 init fs
......

@frankliee
Copy link
Contributor Author

I use strace on the worker process, there are FUTEX_WAIT_BITSET_PRIVATE, and I not sure it was dead locking caused by process forking in pyarrow.

Then I find that using "spawn" process could avoid hanging.

from multiprocessing import Process
from pyiceberg.io.pyarrow import PyArrowFileIO
import multiprocessing as mp

worker_num = 2
def worker(tbl):
    tbl.io = PyArrowFileIO(tbl.properties)
    arr = tbl.scan().to_arrow()
    print(arr)

if __name__ == "__main__":
    ctx = mp.get_context("spawn")
    from pyiceberg.catalog import load_catalog
    catalog = load_catalog("mycatalog")
    tbl = catalog.load_table("db.table")
    workers = [ctx.Process(target=worker, args=(tbl, )) for worker_id in range(worker_num)]
    [p.start() for p in workers]
    [p.join() for p in workers]

@kevinjqliu

@kevinjqliu
Copy link
Contributor

Glad you were able to find a working solution. In general, the FileIO is loaded/retrieve from the catalog object
So it would be best to recreate the catalog on the workers and not share anything in common.

I use strace on the worker process, there are FUTEX_WAIT_BITSET_PRIVATE, and I not sure it was dead locking caused by process forking in pyarrow.

Arrow has an issue related to FUTEX_WAKE_PRIVATE apache/arrow#15233

Also from multiprocessing docs

spawn
The parent process starts a fresh Python interpreter process. The child process will only inherit those resources necessary to run the process object’s [run()](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.run) method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.

It sounds like in the case of spawn, the child process will recreate all objects including the catalog

@frankliee
Copy link
Contributor Author

apache/arrow#15233

When we use multi-process to boost iceberg reading, we cannot recreate catalog on the workers because planning will be invoked on each process. So we have to planning on the main process, and then read parquet files on each worker with a subset of file list.
By the way, we found multi-thread cannot fully exploits CPUs on many cores, so we leverage multi-process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants