Skip to content

distributed.comm.tests.test_ucx_config.test_ucx_config_w_env_var flaky #5229

@fjetter

Description

@fjetter

I noticed the UCX test distributed.comm.tests.test_ucx_config.test_ucx_config_w_env_var to occasionally fail

E.g. https://gpuci.gpuopenanalytics.com/job/dask/job/distributed/job/prb/job/distributed-prb/234/CUDA_VER=11.2,LINUX_VER=ubuntu18.04,PYTHON_VER=3.8,RAPIDS_VER=21.10/testReport/junit/distributed.comm.tests/test_ucx_config/test_ucx_config_w_env_var/

Traceback
self = <Client: scheduler='ucx://172.17.0.3:13339'>

    async def _handle_report(self):
        """Listen to scheduler"""
        with log_errors():
            try:
                while True:
                    if self.scheduler_comm is None:
                        break
                    try:
>                       msgs = await self.scheduler_comm.comm.read()

distributed/client.py:1222: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <UCX Client->Scheduler local=None remote=ucx://172.17.0.3:13339>
deserializers = ('cuda', 'dask', 'pickle', 'error')

    async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
        with log_errors():
            if self.closed():
                raise CommClosedError("Endpoint is closed -- unable to read message")
    
            if deserializers is None:
                deserializers = ("cuda", "dask", "pickle", "error")
    
            try:
                # Recv meta data
    
                # Recv close flag and number of frames (_Bool, int64)
                msg = host_array(struct.calcsize("?Q"))
                await self.ep.recv(msg)
                (shutdown, nframes) = struct.unpack("?Q", msg)
    
                if shutdown:  # The writer is closing the connection
>                   raise CommClosedError("Connection closed by writer")
E                   distributed.comm.core.CommClosedError: Connection closed by writer

distributed/comm/ucx.py:255: CommClosedError

During handling of the above exception, another exception occurred:

cleanup = None
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f872c175550>
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7f87b936b2e0>

    def test_ucx_config_w_env_var(cleanup, loop, monkeypatch):
        size = "1000.00 MB"
        monkeypatch.setenv("DASK_RMM__POOL_SIZE", size)
    
        dask.config.refresh()
    
        port = "13339"
        sched_addr = f"ucx://{HOST}:{port}"
    
        with popen(
            ["dask-scheduler", "--no-dashboard", "--protocol", "ucx", "--port", port]
        ) as sched:
            with popen(
                [
                    "dask-worker",
                    sched_addr,
                    "--no-dashboard",
                    "--protocol",
                    "ucx",
                    "--no-nanny",
                ]
            ) as w:
                with Client(sched_addr, loop=loop, timeout=10) as c:
                    while not c.scheduler_info()["workers"]:
                        sleep(0.1)
    
                    # Check for RMM pool resource type
                    rmm_resource = c.run_on_scheduler(
                        rmm.mr.get_current_device_resource_type
                    )
                    assert rmm_resource == rmm.mr.PoolMemoryResource
    
                    worker_addr = list(c.scheduler_info()["workers"])[0]
                    worker_rmm_usage = c.run(rmm.mr.get_current_device_resource_type)
                    rmm_resource = worker_rmm_usage[worker_addr]
>                   assert rmm_resource == rmm.mr.PoolMemoryResource

distributed/comm/tests/test_ucx_config.py:118: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:1187: in __exit__
    self.close()
distributed/client.py:1430: in close
    sync(self.loop, self._close, fast=True, callback_timeout=timeout)
distributed/utils.py:325: in sync
    raise exc.with_traceback(tb)
distributed/utils.py:308: in f
    result[0] = yield future
/opt/conda/envs/dask/lib/python3.8/site-packages/tornado/gen.py:762: in run
    value = future.result()
/opt/conda/envs/dask/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
distributed/client.py:1339: in _close
    await asyncio.wait_for(
/opt/conda/envs/dask/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
distributed/client.py:1228: in _handle_report
    await self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Client: scheduler='ucx://172.17.0.3:13339'>

    async def _reconnect(self):
        with log_errors():
>           assert self.scheduler_comm.comm.closed()
E           AssertionError

distributed/client.py:1053: AssertionError

cc @dask/gpu

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions