From a2eef4a88a633873eb2c5f3fa6e7f4fa3cd67306 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 5 Jun 2026 18:23:23 +0000 Subject: [PATCH 1/4] Add Scrapy-style spiders: Request/callbacks, item pipelines, rules, sitemaps Closes the main "framework" gaps vs Scrapy, built on the existing async engine (httpx, retries, rate limiting, robots.txt, de-duplication). - crawley.spider: `Request` (callback, meta, cb_kwargs, headers, priority, dont_filter, errback, fingerprint/replace), `Item`, and a callback-driven `Spider` (parse/start_requests/on_item, depth tracking, fingerprint de-dup). - response.follow()/response.meta and response.request for list->detail crawls. - crawley.pipelines: `ItemPipeline` + `DropItem`; spiders run items through the pipeline chain (open_spider/close_spider/process_item, sync or async). - crawley.spiders: `LinkExtractor` (allow/deny/restrict_xpaths/restrict_css), `Rule`, `CrawlSpider` (rule-based following) and `SitemapSpider` (sitemap.xml + sitemap index). - RequestManager.make_request accepts per-request headers. - Extractors parse from bytes so XML-with-declaration (sitemaps) is handled. Tests (169 -> 180): test_spider, test_spiders; conftest serves /sitemap.xml. --- crawley/__init__.py | 21 ++++ crawley/extractors.py | 10 +- crawley/http/managers.py | 14 ++- crawley/http/response.py | 23 ++++ crawley/pipelines.py | 28 +++++ crawley/spider.py | 244 +++++++++++++++++++++++++++++++++++++++ crawley/spiders.py | 157 +++++++++++++++++++++++++ tests/conftest.py | 20 ++++ tests/test_spider.py | 136 ++++++++++++++++++++++ tests/test_spiders.py | 84 ++++++++++++++ 10 files changed, 728 insertions(+), 9 deletions(-) create mode 100644 crawley/pipelines.py create mode 100644 crawley/spider.py create mode 100644 crawley/spiders.py create mode 100644 tests/test_spider.py create mode 100644 tests/test_spiders.py diff --git a/crawley/__init__.py b/crawley/__init__.py index b1c7628..6660c0a 100644 --- a/crawley/__init__.py +++ b/crawley/__init__.py @@ -34,6 +34,15 @@ "parse", "Document", "Element", + "Spider", + "Request", + "Item", + "DropItem", + "ItemPipeline", + "LinkExtractor", + "Rule", + "CrawlSpider", + "SitemapSpider", "__version__", ] @@ -66,6 +75,18 @@ def __getattr__(name): from crawley.http.response import Response return Response + if name in ("Spider", "Request", "Item"): + from crawley import spider + + return getattr(spider, name) + if name in ("DropItem", "ItemPipeline"): + from crawley import pipelines + + return getattr(pipelines, name) + if name in ("LinkExtractor", "Rule", "CrawlSpider", "SitemapSpider"): + from crawley import spiders + + return getattr(spiders, name) if name in ("fetch", "afetch", "afetch_all", "scrape", "parse", "Document", "Element"): from crawley import scraping diff --git a/crawley/extractors.py b/crawley/extractors.py index e03b3d7..06343da 100644 --- a/crawley/extractors.py +++ b/crawley/extractors.py @@ -7,7 +7,7 @@ from __future__ import annotations -from io import StringIO +from io import BytesIO from typing import Any from lxml import etree @@ -33,7 +33,9 @@ class XPathExtractor(BaseExtractor): def get_object(self, data: str) -> Any: parser = etree.HTMLParser() - return etree.parse(StringIO(data), parser) + # Parse from bytes so documents carrying an XML encoding declaration + # (e.g. sitemaps) don't raise "Unicode strings with encoding declaration". + return etree.parse(BytesIO(data.encode("utf-8")), parser) class CSSExtractor(BaseExtractor): @@ -45,7 +47,9 @@ class CSSExtractor(BaseExtractor): def get_object(self, data: str) -> Any: parser = etree.HTMLParser() - return etree.parse(StringIO(data), parser) + # Parse from bytes so documents carrying an XML encoding declaration + # (e.g. sitemaps) don't raise "Unicode strings with encoding declaration". + return etree.parse(BytesIO(data.encode("utf-8")), parser) class RawExtractor(BaseExtractor): diff --git a/crawley/http/managers.py b/crawley/http/managers.py index 1055852..5d73ae6 100644 --- a/crawley/http/managers.py +++ b/crawley/http/managers.py @@ -85,19 +85,20 @@ async def aclose(self): # -- requests -------------------------------------------------------- - def _get_request(self, url): + def _get_request(self, url, headers=None): host = urllib.parse.urlparse(url).netloc self.host_counter.increase(host) + merged = {**self.headers, **(headers or {})} return DelayedRequest( url=url, - headers=self.headers, + headers=merged, delay=self.delay, deviation=self.deviation, ) - async def make_request(self, url, data=None, extractor=None): + async def make_request(self, url, data=None, extractor=None, headers=None): """Issue a request and wrap the result in a :class:`Response`.""" - request = self._get_request(url) + request = self._get_request(url, headers) host = urllib.parse.urlparse(url).netloc semaphore = self.rate_limiter.semaphore(host) @@ -152,7 +153,8 @@ async def get_response(self, request, data): class FastRequestManager(RequestManager): """A request manager without per-request delays.""" - def _get_request(self, url): + def _get_request(self, url, headers=None): host = urllib.parse.urlparse(url).netloc self.host_counter.increase(host) - return Request(url=url, headers=self.headers) + merged = {**self.headers, **(headers or {})} + return Request(url=url, headers=merged) diff --git a/crawley/http/response.py b/crawley/http/response.py index 13b98f2..2fc0447 100644 --- a/crawley/http/response.py +++ b/crawley/http/response.py @@ -31,6 +31,7 @@ def __init__( self.html = extracted_html self.url = url self.response = response + self.request: Any = None self._doc: Optional[Document] = None if response is not None: @@ -72,5 +73,27 @@ def extract(self, rules: dict) -> dict: """Shortcut for ``response.doc.extract(rules)``.""" return self.doc.extract(rules) + @property + def meta(self) -> dict: + """The ``meta`` dict carried by the originating request (if any).""" + if self.request is not None: + return self.request.meta + return {} + + def follow(self, url: str, callback: Any = None, **kwargs: Any) -> Any: + """Build a :class:`~crawley.spider.Request` to a (possibly relative) url. + + Relative urls are resolved against this response's url, and ``meta`` is + inherited from the current request unless overridden. + """ + from urllib.parse import urljoin + + from crawley.spider import Request + + absolute = urljoin(self.url or "", url) + if "meta" not in kwargs and self.request is not None: + kwargs["meta"] = dict(self.request.meta) + return Request(absolute, callback=callback, **kwargs) + def __repr__(self) -> str: return "" % (self.status_code, self.url) diff --git a/crawley/pipelines.py b/crawley/pipelines.py new file mode 100644 index 0000000..de5c1de --- /dev/null +++ b/crawley/pipelines.py @@ -0,0 +1,28 @@ +"""Item pipelines. + +A pipeline post-processes every item yielded by a :class:`~crawley.spider.Spider` +callback: validate, clean, de-duplicate or store it. Pipelines are tried in +order; raising :class:`DropItem` discards the item. +""" + +from __future__ import annotations + +from typing import Any + + +class DropItem(Exception): + """Raise from ``process_item`` to discard the current item.""" + + +class ItemPipeline: + """Base class for item pipelines (all methods are optional).""" + + def open_spider(self, spider: Any) -> None: + """Called once when the spider starts.""" + + def close_spider(self, spider: Any) -> None: + """Called once when the spider finishes.""" + + def process_item(self, item: Any, spider: Any) -> Any: + """Return the (possibly transformed) item, or raise :class:`DropItem`.""" + return item diff --git a/crawley/spider.py b/crawley/spider.py new file mode 100644 index 0000000..4c0b520 --- /dev/null +++ b/crawley/spider.py @@ -0,0 +1,244 @@ +"""Scrapy-style spiders: requests with callbacks, items and pipelines. + +This sits on top of :class:`~crawley.crawlers.base.BaseCrawler` (reusing its +async engine: httpx, retries, rate limiting, robots.txt, de-duplication) but +exposes the more powerful, callback-driven model:: + + from crawley.spider import Spider, Request + + class BlogSpider(Spider): + name = "blog" + start_urls = ["https://example.com/"] + + def parse(self, response): + for href in response.css("a.post::attr(href)"): + yield response.follow(href, callback=self.parse_post) + nxt = response.css_first("a.next::attr(href)") + if nxt: + yield response.follow(nxt) # defaults to parse + + def parse_post(self, response): + yield { + "title": response.css_first("h1").text, + "url": response.url, + } +""" + +from __future__ import annotations + +import hashlib +import inspect +import logging +from typing import Any, Optional + +from crawley.crawlers.base import BaseCrawler +from crawley.multiprogramming.pool import AsyncPool +from crawley.pipelines import DropItem + +log = logging.getLogger("crawley.spider") + + +class Item(dict): + """A scraped item. Just a ``dict`` you may subclass for clarity.""" + + +class Request: + """A scheduled HTTP request with a callback to process its response.""" + + def __init__( + self, + url: str, + callback: Any = None, + method: str = "GET", + data: Any = None, + headers: Optional[dict] = None, + meta: Optional[dict] = None, + cb_kwargs: Optional[dict] = None, + priority: int = 0, + dont_filter: bool = False, + errback: Any = None, + ) -> None: + self.url = url + self.callback = callback + self.method = method.upper() + self.data = data + self.headers = headers or {} + self.meta = meta if meta is not None else {} + self.cb_kwargs = cb_kwargs or {} + self.priority = priority + self.dont_filter = dont_filter + self.errback = errback + + def replace(self, **kwargs: Any) -> "Request": + """Return a copy of this request with some attributes replaced.""" + attrs = { + "url": self.url, + "callback": self.callback, + "method": self.method, + "data": self.data, + "headers": self.headers, + "meta": self.meta, + "cb_kwargs": self.cb_kwargs, + "priority": self.priority, + "dont_filter": self.dont_filter, + "errback": self.errback, + } + attrs.update(kwargs) + return Request(**attrs) + + def fingerprint(self) -> str: + """A stable fingerprint (method + url + body) used for de-duplication.""" + raw = "%s|%s|%s" % (self.method, self.url, self.data or "") + return hashlib.sha1(raw.encode("utf-8")).hexdigest() + + def __repr__(self) -> str: + return "" % (self.method, self.url) + + +class Spider(BaseCrawler): + """A callback-driven spider. + + Define :meth:`parse` (the default callback) and yield :class:`Request` + objects (or :func:`response.follow(...)`) to crawl further, and dicts / + :class:`Item` objects to emit data. + """ + + name = "spider" + + pipelines: list = [] + """Item pipeline classes applied, in order, to every emitted item.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._pipelines = [pipe() for pipe in self.pipelines] + + # -- overridables ---------------------------------------------------- + + def parse(self, response: Any) -> Any: + """Default callback. Override to extract data and follow links.""" + return None + + def start_requests(self) -> Any: + """Yield the initial requests (defaults to ``start_urls``).""" + for url in self.start_urls: + yield Request(url, callback=self.parse) + + def on_item(self, item: Any) -> None: + """Called for every item that survives the pipelines.""" + + # -- engine ---------------------------------------------------------- + + async def start(self) -> None: + self.pool = AsyncPool(self.max_concurrency_level) + self._seen = set() + + self.on_start() + for pipe in self._pipelines: + pipe.open_spider(self) + + await self._login() + + for request in self.start_requests(): + self._schedule(request, depth=0) + + try: + await self.pool.join() + finally: + await self.request_manager.aclose() + + for pipe in self._pipelines: + pipe.close_spider(self) + self.on_finish() + + def _schedule(self, request: Request, depth: int) -> None: + # The engine is authoritative over depth (meta may be inherited via + # response.follow, carrying the parent's depth). + request.meta["depth"] = depth + + if not self._validate_url(request.url): + return + + if not request.dont_filter and self.unique_urls: + fingerprint = request.fingerprint() + if fingerprint in self._seen: + return + self._seen.add(fingerprint) + + self.pool.spawn(self._handle(request)) + + async def _handle(self, request: Request) -> None: + url = request.url + + if self.respect_robots and not await self._robots_allowed(url): + self.on_robots_blocked(url) + return + + if self.debug: + log.info("crawling -> %s", url) + + try: + response = await self.request_manager.make_request( + url, + data=request.data, + extractor=self.extractor, + headers=request.headers, + ) + except Exception as ex: # noqa: BLE001 - routed to the errback/handler + self._handle_error(request, ex) + return + + response.request = request + callback = request.callback or self.parse + await self._drive_callback(callback, response) + + async def _drive_callback(self, callback: Any, response: Any) -> None: + depth = response.meta.get("depth", 0) + cb_kwargs = response.request.cb_kwargs if response.request else {} + result = callback(response, **cb_kwargs) + + if result is None: + return + + if inspect.isasyncgen(result): + async for out in result: + await self._handle_output(out, depth) + elif inspect.iscoroutine(result): + res = await result + for out in self._as_iterable(res): + await self._handle_output(out, depth) + else: + for out in self._as_iterable(result): + await self._handle_output(out, depth) + + @staticmethod + def _as_iterable(value: Any) -> Any: + if value is None: + return [] + if isinstance(value, (dict, Request)): + return [value] + if hasattr(value, "__iter__"): + return value + return [value] + + async def _handle_output(self, out: Any, depth: int) -> None: + if isinstance(out, Request): + if self.max_depth != -1 and depth >= self.max_depth: + return + self._schedule(out, depth=depth + 1) + else: + await self._process_item(out) + + async def _process_item(self, item: Any) -> None: + try: + for pipe in self._pipelines: + res = pipe.process_item(item, self) + item = await res if inspect.iscoroutine(res) else res + except DropItem: + return + self.on_item(item) + + def _handle_error(self, request: Request, ex: Exception) -> None: + if request.errback is not None: + request.errback(request, ex) + else: + self.on_request_error(request.url, ex) diff --git a/crawley/spiders.py b/crawley/spiders.py new file mode 100644 index 0000000..47ea49a --- /dev/null +++ b/crawley/spiders.py @@ -0,0 +1,157 @@ +"""Higher-level spiders: rule-based crawling and sitemap crawling. + +- :class:`LinkExtractor` extracts (and filters) links from a response. +- :class:`Rule` binds a link extractor to a callback / follow behaviour. +- :class:`CrawlSpider` follows links automatically according to its ``rules``. +- :class:`SitemapSpider` seeds the crawl from one or more ``sitemap.xml`` files. +""" + +from __future__ import annotations + +import re +from typing import Any, Optional + +from lxml import etree + +from crawley.spider import Request, Spider + + +def _as_tuple(value: Any) -> tuple: + if value is None: + return () + if isinstance(value, (list, tuple, set)): + return tuple(value) + return (value,) + + +class LinkExtractor: + """Extract links from a response, filtered by allow/deny rules.""" + + def __init__( + self, + allow: Any = (), + deny: Any = (), + restrict_xpaths: Any = (), + restrict_css: Any = (), + tags: Any = ("a",), + attrs: Any = ("href",), + unique: bool = True, + ) -> None: + self.allow = [re.compile(p) for p in _as_tuple(allow)] + self.deny = [re.compile(p) for p in _as_tuple(deny)] + self.restrict_xpaths = _as_tuple(restrict_xpaths) + self.restrict_css = _as_tuple(restrict_css) + self.tags = _as_tuple(tags) + self.attrs = _as_tuple(attrs) + self.unique = unique + + def _allowed(self, href: str) -> bool: + if any(r.search(href) for r in self.deny): + return False + if self.allow and not any(r.search(href) for r in self.allow): + return False + return True + + def extract_links(self, response: Any) -> list: + """Return the (absolute, filtered) links found in *response*.""" + doc = response.doc + + roots: list = [] + for xpath in self.restrict_xpaths: + roots.extend( + el for el in doc.xpath(xpath) if hasattr(el, "css") + ) + for css in self.restrict_css: + roots.extend(doc.css(css)) + if not roots: + roots = [doc] + + links: list = [] + for root in roots: + for tag in self.tags: + for element in root.css(tag): + for attr in self.attrs: + href = element.attr(attr) + if not href or not self._allowed(href): + continue + if self.unique and href in links: + continue + links.append(href) + return links + + +class Rule: + """Bind a :class:`LinkExtractor` to a callback and/or a follow behaviour.""" + + def __init__( + self, + link_extractor: Optional[LinkExtractor] = None, + callback: Any = None, + follow: Optional[bool] = None, + ) -> None: + self.link_extractor = link_extractor or LinkExtractor() + self.callback = callback + # Like Scrapy: follow defaults to True when there is no callback. + self.follow = follow if follow is not None else (callback is None) + + +class CrawlSpider(Spider): + """A spider that follows links according to a list of :class:`Rule`.""" + + rules: list = [] + + def parse(self, response: Any) -> Any: + return self._apply_rules(response) + + def _apply_rules(self, response: Any) -> Any: + seen = set() + for index, rule in enumerate(self.rules): + for href in rule.link_extractor.extract_links(response): + if href in seen: + continue + seen.add(href) + yield response.follow( + href, callback=self._crawl, cb_kwargs={"rule_index": index} + ) + + def _crawl(self, response: Any, rule_index: int) -> Any: + rule = self.rules[rule_index] + + if rule.callback is not None: + callback = ( + getattr(self, rule.callback) + if isinstance(rule.callback, str) + else rule.callback + ) + result = callback(response) + if result is not None: + yield from result + + if rule.follow: + yield from self._apply_rules(response) + + +class SitemapSpider(Spider): + """Seed the crawl from ``sitemap.xml`` files (incl. sitemap indexes).""" + + sitemap_urls: list = [] + + def start_requests(self) -> Any: + for url in self.sitemap_urls: + yield Request(url, callback=self._parse_sitemap) + + def _parse_sitemap(self, response: Any) -> Any: + try: + root = etree.fromstring((response.raw_html or "").encode("utf-8")) + except etree.XMLSyntaxError: + return + + is_index = etree.QName(root).localname == "sitemapindex" + for loc in root.iter("{*}loc"): + url = (loc.text or "").strip() + if not url: + continue + if is_index: + yield Request(url, callback=self._parse_sitemap) + else: + yield Request(url, callback=self.parse) diff --git a/tests/conftest.py b/tests/conftest.py index dafb7fc..4ae3d1d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -178,10 +178,30 @@ def _send(self, body, status=200): self.end_headers() self.wfile.write(payload) + def _send_xml(self, body): + payload = body.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/xml; charset=utf-8") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + def do_GET(self): path = urlparse(self.path).path if path == "/robots.txt": self._send("User-agent: *\nAllow: /\n") + elif path == "/sitemap.xml": + host = self.headers.get("Host", "") + locs = "".join( + "http://%s/page/%d/" % (host, n) + for n in (1, 2, 3) + ) + self._send_xml( + '' + '' + + locs + + "" + ) elif path == "/" or path == "": self._send(quotes_page(1)) elif path.startswith("/page/"): diff --git a/tests/test_spider.py b/tests/test_spider.py new file mode 100644 index 0000000..1b023b9 --- /dev/null +++ b/tests/test_spider.py @@ -0,0 +1,136 @@ +"""Tests for the callback-driven Spider, Request and item pipelines.""" + +from crawley.pipelines import DropItem, ItemPipeline +from crawley.spider import Request, Spider + + +def test_request_fingerprint_and_replace(): + a = Request("http://x/", method="get") + assert a.method == "GET" + b = Request("http://x/") + assert a.fingerprint() == b.fingerprint() + assert Request("http://x/", data="q=1").fingerprint() != a.fingerprint() + + c = a.replace(url="http://y/") + assert c.url == "http://y/" and a.url == "http://x/" + + +async def test_spider_follows_pagination_and_emits_items(quotes_server): + items = [] + + class QSpider(Spider): + start_urls = [quotes_server] + requests_delay = 0 + requests_deviation = 0 + + def parse(self, response): + for quote in response.css("div.quote"): + yield { + "text": quote.css_first("span.text").text, + "depth": response.meta.get("depth"), + } + nxt = response.css_first("li.next a") + if nxt: + yield response.follow(nxt.attr("href")) + + def on_item(self, item): + items.append(item) + + await QSpider().start() + + assert len(items) == 6 + assert {it["depth"] for it in items} == {0, 1, 2} + + +async def test_meta_and_cb_kwargs(quotes_server): + class MSpider(Spider): + start_urls = [quotes_server] + requests_delay = 0 + + def parse(self, response): + yield response.follow( + "/page/2/", callback=self.parse_2, + meta={"tag": "x"}, cb_kwargs={"n": 42}, + ) + + def parse_2(self, response, n=None): + self.captured = (response.meta.get("tag"), n) + + spider = MSpider() + await spider.start() + assert spider.captured == ("x", 42) + + +async def test_pipeline_transforms_and_drops(quotes_server): + kept = [] + + class OnlyFirst(ItemPipeline): + def open_spider(self, spider): + spider.opened = True + + def process_item(self, item, spider): + if item["text"].endswith("1-2"): + raise DropItem() + item["text"] = item["text"].upper() + return item + + class QSpider(Spider): + start_urls = [quotes_server] + pipelines = [OnlyFirst] + requests_delay = 0 + + def parse(self, response): + for quote in response.css("div.quote"): + yield {"text": quote.css_first("span.text").text} + + def on_item(self, item): + kept.append(item["text"]) + + spider = QSpider() + await spider.start() + + assert spider.opened is True + assert "QUOTE 1-1" in kept + assert all(not t.endswith("1-2") for t in kept) # dropped items + + +async def test_dont_filter_allows_duplicates(quotes_server): + hits = [] + + class DSpider(Spider): + start_urls = [quotes_server] + requests_delay = 0 + + def start_requests(self): + yield Request(quotes_server, callback=self.parse) + yield Request(quotes_server, callback=self.parse, dont_filter=True) + + def parse(self, response): + hits.append(response.url) + + await DSpider().start() + assert len(hits) == 2 + + +async def test_errback_is_called(): + errors = [] + + class ESpider(Spider): + start_urls = [] + requests_delay = 0 + + def start_requests(self): + yield Request( + "http://127.0.0.1:1/down", callback=self.parse, errback=self.err + ) + + def parse(self, response): + pass + + def err(self, request, ex): + errors.append(request.url) + + spider = ESpider() + spider.retry_policy.max_retries = 0 + await spider.start() + assert errors == ["http://127.0.0.1:1/down"] diff --git a/tests/test_spiders.py b/tests/test_spiders.py new file mode 100644 index 0000000..abe35d9 --- /dev/null +++ b/tests/test_spiders.py @@ -0,0 +1,84 @@ +"""Tests for LinkExtractor, CrawlSpider and SitemapSpider.""" + +from crawley.http.response import Response +from crawley.spiders import CrawlSpider, LinkExtractor, Rule, SitemapSpider + +HTML = """ + + + + +""" + + +def _response(url="http://site.test/"): + return Response(raw_html=HTML, url=url) + + +def test_link_extractor_allow_deny(): + le = LinkExtractor(allow=[r"/page/"], deny=[r"/page/3"]) + links = le.extract_links(_response()) + assert "http://site.test/page/2/" in links + assert "http://site.test/page/3/" not in links + assert "http://site.test/item/a" not in links + + +def test_link_extractor_restrict_css(): + le = LinkExtractor(restrict_css=["#content"]) + links = le.extract_links(_response()) + assert "http://site.test/item/a" in links + assert "http://site.test/page/2/" not in links + + +def test_link_extractor_unique(): + html = '12' + le = LinkExtractor() + links = le.extract_links(Response(raw_html=html, url="http://s/")) + assert links == ["http://s/x"] + + +async def test_crawlspider_rules(quotes_server): + items = [] + + class QuotesCrawl(CrawlSpider): + start_urls = [quotes_server] + requests_delay = 0 + requests_deviation = 0 + rules = [ + Rule( + LinkExtractor(allow=[r"/page/\d+/"]), + callback="parse_page", + follow=True, + ), + ] + + def parse_page(self, response): + for quote in response.css("div.quote"): + yield {"text": quote.css_first("span.text").text} + + def on_item(self, item): + items.append(item["text"]) + + await QuotesCrawl().start() + + # Pages 2 and 3 reached by following the pager (page 1 is the seed and is + # only parsed for links, not by the rule callback). + assert any("Quote 2-1" == t for t in items) + assert any("Quote 3-1" == t for t in items) + + +async def test_sitemap_spider(quotes_server): + pages = [] + + class QuotesSitemap(SitemapSpider): + sitemap_urls = [quotes_server + "sitemap.xml"] + requests_delay = 0 + requests_deviation = 0 + + def parse(self, response): + pages.append(response.url) + + await QuotesSitemap().start() + + assert len(pages) == 3 + assert any(p.endswith("/page/2/") for p in pages) From 1b0e0005d5e153821f69f2718327b6b5190f198a Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 5 Jun 2026 18:28:10 +0000 Subject: [PATCH 2/4] Add JavaScript rendering (Playwright) + docs/examples for the new spiders - crawley.http.playwright.PlaywrightRequestManager: render pages with a headless browser (lazy import), with per-host throttling and retries; wired into the engine via `render_js = True` and `playwright_options`. Optional extra `crawley[js]`. - Docs: new "Spiders" page (Request/callbacks/follow, item pipelines, CrawlSpider/LinkExtractor, SitemapSpider, JS rendering); API reference and nav updated. - examples/06_spider.py (callback spider + pipeline), indexed and test-covered. - README "Spiders" section; CHANGELOG updated. Tests (180 -> 187): test_playwright (render path mocked, no browser needed) plus the spider example. --- CHANGELOG.md | 10 ++- README.md | 33 +++++++++ crawley/crawlers/base.py | 18 +++++ crawley/http/playwright.py | 129 +++++++++++++++++++++++++++++++++ docs/reference.md | 15 ++++ docs/spiders.md | 144 +++++++++++++++++++++++++++++++++++++ examples/06_spider.py | 58 +++++++++++++++ examples/README.md | 1 + mkdocs.yml | 1 + pyproject.toml | 1 + tests/test_examples.py | 11 +++ tests/test_playwright.py | 88 +++++++++++++++++++++++ 12 files changed, 508 insertions(+), 1 deletion(-) create mode 100644 crawley/http/playwright.py create mode 100644 docs/spiders.md create mode 100644 examples/06_spider.py create mode 100644 tests/test_playwright.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 38842cb..8973d6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,12 +21,20 @@ A full port of the legacy Python 2 framework to a modern Python 3 (3.9+) stack. `retry_statuses`). - **Visited-url de-duplication** (`unique_urls`) preventing redundant fetches and crawl loops. +- **Callback-driven `Spider`** (`crawley.spider`): `Request` with + callback / `meta` / `cb_kwargs` / `errback`, `response.follow()`, `Item`, and + fingerprint-based de-duplication — for list→detail crawls. +- **Item pipelines** (`crawley.pipelines`): `ItemPipeline` + `DropItem`. +- **`CrawlSpider`** with `Rule` / `LinkExtractor` (allow/deny/restrict) and + **`SitemapSpider`** (`crawley.spiders`). +- **JavaScript rendering** via Playwright (`render_js = True`, extra + `crawley[js]`). - Documentation site (MkDocs Material + mkdocstrings) and a set of runnable, test-covered `examples/`. - **Type hints** on the public modules and a PEP 561 `py.typed` marker so downstream code gets type information; `mypy` runs in CI. - A `LICENSE` file (GPL-3.0). -- A hermetic `pytest` suite (~170 tests, ~92% core coverage). +- A hermetic `pytest` suite (~185 tests, ~90% core coverage). ### Changed - Concurrency moved from `eventlet` green pools to **asyncio** (`AsyncPool`); diff --git a/README.md b/README.md index 5ef583b..ad43ed4 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,39 @@ The same shortcuts (`response.css`, `response.css_first`, `response.extract`, --- +## Spiders (callbacks, items, rules, JS) + +For full crawls there's a Scrapy-style `Spider`: yield `Request`s (or +`response.follow(...)`) to navigate and dicts/`Item`s to emit data, with item +pipelines, rule-based crawling and optional JavaScript rendering. See +[`docs/spiders.md`](docs/spiders.md). + +```python +from crawley.spider import Spider + +class BlogSpider(Spider): + start_urls = ["https://example.com/blog/"] + + def parse(self, response): # default callback + for href in response.css("a.post::attr(href)"): + yield response.follow(href, callback=self.parse_post) + nxt = response.css_first("a.next::attr(href)") + if nxt: + yield response.follow(nxt) # follows pagination + + def parse_post(self, response): + yield {"title": response.css_first("h1").text, "url": response.url} + +BlogSpider().run() +``` + +- **Item pipelines**: `crawley.pipelines.ItemPipeline` + `DropItem`. +- **Rule-based**: `CrawlSpider` + `Rule(LinkExtractor(allow=..., deny=...))`. +- **Sitemaps**: `SitemapSpider(sitemap_urls=[...])`. +- **JavaScript**: `render_js = True` (install `crawley[js]` + `playwright install`). + +--- + ## Quick start (as a framework / CLI) ### 1. Start a new project diff --git a/crawley/crawlers/base.py b/crawley/crawlers/base.py index a114313..8037732 100644 --- a/crawley/crawlers/base.py +++ b/crawley/crawlers/base.py @@ -104,6 +104,12 @@ class BaseCrawler(metaclass=CrawlerMeta): unique_urls = True """Skip urls that have already been visited during the crawl.""" + render_js = False + """Render pages with a headless browser (Playwright). Needs ``crawley[js]``.""" + + playwright_options = {} + """Extra options for the Playwright manager (browser_type, headless, ...).""" + def __init__(self, sessions=None, settings=None): self.sessions = sessions if sessions is not None else [] self.debug = getattr(settings, "SHOW_DEBUG_INFO", True) @@ -137,6 +143,18 @@ def __init__(self, sessions=None, settings=None): self._initialize_scrapers() def _make_request_manager(self): + if self.render_js: + from crawley.http.playwright import PlaywrightRequestManager + + return PlaywrightRequestManager( + settings=self.settings, + headers=self.headers, + delay=self.requests_delay, + deviation=self.requests_deviation, + retry_policy=self.retry_policy, + rate_limiter=self.rate_limiter, + **self.playwright_options, + ) return RequestManager( settings=self.settings, headers=self.headers, diff --git a/crawley/http/playwright.py b/crawley/http/playwright.py new file mode 100644 index 0000000..f6560a5 --- /dev/null +++ b/crawley/http/playwright.py @@ -0,0 +1,129 @@ +"""JavaScript rendering via Playwright. + +A drop-in :class:`~crawley.http.managers.RequestManager` that downloads pages +with a real (headless) browser, so client-side rendered / SPA sites can be +scraped. Playwright is imported lazily and is an optional dependency:: + + pip install "crawley[js]" + playwright install chromium + +Enable it on a crawler/spider with ``render_js = True``. +""" + +from __future__ import annotations + +from typing import Any, Optional +from urllib.parse import urlparse + +from crawley.http.managers import RequestManager +from crawley.http.response import Response + + +class _RenderResult: + """Minimal stand-in for an httpx response (status_code + headers).""" + + def __init__(self, status_code: Optional[int]) -> None: + self.status_code = status_code + self.headers: dict = {} + + +class PlaywrightRequestManager(RequestManager): + """Render pages with Playwright instead of plain HTTP.""" + + def __init__( + self, + *args: Any, + browser_type: str = "chromium", + headless: bool = True, + wait_until: str = "load", + wait_for: Optional[str] = None, + nav_timeout: float = 30000, + **kwargs: Any, + ) -> None: + super().__init__(*args, **kwargs) + self.browser_type = browser_type + self.headless = headless + self.wait_until = wait_until + self.wait_for = wait_for + self.nav_timeout = nav_timeout + self._pw = None + self._browser = None + self._context = None + + async def _ensure_browser(self) -> None: + if self._browser is not None: + return + from playwright.async_api import async_playwright + + self._pw = await async_playwright().start() + browser_launcher = getattr(self._pw, self.browser_type) + self._browser = await browser_launcher.launch(headless=self.headless) + self._context = await self._browser.new_context( + extra_http_headers=self.headers or {} + ) + + async def _render(self, url: str, headers: Optional[dict] = None): + """Open *url* in a fresh page and return (html, final_url, status).""" + await self._ensure_browser() + page = await self._context.new_page() + try: + if headers: + await page.set_extra_http_headers(headers) + nav = await page.goto( + url, wait_until=self.wait_until, timeout=self.nav_timeout + ) + if self.wait_for: + await page.wait_for_selector(self.wait_for, timeout=self.nav_timeout) + html = await page.content() + status = nav.status if nav is not None else None + return html, page.url, status + finally: + await page.close() + + async def _render_with_retry(self, url: str, headers: Optional[dict]): + attempt = 0 + while True: + try: + return await self._render(url, headers) + except Exception as ex: # noqa: BLE001 - decided by the retry policy + if self.retry_policy.should_retry(attempt, exception=ex): + await self.retry_policy.sleep( + self.retry_policy.backoff_time(attempt) + ) + attempt += 1 + continue + raise + + async def make_request( + self, url: str, data: Any = None, extractor: Any = None, headers: Any = None + ) -> Response: + host = urlparse(url).netloc + semaphore = self.rate_limiter.semaphore(host) + if semaphore is not None: + await semaphore.acquire() + try: + await self.rate_limiter.throttle(host) + raw_html, final_url, status = await self._render_with_retry(url, headers) + finally: + if semaphore is not None: + semaphore.release() + + extracted = extractor.get_object(raw_html) if extractor is not None else None + return Response( + raw_html=raw_html, + extracted_html=extracted, + url=final_url, + response=_RenderResult(status), + ) + + async def aclose(self) -> None: + if self._context is not None: + await self._context.close() + self._context = None + if self._browser is not None: + await self._browser.close() + self._browser = None + if self._pw is not None: + await self._pw.stop() + self._pw = None + self.cookie_handler.save_cookies() diff --git a/docs/reference.md b/docs/reference.md index 4266f75..13e8a10 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -23,6 +23,21 @@ Auto-generated from the docstrings. ::: crawley.scrapers.base.BaseScraper ::: crawley.scrapers.smart.SmartScraper +## Spiders + +::: crawley.spider.Spider +::: crawley.spider.Request +::: crawley.spider.Item +::: crawley.spiders.CrawlSpider +::: crawley.spiders.SitemapSpider +::: crawley.spiders.LinkExtractor +::: crawley.spiders.Rule + +## Pipelines + +::: crawley.pipelines.ItemPipeline +::: crawley.pipelines.DropItem + ## Extractors ::: crawley.extractors.XPathExtractor diff --git a/docs/spiders.md b/docs/spiders.md new file mode 100644 index 0000000..f8d95a4 --- /dev/null +++ b/docs/spiders.md @@ -0,0 +1,144 @@ +# Spiders (callbacks, items & pipelines) + +Besides the `BaseCrawler` + `BaseScraper` model, crawley ships a more powerful, +Scrapy-style **Spider** built on the same async engine (httpx, retries, rate +limiting, robots.txt, de-duplication). + +## Requests with callbacks + +A `Spider` defines `parse()` (the default callback) and yields `Request` +objects — or `response.follow(...)` — to crawl further, and dicts / `Item` +objects to emit data. + +```python +from crawley.spider import Spider + +class BlogSpider(Spider): + name = "blog" + start_urls = ["https://example.com/blog/"] + + def parse(self, response): + # follow each post to a detail callback + for href in response.css("a.post::attr(href)"): + yield response.follow(href, callback=self.parse_post) + # follow pagination with the default callback (parse) + nxt = response.css_first("a.next::attr(href)") + if nxt: + yield response.follow(nxt) + + def parse_post(self, response): + yield { + "title": response.css_first("h1").text, + "url": response.url, + } + +BlogSpider().run() # or: await BlogSpider().start() +``` + +### Passing state: `meta` and `cb_kwargs` + +```python +def parse(self, response): + yield response.follow( + "/item/1", + callback=self.parse_item, + meta={"category": "books"}, # available as response.meta + cb_kwargs={"rank": 1}, # passed as a keyword argument + ) + +def parse_item(self, response, rank=None): + yield {"category": response.meta["category"], "rank": rank} +``` + +`Request` supports `method`, `data` (POST), `headers`, `priority`, +`dont_filter` (bypass de-duplication) and `errback` (per-request error +handler). De-duplication uses a fingerprint of *method + url + body*. + +## Item pipelines + +Pipelines post-process every emitted item in order. Raise `DropItem` to discard +one. + +```python +from crawley.pipelines import ItemPipeline, DropItem + +class PricePipeline(ItemPipeline): + def open_spider(self, spider): ... + def close_spider(self, spider): ... + + def process_item(self, item, spider): + if not item.get("price"): + raise DropItem("missing price") + item["price"] = float(item["price"]) + return item # may be sync or async + +class ShopSpider(Spider): + pipelines = [PricePipeline] + ... +``` + +Items that survive the pipeline reach `on_item(item)` (override it, or use a +pipeline, to store them — e.g. with the [persistence](persistence.md) layer). + +## Rule-based crawling: `CrawlSpider` + +`CrawlSpider` follows links automatically according to a list of `Rule`s, each +built from a `LinkExtractor`. + +```python +from crawley.spiders import CrawlSpider, Rule, LinkExtractor + +class SiteSpider(CrawlSpider): + start_urls = ["https://books.example/"] + rules = [ + # paginate, just follow + Rule(LinkExtractor(allow=[r"/catalogue/page-\d+"])), + # product pages -> extract with parse_book + Rule(LinkExtractor(allow=[r"/catalogue/.+/index\.html"]), + callback="parse_book"), + ] + + def parse_book(self, response): + yield {"title": response.css_first("h1").text} +``` + +`LinkExtractor(allow, deny, restrict_xpaths, restrict_css, tags, attrs, unique)` +returns absolute, filtered links (`allow`/`deny` are regexes). + +## Sitemaps: `SitemapSpider` + +Seed the crawl from one or more `sitemap.xml` files (sitemap indexes are +followed automatically): + +```python +from crawley.spiders import SitemapSpider + +class NewsSpider(SitemapSpider): + sitemap_urls = ["https://news.example/sitemap.xml"] + + def parse(self, response): + yield {"title": response.css_first("h1").text, "url": response.url} +``` + +## JavaScript rendering + +For client-side rendered / SPA sites, render pages with a headless browser +(Playwright). Install the extra and the browser, then set `render_js = True`: + +```bash +pip install "crawley[js]" +playwright install chromium +``` + +```python +class SpaSpider(Spider): + start_urls = ["https://app.example/"] + render_js = True + playwright_options = {"browser_type": "chromium", "wait_for": "div.loaded"} + + def parse(self, response): + yield {"title": response.css_first("h1").text} +``` + +Everything else (selectors, `follow`, pipelines, rules, rate limiting, retries) +works the same — only the download step changes. diff --git a/examples/06_spider.py b/examples/06_spider.py new file mode 100644 index 0000000..fe1f6bb --- /dev/null +++ b/examples/06_spider.py @@ -0,0 +1,58 @@ +"""Example 6 — the callback-driven Spider with an item pipeline. + +`parse` extracts quotes and follows the pagination link; an item pipeline +normalizes each item before it reaches `on_item`. + +Run it:: + + python examples/06_spider.py +""" + +from crawley.pipelines import DropItem, ItemPipeline +from crawley.spider import Spider + +LIVE_SITE = "https://quotes.toscrape.com/" + + +class CleanPipeline(ItemPipeline): + def process_item(self, item, spider): + item["text"] = item["text"].strip().strip("“”\"") + if not item["text"]: + raise DropItem("empty quote") + return item + + +def crawl_quotes(base_url=LIVE_SITE): + """Crawl all pages following pagination and return the cleaned items.""" + collected = [] + + class QuotesSpider(Spider): + name = "quotes" + start_urls = [base_url] + pipelines = [CleanPipeline] + requests_delay = 0 + requests_deviation = 0 + + def parse(self, response): + for quote in response.css("div.quote"): + yield { + "text": quote.css_first("span.text").text, + "author": quote.css_first("small.author").text, + "page": response.meta.get("depth", 0) + 1, + } + nxt = response.css_first("li.next a") + if nxt: + yield response.follow(nxt.attr("href")) + + def on_item(self, item): + collected.append(item) + + QuotesSpider().run() + return collected + + +if __name__ == "__main__": + items = crawl_quotes() + print(f"Collected {len(items)} quotes across {max(i['page'] for i in items)} pages") + for item in items[:5]: + print(f"- {item['author']}: {item['text']}") diff --git a/examples/README.md b/examples/README.md index 3796d67..b0b206b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -11,6 +11,7 @@ Runnable, documented examples. Each one targets | [`03_polite_crawler.py`](03_polite_crawler.py) | Politeness: `robots.txt`, per-host rate limiting and retries. | | [`04_persistence_json.py`](04_persistence_json.py) | Persisting scraped data to a JSON document. | | [`05_concurrent_fetch.py`](05_concurrent_fetch.py) | Fetching many pages concurrently with `afetch_all`. | +| [`06_spider.py`](06_spider.py) | The callback-driven `Spider` with an item pipeline. | Run any of them with: diff --git a/mkdocs.yml b/mkdocs.yml index f0699f0..de05cc4 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -29,6 +29,7 @@ nav: - Installation: installation.md - Scraping API: scraping.md - Crawlers & Scrapers: crawler.md + - Spiders (callbacks, items, rules): spiders.md - Politeness — robots, rate limiting, retries: politeness.md - Persistence: persistence.md - CLI & Framework: cli.md diff --git a/pyproject.toml b/pyproject.toml index 05902cd..901e5bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ mongo = ["pymongo>=4.0"] shell = ["ipython>=8.0"] gui = ["PySide6>=6.5"] http2 = ["h2>=4.0"] +js = ["playwright>=1.40"] dev = [ "pytest>=7.4", "pytest-asyncio>=0.23", diff --git a/tests/test_examples.py b/tests/test_examples.py index ca7af8a..c0d9ee0 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -94,6 +94,16 @@ async def test_concurrent_fetch(quotes_server): assert all(count == 2 for count in counts.values()) +# -- 06 spider -------------------------------------------------------------- + + +def test_spider_example(quotes_server): + mod = load_example("06_spider.py") + items = mod.crawl_quotes(quotes_server) + assert len(items) == TOTAL_QUOTES + assert {it["page"] for it in items} == {1, 2, 3} + + # -- every example at least imports cleanly --------------------------------- @@ -105,6 +115,7 @@ async def test_concurrent_fetch(quotes_server): "03_polite_crawler.py", "04_persistence_json.py", "05_concurrent_fetch.py", + "06_spider.py", ], ) def test_example_imports(filename): diff --git a/tests/test_playwright.py b/tests/test_playwright.py new file mode 100644 index 0000000..75f1ccb --- /dev/null +++ b/tests/test_playwright.py @@ -0,0 +1,88 @@ +"""Tests for the Playwright (JS rendering) integration. + +Playwright itself (and a browser) isn't required: the rendering step is +monkeypatched, so these tests exercise the wiring without launching a browser. +""" + +from crawley.extractors import XPathExtractor +from crawley.http.playwright import PlaywrightRequestManager +from crawley.spider import Spider + +RENDERED = "

Rendered by JS

x" + + +def test_render_js_selects_playwright_manager(): + class S(Spider): + render_js = True + start_urls = [] + + spider = S() + assert isinstance(spider.request_manager, PlaywrightRequestManager) + + +def test_plain_crawler_does_not_use_playwright(): + from crawley.http.managers import RequestManager + + class S(Spider): + start_urls = [] + + spider = S() + assert type(spider.request_manager) is RequestManager + + +async def test_make_request_builds_response_from_render(): + manager = PlaywrightRequestManager() + + async def fake_render(url, headers=None): + return RENDERED, url, 200 + + manager._render = fake_render + + response = await manager.make_request( + "http://spa.test/", extractor=XPathExtractor() + ) + assert response.status_code == 200 + assert response.css_first("h1").text == "Rendered by JS" + await manager.aclose() # no browser was launched -> safe + + +async def test_render_with_retry_recovers(monkeypatch): + import httpx + + from crawley.http.retry import RetryPolicy + + manager = PlaywrightRequestManager( + retry_policy=RetryPolicy(max_retries=2, backoff_factor=0) + ) + calls = {"n": 0} + + async def flaky_render(url, headers=None): + calls["n"] += 1 + if calls["n"] == 1: + raise httpx.ConnectError("boom") + return RENDERED, url, 200 + + manager._render = flaky_render + response = await manager.make_request("http://spa.test/") + assert response.status_code == 200 + assert calls["n"] == 2 + await manager.aclose() + + +async def test_spider_render_end_to_end(): + class S(Spider): + render_js = True + start_urls = ["http://spa.test/"] + requests_delay = 0 + + def parse(self, response): + self.title = response.css_first("h1").text + + spider = S() + + async def fake_render(url, headers=None): + return RENDERED, url, 200 + + spider.request_manager._render = fake_render + await spider.start() + assert spider.title == "Rendered by JS" From 8218f22b610255a974b63cfc07e76724d03a4466 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 5 Jun 2026 18:42:19 +0000 Subject: [PATCH 3/4] Add stats collector, on-disk HTTP cache and FormRequest - crawley.stats.StatsCollector: per-crawl counters (requests, responses, status/, request_errors, robots_blocked, items/items_dropped, elapsed), exposed as crawler/spider `stats` and logged on finish. - crawley.http.cache.HttpCache: on-disk response cache keyed by method+url+body. Enable with `http_cache = True` / `http_cache_dir`; wired into RequestManager, FastRequestManager and the Playwright manager. - crawley.spider.FormRequest + FormRequest.from_response(): read a
, pre-fill inputs/selects/textareas, honour its method (GET -> query string), override fields via formdata. Docs (crawler stats/cache, spiders forms, API reference) and CHANGELOG updated. Tests (187 -> 201): test_stats, test_cache, test_forms; conftest serves /login-form. --- CHANGELOG.md | 4 ++ crawley/__init__.py | 8 +++- crawley/crawlers/base.py | 30 ++++++++++++ crawley/crawlers/fast.py | 1 + crawley/http/cache.py | 77 ++++++++++++++++++++++++++++++ crawley/http/managers.py | 31 ++++++++++++- crawley/http/playwright.py | 8 ++++ crawley/spider.py | 95 ++++++++++++++++++++++++++++++++++++++ crawley/stats.py | 43 +++++++++++++++++ docs/crawler.md | 30 ++++++++++++ docs/reference.md | 6 +++ docs/spiders.md | 25 ++++++++++ tests/conftest.py | 9 ++++ tests/test_cache.py | 75 ++++++++++++++++++++++++++++++ tests/test_forms.py | 75 ++++++++++++++++++++++++++++++ tests/test_stats.py | 86 ++++++++++++++++++++++++++++++++++ 16 files changed, 601 insertions(+), 2 deletions(-) create mode 100644 crawley/http/cache.py create mode 100644 crawley/stats.py create mode 100644 tests/test_cache.py create mode 100644 tests/test_forms.py create mode 100644 tests/test_stats.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8973d6e..d6bcc4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,10 @@ A full port of the legacy Python 2 framework to a modern Python 3 (3.9+) stack. **`SitemapSpider`** (`crawley.spiders`). - **JavaScript rendering** via Playwright (`render_js = True`, extra `crawley[js]`). +- **Stats collector** (`crawley.stats.StatsCollector`): per-crawl counters + (requests/responses/status/errors/items/elapsed), logged on finish. +- **On-disk HTTP cache** (`http_cache = True`) for development. +- **`FormRequest.from_response`** to pre-fill and submit forms. - Documentation site (MkDocs Material + mkdocstrings) and a set of runnable, test-covered `examples/`. - **Type hints** on the public modules and a PEP 561 `py.typed` marker so diff --git a/crawley/__init__.py b/crawley/__init__.py index 6660c0a..ca8539b 100644 --- a/crawley/__init__.py +++ b/crawley/__init__.py @@ -36,6 +36,7 @@ "Element", "Spider", "Request", + "FormRequest", "Item", "DropItem", "ItemPipeline", @@ -43,6 +44,7 @@ "Rule", "CrawlSpider", "SitemapSpider", + "StatsCollector", "__version__", ] @@ -75,7 +77,7 @@ def __getattr__(name): from crawley.http.response import Response return Response - if name in ("Spider", "Request", "Item"): + if name in ("Spider", "Request", "FormRequest", "Item"): from crawley import spider return getattr(spider, name) @@ -87,6 +89,10 @@ def __getattr__(name): from crawley import spiders return getattr(spiders, name) + if name == "StatsCollector": + from crawley.stats import StatsCollector + + return StatsCollector if name in ("fetch", "afetch", "afetch_all", "scrape", "parse", "Document", "Element"): from crawley import scraping diff --git a/crawley/crawlers/base.py b/crawley/crawlers/base.py index 8037732..b8db2ba 100644 --- a/crawley/crawlers/base.py +++ b/crawley/crawlers/base.py @@ -110,12 +110,22 @@ class BaseCrawler(metaclass=CrawlerMeta): playwright_options = {} """Extra options for the Playwright manager (browser_type, headless, ...).""" + http_cache = False + """Cache responses on disk (development helper).""" + + http_cache_dir = ".crawley_cache" + """Directory used by the on-disk HTTP cache.""" + def __init__(self, sessions=None, settings=None): self.sessions = sessions if sessions is not None else [] self.debug = getattr(settings, "SHOW_DEBUG_INFO", True) self.settings = settings self._seen = set() + from crawley.stats import StatsCollector + + self.stats = StatsCollector() + extractor_class = self.extractor or XPathExtractor self.extractor = extractor_class() @@ -142,7 +152,15 @@ def __init__(self, sessions=None, settings=None): self._initialize_scrapers() + def _make_cache(self): + if not self.http_cache: + return None + from crawley.http.cache import HttpCache + + return HttpCache(self.http_cache_dir, enabled=True) + def _make_request_manager(self): + cache = self._make_cache() if self.render_js: from crawley.http.playwright import PlaywrightRequestManager @@ -153,6 +171,7 @@ def _make_request_manager(self): deviation=self.requests_deviation, retry_policy=self.retry_policy, rate_limiter=self.rate_limiter, + cache=cache, **self.playwright_options, ) return RequestManager( @@ -162,6 +181,7 @@ def _make_request_manager(self): deviation=self.requests_deviation, retry_policy=self.retry_policy, rate_limiter=self.rate_limiter, + cache=cache, ) def _initialize_scrapers(self): @@ -228,18 +248,24 @@ async def _fetch(self, url, depth_level=0): self._seen.add(url) if self.respect_robots and not await self._robots_allowed(url): + self.stats.inc("robots_blocked") self.on_robots_blocked(url) return if self.debug: log.info("crawling -> %s", url) + self.stats.inc("requests") try: response = await self._get_response(url) except Exception as ex: # noqa: BLE001 - delegated to error handler + self.stats.inc("request_errors") self.on_request_error(url, ex) return + self.stats.inc("responses") + self.stats.inc("status/%s" % response.status_code) + urls = self._manage_scrapers(response) if not urls: @@ -276,6 +302,7 @@ async def start(self): """Run the crawler (coroutine).""" self.pool = AsyncPool(self.max_concurrency_level) self._seen = set() + self.stats.open() self.on_start() await self._login() @@ -288,6 +315,9 @@ async def start(self): finally: await self.request_manager.aclose() + self.stats.close() + if self.debug: + log.info("stats: %s", self.stats.get_stats()) self.on_finish() def run(self): diff --git a/crawley/crawlers/fast.py b/crawley/crawlers/fast.py index 5e77a89..aef5b3a 100644 --- a/crawley/crawlers/fast.py +++ b/crawley/crawlers/fast.py @@ -13,4 +13,5 @@ def _make_request_manager(self): headers=self.headers, retry_policy=self.retry_policy, rate_limiter=self.rate_limiter, + cache=self._make_cache(), ) diff --git a/crawley/http/cache.py b/crawley/http/cache.py new file mode 100644 index 0000000..0fa19c8 --- /dev/null +++ b/crawley/http/cache.py @@ -0,0 +1,77 @@ +"""On-disk HTTP cache (a development helper). + +Caches responses on disk keyed by *method + url + body* so repeated runs don't +hit the site again. Enable it on a crawler/spider with ``http_cache = True``. +""" + +from __future__ import annotations + +import hashlib +import json +import os +from typing import Any, Optional + + +class _CachedResponse: + """Minimal stand-in exposing ``status_code`` and ``headers``.""" + + def __init__(self, status_code: Optional[int], headers: Optional[dict]) -> None: + self.status_code = status_code + self.headers = headers or {} + + +class HttpCache: + """A tiny JSON-on-disk response cache.""" + + def __init__(self, cache_dir: str = ".crawley_cache", enabled: bool = True) -> None: + self.cache_dir = cache_dir + self.enabled = enabled + + @staticmethod + def _normalize(data: Any) -> str: + if data is None: + return "" + if isinstance(data, dict): + return json.dumps(data, sort_keys=True) + return str(data) + + def _key(self, method: str, url: str, data: Any) -> str: + raw = "%s|%s|%s" % (method.upper(), url, self._normalize(data)) + return hashlib.sha1(raw.encode("utf-8")).hexdigest() + + def _path(self, key: str) -> str: + return os.path.join(self.cache_dir, key + ".json") + + def get(self, method: str, url: str, data: Any = None) -> Optional[dict]: + if not self.enabled: + return None + path = self._path(self._key(method, url, data)) + if not os.path.isfile(path): + return None + try: + with open(path, encoding="utf-8") as f: + return json.load(f) + except (OSError, ValueError): + return None + + def store( + self, + method: str, + url: str, + data: Any, + status_code: Optional[int], + final_url: str, + headers: Any, + body: str, + ) -> None: + if not self.enabled: + return + os.makedirs(self.cache_dir, exist_ok=True) + payload = { + "status": status_code, + "url": final_url, + "headers": dict(headers or {}), + "body": body, + } + with open(self._path(self._key(method, url, data)), "w", encoding="utf-8") as f: + json.dump(payload, f) diff --git a/crawley/http/managers.py b/crawley/http/managers.py index 5d73ae6..16b8dea 100644 --- a/crawley/http/managers.py +++ b/crawley/http/managers.py @@ -34,6 +34,7 @@ def __init__( deviation=None, retry_policy=None, rate_limiter=None, + cache=None, ): self.host_counter = HostCounterDict() self.cookie_handler = CookieHandler() @@ -48,6 +49,7 @@ def __init__( self.rate_limiter = ( rate_limiter if rate_limiter is not None else HostRateLimiter() ) + self.cache = cache self._client = None # -- client lifecycle ------------------------------------------------ @@ -98,6 +100,13 @@ def _get_request(self, url, headers=None): async def make_request(self, url, data=None, extractor=None, headers=None): """Issue a request and wrap the result in a :class:`Response`.""" + method = "POST" if data is not None else "GET" + + if self.cache is not None: + cached = self.cache.get(method, url, data) + if cached is not None: + return self._response_from_cache(cached, extractor) + request = self._get_request(url, headers) host = urllib.parse.urlparse(url).netloc @@ -112,6 +121,13 @@ async def make_request(self, url, data=None, extractor=None, headers=None): semaphore.release() raw_html = response.text + final_url = str(response.url) + + if self.cache is not None: + self.cache.store( + method, url, data, response.status_code, final_url, + dict(response.headers), raw_html, + ) extracted_html = None if extractor is not None: @@ -120,10 +136,23 @@ async def make_request(self, url, data=None, extractor=None, headers=None): return Response( raw_html=raw_html, extracted_html=extracted_html, - url=str(response.url), + url=final_url, response=response, ) + @staticmethod + def _response_from_cache(cached, extractor): + from crawley.http.cache import _CachedResponse + + raw_html = cached["body"] + extracted = extractor.get_object(raw_html) if extractor is not None else None + return Response( + raw_html=raw_html, + extracted_html=extracted, + url=cached["url"], + response=_CachedResponse(cached["status"], cached["headers"]), + ) + async def get_response(self, request, data): """Perform the request, retrying with backoff per the retry policy.""" attempt = 0 diff --git a/crawley/http/playwright.py b/crawley/http/playwright.py index f6560a5..c1af3fe 100644 --- a/crawley/http/playwright.py +++ b/crawley/http/playwright.py @@ -97,6 +97,11 @@ async def _render_with_retry(self, url: str, headers: Optional[dict]): async def make_request( self, url: str, data: Any = None, extractor: Any = None, headers: Any = None ) -> Response: + if self.cache is not None: + cached = self.cache.get("GET", url, data) + if cached is not None: + return self._response_from_cache(cached, extractor) + host = urlparse(url).netloc semaphore = self.rate_limiter.semaphore(host) if semaphore is not None: @@ -108,6 +113,9 @@ async def make_request( if semaphore is not None: semaphore.release() + if self.cache is not None: + self.cache.store("GET", url, data, status, final_url, {}, raw_html) + extracted = extractor.get_object(raw_html) if extractor is not None else None return Response( raw_html=raw_html, diff --git a/crawley/spider.py b/crawley/spider.py index 4c0b520..62cfbc9 100644 --- a/crawley/spider.py +++ b/crawley/spider.py @@ -95,6 +95,90 @@ def __repr__(self) -> str: return "" % (self.method, self.url) +class FormRequest(Request): + """A :class:`Request` that submits form data (POST by default).""" + + def __init__( + self, url: str, formdata: Optional[dict] = None, method: str = "POST", + **kwargs: Any, + ) -> None: + super().__init__(url, method=method, data=dict(formdata or {}), **kwargs) + + @classmethod + def from_response( + cls, + response: Any, + formdata: Optional[dict] = None, + formid: Optional[str] = None, + formname: Optional[str] = None, + formxpath: Optional[str] = None, + callback: Any = None, + **kwargs: Any, + ) -> "Request": + """Build a request from a ```` in *response*, pre-filling inputs.""" + from urllib.parse import urlencode, urljoin + + form = cls._find_form(response, formid, formname, formxpath) + if form is None: + raise ValueError("No found in the response") + + values = cls._form_values(form) + if formdata: + values.update(formdata) + + action = form.attr("action") or response.url + url = urljoin(response.url, action) + method = (form.attr("method") or "GET").upper() + + if method == "GET": + if values: + sep = "&" if "?" in url else "?" + url = url + sep + urlencode(values) + return Request(url, method="GET", callback=callback, **kwargs) + return cls(url, formdata=values, method="POST", callback=callback, **kwargs) + + @staticmethod + def _find_form(response, formid, formname, formxpath): + doc = response.doc + if formid: + return doc.css_first("form#%s" % formid) + if formname: + return doc.css_first('form[name="%s"]' % formname) + if formxpath: + forms = [el for el in doc.xpath(formxpath) if hasattr(el, "css")] + return forms[0] if forms else None + return doc.css_first("form") + + @staticmethod + def _form_values(form): + values: dict = {} + for inp in form.css("input"): + name = inp.attr("name") + if not name: + continue + itype = (inp.attr("type") or "text").lower() + if itype in ("checkbox", "radio") and inp.attr("checked") is None: + continue + if itype in ("submit", "button", "image", "reset"): + continue + values[name] = inp.attr("value") or "" + for textarea in form.css("textarea"): + name = textarea.attr("name") + if name: + values[name] = textarea.text or "" + for select in form.css("select"): + name = select.attr("name") + if not name: + continue + options = select.css("option") + chosen = [o for o in options if o.attr("selected") is not None] + option = chosen[0] if chosen else (options[0] if options else None) + if option is not None: + value = option.attr("value") + values[name] = value if value is not None else option.text + return values + + class Spider(BaseCrawler): """A callback-driven spider. @@ -131,6 +215,7 @@ def on_item(self, item: Any) -> None: async def start(self) -> None: self.pool = AsyncPool(self.max_concurrency_level) self._seen = set() + self.stats.open() self.on_start() for pipe in self._pipelines: @@ -148,6 +233,9 @@ async def start(self) -> None: for pipe in self._pipelines: pipe.close_spider(self) + self.stats.close() + if self.debug: + log.info("stats: %s", self.stats.get_stats()) self.on_finish() def _schedule(self, request: Request, depth: int) -> None: @@ -170,12 +258,14 @@ async def _handle(self, request: Request) -> None: url = request.url if self.respect_robots and not await self._robots_allowed(url): + self.stats.inc("robots_blocked") self.on_robots_blocked(url) return if self.debug: log.info("crawling -> %s", url) + self.stats.inc("requests") try: response = await self.request_manager.make_request( url, @@ -184,9 +274,12 @@ async def _handle(self, request: Request) -> None: headers=request.headers, ) except Exception as ex: # noqa: BLE001 - routed to the errback/handler + self.stats.inc("request_errors") self._handle_error(request, ex) return + self.stats.inc("responses") + self.stats.inc("status/%s" % response.status_code) response.request = request callback = request.callback or self.parse await self._drive_callback(callback, response) @@ -234,7 +327,9 @@ async def _process_item(self, item: Any) -> None: res = pipe.process_item(item, self) item = await res if inspect.iscoroutine(res) else res except DropItem: + self.stats.inc("items_dropped") return + self.stats.inc("items") self.on_item(item) def _handle_error(self, request: Request, ex: Exception) -> None: diff --git a/crawley/stats.py b/crawley/stats.py new file mode 100644 index 0000000..5cfa3a8 --- /dev/null +++ b/crawley/stats.py @@ -0,0 +1,43 @@ +"""A simple stats collector for a crawl. + +Counts requests, responses, items, errors, etc. and the elapsed time. Every +crawler / spider owns one as ``self.stats`` and logs a summary when it finishes. +""" + +from __future__ import annotations + +import time +from typing import Any, Optional + + +class StatsCollector: + """Collect counters and values during a crawl.""" + + def __init__(self) -> None: + self._stats: dict[str, Any] = {} + self._start: Optional[float] = None + + def open(self) -> None: + """Reset the stats and start the clock.""" + self._stats = {} + self._start = time.monotonic() + + def close(self) -> None: + """Record the total elapsed time.""" + if self._start is not None: + self.set("elapsed_seconds", round(time.monotonic() - self._start, 3)) + + def inc(self, key: str, count: int = 1) -> None: + self._stats[key] = self._stats.get(key, 0) + count + + def set(self, key: str, value: Any) -> None: + self._stats[key] = value + + def get(self, key: str, default: Any = None) -> Any: + return self._stats.get(key, default) + + def get_stats(self) -> dict[str, Any]: + return dict(self._stats) + + def __repr__(self) -> str: + return "StatsCollector(%r)" % self._stats diff --git a/docs/crawler.md b/docs/crawler.md index 283df5f..b874f13 100644 --- a/docs/crawler.md +++ b/docs/crawler.md @@ -51,10 +51,40 @@ QuotesCrawler().run() # synchronous entry point | `unique_urls` | `True` | Skip already-visited urls (prevents loops). | | `post_urls` | `[]` | `(pattern, data)` tuples to issue POSTs. | | `login` | `None` | `(url, data)` to authenticate before crawling. | +| `unique_urls` | `True` | Skip already-visited urls. | +| `http_cache` | `False` | Cache responses on disk (development helper). | +| `http_cache_dir` | `.crawley_cache` | Where the HTTP cache is stored. | +| `render_js` | `False` | Render pages with Playwright (`crawley[js]`). | See [Politeness](politeness.md) for `respect_robots`, `crawl_delay`, `max_concurrency_per_host` and the retry options. +## Stats + +Every crawler / spider owns a `stats` collector. It counts requests, responses, +per-status codes, errors, robots blocks, items (spiders) and the elapsed time, +and logs a summary when the crawl finishes: + +```python +crawler = MyCrawler() +crawler.run() +print(crawler.stats.get_stats()) +# {'requests': 12, 'responses': 12, 'status/200': 12, 'elapsed_seconds': 1.3, ...} +``` + +## HTTP cache (development) + +Set `http_cache = True` to cache every response on disk (keyed by +method + url + body). Re-running the crawl then serves from the cache instead of +hitting the site again — handy while developing scrapers: + +```python +class MyCrawler(BaseCrawler): + start_urls = ["https://example.com/"] + http_cache = True + http_cache_dir = ".crawley_cache" +``` + ## URL matching Patterns use `%` as a wildcard: diff --git a/docs/reference.md b/docs/reference.md index 13e8a10..29245dc 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -27,6 +27,7 @@ Auto-generated from the docstrings. ::: crawley.spider.Spider ::: crawley.spider.Request +::: crawley.spider.FormRequest ::: crawley.spider.Item ::: crawley.spiders.CrawlSpider ::: crawley.spiders.SitemapSpider @@ -38,6 +39,11 @@ Auto-generated from the docstrings. ::: crawley.pipelines.ItemPipeline ::: crawley.pipelines.DropItem +## Stats & cache + +::: crawley.stats.StatsCollector +::: crawley.http.cache.HttpCache + ## Extractors ::: crawley.extractors.XPathExtractor diff --git a/docs/spiders.md b/docs/spiders.md index f8d95a4..c9731a6 100644 --- a/docs/spiders.md +++ b/docs/spiders.md @@ -54,6 +54,31 @@ def parse_item(self, response, rank=None): `dont_filter` (bypass de-duplication) and `errback` (per-request error handler). De-duplication uses a fingerprint of *method + url + body*. +### Submitting forms + +`FormRequest.from_response` reads a `` from the page, pre-fills its inputs +and lets you override fields — handy for logins and search forms: + +```python +from crawley.spider import FormRequest + +class LoginSpider(Spider): + start_urls = ["https://site.example/login"] + + def parse(self, response): + yield FormRequest.from_response( + response, + formdata={"username": "me", "password": "secret"}, + callback=self.after_login, + ) + + def after_login(self, response): + ... +``` + +`from_response` accepts `formid` / `formname` / `formxpath` to pick a specific +form, and honours the form's `method` (GET forms become a query string). + ## Item pipelines Pipelines post-process every emitted item in order. Raise `DropItem` to discard diff --git a/tests/conftest.py b/tests/conftest.py index 4ae3d1d..3d1cb70 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -66,6 +66,15 @@ def do_GET(self): self._handle_flaky() elif path == "/always-503": self._send_status(503, "down", retry_after=0) + elif path == "/login-form": + self._send( + '' + '' + '' + '' + '' + "" + ) elif path == "/loop-a": self._send('b') elif path == "/loop-b": diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 0000000..4d8cf59 --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,75 @@ +"""Tests for the on-disk HTTP cache.""" + +import os + +from crawley.crawlers import BaseCrawler +from crawley.http.cache import HttpCache +from crawley.http.managers import FastRequestManager +from crawley.scrapers import BaseScraper + + +def test_cache_store_and_get(tmp_path): + cache = HttpCache(str(tmp_path), enabled=True) + assert cache.get("GET", "http://x/") is None + + cache.store("GET", "http://x/", None, 200, "http://x/", {"a": "b"}, "hi") + hit = cache.get("GET", "http://x/") + assert hit["status"] == 200 + assert hit["body"] == "hi" + assert hit["url"] == "http://x/" + + +def test_cache_key_includes_method_and_body(tmp_path): + cache = HttpCache(str(tmp_path)) + cache.store("GET", "http://x/", None, 200, "http://x/", {}, "get") + assert cache.get("POST", "http://x/", {"q": 1}) is None + cache.store("POST", "http://x/", {"q": 1}, 200, "http://x/", {}, "post") + assert cache.get("POST", "http://x/", {"q": 1})["body"] == "post" + + +def test_disabled_cache_is_noop(tmp_path): + cache = HttpCache(str(tmp_path), enabled=False) + cache.store("GET", "http://x/", None, 200, "http://x/", {}, "body") + assert cache.get("GET", "http://x/") is None + + +async def test_manager_uses_cache(server, tmp_path): + cache = HttpCache(str(tmp_path), enabled=True) + manager = FastRequestManager(cache=cache) + try: + first = await manager.make_request(server + "/page1") + assert first.status_code == 200 + # The response is now cached on disk. + files = os.listdir(tmp_path) + assert len(files) == 1 + finally: + await manager.aclose() + + # A fresh manager pointed at an unreachable host still serves from cache. + manager2 = FastRequestManager(cache=cache) + try: + cached = await manager2.make_request(server + "/page1") + assert "Title 1" in cached.raw_html + finally: + await manager2.aclose() + + +async def test_crawler_http_cache_attribute(server, tmp_path): + visited = [] + + class S(BaseScraper): + matching_urls = ["%/page%"] + + def scrape(self, response): + visited.append(response.url) + + class C(BaseCrawler): + start_urls = [server + "/page1"] + scrapers = [S] + max_depth = 0 + requests_delay = 0 + http_cache = True + http_cache_dir = str(tmp_path) + + await C().start() + assert len(os.listdir(tmp_path)) >= 1 diff --git a/tests/test_forms.py b/tests/test_forms.py new file mode 100644 index 0000000..3838167 --- /dev/null +++ b/tests/test_forms.py @@ -0,0 +1,75 @@ +"""Tests for FormRequest.from_response.""" + +import pytest + +from crawley.http.response import Response +from crawley.spider import FormRequest, Spider + + +def test_from_response_post_prefills_inputs(): + html = ( + '
' + '' + '' + '' + '' + "
" + ) + resp = Response(raw_html=html, url="http://site.test/login") + req = FormRequest.from_response(resp, formdata={"pass": "secret"}) + + assert isinstance(req, FormRequest) + assert req.method == "POST" + assert req.url == "http://site.test/submit" + assert req.data == {"user": "bob", "token": "xyz", "pass": "secret"} + + +def test_from_response_get_builds_querystring(): + html = '
' + resp = Response(raw_html=html, url="http://site.test/") + req = FormRequest.from_response(resp, formdata={"q": "crawley"}) + + assert req.method == "GET" + assert "q=crawley" in req.url + assert req.data is None + + +def test_from_response_select_and_textarea(): + html = ( + '
' + '' + '' + "
" + ) + resp = Response(raw_html=html, url="http://site.test/") + req = FormRequest.from_response(resp) + assert req.data == {"cat": "b", "msg": "hi"} + + +def test_from_response_no_form_raises(): + resp = Response(raw_html="no form", url="http://x/") + with pytest.raises(ValueError): + FormRequest.from_response(resp) + + +async def test_formrequest_end_to_end(server): + captured = [] + + class LoginSpider(Spider): + start_urls = [server + "/login-form"] + requests_delay = 0 + + def parse(self, response): + yield FormRequest.from_response( + response, formdata={"pass": "secret"}, callback=self.after + ) + + def after(self, response): + captured.append(response.raw_html) + + await LoginSpider().start() + + body = captured[0] + assert "user=bob" in body + assert "pass=secret" in body diff --git a/tests/test_stats.py b/tests/test_stats.py new file mode 100644 index 0000000..55e52e7 --- /dev/null +++ b/tests/test_stats.py @@ -0,0 +1,86 @@ +"""Tests for the stats collector and its integration.""" + +from crawley.crawlers import BaseCrawler +from crawley.scrapers import BaseScraper +from crawley.spider import Spider +from crawley.stats import StatsCollector + + +def test_stats_collector_basics(): + stats = StatsCollector() + stats.open() + stats.inc("requests") + stats.inc("requests", 2) + stats.set("foo", "bar") + stats.close() + + assert stats.get("requests") == 3 + assert stats.get("foo") == "bar" + assert "elapsed_seconds" in stats.get_stats() + assert stats.get("missing", 0) == 0 + + +async def test_crawler_collects_stats(server): + class S(BaseScraper): + matching_urls = ["%/page%"] + + def scrape(self, response): + pass + + class C(BaseCrawler): + start_urls = [server + "/"] + scrapers = [S] + max_depth = 1 + requests_delay = 0 + requests_deviation = 0 + + crawler = C() + await crawler.start() + + stats = crawler.stats.get_stats() + assert stats["requests"] >= 3 + assert stats["responses"] >= 3 + assert stats.get("status/200", 0) >= 3 + assert "elapsed_seconds" in stats + + +async def test_spider_collects_item_stats(quotes_server): + from crawley.pipelines import DropItem, ItemPipeline + + class DropFirst(ItemPipeline): + def process_item(self, item, spider): + if item["text"].endswith("1-1"): + raise DropItem() + return item + + class QSpider(Spider): + start_urls = [quotes_server] + pipelines = [DropFirst] + requests_delay = 0 + + def parse(self, response): + for quote in response.css("div.quote"): + yield {"text": quote.css_first("span.text").text} + nxt = response.css_first("li.next a") + if nxt: + yield response.follow(nxt.attr("href")) + + spider = QSpider() + await spider.start() + + stats = spider.stats.get_stats() + assert stats["items"] == 5 # 6 quotes minus the dropped one + assert stats["items_dropped"] == 1 + assert stats["responses"] == 3 + + +async def test_request_error_stat(): + class C(BaseCrawler): + start_urls = ["http://127.0.0.1:1/down"] + max_depth = 0 + requests_delay = 0 + + crawler = C() + crawler.request_manager.retry_policy.max_retries = 0 + await crawler.start() + assert crawler.stats.get("request_errors") == 1 From b33c6d90c8206f1b737e7563ddc04128aa6ba8db Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 5 Jun 2026 18:57:54 +0000 Subject: [PATCH 4/4] Add downloader middlewares and AutoThrottle - crawley.middlewares.DownloaderMiddleware: process_request / process_response / process_exception chains (sync or async) wrapping every Spider download. process_request may short-circuit with a Response or reschedule a Request; process_exception can recover from errors. - crawley.http.autothrottle.AutoThrottle: adapt the per-host delay to the observed response latency (target_concurrency, start/max delay). Enable with `autothrottle = True`; Response now carries `.latency` (httpx elapsed / measured render time), fed to the per-host rate limiter. Docs (spiders middlewares, politeness AutoThrottle, API reference) and CHANGELOG updated. Tests (201 -> 213): test_middlewares, test_autothrottle. --- CHANGELOG.md | 5 ++ crawley/__init__.py | 10 +++ crawley/crawlers/base.py | 34 ++++++++++ crawley/http/autothrottle.py | 40 ++++++++++++ crawley/http/managers.py | 6 +- crawley/http/playwright.py | 8 ++- crawley/http/response.py | 1 + crawley/middlewares.py | 35 +++++++++++ crawley/spider.py | 82 ++++++++++++++++++++---- docs/politeness.md | 20 ++++++ docs/reference.md | 6 +- docs/spiders.md | 37 +++++++++++ tests/test_autothrottle.py | 59 ++++++++++++++++++ tests/test_middlewares.py | 118 +++++++++++++++++++++++++++++++++++ 14 files changed, 444 insertions(+), 17 deletions(-) create mode 100644 crawley/http/autothrottle.py create mode 100644 crawley/middlewares.py create mode 100644 tests/test_autothrottle.py create mode 100644 tests/test_middlewares.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d6bcc4a..a399d0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,11 @@ A full port of the legacy Python 2 framework to a modern Python 3 (3.9+) stack. (requests/responses/status/errors/items/elapsed), logged on finish. - **On-disk HTTP cache** (`http_cache = True`) for development. - **`FormRequest.from_response`** to pre-fill and submit forms. +- **Downloader middlewares** (`crawley.middlewares.DownloaderMiddleware`): + `process_request` / `process_response` / `process_exception` chains on the + `Spider` (sync or async). +- **AutoThrottle** (`autothrottle = True`): adapt the per-host delay to the + observed response latency. - Documentation site (MkDocs Material + mkdocstrings) and a set of runnable, test-covered `examples/`. - **Type hints** on the public modules and a PEP 561 `py.typed` marker so diff --git a/crawley/__init__.py b/crawley/__init__.py index ca8539b..a801b2f 100644 --- a/crawley/__init__.py +++ b/crawley/__init__.py @@ -45,6 +45,8 @@ "CrawlSpider", "SitemapSpider", "StatsCollector", + "DownloaderMiddleware", + "AutoThrottle", "__version__", ] @@ -93,6 +95,14 @@ def __getattr__(name): from crawley.stats import StatsCollector return StatsCollector + if name == "DownloaderMiddleware": + from crawley.middlewares import DownloaderMiddleware + + return DownloaderMiddleware + if name == "AutoThrottle": + from crawley.http.autothrottle import AutoThrottle + + return AutoThrottle if name in ("fetch", "afetch", "afetch_all", "scrape", "parse", "Document", "Element"): from crawley import scraping diff --git a/crawley/crawlers/base.py b/crawley/crawlers/base.py index b8db2ba..9781100 100644 --- a/crawley/crawlers/base.py +++ b/crawley/crawlers/base.py @@ -116,6 +116,18 @@ class BaseCrawler(metaclass=CrawlerMeta): http_cache_dir = ".crawley_cache" """Directory used by the on-disk HTTP cache.""" + autothrottle = False + """Adapt the per-host delay to the observed response latency.""" + + autothrottle_target_concurrency = 1.0 + """Target number of concurrent requests per host for AutoThrottle.""" + + autothrottle_start_delay = 1.0 + """Initial per-host delay used by AutoThrottle (seconds).""" + + autothrottle_max_delay = 60.0 + """Maximum per-host delay AutoThrottle may set (seconds).""" + def __init__(self, sessions=None, settings=None): self.sessions = sessions if sessions is not None else [] self.debug = getattr(settings, "SHOW_DEBUG_INFO", True) @@ -148,10 +160,31 @@ def __init__(self, sessions=None, settings=None): enabled=self.respect_robots, ) + self._autothrottle = None + if self.autothrottle: + from crawley.http.autothrottle import AutoThrottle + + self._autothrottle = AutoThrottle( + target_concurrency=self.autothrottle_target_concurrency, + start_delay=self.autothrottle_start_delay, + max_delay=self.autothrottle_max_delay, + ) + self.rate_limiter.delay = self.autothrottle_start_delay + self.request_manager = self._make_request_manager() self._initialize_scrapers() + def _record_latency(self, url, response): + """Feed the response latency to AutoThrottle (if enabled).""" + if self._autothrottle is None: + return + latency = getattr(response, "latency", None) + if latency is None: + return + host = urllib.parse.urlparse(url).netloc + self.rate_limiter.set_delay(host, self._autothrottle.adjust(host, latency)) + def _make_cache(self): if not self.http_cache: return None @@ -265,6 +298,7 @@ async def _fetch(self, url, depth_level=0): self.stats.inc("responses") self.stats.inc("status/%s" % response.status_code) + self._record_latency(url, response) urls = self._manage_scrapers(response) diff --git a/crawley/http/autothrottle.py b/crawley/http/autothrottle.py new file mode 100644 index 0000000..e99c217 --- /dev/null +++ b/crawley/http/autothrottle.py @@ -0,0 +1,40 @@ +"""Adaptive per-host throttling. + +Adjusts the delay applied to each host based on the observed response latency, +aiming to keep roughly ``target_concurrency`` requests in flight per host (the +same idea as Scrapy's AutoThrottle). +""" + +from __future__ import annotations + +from typing import Optional + + +class AutoThrottle: + """Compute a per-host delay from observed latencies.""" + + def __init__( + self, + target_concurrency: float = 1.0, + start_delay: float = 1.0, + max_delay: float = 60.0, + enabled: bool = True, + ) -> None: + self.target_concurrency = max(target_concurrency, 0.01) + self.start_delay = start_delay + self.max_delay = max_delay + self.enabled = enabled + self._delays: dict[str, float] = {} + + def adjust(self, host: str, latency: Optional[float]) -> float: + """Update and return the new delay for *host* given a *latency*.""" + previous = self._delays.get(host, self.start_delay) + if latency is None: + return previous + + target = latency / self.target_concurrency + # Smooth towards the target and clamp to [0, max_delay]. + new_delay = (previous + target) / 2 + new_delay = min(max(new_delay, 0.0), self.max_delay) + self._delays[host] = new_delay + return new_delay diff --git a/crawley/http/managers.py b/crawley/http/managers.py index 16b8dea..720c440 100644 --- a/crawley/http/managers.py +++ b/crawley/http/managers.py @@ -133,12 +133,16 @@ async def make_request(self, url, data=None, extractor=None, headers=None): if extractor is not None: extracted_html = extractor.get_object(raw_html) - return Response( + result = Response( raw_html=raw_html, extracted_html=extracted_html, url=final_url, response=response, ) + elapsed = getattr(response, "elapsed", None) + if elapsed is not None: + result.latency = elapsed.total_seconds() + return result @staticmethod def _response_from_cache(cached, extractor): diff --git a/crawley/http/playwright.py b/crawley/http/playwright.py index c1af3fe..c5533ee 100644 --- a/crawley/http/playwright.py +++ b/crawley/http/playwright.py @@ -102,13 +102,17 @@ async def make_request( if cached is not None: return self._response_from_cache(cached, extractor) + import time + host = urlparse(url).netloc semaphore = self.rate_limiter.semaphore(host) if semaphore is not None: await semaphore.acquire() try: await self.rate_limiter.throttle(host) + started = time.monotonic() raw_html, final_url, status = await self._render_with_retry(url, headers) + latency = time.monotonic() - started finally: if semaphore is not None: semaphore.release() @@ -117,12 +121,14 @@ async def make_request( self.cache.store("GET", url, data, status, final_url, {}, raw_html) extracted = extractor.get_object(raw_html) if extractor is not None else None - return Response( + result = Response( raw_html=raw_html, extracted_html=extracted, url=final_url, response=_RenderResult(status), ) + result.latency = latency + return result async def aclose(self) -> None: if self._context is not None: diff --git a/crawley/http/response.py b/crawley/http/response.py index 2fc0447..2e85094 100644 --- a/crawley/http/response.py +++ b/crawley/http/response.py @@ -32,6 +32,7 @@ def __init__( self.url = url self.response = response self.request: Any = None + self.latency: Optional[float] = None self._doc: Optional[Document] = None if response is not None: diff --git a/crawley/middlewares.py b/crawley/middlewares.py new file mode 100644 index 0000000..fe712db --- /dev/null +++ b/crawley/middlewares.py @@ -0,0 +1,35 @@ +"""Downloader middlewares. + +A downloader middleware is a hook around every download performed by a +:class:`~crawley.spider.Spider`. Middlewares are applied in order on the way out +(``process_request``) and in reverse order on the way back +(``process_response`` / ``process_exception``), exactly like Scrapy. + +All methods may be sync or async and are optional: + +- ``process_request(request, spider)`` -> ``None`` (continue), a ``Response`` + (short-circuit the download) or a ``Request`` (reschedule it instead). +- ``process_response(request, response, spider)`` -> a ``Response`` (possibly + replaced) or a ``Request`` (reschedule). +- ``process_exception(request, exception, spider)`` -> ``None`` (propagate), a + ``Response`` or a ``Request``. +""" + +from __future__ import annotations + +from typing import Any, Optional + + +class DownloaderMiddleware: + """Base class for downloader middlewares (all methods optional).""" + + def process_request(self, request: Any, spider: Any) -> Any: + return None + + def process_response(self, request: Any, response: Any, spider: Any) -> Any: + return response + + def process_exception( + self, request: Any, exception: Exception, spider: Any + ) -> Optional[Any]: + return None diff --git a/crawley/spider.py b/crawley/spider.py index 62cfbc9..5f3aa99 100644 --- a/crawley/spider.py +++ b/crawley/spider.py @@ -38,6 +38,13 @@ def parse_post(self, response): log = logging.getLogger("crawley.spider") +async def _maybe_await(value: Any) -> Any: + """Await *value* if it is awaitable, otherwise return it as-is.""" + if inspect.isawaitable(value): + return await value + return value + + class Item(dict): """A scraped item. Just a ``dict`` you may subclass for clarity.""" @@ -192,9 +199,13 @@ class Spider(BaseCrawler): pipelines: list = [] """Item pipeline classes applied, in order, to every emitted item.""" + middlewares: list = [] + """Downloader middleware classes wrapping every download.""" + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self._pipelines = [pipe() for pipe in self.pipelines] + self._middlewares = [mw() for mw in self.middlewares] # -- overridables ---------------------------------------------------- @@ -265,25 +276,70 @@ async def _handle(self, request: Request) -> None: if self.debug: log.info("crawling -> %s", url) - self.stats.inc("requests") - try: - response = await self.request_manager.make_request( - url, - data=request.data, - extractor=self.extractor, - headers=request.headers, + # process_request chain (in order). + response = None + for mw in self._middlewares: + outcome = await _maybe_await(mw.process_request(request, self)) + if isinstance(outcome, Request): + self._reschedule(outcome, request) + return + if outcome is not None: # a Response: short-circuit the download + response = outcome + break + + if response is None: + self.stats.inc("requests") + try: + response = await self.request_manager.make_request( + url, + data=request.data, + extractor=self.extractor, + headers=request.headers, + ) + except Exception as ex: # noqa: BLE001 - middleware/errback decide + self.stats.inc("request_errors") + handled = await self._process_exception(request, ex) + if isinstance(handled, Request): + self._reschedule(handled, request) + return + if handled is None: + self._handle_error(request, ex) + return + response = handled + else: + self.stats.inc("responses") + self.stats.inc("status/%s" % response.status_code) + self._record_latency(url, response) + + # process_response chain (reverse order). + for mw in reversed(self._middlewares): + outcome = await _maybe_await( + mw.process_response(request, response, self) ) - except Exception as ex: # noqa: BLE001 - routed to the errback/handler - self.stats.inc("request_errors") - self._handle_error(request, ex) - return + if isinstance(outcome, Request): + self._reschedule(outcome, request) + return + response = outcome - self.stats.inc("responses") - self.stats.inc("status/%s" % response.status_code) response.request = request callback = request.callback or self.parse await self._drive_callback(callback, response) + async def _process_exception(self, request: Request, ex: Exception) -> Any: + for mw in reversed(self._middlewares): + outcome = await _maybe_await( + mw.process_exception(request, ex, self) + ) + if outcome is not None: + return outcome + return None + + def _reschedule(self, request: Request, base_request: Request) -> None: + depth = request.meta.get("depth") + if depth is None: + depth = base_request.meta.get("depth", 0) + self._schedule(request, depth) + async def _drive_callback(self, callback: Any, response: Any) -> None: depth = response.meta.get("depth", 0) cb_kwargs = response.request.cb_kwargs if response.request else {} diff --git a/docs/politeness.md b/docs/politeness.md index 64e0458..b5de80e 100644 --- a/docs/politeness.md +++ b/docs/politeness.md @@ -84,6 +84,26 @@ policy.should_retry(attempt=0, response=resp) # -> bool policy.backoff_time(attempt=2) # -> seconds ``` +## AutoThrottle + +Instead of a fixed `crawl_delay`, let crawley adapt the per-host delay to the +observed response latency, aiming to keep roughly `target_concurrency` requests +in flight per host: + +```python +class MyCrawler(BaseCrawler): + start_urls = ["https://example.com/"] + autothrottle = True + autothrottle_target_concurrency = 2.0 # ~2 concurrent req/host + autothrottle_start_delay = 1.0 + autothrottle_max_delay = 30.0 +``` + +After each response the per-host delay is nudged towards +`latency / target_concurrency` (smoothed and clamped to `max_delay`). Slower +servers are hit more gently, faster ones a bit harder. The primitive is +`crawley.http.autothrottle.AutoThrottle`. + ## Putting it together ```python diff --git a/docs/reference.md b/docs/reference.md index 29245dc..5fb2328 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -34,15 +34,17 @@ Auto-generated from the docstrings. ::: crawley.spiders.LinkExtractor ::: crawley.spiders.Rule -## Pipelines +## Pipelines & middlewares ::: crawley.pipelines.ItemPipeline ::: crawley.pipelines.DropItem +::: crawley.middlewares.DownloaderMiddleware -## Stats & cache +## Stats, cache & throttling ::: crawley.stats.StatsCollector ::: crawley.http.cache.HttpCache +::: crawley.http.autothrottle.AutoThrottle ## Extractors diff --git a/docs/spiders.md b/docs/spiders.md index c9731a6..2ed3e87 100644 --- a/docs/spiders.md +++ b/docs/spiders.md @@ -105,6 +105,43 @@ class ShopSpider(Spider): Items that survive the pipeline reach `on_item(item)` (override it, or use a pipeline, to store them — e.g. with the [persistence](persistence.md) layer). +## Downloader middlewares + +Middlewares wrap every download: they run in order on the way out +(`process_request`) and in reverse on the way back (`process_response` / +`process_exception`) — like Scrapy. Use them to inject headers, rotate +proxies/user-agents, short-circuit with a cached response, or recover from +errors. + +```python +from crawley.middlewares import DownloaderMiddleware +from crawley.spider import Request + +class AuthHeaderMiddleware(DownloaderMiddleware): + def process_request(self, request, spider): + request.headers["Authorization"] = "Bearer ..." + return None # continue the download + + def process_response(self, request, response, spider): + if response.status_code == 302: + return Request(response.headers["location"]) # reschedule + return response + + def process_exception(self, request, exception, spider): + return None # propagate (or return a Response/Request) + +class MySpider(Spider): + middlewares = [AuthHeaderMiddleware] + ... +``` + +- `process_request` -> `None` (continue), a `Response` (skip the download) or a + `Request` (reschedule). +- `process_response` -> a `Response` or a `Request`. +- `process_exception` -> `None` (propagate), a `Response` or a `Request`. + +All methods may be sync or async. + ## Rule-based crawling: `CrawlSpider` `CrawlSpider` follows links automatically according to a list of `Rule`s, each diff --git a/tests/test_autothrottle.py b/tests/test_autothrottle.py new file mode 100644 index 0000000..50039af --- /dev/null +++ b/tests/test_autothrottle.py @@ -0,0 +1,59 @@ +"""Tests for AutoThrottle.""" + +from crawley.http.autothrottle import AutoThrottle +from crawley.http.throttle import HostRateLimiter +from crawley.spider import Spider + + +def test_adjust_moves_towards_target(): + at = AutoThrottle(target_concurrency=1.0, start_delay=1.0, max_delay=60.0) + # latency 3s, target concurrency 1 -> target delay 3; smoothed (1+3)/2 = 2 + assert at.adjust("h", 3.0) == 2.0 + # next: previous 2, target 3 -> 2.5 + assert at.adjust("h", 3.0) == 2.5 + + +def test_adjust_respects_concurrency(): + at = AutoThrottle(target_concurrency=2.0, start_delay=2.0) + # latency 2, target = 2/2 = 1; smoothed (2+1)/2 = 1.5 + assert at.adjust("h", 2.0) == 1.5 + + +def test_adjust_clamps_to_max(): + at = AutoThrottle(start_delay=50.0, max_delay=10.0) + assert at.adjust("h", 100.0) == 10.0 + + +def test_adjust_none_latency_keeps_previous(): + at = AutoThrottle(start_delay=1.0) + assert at.adjust("h", None) == 1.0 + + +def test_per_host_independent(): + at = AutoThrottle(start_delay=1.0) + at.adjust("a", 5.0) + assert at.adjust("b", 0.0) == 0.5 # b starts from start_delay, not a's + + +async def test_spider_autothrottle_sets_per_host_delay(server): + class S(Spider): + start_urls = [server + "/page1"] + autothrottle = True + autothrottle_start_delay = 0.0 + requests_delay = 0 + + def parse(self, response): + pass + + spider = S() + assert spider._autothrottle is not None + await spider.start() + # After one response a per-host delay has been recorded. + host = server.split("://", 1)[1] + assert host in spider._autothrottle._delays + + +def test_autothrottle_uses_rate_limiter_delay_override(): + limiter = HostRateLimiter(delay=0) + limiter.set_delay("h", 1.23) + assert limiter._delay_for("h") == 1.23 diff --git a/tests/test_middlewares.py b/tests/test_middlewares.py new file mode 100644 index 0000000..c446c08 --- /dev/null +++ b/tests/test_middlewares.py @@ -0,0 +1,118 @@ +"""Tests for downloader middlewares.""" + + +from crawley.http.response import Response +from crawley.middlewares import DownloaderMiddleware +from crawley.spider import Request, Spider + + +async def test_process_request_can_short_circuit(server): + seen = [] + + class FakeResponseMiddleware(DownloaderMiddleware): + def process_request(self, request, spider): + if request.url.endswith("/cached"): + return Response( + raw_html="

from middleware

", + url=request.url, + ) + return None + + class S(Spider): + start_urls = [server + "/cached"] + middlewares = [FakeResponseMiddleware] + requests_delay = 0 + + def parse(self, response): + seen.append(response.css_first("h1").text) + + spider = S() + await spider.start() + assert seen == ["from middleware"] + # No real request was made (short-circuited). + assert spider.stats.get("requests", 0) == 0 + + +async def test_process_request_headers_injection(server): + captured = [] + + class HeaderMiddleware(DownloaderMiddleware): + def process_request(self, request, spider): + request.headers["X-Test"] = "1" + return None + + class S(Spider): + start_urls = [server + "/page1"] + middlewares = [HeaderMiddleware] + requests_delay = 0 + + def parse(self, response): + captured.append(response.request.headers.get("X-Test")) + + await S().start() + assert captured == ["1"] + + +async def test_process_response_chain(server): + results = [] + + class TagMiddleware(DownloaderMiddleware): + def process_response(self, request, response, spider): + response.tagged = True + return response + + class S(Spider): + start_urls = [server + "/page1"] + middlewares = [TagMiddleware] + requests_delay = 0 + + def parse(self, response): + results.append(getattr(response, "tagged", False)) + + await S().start() + assert results == [True] + + +async def test_process_request_reschedules(): + visited = [] + + class RedirectMiddleware(DownloaderMiddleware): + def process_request(self, request, spider): + if request.url == "http://start/": + return Request("http://target/", callback=spider.parse, + dont_filter=True) + return Response(raw_html="", url=request.url) + + class S(Spider): + start_urls = ["http://start/"] + middlewares = [RedirectMiddleware] + requests_delay = 0 + + def parse(self, response): + visited.append(response.url) + + await S().start() + assert visited == ["http://target/"] + + +async def test_process_exception_recovers(): + handled = [] + + class RecoverMiddleware(DownloaderMiddleware): + def process_exception(self, request, exception, spider): + handled.append(type(exception).__name__) + return Response(raw_html="ok", + url=request.url) + + class S(Spider): + start_urls = ["http://127.0.0.1:1/down"] + middlewares = [RecoverMiddleware] + requests_delay = 0 + + def parse(self, response): + self.recovered = "ok" in response.raw_html + + spider = S() + spider.request_manager.retry_policy.max_retries = 0 + await spider.start() + assert handled and spider.recovered is True