From ec75857b70dad699e3702fc67067b60488773672 Mon Sep 17 00:00:00 2001 From: Adrian Chaves Date: Wed, 29 Apr 2026 12:18:37 +0200 Subject: [PATCH] Add SHUB_JOBQ support --- scrapinghub/hubstorage/client.py | 23 +++++++++++++++-- scrapinghub/hubstorage/job.py | 3 ++- scrapinghub/hubstorage/project.py | 3 ++- tests/test_jobq_client.py | 43 +++++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 4 deletions(-) create mode 100644 tests/test_jobq_client.py diff --git a/scrapinghub/hubstorage/client.py b/scrapinghub/hubstorage/client.py index 508c9445..72d2a556 100644 --- a/scrapinghub/hubstorage/client.py +++ b/scrapinghub/hubstorage/client.py @@ -45,6 +45,15 @@ def _get_package_version(): return __version__ +class _JobQClientProxy: + + def __init__(self, client, endpoint): + self.auth = client.auth + self.endpoint = endpoint + self.use_msgpack = False + self.request = client.request + + class HubstorageClient(object): DEFAULT_ENDPOINT = 'https://storage.scrapinghub.com/' @@ -60,7 +69,7 @@ class HubstorageClient(object): def __init__(self, auth=None, endpoint=None, connection_timeout=None, max_retries=None, max_retry_time=None, user_agent=None, - use_msgpack=True): + use_msgpack=True, *, jobq_endpoint=None): """ Note: max_retries and max_retry_time change how the client attempt to retry failing requests that are @@ -78,14 +87,24 @@ def __init__(self, auth=None, endpoint=None, connection_timeout=None, max_retries (int): The number of time idempotent requests may be retried max_retry_time (int): The time, in seconds, during which the client can retry a request use_msgpack (bool): Flag to enable/disable msgpack use for serialization + jobq_endpoint (str, optional): The JobQ API root address. + Keyword-only argument. If not provided, it will be read from + the ``SHUB_JOBQ`` environment variable, or fall back to the + value of ``endpoint``. """ self.auth = xauth(auth) self.endpoint = endpoint or os.getenv("SHUB_STORAGE", self.DEFAULT_ENDPOINT) + self._jobq_endpoint = ( + jobq_endpoint or + os.getenv("SHUB_JOBQ") or + self.endpoint + ) self.connection_timeout = connection_timeout or self.DEFAULT_CONNECTION_TIMEOUT_S self.user_agent = user_agent or self.DEFAULT_USER_AGENT self.session = self._create_session() self.retrier = self._create_retrier(max_retries, max_retry_time) - self.jobq = JobQ(self, None) + self._jobq_client = _JobQClientProxy(self, self._jobq_endpoint) + self.jobq = JobQ(self._jobq_client, None) self.projects = Projects(self, None) self.root = ResourceType(self, None) self._batchuploader = None diff --git a/scrapinghub/hubstorage/job.py b/scrapinghub/hubstorage/job.py index 0939b249..f1c99c95 100644 --- a/scrapinghub/hubstorage/job.py +++ b/scrapinghub/hubstorage/job.py @@ -18,7 +18,8 @@ def __init__(self, client, key, auth=None, jobauth=None, metadata=None): self.logs = Logs(client, self.key, self.auth) self.samples = Samples(client, self.key, self.auth) self.requests = Requests(client, self.key, self.auth) - self.jobq = JobQ(client, self.key.split('/')[0], auth) + jobq_client = getattr(client, '_jobq_client', client) + self.jobq = JobQ(jobq_client, self.key.split('/')[0], auth) def close_writers(self): wl = [self.items, self.logs, self.samples, self.requests] diff --git a/scrapinghub/hubstorage/project.py b/scrapinghub/hubstorage/project.py index 9d8fa3ac..62801605 100644 --- a/scrapinghub/hubstorage/project.py +++ b/scrapinghub/hubstorage/project.py @@ -21,7 +21,8 @@ def __init__(self, client, projectid, auth=None): self.items = Items(client, self.projectid, auth=auth) self.logs = Logs(client, self.projectid, auth=auth) self.samples = Samples(client, self.projectid, auth=auth) - self.jobq = JobQ(client, self.projectid, auth=auth) + jobq_client = getattr(client, '_jobq_client', client) + self.jobq = JobQ(jobq_client, self.projectid, auth=auth) self.activity = Activity(client, self.projectid, auth=auth) self.collections = Collections(client, self.projectid, auth=auth) self.frontier = Frontier(client, self.projectid, auth=auth) diff --git a/tests/test_jobq_client.py b/tests/test_jobq_client.py new file mode 100644 index 00000000..f5c5290b --- /dev/null +++ b/tests/test_jobq_client.py @@ -0,0 +1,43 @@ +from scrapinghub import HubstorageClient + + +def test_hubstorage_jobq_defaults_to_storage_endpoint(monkeypatch): + monkeypatch.delenv('SHUB_JOBQ', raising=False) + client = HubstorageClient(auth='apikey', endpoint='https://storage.example/') + assert client.jobq.url.startswith('https://storage.example/') + + +def test_hubstorage_jobq_endpoint_uses_env_var(monkeypatch): + monkeypatch.setenv('SHUB_JOBQ', 'https://jobq-internal.zyte.com/') + client = HubstorageClient(auth='apikey', endpoint='https://storage.example/') + assert client.jobq.url.startswith('https://jobq-internal.zyte.com/') + + +def test_hubstorage_jobq_endpoint_argument(monkeypatch): + monkeypatch.delenv('SHUB_JOBQ', raising=False) + client = HubstorageClient( + auth='apikey', + endpoint='https://storage.example/', + jobq_endpoint='https://jobq.example/', + ) + assert client.jobq.url.startswith('https://jobq.example/') + + +def test_hubstorage_connection_timeout_positional_compatibility(monkeypatch): + monkeypatch.delenv('SHUB_JOBQ', raising=False) + client = HubstorageClient('apikey', 'https://storage.example/', 12) + assert client.connection_timeout == 12 + assert client.jobq.url.startswith('https://storage.example/') + + +def test_project_and_job_jobq_use_configured_endpoint(): + client = HubstorageClient( + auth='apikey', + endpoint='https://storage.example/', + jobq_endpoint='https://jobq.example/', + ) + project = client.get_project('123') + job = client.get_job('123/1/1') + + assert project.jobq.url.startswith('https://jobq.example/') + assert job.jobq.url.startswith('https://jobq.example/')