Skip to content
This repository has been archived by the owner on Sep 5, 2023. It is now read-only.

Exception: Java gateway process exited before sending its port number #2

Open
ckingbailey opened this issue Jul 6, 2020 · 0 comments

Comments

@ckingbailey
Copy link

I found your Docker image from issue #25 on the aws-glue-libs repo. It looks like it could be very helpful for a couple of different contexts, but I'm having trouble getting it running.

I ran the curl command from the readme to create the aliases in my .zshrc. When I run glue pytest I get the error, "Exception: Java gateway process exited before sending its port number".

The issue seems to be with a Pytest fixture I have for SparkContext. I have two fixtures for Spark objects that I got from this blog post. They both work with a local PySpark install and local OpenJDK install on my machine.

I have a suite of tests that run using aforementioned local install of PySpark & OpenJDK. I'd like to use your Docker image to run the tests on CircleCI, and also to enable other developers to run my tests without the headache of setting up Spark and OpenJDK locally.

The complete error output is:

_________________________________________________________________________________ ERROR at setup of test_filter_report_by_org_list _________________________________________________________________________________

request = <SubRequest 'sc' for <Function test_compose_fully_qualified_table_name>>

    @pytest.fixture(scope="session", name="sc")
    def spark_context(request):
        """ fixture for creating a Spark context
        Args:
            request: pytest.FixtureRequest object
        """
        spark_conf = (SparkConf().setMaster("local[*]").setAppName("sparta-monthly-reports"))
>       sc = SparkContext(conf=spark_conf)

tests/conftest.py:22:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/pyspark/context.py:133: in __init__
    SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
/opt/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/pyspark/context.py:316: in _ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
/opt/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/pyspark/java_gateway.py:46: in launch_gateway
    return _launch_gateway(conf)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

conf = <pyspark.conf.SparkConf object at 0x7f54a3e83b70>, insecure = False

    def _launch_gateway(conf=None, insecure=False):
        """
        launch jvm gateway
        :param conf: spark configuration passed to spark-submit
        :param insecure: True to create an insecure gateway; only for testing
        :return: a JVM gateway
        """
        if insecure and os.environ.get("SPARK_TESTING", "0") != "1":
            raise ValueError("creating insecure gateways is only for testing")
        if "PYSPARK_GATEWAY_PORT" in os.environ:
            gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
            gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
        else:
            SPARK_HOME = _find_spark_home()
            # Launch the Py4j gateway using Spark's run command so that we pick up the
            # proper classpath and settings from spark-env.sh
            on_windows = platform.system() == "Windows"
            script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
            command = [os.path.join(SPARK_HOME, script)]
            if conf:
                for k, v in conf.getAll():
                    command += ['--conf', '%s=%s' % (k, v)]
            submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
            if os.environ.get("SPARK_TESTING"):
                submit_args = ' '.join([
                    "--conf spark.ui.enabled=false",
                    submit_args
                ])
            command = command + shlex.split(submit_args)

            # Create a temporary directory where the gateway server should write the connection
            # information.
            conn_info_dir = tempfile.mkdtemp()
            try:
                fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
                os.close(fd)
                os.unlink(conn_info_file)

                env = dict(os.environ)
                env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file
                if insecure:
                    env["_PYSPARK_CREATE_INSECURE_GATEWAY"] = "1"

                # Launch the Java gateway.
                # We open a pipe to stdin so that the Java gateway can die when the pipe is broken
                if not on_windows:
                    # Don't send ctrl-c / SIGINT to the Java gateway:
                    def preexec_func():
                        signal.signal(signal.SIGINT, signal.SIG_IGN)
                    proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
                else:
                    # preexec_fn not supported on Windows
                    proc = Popen(command, stdin=PIPE, env=env)

                # Wait for the file to appear, or for the process to exit, whichever happens first.
                while not proc.poll() and not os.path.isfile(conn_info_file):
                    time.sleep(0.1)

                if not os.path.isfile(conn_info_file):
>                   raise Exception("Java gateway process exited before sending its port number")
E                   Exception: Java gateway process exited before sending its port number

/opt/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/pyspark/java_gateway.py:108: Exception

conftest.py looks like:

import pytest
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import logging

def quiet_py4j():
    """ turn down spark logging for the test context """
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)

@pytest.fixture(scope='session', name='test_data_filepath')
def data_path():
    return './tests/fixtures/test_data/'

@pytest.fixture(scope="session", name="sc")
def spark_context(request):
    """ fixture for creating a Spark context
    Args:
        request: pytest.FixtureRequest object
    """
    spark_conf = (SparkConf().setMaster("local[*]").setAppName("sparta-monthly-reports"))
    sc = SparkContext(conf=spark_conf)
    request.addfinalizer(lambda: sc.stop())

    quiet_py4j()
    return sc

@pytest.fixture(scope="session", name="spark_session")
def spark_session(sc):
    """ fixture for creating SparkSession
    Args:
        sc: spark_context fixture
    """
    quiet_py4j()
    return SparkSession(sc)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant