diff --git a/.gitignore b/.gitignore index 68bc17f..f54c094 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# vscode files +.vscode diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..2cdff22 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,21 @@ +# .readthedocs.yaml +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Set the version of Python +build: + os: ubuntu-22.04 + tools: + python: "3.11" + +# Build documentation in the docs/ directory with Sphinx +sphinx: + configuration: docs/conf.py + +# Declare the python requirements to build docs +python: + install: + - requirements: docs/requirements.txt diff --git a/README.rst b/README.rst index dd62f98..4ebaffc 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,5 @@ -Rmqtools -======== +Welcome to Rmqtools! +==================== Rmqtools provides enhanced features for RabbitMQ development in Python. Introduction @@ -21,6 +21,33 @@ applications. for unique publish-subscribe relationships to be set up outside of the periodic status message use case contained in the high-level wrappers. +Installation +------------ + +Prerequisites +~~~~~~~~~~~~~ + +* Pika (`version 1.3.0+ `_) +* RabbitMQ (`version 3.12.0+ `_) + +Note: Pika should be automatically installed when installing Rmqtools + +Rmqtools is not currently available for download with PyPI. Once it is +available, it can be installed via pip:: + + pip install rmqtools + +To install the latest version before Rmqtools is available on PyPI, use the +latest_ release on GitHub and download ``rmqtools--py3-none-any.whl``. +Then install the wheel file with pip:: + + pip install rmqtools--py3-none-any.whl + +The prerelease version is currently available on the PyPI test site. It can +be installed via pip:: + + pip install -i https://test.pypi.org/simple/ rmqtools + Documentation ------------- Documentation coming soon. @@ -115,3 +142,6 @@ Exposed Classes handler, defaults to ``[]``; operates like ``*args`` - ``kwargs : dict`` - a dictionary of keyword arguments to pass to the response handler, defaults to ``{}``; operates like ``**kwargs`` + +.. aliases below here +.. _latest: https://github.com/ASTHROS/rmqtools/releases/latest diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..d4bb2cb --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..9f25144 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,40 @@ +# Configuration file for the Sphinx documentation builder. +import sys +sys.path.insert(0, '../') +# needs_sphinx = '1.0' + +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.viewcode', + 'sphinx.ext.intersphinx', + 'sphinx.ext.napoleon', +] + +intersphinx_mapping = { + 'python': ('https://docs.python.org/3/', + 'https://docs.python.org/3/objects.inv'), + 'pika': ('https://pika.readthedocs.io/en/stable/', + 'https://pika.readthedocs.io/en/stable/objects.inv'), +} + +templates_path = ['_templates'] + +source_suffix = '.rst' +master_doc = 'index' + +project = 'rmqtools' +copyright = '2026, Christian Thompson, Paul Horton' +author = 'Christian Thompson, Paul Horton' +release = '1.0.0' +version = '.'.join(release.split('.')[0:2]) + +exclude_patterns = ['_build'] +add_function_parenthesis = True +add_module_names = True +show_authors = True +pygments_style = 'sphinx' +modindex_common_prefix = ['rmqtools'] +html_theme = 'default' +html_static_path = ['_static'] + +# autoclass_content = 'both' diff --git a/docs/contributors.rst b/docs/contributors.rst new file mode 100644 index 0000000..e38dc6d --- /dev/null +++ b/docs/contributors.rst @@ -0,0 +1,9 @@ +Contributors +============ +The following people have directly contributed code by way of new features +and/or bug fixes to Rmqtools: + + - Christian Thompson + - Paul Horton + +*Contributors listed by commit count.* diff --git a/docs/examples.rst b/docs/examples.rst new file mode 100644 index 0000000..082e1ab --- /dev/null +++ b/docs/examples.rst @@ -0,0 +1,14 @@ +Usage Examples +============== + +Examples coming soon! +~~~~~~~~~~~~~~~~~~~~~ + +See ``examples/`` in the GitHub repository for now. + +.. Rmqtools has various methods of use, between the high-level RmqConnection class +.. and the various lower-level classes. The following examples illustrate +.. different ways the RmqConnection class can be used to handle common use cases. + +.. These examples, on the other hand, illustrate some ways in which the +.. lower-level API can be used for more specific use cases. diff --git a/docs/faq.rst b/docs/faq.rst new file mode 100644 index 0000000..5c74876 --- /dev/null +++ b/docs/faq.rst @@ -0,0 +1,22 @@ +Frequently Asked Questions +========================== + +- Is Rmqtools threaded? + + The high-level API in the Rmqtools library, ``RmqConnection`` contains + mostly threaded methods, while the underlying classes are not threaded, + but can be threaded if desired. The important thing to remember when + threading is that each thread must have a unique connection, created in + that thread. See :doc:`/modules/rmq` for more details. + +- How do I report a bug with Rmqtools? + + The `main Rmqtools repository `_ + is hosted on `GitHub `_ and we use the + `Issue tracker `_ to + handle bug reports. + +- How can I contribute to Rmqtools? + + You can fork the project on GitHub and issue Pull Requests when you believe + you have something solid to be added to the main repository. diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..e52c554 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,62 @@ +.. rmqtools documentation master file, created by + sphinx-quickstart on Thu Jun 29 13:22:51 2023. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to Rmqtools! +==================== +Rmqtools provides enhanced features for RabbitMQ development in Python. It is +based on the Pika_ library. + +If you have not developed with Rmqtools, Pika, or RabbitMQ before, the +:doc:`intro` documentation is a good place to get started. + +Installing Rmqtools +------------------- + +Prerequisites +~~~~~~~~~~~~~ + +* Pika (`version 1.3.0+ `_) +* RabbitMQ (`version 3.12.0+ `_) + +Note: Pika should be automatically installed when installing Rmqtools + +Rmqtools is not currently available for download with PyPI. Once it is +available, it can be installed via pip:: + + pip install rmqtools + +To install the latest version before Rmqtools is available on PyPI, use the +latest_ release on GitHub and download ``rmqtools--py3-none-any.whl``. +Then install the wheel file with pip:: + + pip install rmqtools--py3-none-any.whl + +To install directly from source, run "python setup.py install" in the root +source directory. + +Using Rmqtools +-------------- +.. toctree:: + :glob: + :maxdepth: 2 + + intro + modules/index + examples + faq + contributors + + +Indices and tables +------------------ + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` + +.. aliases below here +.. _Pika: https://pika.readthedocs.io +.. _latest: https://github.com/ASTHROS/rmqtools/releases/latest +.. |repo_base| replace:: https://github.com/ASTHROS/rmqtools diff --git a/docs/intro.rst b/docs/intro.rst new file mode 100644 index 0000000..fe9362d --- /dev/null +++ b/docs/intro.rst @@ -0,0 +1,65 @@ +Introduction to Rmqtools +======================== +This introduction guide will cover the basics of using RabbitMQ and Rmqtools. +For more detailed information about the underlying Pika library, please read +`Pika's documentation `_. For more +information about the underlying RabbitMQ messaging system, please visit +`RabbitMQ's main site `_. + +High-level API +-------------- +The high-level API consists of the RmqConnection class and the ResponseObject +structure. The high-level API implements wrappers for several common RabbitMQ +use cases. All methods in the high-level API are threaded, so as to make sure +that Pika's :class:`~pika.adapters.blocking_connection.BlockingConnection` +doesn't interfere with other connections. Essentially, the threading allows +for multiple concurrent producers and consumers to run in the same program. +Detailed documentation for the high-level API can be found at +:doc:`/modules/rmq`, and additional examples can be found in :doc:`/examples`. + +Example of a publisher that publishes log messages once per second:: + + import rmqtools + + rmq = rmqtools.RmqConnection(host='rabbit-1') + rmq.set_status_exchange('logs') + + @rmq.publish_status(1, 'device.1.status') + def send_status(): + status = 'running' + msg = {'status': status} + return msg + + rmq.run() + +Example of a subscriber that consumes those log messages:: + + import rmqtools + import json + + rmq = rmqtools.RmqConnection(host='rabbit-1') + rmq.set_status_exchange('logs') + + @rmq.subscribe_status('device_logs', 'device.*.status') + def handle_response(channel, method, props, body): + try: + data = json.loads(body) + except: + data = {'status': 'down'} + print(data.get('status')) + + rmq.run() + +Notice how each of these examples end in ``rmq.run()``. This is not a +coincidence, but rather a requirement. Any program that uses the high-level +API must always use the :py:meth:`RmqConnection.run` method at the end to +initiate the program. The method decorators in :class:`RmqConnection` create +threads, and the :py:meth:`RmqConnection.run` method starts all the threads +and waits for either user input or a system interrupt to stop the threads. + +Low-level API +------------- +Coming soon! + +More examples coming soon! +~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..954237b --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "" goto help + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/modules/connection.rst b/docs/modules/connection.rst new file mode 100644 index 0000000..fa01ee6 --- /dev/null +++ b/docs/modules/connection.rst @@ -0,0 +1,13 @@ +Connection +========== +.. automodule:: rmqtools.connection + +Connection +---------- +.. autoclass:: rmqtools.Connection + :members: + :member-order: bysource + +.. autoclass:: rmqtools.ResponseObject + :members: + :member-order: bysource diff --git a/docs/modules/index.rst b/docs/modules/index.rst new file mode 100644 index 0000000..4e86a78 --- /dev/null +++ b/docs/modules/index.rst @@ -0,0 +1,12 @@ +Core Class and Module Documentation +=================================== + +.. toctree:: + :glob: + :maxdepth: 1 + + rmq + rpc + publisher + subscriber + connection diff --git a/docs/modules/publisher.rst b/docs/modules/publisher.rst new file mode 100644 index 0000000..71bf033 --- /dev/null +++ b/docs/modules/publisher.rst @@ -0,0 +1,9 @@ +Publisher +========= +.. automodule:: rmqtools.publisher + +Publisher +--------- +.. autoclass:: rmqtools.Publisher + :members: + :member-order: bysource diff --git a/docs/modules/rmq.rst b/docs/modules/rmq.rst new file mode 100644 index 0000000..9b7a430 --- /dev/null +++ b/docs/modules/rmq.rst @@ -0,0 +1,10 @@ +RMQ Connection +============== +.. automodule:: rmqtools.rmq + +RmqConnection +------------- +.. autoclass:: rmqtools.RmqConnection + :members: + :private-members: + :member-order: bysource diff --git a/docs/modules/rpc.rst b/docs/modules/rpc.rst new file mode 100644 index 0000000..b33ada9 --- /dev/null +++ b/docs/modules/rpc.rst @@ -0,0 +1,15 @@ +RPC +=== +.. automodule:: rmqtools.rpc + +RpcClient +--------- +.. autoclass:: rmqtools.RpcClient + :members: + :member-order: bysource + +RpcServer +--------- +.. autoclass:: rmqtools.RpcServer + :members: + :member-order: bysource diff --git a/docs/modules/subscriber.rst b/docs/modules/subscriber.rst new file mode 100644 index 0000000..b7a6b4a --- /dev/null +++ b/docs/modules/subscriber.rst @@ -0,0 +1,9 @@ +Subscriber +========== +.. automodule:: rmqtools.subscriber + +Subscriber +---------- +.. autoclass:: rmqtools.Subscriber + :members: + :member-order: bysource diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..65cd837 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1 @@ +pika>=1.3.0 diff --git a/examples/multi_status_publisher.py b/examples/multi_status_publisher.py index d977cda..8d221b7 100644 --- a/examples/multi_status_publisher.py +++ b/examples/multi_status_publisher.py @@ -25,7 +25,7 @@ def send_status(device_id): print(f"[{now.isoformat()}] Total status messages sent: {total}", "\n") return msg -rmq = RmqConnection(host='rabbit-1') +rmq = RmqConnection(host='rabbit') rmq.set_status_exchange('logs') for i in range(16): diff --git a/examples/receive_command.py b/examples/receive_command.py index 216e7c8..823b89a 100644 --- a/examples/receive_command.py +++ b/examples/receive_command.py @@ -1,6 +1,6 @@ from rmqtools import RmqConnection, ResponseObject -rmq = RmqConnection(host='rabbit-3') +rmq = RmqConnection(host='rabbit') rmq.set_command_exchange('spec-command') @rmq.handle_command('device_command') diff --git a/examples/send_command.py b/examples/send_command.py index 44b1f8b..261f6fd 100644 --- a/examples/send_command.py +++ b/examples/send_command.py @@ -1,20 +1,21 @@ from rmqtools import RmqConnection, ResponseObject -rmq = RmqConnection(host='rabbit-1') -rmq.set_command_exchange('spec-command') +rmq = RmqConnection(host='rabbit') +rmq.set_command_exchange('test_exg') @rmq.send_command('start_device', 'device_command') def start() -> ResponseObject: - device_num = 1 - command = 'start' - args = [device_num] - kwargs = {'command': command} - obj = ResponseObject(args, kwargs) - print('test') + spec_id = 'spec1' + command = 'stop' + kwargs = {'spec_id': spec_id, 'command': command} + obj = ResponseObject(kwargs=kwargs) return obj @rmq.handle_response('start_device') -def print_status(device_id, status=''): - print(f"Device {device_id} status: {status}") +def print_status(success='', error=''): + if error: + print(f"[ERROR] {error}") + else: + print(f"Device status: {success}") rmq.run() diff --git a/examples/status_publisher.py b/examples/status_publisher.py index 3ce0114..e0c3988 100644 --- a/examples/status_publisher.py +++ b/examples/status_publisher.py @@ -1,7 +1,7 @@ from rmqtools import RmqConnection -rmq = RmqConnection(host='rabbit-1') +rmq = RmqConnection(host='rabbit') rmq.set_status_exchange('status') def get_status(device_id): diff --git a/examples/status_subscriber.py b/examples/status_subscriber.py index 8108641..f601e93 100644 --- a/examples/status_subscriber.py +++ b/examples/status_subscriber.py @@ -2,54 +2,17 @@ import json from rmqtools import RmqConnection -from pika.adapters.blocking_connection import BlockingChannel -rmq = RmqConnection(host='rabbit-3') -rmq.set_status_exchange('logs') +rmq = RmqConnection(host='rabbit') +rmq.set_status_exchange('status') response_count = {'ok': 0, 'down': 0} response_count_2 = {'ok': 0, 'down': 0} msg_times = [datetime.now()] msg_times_2 = [datetime.now()] -@rmq.subscribe_status('device_logs', ['device.*.status']) -def handle_response(channel:BlockingChannel, method, properties, body): - try: - data = json.loads(body) - except: - data = {'status': 'down'} - ok_count = response_count.get('ok') - down_count = response_count.get('down') - if data.get('status') == 'ok': - ok_count = ok_count + 1 - else: - down_count = down_count + 1 - response_count.update(ok=ok_count, down=down_count) - - total = sum(response_count.values()) - now = datetime.now() - if (now - msg_times[-1]).seconds >= 10: - msg_times.append(now) - print(f"(1) [{now.isoformat()}] Total status messages received: {total}", "\n") - -@rmq.subscribe_status('device_logs_2', ['device.*.status']) -def handle_response_2(channel:BlockingChannel, method, properties, body): - try: - data = json.loads(body) - except: - data = {'status': 'down'} - ok_count = response_count_2.get('ok') - down_count = response_count_2.get('down') - if data.get('status') == 'ok': - ok_count = ok_count + 1 - else: - down_count = down_count + 1 - response_count_2.update(ok=ok_count, down=down_count) - - total = sum(response_count_2.values()) - now = datetime.now() - if (now - msg_times_2[-1]).seconds >= 10: - msg_times_2.append(now) - print(f"(2) [{now.isoformat()}] Total status messages received: {total}", "\n") +@rmq.subscribe_status('device_logs', ['spec.*.status']) +def handle_response(channel, method, properties, body): + print(body) rmq.run() diff --git a/pyproject.toml b/pyproject.toml index 50d8580..7d0b9e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,10 +4,10 @@ build-backend = "setuptools.build_meta" [project] name = "rmqtools" -version = "0.1.0" +version = "1.0.0" authors = [ - { name="Christian Thompson", email="cjthom43@asu.edu" }, - { name="Paul Horton", email="pahorton@asu.edu" }, + { name="Christian Thompson", email="christian.thompson@mail.utoronto.ca" }, + { name="Paul Horton", email="paul_horton@uml.edu" }, ] description = "Wrappers for RabbitMQ development in Python. Based on the pika library." readme = "README.md" @@ -22,5 +22,5 @@ dependencies = [ ] [project.urls] -"Homepage" = "https://github.com/217690thompson/rmqtools" -"Bug Tracker" = "https://github.com/217690thompson/rmqtools/issues" +"Homepage" = "https://github.com/ASTHROS/rmqtools" +"Bug Tracker" = "https://github.com/ASTHROS/rmqtools/issues" diff --git a/rmqtools/__init__.py b/rmqtools/__init__.py index 2dc4577..7f9db24 100644 --- a/rmqtools/__init__.py +++ b/rmqtools/__init__.py @@ -1,15 +1,29 @@ -__version__ = '0.1.0' +""" +Rmqtools +======== + +Provides wrappers on top of the Pika library for easy RabbitMQ development in +Python. + +How to use the documentation +---------------------------- +Documentation is available in two forms: docstrings provided with the code, +and an API reference, available on Read the Docs at ... +""" + +__version__ = '1.0.0' import logging # suppress logging warnings while importing rabbitmq-tools logging.getLogger(__name__).addHandler(logging.NullHandler()) +from rmqtools.exceptions import RmqError + from rmqtools.connection import ResponseObject from rmqtools.connection import Connection from rmqtools.publisher import Publisher from rmqtools.subscriber import Subscriber -from rmqtools.rpc_client import RpcClient -from rmqtools.rpc_server import RpcServer +from rmqtools.rpc import RpcClient, RpcServer from rmqtools.rmq import RmqConnection diff --git a/rmqtools/connection.py b/rmqtools/connection.py index 664fa58..2683738 100644 --- a/rmqtools/connection.py +++ b/rmqtools/connection.py @@ -1,16 +1,28 @@ """Core connection objects""" from dataclasses import dataclass, field -from typing import Any, List, Literal, Dict, Union +from typing import Any, List, Union import pika -from pika import DeliveryMode from pika.channel import Channel from pika.exchange_type import ExchangeType @dataclass class ResponseObject: + """The dataclass used by RPC calls to pass information to callback + functions and response handlers. All RPC request methods and worker + functions in RPC calls must return a ResponseObject. + + Parameters + ---------- + args : list, optional + A list of positional arguments to pass to the response/request handler, + defaults to []. + kwargs : dict, optional + A dictionary of keyword arguments to pass to the response/request + handler, defaults to {}. + """ args: list = field(default_factory=list) kwargs: dict = field(default_factory=dict) @@ -120,7 +132,6 @@ def delete(cls, channel:Channel, name:str, **kwargs): name : str The name of the exchange kwargs : optional - Optional values to pass with the exchange_delete call: if_unused : bool Only delete if the exchange is unused callback : callable @@ -132,8 +143,8 @@ def delete(cls, channel:Channel, name:str, **kwargs): class Exchanges(): """Encapsulates a list of Exchange objects. Additionally, provides methods for interacting with the list of exchanges, such as implementing - dictionary-style `.get()` and `.remove()` methods, as well as list-style - `+=` and `.append()` methods. Allows for the storage of information + dictionary-style ``.get()`` and ``.remove()`` methods, as well as list-style + ``+=`` and ``.append()`` methods. Allows for the storage of information about all the exchanges involved in a RabbitMQ operation. """ @@ -172,7 +183,7 @@ def __getitem__(self, index:str) -> Exchange: raise KeyError(f"Exchange '{index}' not found in list of exchanges!") def get(self, index:str, default=None) -> Union[Exchange, Any]: - """A dictionary-style `.get(index, [default])` for Exchanges + """A dictionary-style ``.get(index, [default])`` for Exchanges Parameters ---------- @@ -184,7 +195,7 @@ def get(self, index:str, default=None) -> Union[Exchange, Any]: Returns ------- Exchange | Any - Exchange such that `Exchange.name == index`, else returns default + Exchange such that ``Exchange.name == index``, else returns default """ try: return self[index] @@ -210,7 +221,7 @@ def append(self, exchange:Exchange) -> None: self.exchanges.append(exchange) def __iadd__(self, obj:Union["Exchanges", Exchange, List[Exchange]]) -> "Exchanges": - """Implements the `+=` operator for Exchanges + """Implements the ``+=`` operator for Exchanges Parameters ---------- @@ -240,16 +251,18 @@ def __iadd__(self, obj:Union["Exchanges", Exchange, List[Exchange]]) -> "Exchang raise TypeError(f"Cannot merge '{type(obj)}' into Exchanges") return self - def remove(self, name:str) -> None: + def remove(self, name:str, if_unused=False) -> None: """Remove an exchange from the list of exchanges Parameters ---------- name : str The name of the exchange to remove + if_unused : bool + If True, only delete the exchange if it is unused, by default False """ exchange = self[name] - exchange._delete() + exchange._delete(if_unused=if_unused) self.exchanges.remove(name) @@ -301,7 +314,7 @@ def _bind(self, routing_key:str, **kwargs) -> None: Parameters ---------- routing_key : str - A RabbitMQ routing key (e.g. `device.*.status`) + A RabbitMQ routing key (e.g. ``device.*.status``) """ binding_kwargs = self.binding_kwargs binding_kwargs.update(kwargs) @@ -314,7 +327,7 @@ def _delete(self, **kwargs) -> None: @classmethod def declare(cls, channel:Channel, name:str, **kwargs) -> None: - """Declare a queue on `channel` with name `name` + """Declare a queue on ``channel`` with name ``name`` Parameters ---------- @@ -337,7 +350,7 @@ def bind(cls, channel:Channel, name:str, routing_key:str, exchange='', name : str The name of the queue routing_key : str - A RabbitMQ routing key (e.g. `device.*.status`) + A RabbitMQ routing key (e.g. ``device.*.status``) exchange : str, optional The exchange to bind the queue on, leave blank to bind to the default queue, by default '' @@ -362,8 +375,8 @@ def delete(cls, channel:Channel, name:str, **kwargs) -> None: class Queues(): """Encapsulates a list of Queue objects. Additionally, provides methods for interacting with the list of queues, such as implementing - dictionary-style `.get()` and `.remove()` methods, as well as list-style - `+=` and `.append()` methods. Allows for the storage of information + dictionary-style ``.get()`` and ``.remove()`` methods, as well as list-style + ``+=`` and ``.append()`` methods. Allows for the storage of information about all the queues involved in a RabbitMQ operation.""" def __init__(self, queues:List[Queue]=[]) -> None: @@ -401,7 +414,7 @@ def __getitem__(self, index:str) -> Queue: raise KeyError(f"Queue '{index}' not found in list of queues!") def get(self, index:str, default=None) -> Union[Queue, Any]: - """A dictionary-style `.get(index, [default])` for Queues + """A dictionary-style ``.get(index, [default])`` for Queues Parameters ---------- @@ -413,7 +426,7 @@ def get(self, index:str, default=None) -> Union[Queue, Any]: Returns ------- Queue | Any - Queue such that `Queue.name == index`, else returns default + Queue such that ``Queue.name == index``, else returns default """ try: return self[index] @@ -439,7 +452,7 @@ def append(self, queue:Queue) -> None: self.queues.append(queue) def __iadd__(self, obj:Union["Queues", Queue, List[Queue]]) -> "Queues": - """Implements the `+=` operator for Queues + """Implements the ``+=`` operator for Queues Parameters ---------- @@ -483,20 +496,113 @@ def remove(self, name:str) -> None: class Connection(): + """The core class that implements communication with RabbitMQ through Pika. + This class is used by most of the other classes in rmqtools as the base + method of interacting with the RabbitMQ server. + + Parameters + ---------- + username : str, optional + The username to use for logging into the RabbitMQ server, by + default 'guest'. It is not necessary, but recommended to create + a user other than the default 'guest' to use for server + authentication. + password : str, optional + The password to use for logging into the RabbitMQ server, by + default 'guest'. + host : str, optional + The host of the RabbitMQ server, by default 'localhost'. This can + either be an address (e.g. '192.168.0.1') or a host name (e.g. + 'rabbit-1'). Using a host name is prefered to using an address. + Only host with 'localhost' if all connections are coming from the + same machine as the RabbitMQ server. + port : int, optional + The port to connect over, by default 5672. This is almost always + 5672, unless there are multiple RabbitMQ servers running on the + same host, which is discouraged. + autoconnect : bool, optional + Whether to connect to the RabbitMQ server and open a channel + automatically, by default True. + + Attributes + ---------- + username : str + The username used to log into the server. + password : str + The password used to log into the server. + host : str + The name or address of the host server for this connection ('localhost' + can be used single machine setups). Names (e.g. 'rabbit-1') are + prefered to addresses (e.g. '192.168.0.1'). + port : int + The port to connect to the server on (usually 5672). + connection : pika.BlockingConnection | pika.BaseConnection | pika.SelectConnection + The actual Pika connection object. + channel : pika.channel.Channel | pika.BlockingChannel + The Pika channel that RabbitMQ requests are sent on. + exchanges : rmqtools.connection.Exchanges + The object that contains all of the rmqtools.connection.Exchange + objects associated with this connection. Each Exchange object contains + information about exchanges created on the RabbitMQ server using this + connection. + queues : rmqtools.connection.Queues + The object that contains all of the rmqtools.connection.Queue + objects associated with this connection. Each Queue object contains + information about a queue created on the RabbitMQ server using this + connection. + credentials : pika.PlainCredentials + The credentials used to log into the RabbitMQ server. Uses username + and password attributes. + parameters : pika.ConnectionParameters + The parameters used when connecting to the RabbitMQ server. Uses the + credentials, host, and port attributes. + """ def __init__(self, username='guest', password='guest', host='localhost', port=5672, autoconnect=True) -> None: + """Connection initialization requires a username, password, host, and + port, but has default values for all of these. If autoconnect is true, + the Connection object will automatically connect to the RabbitMQ + server. Otherwise, the user must call Connection.connect() and + Connection.set_channel() after initializing the Connection object. + + Parameters + ---------- + username : str, optional + The username to use for logging into the RabbitMQ server, by + default 'guest'. It is not necessary, but recommended to create + a user other than the default 'guest' to use for server + authentication. + password : str, optional + The password to use for logging into the RabbitMQ server, by + default 'guest'. + host : str, optional + The host of the RabbitMQ server, by default 'localhost'. This can + either be an address (e.g. '192.168.0.1') or a host name (e.g. + 'rabbit-1'). Using a host name is prefered to using an address. + Only host with 'localhost' if all connections are coming from the + same machine as the RabbitMQ server. + port : int, optional + The port to connect over, by default 5672. This is almost always + 5672, unless there are multiple RabbitMQ servers running on the + same host, which is discouraged. + autoconnect : bool, optional + Whether to connect to the RabbitMQ server and open a channel + automatically, by default True. + """ self.username = username self.password = password self.host = host self.port = port + self.credentials = None self._set_credentials() + self.parameters = None self._set_parameters() self.connection = None self.channel = None if autoconnect: - self._connect() - self._set_channel() + self.connect() + self.set_channel() self.exchanges = Exchanges() self.queues = Queues() @@ -519,9 +625,31 @@ def _set_parameters(self, host:str=None, port:int=None, heartbeat=60) self.parameters = parameters - def _connect(self, connection_type='blocking', + def connect(self, connection_type='blocking', parameters:pika.ConnectionParameters=None, **kwargs) -> None: + """Creates a connection on the RabbitMQ server. + + Stores the connection in the ``connection`` attribute. + + - If ``connection_type`` is 'blocking', creates a BlockingConnection. + - If ``connection_type`` is 'base', creates a BaseConnection + - If ``connection_type`` is 'select', creates a SelectConnection + + Parameters + ---------- + connection_type : str, optional + The connection type, options are 'blocking', 'base', and 'select', + by default 'blocking' + parameters : pika.ConnectionParameters, optional + Any additional connection parameters (see pika.BlockingConnection + documentation for details), by default None + + Raises + ------ + ValueError + If an invalid connection type is given. + """ parameters = parameters if parameters else self.parameters allowed_connection_types = ['blocking', 'base', 'select'] if connection_type not in allowed_connection_types: @@ -537,8 +665,24 @@ def _connect(self, connection_type='blocking', raise ValueError(f"Wrong connection type: {connection_type}") self.connection = connection - def _set_channel(self, channel_number:int=None, + def set_channel(self, channel_number:int=None, on_open_callback:callable=None) -> None: + """Creates a channel on the RabbitMQ server using the ``connection`` + attribute and sets the created channel to the ``channel`` attribute. + + Parameters + ---------- + channel_number : int, optional + An optional numerical identifier for the channel, by default None + on_open_callback : callable, optional + An optional callback method to call on channel creation, + by default None + + Raises + ------ + ValueError + If the ``connection`` attribute is not set or is an invalid type. + """ if isinstance(self.connection, pika.BlockingConnection): channel = self.connection.channel(channel_number=channel_number) elif isinstance(self.connection, pika.SelectConnection): @@ -556,26 +700,60 @@ def _set_channel(self, channel_number:int=None, def exchange_declare(self, name:str, exchange_type:ExchangeType, **kwargs) -> None: + """Declares an exchange on the RabbitMQ server. + + Creates the exchange using the rmqtools.connection.Exchange class, + allowing for the storage of information about the exchange. Adds the + created exchange to the list of exchanges (rmqtools.connection. + Exchanges) for ease of access later. + + Parameters + ---------- + name : str + The name of the exchange + exchange_type : ExchangeType + A pika.ExchangeType, e.g. pika.ExchangeType.topic + """ e = Exchange(self.channel, name, exchange_type, **kwargs) try: self.exchanges += e except ValueError as err: - # log that exchange already exists so skipping the creation of it - # print(f"Exchange '{name}' already exists on this channel. " - # "Skipping creation, instead verifying the exchange.") + # exchange already exists, but that is fine pass def exchange_delete(self, name:str, if_unused=False) -> None: + """Deletes the given exchange on the RabbitMQ server and removes it + from the ``exchanges`` attribute. + + Parameters + ---------- + name : str + The name of the exchange to delete. + if_unused : bool, optional + If True, only delete the exchange if it is unused, by default False + """ self.exchanges.remove(name, if_unused=if_unused) def queue_declare(self, name='', binding_keys=[], binding_kwargs={}) -> None: + """Declares a queue on the RabbitMQ server using the ``channel`` + attribute as the channel to declare the queue on. + + Parameters + ---------- + name : str, optional + The name of the queue, by default '' + binding_keys : list, optional + An optional list of routing keys to bind the queue to, + by default [] + binding_kwargs : dict, optional + An optional dictionary of keyword arguments to pass to the queue + binding function (see pika.queue_bind for details), by default {} + """ q = Queue(self.channel, name=name, binding_keys=binding_keys, binding_kwargs=binding_kwargs) try: self.queues += q except ValueError as err: - # log that queue already exists so skipping the creation of it - # print(f"Queue '{name}' already exists on this channel. " - # "Skipping creation, instead verifying the queue.") + # this queue already exists, but we can reuse it pass diff --git a/rmqtools/exceptions.py b/rmqtools/exceptions.py new file mode 100644 index 0000000..ef39cfd --- /dev/null +++ b/rmqtools/exceptions.py @@ -0,0 +1,6 @@ +"""Implements custom error classes for rmqtools.""" + + +class RmqError(Exception): + """Common base class for all rmqtools errors""" + pass diff --git a/rmqtools/publisher.py b/rmqtools/publisher.py index 5f6a21b..5523d3b 100644 --- a/rmqtools/publisher.py +++ b/rmqtools/publisher.py @@ -1,20 +1,67 @@ """Tools for a publisher connection""" import json -import logging -from typing import Literal +from typing import Any, Dict, List, Literal, Union import pika from pika.exchange_type import ExchangeType -from pika.delivery_mode import DeliveryMode - from rmqtools import Connection class Publisher(): + """Creates a publisher object that has methods to easily publish messages + to the RabbitMQ server. This class is used by the higher-level + RmqConnection class. + + Parameters + ---------- + ptype : 'topic' | 'fanout' | 'direct' + The type of publisher. Topic publishers send messages to specific + routing keys. Fanout publishers send messages to all queues on an + exchange. Direct publishers are used for RPC calls, and they publish + to a single queue. The exchange type corresponds to the publisher + type (i.e. direct exchange for direct publisher). + exchange : str, optional + The name of the exchange to use for publishing. Leaving this value + blank uses the default exchange and is not supported for some types + of interactions. + + Attributes + ---------- + ptype : 'fanout' | 'topic' | 'direct' + The type of publisher this is. + etype : pika.ExchangeType + The type of exchange used by this publisher. Corresponds to ptype. + exchange_name : str + The name of the exchange this publisher operates on. + Connection : rmqtools.Connection + The Connection object associated with this publisher. + channel : pika.BlockingChannel + The channel used by this publisher to connect to RabbitMQ. + exchange : rmqtools.connection.Exchange + The Exchange object associated with etype and exchange_name. Allows + for syncing the publisher with the Connection object. + """ def __init__(self, ptype:Literal['topic', 'fanout', 'direct'], exchange='') -> None: + """Creates a publisher object that has methods to easily publish messages + to the RabbitMQ server. This class is used by the higher-level + RmqConnection class. + + Parameters + ---------- + ptype : 'topic' | 'fanout' | 'direct' + The type of publisher. Topic publishers send messages to specific + routing keys. Fanout publishers send messages to all queues on an + exchange. Direct publishers are used for RPC calls, and they publish + to a single queue. The exchange type corresponds to the publisher + type (i.e. direct exchange for direct publisher). + exchange : str, optional + The name of the exchange to use for publishing. Leaving this value + blank uses the default exchange and is not supported for some types + of interactions. + """ if ptype == 'fanout': etype = ExchangeType.fanout elif ptype == 'topic': @@ -28,6 +75,16 @@ def __init__(self, ptype:Literal['topic', 'fanout', 'direct'], self.exchange_name = exchange def connect(self, conn:Connection): + """Connect the publisher to a Connection object, giving it access to + the RabbitMQ server. Also declares the exchange defined in instance + initialization. + + Parameters + ---------- + conn : Connection + The ``rmqtools.Connection`` object to use to connect to RabbitMQ + and link to this publisher. Use a unique Connection per thread. + """ self.Connection = conn self.channel = self.Connection.channel self.Connection.exchange_declare(self.exchange_name, self.etype) @@ -42,13 +99,55 @@ def _get_publish_args(self, routing_key='') -> dict: def publish(self, message:str, routing_key='', properties:pika.BasicProperties=None, mandatory=False) -> None: + """Publish a message on the RabbitMQ server. + + Parameters + ---------- + message : str + The content of the message to send. + routing_key : str, optional + A routing key to publish the message to, by default ''. A blank + routing key is only used for fanout publishers. Direct publishers + will use the name of the queue they are publishing to for the + routing key. For topic publishers, see the RabbitMQ documentation + on routing keys + (https://www.rabbitmq.com/tutorials/tutorial-four-python.html) + properties : pika.BasicProperties, optional + Any additional publish properties to pass to + pika.BlockingChannel.basic_publish, by default None + mandatory : bool, optional + Set the mandatory parameter in pika.BlockingChannel.basic_publish, + by default False + """ publish_args = self._get_publish_args(routing_key=routing_key) self.Connection.channel.basic_publish( **publish_args, body=message, properties=properties, mandatory=mandatory) - def publish_json(self, message, routing_key='', - properties:pika.BasicProperties=None, mandatory=False) -> None: + def publish_json(self, message: Union[Dict[str, Any], List[Any], int, str, + float, bool, None], + routing_key='', properties:pika.BasicProperties=None, + mandatory=False) -> None: + """Publish a JSON object on the RabbitMQ server. + + Parameters + ---------- + message : Dict[str, Any] | List[Any] | int | str | float | bool | None + A JSON-serializable object to send via RabbitMQ. + routing_key : str, optional + A routing key to publish the message to, by default ''. A blank + routing key is only used for fanout publishers. Direct publishers + will use the name of the queue they are publishing to for the + routing key. For topic publishers, see the RabbitMQ documentation + on routing keys + (https://www.rabbitmq.com/tutorials/tutorial-four-python.html) + properties : pika.BasicProperties, optional + Any additional publish properties to pass to + pika.BlockingChannel.basic_publish, by default None + mandatory : bool, optional + Set the mandatory parameter in pika.BlockingChannel.basic_publish, + by default False + """ try: message = json.dumps(message) except TypeError: diff --git a/rmqtools/rmq.py b/rmqtools/rmq.py index f07797f..cf73990 100644 --- a/rmqtools/rmq.py +++ b/rmqtools/rmq.py @@ -1,25 +1,150 @@ -"""Highest level RabbitMQ wrapper""" - +"""Highest level RabbitMQ wrapper. + +Contains high-level wrappers for RabbitMQ, implementing many common +use cases with built-in threading. The RmqConnection object is meant to +be used on a clustered RabbitMQ server with quorum queues. For details +on clustering and quorum queues, please see the RabbitMQ documentation +of both (https://www.rabbitmq.com/clustering.html and +https://www.rabbitmq.com/quorum-queues.html). For a good visualization of +the underlying Raft algorithm that quorum queues use, see +http://thesecretlivesofdata.com/raft/. Most of the functional methods +contained in RmqConnection are meant to be used as method decorators, though +the private methods they employ can be used in a standalone manner. See +the examples section below. +""" + +from datetime import datetime import functools -import logging -import inspect import signal import sys import threading import time -import uuid -from typing import Dict, List, Literal, Tuple, Callable, Any +from typing import Any, Callable, Dict, List, Literal, Tuple import pika -from pika.adapters.blocking_connection import BlockingChannel from pika.exchange_type import ExchangeType -from rmqtools import Connection, Publisher, Subscriber, RpcClient, RpcServer, ResponseObject +from rmqtools import (Connection, Publisher, ResponseObject, RpcClient, + RpcServer, Subscriber, RmqError) class RmqConnection(): + """Contains high-level wrappers for RabbitMQ, implementing many common + use cases with built-in threading. The RmqConnection object is meant to + be used on a clustered RabbitMQ server with quorum queues. Most of the + functional methods contained in this class are meant to be used as method + decorators, though the private methods they employ can be used in a + standalone manner. + + Parameters + ---------- + username : str, optional + The username to use when logging into the RabbitMQ server, defaults to + 'guest'. Sometimes 'guest' login is not allowed, so it is recommended + to set up a different user for connecting. The username here must + correspond to a user with admin privileges. + password : str, optional + The password to use when logging into RabbitMQ, defaults to 'guest'. + host : str, optional + The hostname of the RabbitMQ server, defaults to 'localhost'. Using + localhost to host RabbitMQ is fine for some applications, but it is + not recommended. If using a clustered RabbitMQ server, any of the + cluster hostnames works (e.g. 'rabbit-1' or 'rabbit-2'). This can also + use an IP address intead of a hostname, but hostnames are recommended. + port : int, optional + The port of the RabbitMQ server, defaults to 5672. Most RabbitMQ + servers use port 5672, but some applications may have a different port. + For example, a virtual cluster hosted on one machine may use ports + 5672, 5673, and 5674. + autoconnect : bool, optional + Automatically run the ``connect`` method to sync to the Connection + object, defaults to True. It is recommended to leave autoconnect + enabled, but some use cases may require connecting at a specific time. + If this is required, run the ``connect`` method to perform the connection + tasks. + + Attributes + ---------- + username : str + The username used to authenticate with the RabbitMQ server. + password : str + The password used for RabbitMQ authentication. + host : str + The host (ip or hostname) of the RabbitMQ server (or one of the + clustered servers). + port : int + The port the RabbitMQ server is using (usually 5672). + autoconnect : bool + Automatically run the ``connect`` method on initialization if True. + threads : List[threading.Thread] + A list of the threads contained within the RmqConnection instance. + These threads are started with the ``run`` method and stoped with the + ``stop`` method. + stop_event : threading.Event + The threading Event used to shutdown all the threads running RabbitMQ + processes. + exchanges : Dict[str, Tuple[str, pika.ExchangeType]] + A dictionary containing information about all the exchanges involved + in this RMQ process. The key is the purpose of the exchange (e.g. + 'logs'), and the value is a tuple of the exchange name and the + exchange type. + publishers : Dict[str, Publisher] + A dictionary containing information about all the publishers involved + in this RMQ process. The key is the name of the publisher (e.g. + 'status') and the value is a Publisher object. These are usually auto- + generated within the method decorators and are named with the routing + key they correspond to (e.g. 'device.1.status'). + publish_props : Dict[str, pika.BasicProperties] + A dictionary containing information about all the publish properties + involved in this RMQ process. The key is the name of the Publisher (so + as to map it directly with the ``publishers`` attribute) and the value + is a pika.BasicProperties object. This is used to pass properties into + the Publisher's connection to the server and is usually auto-generated. + subscribers : Dict[str, Subscriber] + A dictionary containing information about all the subscribers involved + in this RMQ process. The key is the name of the subscriber (e.g. + 'device_logs') and the value is a Subscriber object. These are usually + auto-generated within the method decorators used for subscribing. + response_handlers : Dict[str, (Any) -> Any] + A dictionary containing information about all the response handlers + involved in this RMQ process. The key is an identifier and the value + is a callback function to serve as the response handler for a specific + process. These response handlers are used in the RPC processes to + determine which function needs to be called to handle the RPC response. + The identifier is usually related to a command system (e.g. + 'device_command'). + """ def __init__(self, username='guest', password='guest', host='localhost', port=5672, autoconnect=True) -> None: + """Create an instance of the high-level RmqConnection class. + + Parameters + ---------- + username : str, optional + The username to use when logging into the RabbitMQ server, defaults to + 'guest'. Sometimes 'guest' login is not allowed, so it is recommended + to set up a different user for connecting. The username here must + correspond to a user with admin privileges. + password : str, optional + The password to use when logging into RabbitMQ, defaults to 'guest'. + host : str, optional + The hostname of the RabbitMQ server, defaults to 'localhost'. Using + localhost to host RabbitMQ is fine for some applications, but it is + not recommended. If using a clustered RabbitMQ server, any of the + cluster hostnames works (e.g. 'rabbit-1' or 'rabbit-2'). This can also + use an IP address intead of a hostname, but hostnames are recommended. + port : int, optional + The port of the RabbitMQ server, defaults to 5672. Most RabbitMQ + servers use port 5672, but some applications may have a different port. + For example, a virtual cluster hosted on one machine may use ports + 5672, 5673, and 5674. + autoconnect : bool, optional + Automatically run the ``connect`` method to sync to the Connection + object, defaults to True. It is recommended to leave autoconnect + enabled, but some use cases may require connecting at a specific time. + If this is required, run the ``connect`` method to perform the connection + tasks. + """ self.username = username self.password = password self.host = host @@ -45,13 +170,19 @@ def __init__(self, username='guest', password='guest', host='localhost', self.response_handlers: Dict[str, Callable[[Any], Any]] self.response_handlers = {} + self.timeout_handlers: Dict[str, Callable[[int], None]] + self.timeout_handlers = {} + + self.error_handlers: Dict[str, Callable[[Exception], None]] + self.error_handlers = {} + def handle_exit(sig, frame): print('Main thread interrupted by user. ' 'Shutting down all child threads.') self.stop() sys.exit(0) - signal.signal(signal.SIGINT, handle_exit) + #signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGTERM, handle_exit) def _get_connection(self) -> Connection: @@ -60,26 +191,95 @@ def _get_connection(self) -> Connection: return conn def set_status_exchange(self, name:str) -> None: + """Set the exchange used for status messages. + + Parameters + ---------- + name : str + The name of the status exchange. + """ self.exchanges.update({'status': (name, ExchangeType.topic)}) def set_command_exchange(self, name:str) -> None: + """Set the exchange used for commands. + + Parameters + ---------- + name : str + The name of the command exchange. + """ self.exchanges.update({'command': (name, ExchangeType.direct)}) def add_exchange(self, name:str, purpose:str, etype:ExchangeType) -> None: + """Add an exchange. + + Parameters + ---------- + name : str + The name of the exchange. + purpose : str + The purpose of the exchange (e.g. 'logs'). + etype : ExchangeType + They type of the exchange (e.g. pika.ExchangeType.topic for a + topic exchange). + """ self.exchanges.update({purpose: (name, etype)}) - def get_publisher(self, name:str, err=True): + def get_publisher(self, name:str, err=True) -> Publisher: + """Get a Publisher object by name. + + Parameters + ---------- + name : str + The name of the publisher. + err : bool, optional + Raise an error if no publishers with that name are found, by + default True + + Returns + ------- + Publisher + Returns the publisher corresponding to the given name. + + Raises + ------ + ValueError + If err is True and there isn't a publisher with the given name. + """ publisher = self.publishers.get(name, False) if err and not publisher: raise ValueError(f"A publisher with name '{name}' could not be " f"found.") return publisher - def add_publisher(self, name:str, ptype:Literal['topic', 'fanout']='topic', + def add_publisher(self, name:str, + ptype:Literal['topic', 'fanout', 'direct']='topic', exchange=''): - if ptype not in ['topic', 'fanout']: - raise ValueError(f"Publisher type must be either 'topic' or " - f"'fanout', not '{ptype}'") + """Add a publisher. + + See documentation for rmqtools.Publisher for + additional information about publisher creation. + + Parameters + ---------- + name : str + The name of the publisher. + ptype : Literal['topic'] | Literal['fanout'] | Literal['direct'], optional + The type of publisher, either 'topic', 'fanout', or 'direct', by + default 'topic'. + exchange : str, optional + The name of the exchange used by the new publisher, by default ''. + + Raises + ------ + ValueError + If an invalid publisher type is given. + ValueError + If a publisher with the same name already exists. + """ + if ptype not in ['topic', 'fanout', 'direct']: + raise ValueError(f"Publisher type must be either 'topic', " + f"'fanout', or 'direct', not '{ptype}'") publisher = Publisher(ptype, exchange=exchange) pub = self.get_publisher(name, err=False) if pub: @@ -89,11 +289,41 @@ def add_publisher(self, name:str, ptype:Literal['topic', 'fanout']='topic', def set_publish_props(self, publisher_name:str, publish_props:pika.BasicProperties=None) -> None: + """Updates ``publish_props`` with the properties for a publisher. + + Parameters + ---------- + publisher_name : str + The name of the Publisher to set the properties of. + publish_props : pika.BasicProperties, optional + The pika BasicProperties object to set as the publisher properties, + by default None. Most use cases will use None here. + """ # throws error if publisher doesn't exist self.get_publisher(publisher_name) self.publish_props.update({publisher_name: publish_props}) - def get_subscriber(self, name:str, err=True): + def get_subscriber(self, name:str, err=True) -> Subscriber: + """Get a subscriber by name. + + Parameters + ---------- + name : str + The name of the subscriber. + err : bool, optional + Throw an error if no subscribers with that name are found, by + default True. + + Returns + ------- + Subscriber + The subscriber corresponding with the given name. + + Raises + ------ + ValueError + If err is True and no subscriber exists with that name. + """ subscriber = self.subscribers.get(name, False) if err and not subscriber: raise ValueError(f"A subscriber with name '{name}' could not be " @@ -103,6 +333,30 @@ def get_subscriber(self, name:str, err=True): def add_subscriber(self, name:str, queue:str, exchange='', etype:ExchangeType=ExchangeType.topic, routing_keys:List[str]=[]) -> None: + """Add a subscriber. + + See documentation for rmqtools.Subscriber for more information about + subscriber creation. + + Parameters + ---------- + name : str + The name of the subscriber. + queue : str + The name of the queue associated with the subscriber. + exchange : str, optional + The name of the exchange associated wit the subscriber, by + default ''. + etype : ExchangeType, optional + The type of the exchange, by default ExchangeType.topic. + routing_keys : List[str], optional + A list of routing keys to bind to the subscriber, by default []. + + Raises + ------ + ValueError + If a subscriber with the same name already exists. + """ subscriber = Subscriber(queue=queue, exchange=exchange, etype=etype, routing_keys=routing_keys, queue_arguments={'durable': True}) @@ -113,23 +367,57 @@ def add_subscriber(self, name:str, queue:str, exchange='', self.subscribers.update({name: subscriber}) def start(self): + """Start all the threads in the ``threads`` attribute. + """ for thread in self.threads: thread.start() def run(self): + """Run the main IO loop for RmqConnection. + + Starts the threads then waits for user input. Any user input will + trigger the stop command to shutdown all threads. + """ print("Starting all RabbitMQ threads. Press enter at any time to " "shutdown all child threads.") self.start() - input() - print("Quit command received. Shutting down all child threads...") + try: + input() + print("Quit command received. Shutting down all child threads...") + except KeyboardInterrupt as e: + print("Main thread interrupted by user. Shutting down all child " + "threads...") self.stop() def stop(self): + """Safely shutdown all the running threads. + """ self.stop_event.set() for thread in self.threads: thread.join() - def _publish_status(self, func, interval, routing_key, *args, **kwargs): + def _publish_status(self, func:Callable[[Any], Any], interval:float, + routing_key:str, *args, delay:float=0.0, **kwargs): + """Publish a status message periodically. + + This method is not automatically threaded, but it is thread safe. It + checks RmqConnection.stop_event.is_set() to determine if it should + shut down. + + Parameters + ---------- + func : (Any) -> Any + The function that generates the message to be sent via RabbitMQ. + It must return a JSON-serializable object. + interval : float + The time waited between sending each subsequent message. + routing_key : str + The routing key to use when publishing. + args : List[Any], optional + A list of positional arguments to pass to ``func``. + kwargs : Dict[str, Any], optional + A dictionary of keyword arguments to pass to ``func``. + """ conn = self._get_connection() exchange = self.exchanges.get('status', ('', ExchangeType.topic)) exchange_name = exchange[0] @@ -139,13 +427,35 @@ def _publish_status(self, func, interval, routing_key, *args, **kwargs): publisher.connect(conn) props = self.publish_props.get(routing_key) while not self.stop_event.is_set(): + start = datetime.now() + time.sleep(min(delay, interval)) + data = func(*args, **kwargs) # get status data publisher.publish_json(data, routing_key=routing_key, properties=props) - # print(data) - time.sleep(interval) - def publish_status(self, interval, routing_key): + # make sure we don't add extra delay if the data request or publish + # command take a few seconds + stop = datetime.now() + true_delay = (stop - start).total_seconds() + time.sleep(max(interval - true_delay, 0)) + + def publish_status(self, interval:float, routing_key:str, delay:float=0.0): + """A method decorator for publishing status messages periodically. + + This method is automatically threaded. The function it wraps must + return a JSON-serializable object. That return object will then be + sent with RabbitMQ to a subscriber listening to the given routing + key. This method calls the private method ``_publish_status``, which + is also available for more niche use cases. + + Parameters + ---------- + interval : float + The number of seconds to wait between each message. + routing_key : str + The routing key to use when publishing the message. + """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -153,12 +463,38 @@ def wrapper(*args, **kwargs): thread = threading.Thread( target=self._publish_status, args=(wrapper, interval, routing_key), + kwargs={'delay': delay}, ) self.threads.append(thread) return wrapper return decorator - def _subscribe_status(self, callback:Callable, queue, routing_keys) -> None: + def _subscribe_status(self, callback:Callable[[pika.spec.Basic.Deliver, + pika.spec.BasicProperties, + bytes], Any], + queue: str, routing_keys: List[str]) -> None: + """Subscribe to status messages on a set of routing keys. + + This method is not automatically threaded, but is thread safe. It + takes a callback method that must take the arguments ``method``, + ``properties``, and ``body``. See Pika/RabbitMQ documentation for + specifics about callback functions and how these arguments work. + See Rmqtools documentation for quick examples of how this works. + Calls the private method ``_subscribe_status``, which is also + available to use for more niche cases. + + Parameters + ---------- + callback : (Deliver, BasicProperties, bytes) -> Any + The callback method that handles the received message. Must take + a ``method`` argument of type pika.spec.Basic.Deliver, a + ``properties`` argument of type pika.spec.BasicProperties, and a + ``body`` argument of type bytes for the body of the message. + queue : str + The name of the queue to use for subscribing. + routing_keys : List[str] + A list of routing keys to bind the queue to. + """ exchange = self.exchanges.get('status', ('', ExchangeType.topic)) exchange_name = exchange[0] etype = exchange[1] @@ -188,7 +524,24 @@ def _subscribe_status(self, callback:Callable, queue, routing_keys) -> None: for thread in threads: thread.join() - def subscribe_status(self, queue, routing_keys): + def subscribe_status(self, queue: str, routing_keys: List[str]): + """A method decorator for subscribing to a set of routing keys. + + This method is automatically threaded. The function it wraps must + be a callback method that can take the arguments ``method``, + ``properties``, and ``body``. See Pika/RabbitMQ documentation for + specifics about callback functions and how these arguments work. + See Rmqtools documentation for quick examples of how this works. + Calls the private method ``_subscribe_status``, which is also + available to use for more niche cases. + + Parameters + ---------- + queue : str + The name of the queue to use for subscribing. + routing_keys : List[str] + A list of routing keys to bind the queue to. + """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -201,14 +554,51 @@ def wrapper(*args, **kwargs): return wrapper return decorator - def _handle_command(self, worker:Callable[[Any], Any], queue:str) -> None: + def _handle_command(self, worker:Callable[[Any], ResponseObject], + queue:str) -> None: + """Handle a command sent by an RPC client. + + Creates an RPC server with the worker function as the request handler. + This method is not automatically threaded, but it is threadsafe using + the ``stop_event`` defined on the current RmqConnection object. + + Parameters + ---------- + worker : (Any) -> ResponseObject + The request handler that takes the positional and keyword arguments + given by the client's command and processes them. It then needs + to return a ResponseObject with positional and keyword arguments + corresponding to the client's response handler. + queue : str + The name of the queue to use for handling these commands. This + name will be the routing key used by the command sender to route + the command to the correct place. + """ exchange, _ = self.exchanges.get('command') server = RpcServer(exchange, queue) conn = self._get_connection() server.connect(conn) server.serve_threadsafe(worker, self.stop_event) - def handle_command(self, queue:str) -> None: + def handle_command(self, queue:str): + """A method decorator to set the response handler for a command sent + from an RPC client. + + Creates an RPC server to respond to commands from an RPC client. This + method is automatically threaded. The function it wraps can take any + positional and keyword arguments it wants, but those arguments must + be passed by the client that sends the command. The wrapped function + must return a ResponseObject with positional and keyword arguments + matching the response handler of the client. Calls the private method + ``_handle_command``, which is also available for niche cases. + + Parameters + ---------- + queue : str + The name of the queue to use for handling these commands. This + name will be the routing key used by the command sender to route + the command to the correct place. + """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -222,6 +612,21 @@ def wrapper(*args, **kwargs): return decorator def handle_response(self, command_id:str): + """A method decorator to set the response handler of an RPC client. + + This method is not threaded, but it will not cause any IO blocking. + All it does is update the ``response_handlers`` attribute with the + wrapped function. The wrapped function can have any positional or + keyword arguments, but those arguments must be matched by the return + of the associated worker function in the associated RPC server. The + wrapped function does not need to return any values. + + Parameters + ---------- + command_id : str + The identifier of the command this response handler is associated + with. + """ def decorator(func:Callable[[Any], Any]): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -230,23 +635,133 @@ def wrapper(*args, **kwargs): return wrapper return decorator - def _send_command(self, func:Callable[[], ResponseObject], - command_id:str, queue:str) -> None: - def default_handler(): + def handle_timeout(self, command_id:str): + """A method decorator to set the timeout handler of an RPC client. + + This method is not threaded, but it will not cause any IO blocking. + All it does is update the ``timeout_handlers`` attribute with the + wrapped function. The wrapped function should have only one argument + for the timeout length and should not return anything. + + Parameters + ---------- + command_id : str + The identifier of the command this timeout handler is associated + with. + """ + def decorator(func:Callable[[Any], Any]): + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + self.timeout_handlers.update({command_id: func}) + return wrapper + return decorator + + def handle_error(self, command_id:str): + """A method decorator to set the error handler of an RPC client. + + This method is not threaded, but it will not cause any IO blocking. + All it does is update the ``error_handlers`` attribute with the + wrapped function. The wrapped function should have only one argument + for the returned error and should not return anything. + + Parameters + ---------- + command_id : str + The identifier of the command this error handler is associated + with. + """ + def decorator(func:Callable[[Any], Any]): + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + self.error_handlers.update({command_id: func}) + return wrapper + return decorator + + def _send_command(self, func:Callable[[Any], ResponseObject], + command_id:str, queue:str, *args, timeout:int=None, + **kwargs) -> None: + """Sends a command to an associated RPC server. + + This method is not automatically threaded, but it is threadsafe. It + creates an RPC client to send commands to the associated RPC server + and handle the response. This function should be paired with an + associated ``handle_response`` decorator to define the response + handler. If there is no associated handler, the response from the + server will hit the default handler, which does nothing. + + Parameters + ---------- + func : (Any) -> ResponseObject + The function that generates the command data. It can take + positional and keyword arguments that are given by the ``args`` + and ``kwargs`` parameters. It must return a ResponseObject with + positional and keyword arguments corresponding to the arguments + needed by the server's request handler. + command_id : str + The identifier of the command, used to map the ``_send_command`` + call with the associated ``handle_response`` decorator. + queue : str + The queue name of the RPC server where the command is being sent. + timeout : int, optional + How long to wait for a response before timing out. Default behavior + is no timeout. + """ + def default_handler(*a, **kw): pass - command = func() + + def default_error_handler(e:Exception): + raise RmqError("An error occurred when sending the command") + + command = func(*args, **kwargs) response_handler = self.response_handlers.get( command_id, default_handler) + timeout_handler = self.timeout_handlers.get( + command_id, default_handler) + error_handler = self.error_handlers.get( + command_id, default_error_handler) exchange, _ = self.exchanges.get('command') client = RpcClient(exchange) - conn = self._get_connection() - client.connect(conn) - response = client.call_threadsafe(queue, self.stop_event, command) + + try: + conn = self._get_connection() + client.connect(conn) + response = client.call_threadsafe(queue, self.stop_event, command, + timeout=timeout) + except Exception as e: + return error_handler(e) + + if not response: + return timeout_handler(timeout) args = response.args kwargs = response.kwargs - response_handler(*args, **kwargs) - - def send_command(self, command_id:str, queue:str) -> None: + return response_handler(*args, **kwargs) + + def send_command(self, command_id:str, queue:str, timeout:int=None): + """A method decorator to send a command to an RPC server. + + This method is automatically threaded. It creates an RPC client to + send commands to the associated RPC server and handle the response. + This decorator should be paired with a ``handle_response`` decorator + to define the response handler for the given command. If there is no + associated response handler, the response from the server will hit + the default handler, which does nothing. The wrapped function must + return a ResponseObject with the positional and keyword arguments + needed by the RPC server's request handler. The wrapped function + cannot take any arguments. + + Parameters + ---------- + command_id : str + The identifier of the command, used to map the ``send_command`` + decorator with the associated ``handle_response`` decorator. + queue : str + The queue name of the RPC server where the command is being sent. + timeout : int, optional + How long to wait for a response before timing out. Default behavior + is no timeout. + """ def decorator(func:Callable[[], ResponseObject]): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -254,6 +769,7 @@ def wrapper(*args, **kwargs): thread = threading.Thread( target=self._send_command, args=(wrapper, command_id, queue), + kwargs={'timeout': timeout}, ) self.threads.append(thread) return wrapper diff --git a/rmqtools/rpc.py b/rmqtools/rpc.py new file mode 100644 index 0000000..5df2cb1 --- /dev/null +++ b/rmqtools/rpc.py @@ -0,0 +1,387 @@ +import functools +import json +import threading +import uuid +from datetime import datetime, timedelta +from typing import Any, Callable, Union + +import pika +from pika.channel import Channel +from pika.exchange_type import ExchangeType +from pika.spec import Basic +from rmqtools import Connection, Publisher, ResponseObject + + +class RpcServer(): + """Creates a RPC server on the RabbitMQ server for command-response and + distributed worker systems. + + Parameters + ---------- + exchange : str + The name of the exchange this RPC server uses. + queue : str + The name of the queue this RPC server uses. + + Attributes + ---------- + etype : pika.ExchangeType.direct + The exchange type. This is always ExchangeType.direct because RPC + systems use direct exchanges. + exchange_name : str + The name of the exchange this RPC server uses. + queue_name : str + The name of the queue this RPC server uses. + Connection : rmqtools.Connection + The Connection object linked to this RPC server, set by the ``connect`` + method. Use a unique connection for each thread. + channel : pika.BlockingChannel + The channel this RPC server uses for communication with RabbitMQ. + exchange : rmqtools.connection.Exchange + The Exchange object to keep the RPC server's exchange data consistent + with the Connection object. + """ + + def __init__(self, exchange:str, queue:str): + """Creates a RPC server on the RabbitMQ server for command-response and + distributed worker systems. + + Parameters + ---------- + exchange : str + The name of the exchange this RPC server uses. + queue : str + The name of the queue this RPC server uses. + """ + self.etype = ExchangeType.direct + self.exchange_name = exchange + self.queue_name = queue + + def connect(self, conn:Connection): + """Connect the RPC server to a Connection object, giving it access to + the RabbitMQ server. Also declares the exchange and queue defined in + instance initialization, as well as binding the queue to the exchange + with the routing key set to the queue's name. + + Parameters + ---------- + conn : Connection + The ``rmqtools.Connection`` object to use to connect to RabbitMQ + and link to this RPC server. Use a unique Connection per thread. + """ + self.Connection = conn + self.channel = self.Connection.channel + self.Connection.exchange_declare(self.exchange_name, self.etype) + self.exchange = self.Connection.exchanges.get(self.exchange_name) + self.channel.queue_declare(queue=self.queue_name, durable=True) + self.channel.queue_bind(self.queue_name, self.exchange_name, + self.queue_name) + + def on_request(self, worker:Callable[[Any], ResponseObject], ch:Channel, + method:Basic.Deliver, props:pika.BasicProperties, + body:bytes) -> None: + """Called when the client requests data from the server. + + It parses the ResponseObject sent by the client into positional and + keyword arguments to be passed to the worker function. The worker + function responds with a ResponseObject of its own that is then + serialized and published to the client's reply queue. + + Parameters + ---------- + worker : (Any) -> ResponseObject + The worker function that the request will be passed to. Should + be of the form ``def worker(...)`` where the arguments and keyword + arguments passed by the RPC client will fill out the parameters + of the worker function. The worker function should return a + rmqtools.ResponseObject with the arguments and keyword arguments + that will be passed to the client's response handler. + ch : Channel + Filled in automatically by ``basic_consume`` + method : pika.spec.Basic.Deliver + Filled in automatically by ``basic_consume`` + props : pika.BasicProperties + Filled in automatically by ``basic_consume`` + body : bytes + Filled in automatically by ``basic_consume`` + """ + try: + data = json.loads(body) + except: + data = {} + if not isinstance(data, dict): + data = {} + args = data.get('args', []) + kwargs = data.get('kwargs', {}) + + try: + response = worker(*args, **kwargs) + except TypeError: + raise ValueError("Incorrect arguments supplied to worker " + "function!") + + self.channel.basic_publish( + exchange=self.exchange_name, + routing_key=props.reply_to, + properties=pika.BasicProperties( + correlation_id=props.correlation_id), + body=json.dumps(response.__dict__), + ) + self.channel.basic_ack(delivery_tag=method.delivery_tag) + + def serve(self, worker:Callable[[Any], ResponseObject]) -> None: + """Initialize the RPC server to handle requests. + + Starts by defining with ``basic_qos`` that it handles one message at a + time. Then starts consuming using the passed worker function as the + callback for ``basic_consume``. This method will consume indefinitely, + so it is NOT thread safe. + + Parameters + ---------- + worker : (Any) -> ResponseObject + The worker function that the request will be passed to. Should + be of the form ``def worker(...)`` where the arguments and keyword + arguments passed by the RPC client will fill out the parameters + of the worker function. The worker function should return a + rmqtools.ResponseObject with the arguments and keyword arguments + that will be passed to the client's response handler. + """ + self.channel.basic_qos(prefetch_count=1) + callback = functools.partial(self.on_request, worker) + self.channel.basic_consume(queue=self.queue_name, + on_message_callback=callback) + self.channel.start_consuming() + + def serve_threadsafe(self, worker:Callable[[Any], ResponseObject], + stop_event:threading.Event) -> None: + """Similar to the serve method but is safe for use with threading. + + Rather than consuming indefinitely with the ``start_consuming`` method, + uses the ``consume`` method to handle one message at a time, blocking + only for 5 seconds at a time. The 5 second timeout allows for the + stop_event to propagate so it can stop the RPC server. + + Parameters + ---------- + worker : (Any) -> ResponseObject + The worker function that the request will be passed to. Should + be of the form ``def worker(...)`` where the arguments and keyword + arguments passed by the RPC client will fill out the parameters + of the worker function. The worker function should return a + rmqtools.ResponseObject with the arguments and keyword arguments + that will be passed to the client's response handler. + stop_event : threading.Event + The Event used to trigger the shutdown of the RPC server's thread. + """ + self.channel.basic_qos(prefetch_count=1) + + for method, props, body in self.channel.consume( + self.queue_name, inactivity_timeout=5): + if all([method, props, body]): + self.on_request(worker, self.channel, method, props, body) + if stop_event.is_set(): + break + + +class RpcClient(): + """Creates a RPC client on the RabbitMQ server for command-response and + distributed workload applications. + + Parameters + ---------- + exchange : str + The name of the exchange the RPC client and server will operate on. + + Attributes + ---------- + etype : pika.ExchangeType.direct + This is always ExchangeType.direct because RPC systems use direct + exchanges. + exchange_name : str + The name of the exchange used by the RPC client and server. + corr_id : str | None + The correlation id used to match a request to a reply from the server. + This is auto-generted with ``uuid.uuid4()`` when the ``call`` method is + run. + response : ResponseObject | None + The ResponseObject returned by the RPC server that is routed to the + client's request handler. The response attribute is set by the + ``on_response`` method, which is set as the callback function in + ``basic_consume``. + Connection : rmqtools.Connection + The Connection object linked to this RPC client, set by the ``connect`` + method. Use a unique connection for each thread. + channel : pika.BlockingChannel + The channel this RPC client uses for communication with RabbitMQ. + publisher : rmqtools.Publisher + The Publisher object that the client uses to send messages to the + RabbitMQ server. + callback_queue : str + The auto-generated identifier of the response queue given to the RPC + server to reply to. + """ + + def __init__(self, exchange:str): + """Creates a RPC client on the RabbitMQ server for command-response and + distributed workload applications. + + Parameters + ---------- + exchange : str + The name of the exchange the RPC client and server will operate on. + """ + self.etype = ExchangeType.direct + self.exchange_name = exchange + self.corr_id = None + + self.response: Union[ResponseObject, None] + self.response = None + + def connect(self, conn:Connection): + """Connect the RPC client to a Connection object, giving it access to + the RabbitMQ server. It also creates the Publisher object from the + exchange_name given in initialization. It then creates the callback + queue and binds it to the exchange with a routing key that is the + queue's name. Sets up the ``basic_consume`` method on the callback + queue with the callback method of ``on_response`` to set the response + object. + + Parameters + ---------- + conn : Connection + The ``rmqtools.Connection`` object to use to connect to RabbitMQ + and link to this RPC server. Use a unique Connection per thread. + """ + self.Connection = conn + self.channel = self.Connection.channel + self.publisher = Publisher('direct', self.exchange_name) + self.publisher.connect(conn) + + res = self.channel.queue_declare(queue='', exclusive=True) + self.callback_queue = res.method.queue + self.channel.queue_bind(self.callback_queue, self.exchange_name, + self.callback_queue) + + self.channel.basic_consume( + queue=self.callback_queue, + on_message_callback=self.on_response, + auto_ack=True, + ) + + def on_response(self, ch:Channel, method:Basic.Deliver, + props:pika.BasicProperties, body:bytes): + """The callback method that handles the response from the RPC server. + All it does is set the ``response`` attribute to be used by ``call`` and + ``call_threadsafe`` methods. + + Parameters + ---------- + ch : Channel + Filled in automatically by ``basic_consume``, unused + method : pika.spec.Basic.Deliver + Filled in automatically by ``basic_consume``, unused + props : pika.BasicProperties + Filled in automatically by ``basic_consume`` + body : bytes + Filled in automatically by ``basic_consume`` + """ + if self.corr_id == props.correlation_id: + self.response = ResponseObject(**json.loads(body)) + + def _get_publish_props(self): + props = pika.BasicProperties( + reply_to=self.callback_queue, + correlation_id=self.corr_id, + ) + return props + + def call(self, queue:str, command:ResponseObject) -> ResponseObject: + """Send a command to a RPC server and get the response. + + This method uses Pika's ``process_data_events`` method with no time limit + to get the server's response, so it is NOT thread safe. + + Parameters + ---------- + queue : str + The name of the queue to call (the name of the RPC server's queue). + command : ResponseObject + The command/request to send to the RPC server, in the + ResponseObject format. The args and kwargs in the ResponseObject + will be passed into the RPC server's worker function. + + Returns + ------- + ResponseObject + Returns the response of the RPC server's worker function, formatted + with the ResponseObject format, so the args and kwargs can be + passed into a custom response handler. + """ + self.response = None + self.corr_id = str(uuid.uuid4()) + + body = command.__dict__ + props = self._get_publish_props() + self.publisher.publish_json(body, routing_key=queue, + properties=props) + self.Connection.connection.process_data_events(time_limit=None) + return self.response + + def call_threadsafe(self, queue:str, stop_event:threading.Event, + command:ResponseObject, timeout=None) -> ResponseObject: + """Similar to the ``call`` method but requests the server in a thread- + safe manner. + + This method uses Pika's ``process_data_events`` with a timeout of one + second to allow for checking the stop event to make sure the thread + closes when it is supposed to. + + Parameters + ---------- + queue : str + The name of the queue to call (the name of the RPC server's queue). + stop_event : threading.Event + The Event object used to stop the thread this is running on. + command : ResponseObject + The command/request to send to the RPC server, in the + ResponseObject format. The args and kwargs in the ResponseObject + will be passed into the RPC server's worker function. + + Returns + ------- + ResponseObject + Returns the response of the RPC server's worker function, formatted + with the ResponseObject format, so the args and kwargs can be + passed into a custom response handler. + """ + # declare the recipient queue in case the receiver is dead + self.channel.queue_declare(queue=queue, durable=True) + + self.response = None + self.corr_id = str(uuid.uuid4()) + + body = command.__dict__ + props = self._get_publish_props() + self.publisher.publish_json(body, routing_key=queue, + properties=props) + timeout_time = datetime.max + clearqueue = False + if timeout: + timeout_time = datetime.now() + timedelta(seconds=timeout) + try: + while not stop_event.is_set(): + self.Connection.connection.process_data_events(time_limit=1) + if self.response is not None: + break + if datetime.now() > timeout_time: + # command timed out + clearqueue = True + break + except KeyboardInterrupt as e: + clearqueue = True + if clearqueue: + self.Connection.channel.queue_purge(queue) + self.Connection.connection.close() + self.response = None + return self.response diff --git a/rmqtools/rpc_client.py b/rmqtools/rpc_client.py deleted file mode 100644 index a86881f..0000000 --- a/rmqtools/rpc_client.py +++ /dev/null @@ -1,76 +0,0 @@ -import json -import logging -import threading -import uuid -from typing import Any, Dict, List, Literal, Callable, Union - -import pika -from pika.exchange_type import ExchangeType -from rmqtools import Connection, Publisher, ResponseObject - - -class RpcClient(): - - def __init__(self, exchange:str): - self.etype = ExchangeType.direct - self.exchange_name = exchange - self.corr_id = None - - self.response: Union[ResponseObject, None] - self.response = None - - def connect(self, conn:Connection): - self.Connection = conn - self.channel = self.Connection.channel - self.publisher = Publisher('direct', self.exchange_name) - self.publisher.connect(conn) - - res = self.channel.queue_declare(queue='', exclusive=True) - self.callback_queue = res.method.queue - self.channel.queue_bind(self.callback_queue, self.exchange_name, - self.callback_queue) - - self.channel.basic_consume( - queue=self.callback_queue, - on_message_callback=self.on_response, - auto_ack=True, - ) - - def on_response(self, ch, method, props, body): - if self.corr_id == props.correlation_id: - self.response = ResponseObject(**json.loads(body)) - - def _get_publish_props(self): - props = pika.BasicProperties( - reply_to=self.callback_queue, - correlation_id=self.corr_id, - ) - return props - - def call(self, queue:str, command:ResponseObject) -> ResponseObject: - self.response = None - self.corr_id = str(uuid.uuid4()) - - body = ResponseObject.__dict__ - props = self._get_publish_props() - self.publisher.publish_json(body, routing_key=queue, - properties=props) - self.Connection.connection.process_data_events(time_limit=None) - return self.response - - def call_threadsafe(self, queue:str, stop_event:threading.Event, - command:ResponseObject) -> ResponseObject: - self.response = None - self.corr_id = str(uuid.uuid4()) - - body = command.__dict__ - props = self._get_publish_props() - print(queue) - self.publisher.publish_json(body, routing_key=queue, - properties=props) - print('foo') - while not stop_event.is_set(): - self.Connection.connection.process_data_events(time_limit=1) - if self.response is not None: - break - return self.response diff --git a/rmqtools/rpc_server.py b/rmqtools/rpc_server.py deleted file mode 100644 index 856bcb5..0000000 --- a/rmqtools/rpc_server.py +++ /dev/null @@ -1,103 +0,0 @@ -import functools -import json -import logging -import threading -import uuid -from typing import Any, Dict, List, Literal, Callable, Optional - -import pika -from pika.channel import Channel -from pika.spec import Basic -from pika.exchange_type import ExchangeType -from rmqtools import Connection, ResponseObject - - -class RpcServer(): - - def __init__(self, exchange:str, queue:str): - self.etype = ExchangeType.direct - self.exchange_name = exchange - self.queue_name = queue - - def connect(self, conn:Connection): - self.Connection = conn - self.channel = self.Connection.channel - self.Connection.exchange_declare(self.exchange_name, self.etype) - self.exchange = self.Connection.exchanges.get(self.exchange_name) - self.channel.queue_declare(queue=self.queue_name) - self.channel.queue_bind(self.queue_name, self.exchange_name, - self.queue_name) - - def on_request(self, worker:Callable[[Any], ResponseObject], ch:Channel, - method:Basic.Deliver, props:pika.BasicProperties, - body:bytes): - """Called when the client requests data from the server. - Usage: - >>> callback = functools.partial(on_request, worker) - >>> channel.basic_consume(queue='rpc_queue', - on_message_callback=callback) - - Parameters - ---------- - worker : (Any) -> ResponseObject - The worker function that the request will be passed to. Should - be of the form `def worker(...)` where the arguments and keyword - arguments passed by the RPC client will fill out the parameters - of the worker function. The worker function should return a - rmqtools.ResponseObject with the arguments and keyword arguments - that will be passed to the client's response handler. - ch : Channel - Filled in automatically by `basic_consume` - method : Deliver - Filled in automatically by `basic_consume` - props : BasicProperties - Filled in automatically by `basic_consume` - body : bytes - Filled in automatically by `basic_consume` - - Returns - ------- - Any - Returns the response of the worker function - """ - try: - data = json.loads(body) - except: - data = {} - if not isinstance(data, dict): - data = {} - args = data.get('args', []) - kwargs = data.get('kwargs', {}) - - try: - response = worker(*args, **kwargs) - except TypeError: - raise ValueError("Incorrect arguments supplied to worker " - "function!") - - self.channel.basic_publish( - exchange=self.exchange_name, - routing_key=props.reply_to, - properties=pika.BasicProperties( - correlation_id=props.correlation_id), - body=json.dumps(response.__dict__), - ) - self.channel.basic_ack(delivery_tag=method.delivery_tag) - - def serve(self, worker:Callable[[Any], ResponseObject]) -> None: - self.channel.basic_qos(prefetch_count=1) - callback = functools.partial(self.on_request, worker) - self.channel.basic_consume(queue=self.queue_name, - on_message_callback=callback) - self.channel.start_consuming() - - def serve_threadsafe(self, worker:Callable[[Any], ResponseObject], - stop_event:threading.Event) -> None: - self.channel.basic_qos(prefetch_count=1) - - for method, props, body in self.channel.consume( - self.queue_name, inactivity_timeout=5): - if all([method, props, body]): - self.on_request(worker, self.channel, method, props, body) - if stop_event.is_set(): - break diff --git a/rmqtools/subscriber.py b/rmqtools/subscriber.py index e3a4097..6dfba19 100644 --- a/rmqtools/subscriber.py +++ b/rmqtools/subscriber.py @@ -1,19 +1,108 @@ -"""Tools for a publisher connection""" +"""Tools for a subscriber connection""" -import json -import logging -from typing import Any, Dict, List, Literal +from typing import Any, Dict, List -import pika from pika.exchange_type import ExchangeType from rmqtools import Connection class Subscriber(): + """Creates a subscriber object that has methods to easily subscribe to + messages on a specific RabbitMQ routing key. This class is used by the + higher-level RmqConnection class. + + Parameters + ---------- + queue : str, optional + The name of the queue the subscriber will use, defaults to ''. If left + blank, the queue will be given an automatically generated name. If + using a quorum queue, a queue name must be given (it cannot be left + blank). + exchange : str, optional + The name of the exchange the subscriber will use, defaults to ''. If + left blank, the subscriber will use the default exchange. If an + exchange name is given, the etype parameter must also be given. + etype : pika.ExchangeType, optional + The type of the exchange, given as a pika.ExchangeType object, defaults + to None. If exchange is not left blank, this field is required. + routing_keys : List[str], optional + A list of routing keys for the subscriber to listen on, defaults to []. + Routing keys can contain wildcards for subscribers - see RabbitMQ + documentation about routing keys + (https://www.rabbitmq.com/tutorials/tutorial-four-python.html). + quorum : bool, optional + Makes the subscriber use quorum queues, defaults to True. Quorum queues + are useful for clustered RabbitMQ networks. For more information about + clustering and quorum queues, see the RabbitMQ documentation on these + topics (https://www.rabbitmq.com/clustering.html and + https://www.rabbitmq.com/quorum-queues.html). For a visualization of + the underlying Raft algorithm that makes quorum queues work, see + http://thesecretlivesofdata.com/raft/. + queue_arguments : Dict[str, Any], optional + Additional keyword arguments to pass to the pika queue_bind function, + defaults to {}. + + Attributes + ---------- + queue_name : str + The name of the queue the subscriber is using. + exchange_name : str + The name of the exchange the subscriber is using. + routing_keys : List[str] + A list of routing keys the subscriber is listening to. + quorum : bool + Whether the subscriber is using quorum queues. + queue_arguments : Dict[str, Any] + Additional keyword arguments that were passed to the pika queue_bind + function. + Connection : rmqtools.Connection + The Connection object linked to this subscriber, set by the ``connect`` + method. Use a unique connection for each thread. + channel : pika.BlockingChannel + The channel the subscriber uses for communication with RabbitMQ. + exchange : rmqtools.connection.Exchange + The Exchange object to keep the subscriber's exchange data consistent + with the Connection object. + """ def __init__(self, queue='', exchange='', etype:ExchangeType=None, routing_keys:List[str]=[], quorum=True, queue_arguments:Dict[str, Any]={}) -> None: + """Creates a subscriber object that has methods to easily subscribe to + messages on a specific RabbitMQ routing key. This class is used by the + higher-level RmqConnection class. + + Parameters + ---------- + queue : str, optional + The name of the queue the subscriber will use, defaults to ''. If left + blank, the queue will be given an automatically generated name. If + using a quorum queue, a queue name must be given (it cannot be left + blank). + exchange : str, optional + The name of the exchange the subscriber will use, defaults to ''. If + left blank, the subscriber will use the default exchange. If an + exchange name is given, the etype parameter must also be given. + etype : pika.ExchangeType, optional + The type of the exchange, given as a pika.ExchangeType object, defaults + to None. If exchange is not left blank, this field is required. + routing_keys : List[str], optional + A list of routing keys for the subscriber to listen on, defaults to []. + Routing keys can contain wildcards for subscribers - see RabbitMQ + documentation about routing keys + (https://www.rabbitmq.com/tutorials/tutorial-four-python.html). + quorum : bool, optional + Makes the subscriber use quorum queues, defaults to True. Quorum queues + are useful for clustered RabbitMQ networks. For more information about + clustering and quorum queues, see the RabbitMQ documentation on these + topics (https://www.rabbitmq.com/clustering.html and + https://www.rabbitmq.com/quorum-queues.html). For a visualization of + the underlying Raft algorithm that makes quorum queues work, see + http://thesecretlivesofdata.com/raft/. + queue_arguments : Dict[str, Any], optional + Additional keyword arguments to pass to the pika queue_bind function, + defaults to {}. + """ # validate data if quorum and not queue: raise ValueError("Quorum queues must be named explicitly!") @@ -32,6 +121,16 @@ def __init__(self, queue='', exchange='', etype:ExchangeType=None, self.queue_arguments = queue_arguments def connect(self, conn:Connection) -> None: + """Connect the subscriber to a Connection object, giving it access to + the RabbitMQ server. Also declares the exchange defined in instance + initialization. + + Parameters + ---------- + conn : Connection + The ``rmqtools.Connection`` object to use to connect to RabbitMQ + and link to this subscriber. Use a unique Connection per thread. + """ self.Connection = conn self.channel = self.Connection.channel if self.exchange_name and self.etype: @@ -56,6 +155,11 @@ def _get_queue_bind_args(self, routing_key, **kwargs) -> dict: return args def subscribe(self) -> None: + """Links the subscriber to the queue defined in the initialization. + Also binds that queue to the routing keys given in the initialization. + This must be run after the ``connect`` method because it requires the + channel attribute to be set. + """ self.channel.queue_declare(**self._get_queue_declare_args()) for key in self.routing_keys: self.channel.queue_bind(**self._get_queue_bind_args(key))