From 83b9ec4dd4380fac6813e76045cbdf9cf002030f Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Thu, 6 Jul 2023 14:12:18 -0700 Subject: [PATCH 01/11] Version 1.0.0-alpha The alpha release of version 1.0.0 now contains documentation of the core API. The documentation is built with sphinx and formatted to work with readthedocs. It also contains modifications to the API to make some things more concise and fix a few bugs. Version 1.0.0 should be released once there are more examples in place. --- .gitignore | 3 + docs/Makefile | 20 ++ docs/conf.py | 40 +++ docs/contributors.rst | 9 + docs/examples.rst | 11 + docs/faq.rst | 22 ++ docs/index.rst | 62 +++++ docs/intro.rst | 65 +++++ docs/make.bat | 35 +++ docs/modules/connection.rst | 13 + docs/modules/index.rst | 12 + docs/modules/publisher.rst | 9 + docs/modules/rmq.rst | 10 + docs/modules/rpc.rst | 15 ++ docs/modules/subscriber.rst | 9 + rmqtools/__init__.py | 3 +- rmqtools/connection.py | 226 +++++++++++++++-- rmqtools/publisher.py | 111 ++++++++- rmqtools/rmq.py | 477 ++++++++++++++++++++++++++++++++++-- rmqtools/rpc.py | 370 ++++++++++++++++++++++++++++ rmqtools/rpc_client.py | 76 ------ rmqtools/rpc_server.py | 103 -------- rmqtools/subscriber.py | 114 ++++++++- 23 files changed, 1577 insertions(+), 238 deletions(-) create mode 100644 docs/Makefile create mode 100644 docs/conf.py create mode 100644 docs/contributors.rst create mode 100644 docs/examples.rst create mode 100644 docs/faq.rst create mode 100644 docs/index.rst create mode 100644 docs/intro.rst create mode 100644 docs/make.bat create mode 100644 docs/modules/connection.rst create mode 100644 docs/modules/index.rst create mode 100644 docs/modules/publisher.rst create mode 100644 docs/modules/rmq.rst create mode 100644 docs/modules/rpc.rst create mode 100644 docs/modules/subscriber.rst create mode 100644 rmqtools/rpc.py delete mode 100644 rmqtools/rpc_client.py delete mode 100644 rmqtools/rpc_server.py diff --git a/.gitignore b/.gitignore index 68bc17f..76d4a42 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# vscode files +.vscode/** diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..d4bb2cb --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..f12b9b2 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,40 @@ +# Configuration file for the Sphinx documentation builder. +import sys +sys.path.insert(0, '../') +# needs_sphinx = '1.0' + +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.viewcode', + 'sphinx.ext.intersphinx', + 'sphinx.ext.napoleon', +] + +intersphinx_mapping = { + 'python': ('https://docs.python.org/3/', + 'https://docs.python.org/3/objects.inv'), + 'pika': ('https://pika.readthedocs.io/en/stable/', + 'https://pika.readthedocs.io/en/stable/objects.inv'), +} + +templates_path = ['_templates'] + +source_suffix = '.rst' +master_doc = 'index' + +project = 'rmqtools' +copyright = '2023, Christian Thompson, Paul Horton' +author = 'Christian Thompson, Paul Horton' +release = '1.0.0' +version = '.'.join(release.split('.')[0:2]) + +exclude_patterns = ['_build'] +add_function_parenthesis = True +add_module_names = True +show_authors = True +pygments_style = 'sphinx' +modindex_common_prefix = ['rmqtools'] +html_theme = 'default' +html_static_path = ['_static'] + +# autoclass_content = 'both' diff --git a/docs/contributors.rst b/docs/contributors.rst new file mode 100644 index 0000000..e38dc6d --- /dev/null +++ b/docs/contributors.rst @@ -0,0 +1,9 @@ +Contributors +============ +The following people have directly contributed code by way of new features +and/or bug fixes to Rmqtools: + + - Christian Thompson + - Paul Horton + +*Contributors listed by commit count.* diff --git a/docs/examples.rst b/docs/examples.rst new file mode 100644 index 0000000..0338038 --- /dev/null +++ b/docs/examples.rst @@ -0,0 +1,11 @@ +Usage Examples +============== + +Examples coming soon! +~~~~~~~~~~~~~~~~~~~~~ +.. Rmqtools has various methods of use, between the high-level RmqConnection class +.. and the various lower-level classes. The following examples illustrate +.. different ways the RmqConnection class can be used to handle common use cases. + +.. These examples, on the other hand, illustrate some ways in which the +.. lower-level API can be used for more specific use cases. diff --git a/docs/faq.rst b/docs/faq.rst new file mode 100644 index 0000000..10d5915 --- /dev/null +++ b/docs/faq.rst @@ -0,0 +1,22 @@ +Frequently Asked Questions +========================== + +- Is Rmqtools threaded? + + The high-level API in the Rmqtools library, ``RmqConnection`` contains + mostly threaded methods, while the underlying classes are not threaded, + but can be threaded if desired. The important thing to remember when + threading is that each thread must have a unique connection, created in + that thread. See :doc:`/modules/rmq` for more details. + +- How do I report a bug with Rmqtools? + + The `main Rmqtools repository `_ + is hosted on `GitHub `_ and we use the + `Issue tracker `_ to + handle bug reports. + +- How can I contribute to Rmqtools? + + You can fork the project on GitHub and issue Pull Requests when you believe + you have something solid to be added to the main repository. diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..6250ee0 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,62 @@ +.. rmqtools documentation master file, created by + sphinx-quickstart on Thu Jun 29 13:22:51 2023. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to Rmqtools! +==================== +Rmqtools provides enhanced features for RabbitMQ development in Python. It is +based on the Pika_ library. + +If you have not developed with Rmqtools, Pika, or RabbitMQ before, the +:doc:`intro` documentation is a good place to get started. + +Installing Rmqtools +------------------- + +Prerequisites +~~~~~~~~~~~~~ + +* Pika (`version 1.3.0+ `_) +* RabbitMQ (`version 3.12.0+ `_) + +Note: Pika should be automatically installed when installing Rmqtools + +Rmqtools is not currently available for download with PyPI. Once it is +available, it can be installed via pip:: + + pip install rmqtools + +To install the latest version before Rmqtools is available on PyPI, use the +latest_ release on GitHub and download ``rmqtools--py3-none-any.whl``. +Then install the wheel file with pip:: + + pip install rmqtools--py3-none-any.whl + +To install directly from source, run "python setup.py install" in the root +source directory. + +Using Rmqtools +-------------- +.. toctree:: + :glob: + :maxdepth: 2 + + intro + modules/index + examples + faq + contributors + + +Indices and tables +------------------ + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` + +.. aliases below here +.. _Pika: https://pika.readthedocs.io +.. _latest: https://github.com/217690thompson/rmqtools/releases/latest +.. |repo_base| replace:: https://github.com/217690thompson/rmqtools diff --git a/docs/intro.rst b/docs/intro.rst new file mode 100644 index 0000000..fe9362d --- /dev/null +++ b/docs/intro.rst @@ -0,0 +1,65 @@ +Introduction to Rmqtools +======================== +This introduction guide will cover the basics of using RabbitMQ and Rmqtools. +For more detailed information about the underlying Pika library, please read +`Pika's documentation `_. For more +information about the underlying RabbitMQ messaging system, please visit +`RabbitMQ's main site `_. + +High-level API +-------------- +The high-level API consists of the RmqConnection class and the ResponseObject +structure. The high-level API implements wrappers for several common RabbitMQ +use cases. All methods in the high-level API are threaded, so as to make sure +that Pika's :class:`~pika.adapters.blocking_connection.BlockingConnection` +doesn't interfere with other connections. Essentially, the threading allows +for multiple concurrent producers and consumers to run in the same program. +Detailed documentation for the high-level API can be found at +:doc:`/modules/rmq`, and additional examples can be found in :doc:`/examples`. + +Example of a publisher that publishes log messages once per second:: + + import rmqtools + + rmq = rmqtools.RmqConnection(host='rabbit-1') + rmq.set_status_exchange('logs') + + @rmq.publish_status(1, 'device.1.status') + def send_status(): + status = 'running' + msg = {'status': status} + return msg + + rmq.run() + +Example of a subscriber that consumes those log messages:: + + import rmqtools + import json + + rmq = rmqtools.RmqConnection(host='rabbit-1') + rmq.set_status_exchange('logs') + + @rmq.subscribe_status('device_logs', 'device.*.status') + def handle_response(channel, method, props, body): + try: + data = json.loads(body) + except: + data = {'status': 'down'} + print(data.get('status')) + + rmq.run() + +Notice how each of these examples end in ``rmq.run()``. This is not a +coincidence, but rather a requirement. Any program that uses the high-level +API must always use the :py:meth:`RmqConnection.run` method at the end to +initiate the program. The method decorators in :class:`RmqConnection` create +threads, and the :py:meth:`RmqConnection.run` method starts all the threads +and waits for either user input or a system interrupt to stop the threads. + +Low-level API +------------- +Coming soon! + +More examples coming soon! +~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..954237b --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "" goto help + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/modules/connection.rst b/docs/modules/connection.rst new file mode 100644 index 0000000..fa01ee6 --- /dev/null +++ b/docs/modules/connection.rst @@ -0,0 +1,13 @@ +Connection +========== +.. automodule:: rmqtools.connection + +Connection +---------- +.. autoclass:: rmqtools.Connection + :members: + :member-order: bysource + +.. autoclass:: rmqtools.ResponseObject + :members: + :member-order: bysource diff --git a/docs/modules/index.rst b/docs/modules/index.rst new file mode 100644 index 0000000..4e86a78 --- /dev/null +++ b/docs/modules/index.rst @@ -0,0 +1,12 @@ +Core Class and Module Documentation +=================================== + +.. toctree:: + :glob: + :maxdepth: 1 + + rmq + rpc + publisher + subscriber + connection diff --git a/docs/modules/publisher.rst b/docs/modules/publisher.rst new file mode 100644 index 0000000..71bf033 --- /dev/null +++ b/docs/modules/publisher.rst @@ -0,0 +1,9 @@ +Publisher +========= +.. automodule:: rmqtools.publisher + +Publisher +--------- +.. autoclass:: rmqtools.Publisher + :members: + :member-order: bysource diff --git a/docs/modules/rmq.rst b/docs/modules/rmq.rst new file mode 100644 index 0000000..9b7a430 --- /dev/null +++ b/docs/modules/rmq.rst @@ -0,0 +1,10 @@ +RMQ Connection +============== +.. automodule:: rmqtools.rmq + +RmqConnection +------------- +.. autoclass:: rmqtools.RmqConnection + :members: + :private-members: + :member-order: bysource diff --git a/docs/modules/rpc.rst b/docs/modules/rpc.rst new file mode 100644 index 0000000..b33ada9 --- /dev/null +++ b/docs/modules/rpc.rst @@ -0,0 +1,15 @@ +RPC +=== +.. automodule:: rmqtools.rpc + +RpcClient +--------- +.. autoclass:: rmqtools.RpcClient + :members: + :member-order: bysource + +RpcServer +--------- +.. autoclass:: rmqtools.RpcServer + :members: + :member-order: bysource diff --git a/docs/modules/subscriber.rst b/docs/modules/subscriber.rst new file mode 100644 index 0000000..b7a6b4a --- /dev/null +++ b/docs/modules/subscriber.rst @@ -0,0 +1,9 @@ +Subscriber +========== +.. automodule:: rmqtools.subscriber + +Subscriber +---------- +.. autoclass:: rmqtools.Subscriber + :members: + :member-order: bysource diff --git a/rmqtools/__init__.py b/rmqtools/__init__.py index 2dc4577..8b07a28 100644 --- a/rmqtools/__init__.py +++ b/rmqtools/__init__.py @@ -9,7 +9,6 @@ from rmqtools.connection import Connection from rmqtools.publisher import Publisher from rmqtools.subscriber import Subscriber -from rmqtools.rpc_client import RpcClient -from rmqtools.rpc_server import RpcServer +from rmqtools.rpc import RpcClient, RpcServer from rmqtools.rmq import RmqConnection diff --git a/rmqtools/connection.py b/rmqtools/connection.py index 664fa58..653e965 100644 --- a/rmqtools/connection.py +++ b/rmqtools/connection.py @@ -1,16 +1,28 @@ """Core connection objects""" from dataclasses import dataclass, field -from typing import Any, List, Literal, Dict, Union +from typing import Any, List, Union import pika -from pika import DeliveryMode from pika.channel import Channel from pika.exchange_type import ExchangeType @dataclass class ResponseObject: + """The dataclass used by RPC calls to pass information to callback + functions and response handlers. All RPC request methods and worker + functions in RPC calls must return a ResponseObject. + + Parameters + ---------- + args : list, optional + A list of positional arguments to pass to the response/request handler, + defaults to []. + kwargs : dict, optional + A dictionary of keyword arguments to pass to the response/request + handler, defaults to {}. + """ args: list = field(default_factory=list) kwargs: dict = field(default_factory=dict) @@ -120,7 +132,6 @@ def delete(cls, channel:Channel, name:str, **kwargs): name : str The name of the exchange kwargs : optional - Optional values to pass with the exchange_delete call: if_unused : bool Only delete if the exchange is unused callback : callable @@ -132,8 +143,8 @@ def delete(cls, channel:Channel, name:str, **kwargs): class Exchanges(): """Encapsulates a list of Exchange objects. Additionally, provides methods for interacting with the list of exchanges, such as implementing - dictionary-style `.get()` and `.remove()` methods, as well as list-style - `+=` and `.append()` methods. Allows for the storage of information + dictionary-style ``.get()`` and ``.remove()`` methods, as well as list-style + ``+=`` and ``.append()`` methods. Allows for the storage of information about all the exchanges involved in a RabbitMQ operation. """ @@ -172,7 +183,7 @@ def __getitem__(self, index:str) -> Exchange: raise KeyError(f"Exchange '{index}' not found in list of exchanges!") def get(self, index:str, default=None) -> Union[Exchange, Any]: - """A dictionary-style `.get(index, [default])` for Exchanges + """A dictionary-style ``.get(index, [default])`` for Exchanges Parameters ---------- @@ -184,7 +195,7 @@ def get(self, index:str, default=None) -> Union[Exchange, Any]: Returns ------- Exchange | Any - Exchange such that `Exchange.name == index`, else returns default + Exchange such that ``Exchange.name == index``, else returns default """ try: return self[index] @@ -210,7 +221,7 @@ def append(self, exchange:Exchange) -> None: self.exchanges.append(exchange) def __iadd__(self, obj:Union["Exchanges", Exchange, List[Exchange]]) -> "Exchanges": - """Implements the `+=` operator for Exchanges + """Implements the ``+=`` operator for Exchanges Parameters ---------- @@ -240,16 +251,18 @@ def __iadd__(self, obj:Union["Exchanges", Exchange, List[Exchange]]) -> "Exchang raise TypeError(f"Cannot merge '{type(obj)}' into Exchanges") return self - def remove(self, name:str) -> None: + def remove(self, name:str, if_unused=False) -> None: """Remove an exchange from the list of exchanges Parameters ---------- name : str The name of the exchange to remove + if_unused : bool + If True, only delete the exchange if it is unused, by default False """ exchange = self[name] - exchange._delete() + exchange._delete(if_unused=if_unused) self.exchanges.remove(name) @@ -301,7 +314,7 @@ def _bind(self, routing_key:str, **kwargs) -> None: Parameters ---------- routing_key : str - A RabbitMQ routing key (e.g. `device.*.status`) + A RabbitMQ routing key (e.g. ``device.*.status``) """ binding_kwargs = self.binding_kwargs binding_kwargs.update(kwargs) @@ -314,7 +327,7 @@ def _delete(self, **kwargs) -> None: @classmethod def declare(cls, channel:Channel, name:str, **kwargs) -> None: - """Declare a queue on `channel` with name `name` + """Declare a queue on ``channel`` with name ``name`` Parameters ---------- @@ -337,7 +350,7 @@ def bind(cls, channel:Channel, name:str, routing_key:str, exchange='', name : str The name of the queue routing_key : str - A RabbitMQ routing key (e.g. `device.*.status`) + A RabbitMQ routing key (e.g. ``device.*.status``) exchange : str, optional The exchange to bind the queue on, leave blank to bind to the default queue, by default '' @@ -362,8 +375,8 @@ def delete(cls, channel:Channel, name:str, **kwargs) -> None: class Queues(): """Encapsulates a list of Queue objects. Additionally, provides methods for interacting with the list of queues, such as implementing - dictionary-style `.get()` and `.remove()` methods, as well as list-style - `+=` and `.append()` methods. Allows for the storage of information + dictionary-style ``.get()`` and ``.remove()`` methods, as well as list-style + ``+=`` and ``.append()`` methods. Allows for the storage of information about all the queues involved in a RabbitMQ operation.""" def __init__(self, queues:List[Queue]=[]) -> None: @@ -401,7 +414,7 @@ def __getitem__(self, index:str) -> Queue: raise KeyError(f"Queue '{index}' not found in list of queues!") def get(self, index:str, default=None) -> Union[Queue, Any]: - """A dictionary-style `.get(index, [default])` for Queues + """A dictionary-style ``.get(index, [default])`` for Queues Parameters ---------- @@ -413,7 +426,7 @@ def get(self, index:str, default=None) -> Union[Queue, Any]: Returns ------- Queue | Any - Queue such that `Queue.name == index`, else returns default + Queue such that ``Queue.name == index``, else returns default """ try: return self[index] @@ -439,7 +452,7 @@ def append(self, queue:Queue) -> None: self.queues.append(queue) def __iadd__(self, obj:Union["Queues", Queue, List[Queue]]) -> "Queues": - """Implements the `+=` operator for Queues + """Implements the ``+=`` operator for Queues Parameters ---------- @@ -483,20 +496,113 @@ def remove(self, name:str) -> None: class Connection(): + """The core class that implements communication with RabbitMQ through Pika. + This class is used by most of the other classes in rmqtools as the base + method of interacting with the RabbitMQ server. + + Parameters + ---------- + username : str, optional + The username to use for logging into the RabbitMQ server, by + default 'guest'. It is not necessary, but recommended to create + a user other than the default 'guest' to use for server + authentication. + password : str, optional + The password to use for logging into the RabbitMQ server, by + default 'guest'. + host : str, optional + The host of the RabbitMQ server, by default 'localhost'. This can + either be an address (e.g. '192.168.0.1') or a host name (e.g. + 'rabbit-1'). Using a host name is prefered to using an address. + Only host with 'localhost' if all connections are coming from the + same machine as the RabbitMQ server. + port : int, optional + The port to connect over, by default 5672. This is almost always + 5672, unless there are multiple RabbitMQ servers running on the + same host, which is discouraged. + autoconnect : bool, optional + Whether to connect to the RabbitMQ server and open a channel + automatically, by default True. + + Attributes + ---------- + username : str + The username used to log into the server. + password : str + The password used to log into the server. + host : str + The name or address of the host server for this connection ('localhost' + can be used single machine setups). Names (e.g. 'rabbit-1') are + prefered to addresses (e.g. '192.168.0.1'). + port : int + The port to connect to the server on (usually 5672). + connection : pika.BlockingConnection | pika.BaseConnection | pika.SelectConnection + The actual Pika connection object. + channel : pika.channel.Channel | pika.BlockingChannel + The Pika channel that RabbitMQ requests are sent on. + exchanges : rmqtools.connection.Exchanges + The object that contains all of the rmqtools.connection.Exchange + objects associated with this connection. Each Exchange object contains + information about exchanges created on the RabbitMQ server using this + connection. + queues : rmqtools.connection.Queues + The object that contains all of the rmqtools.connection.Queue + objects associated with this connection. Each Queue object contains + information about a queue created on the RabbitMQ server using this + connection. + credentials : pika.PlainCredentials + The credentials used to log into the RabbitMQ server. Uses username + and password attributes. + parameters : pika.ConnectionParameters + The parameters used when connecting to the RabbitMQ server. Uses the + credentials, host, and port attributes. + """ def __init__(self, username='guest', password='guest', host='localhost', port=5672, autoconnect=True) -> None: + """Connection initialization requires a username, password, host, and + port, but has default values for all of these. If autoconnect is true, + the Connection object will automatically connect to the RabbitMQ + server. Otherwise, the user must call Connection.connect() and + Connection.set_channel() after initializing the Connection object. + + Parameters + ---------- + username : str, optional + The username to use for logging into the RabbitMQ server, by + default 'guest'. It is not necessary, but recommended to create + a user other than the default 'guest' to use for server + authentication. + password : str, optional + The password to use for logging into the RabbitMQ server, by + default 'guest'. + host : str, optional + The host of the RabbitMQ server, by default 'localhost'. This can + either be an address (e.g. '192.168.0.1') or a host name (e.g. + 'rabbit-1'). Using a host name is prefered to using an address. + Only host with 'localhost' if all connections are coming from the + same machine as the RabbitMQ server. + port : int, optional + The port to connect over, by default 5672. This is almost always + 5672, unless there are multiple RabbitMQ servers running on the + same host, which is discouraged. + autoconnect : bool, optional + Whether to connect to the RabbitMQ server and open a channel + automatically, by default True. + """ self.username = username self.password = password self.host = host self.port = port + self.credentials = None self._set_credentials() + self.parameters = None self._set_parameters() self.connection = None self.channel = None if autoconnect: - self._connect() - self._set_channel() + self.connect() + self.set_channel() self.exchanges = Exchanges() self.queues = Queues() @@ -519,9 +625,31 @@ def _set_parameters(self, host:str=None, port:int=None, heartbeat=60) self.parameters = parameters - def _connect(self, connection_type='blocking', + def connect(self, connection_type='blocking', parameters:pika.ConnectionParameters=None, **kwargs) -> None: + """Creates a connection on the RabbitMQ server. + + Stores the connection in the ``connection`` attribute. + + - If ``connection_type`` is 'blocking', creates a BlockingConnection. + - If ``connection_type`` is 'base', creates a BaseConnection + - If ``connection_type`` is 'select', creates a SelectConnection + + Parameters + ---------- + connection_type : str, optional + The connection type, options are 'blocking', 'base', and 'select', + by default 'blocking' + parameters : pika.ConnectionParameters, optional + Any additional connection parameters (see pika.BlockingConnection + documentation for details), by default None + + Raises + ------ + ValueError + If an invalid connection type is given. + """ parameters = parameters if parameters else self.parameters allowed_connection_types = ['blocking', 'base', 'select'] if connection_type not in allowed_connection_types: @@ -537,8 +665,24 @@ def _connect(self, connection_type='blocking', raise ValueError(f"Wrong connection type: {connection_type}") self.connection = connection - def _set_channel(self, channel_number:int=None, + def set_channel(self, channel_number:int=None, on_open_callback:callable=None) -> None: + """Creates a channel on the RabbitMQ server using the ``connection`` + attribute and sets the created channel to the ``channel`` attribute. + + Parameters + ---------- + channel_number : int, optional + An optional numerical identifier for the channel, by default None + on_open_callback : callable, optional + An optional callback method to call on channel creation, + by default None + + Raises + ------ + ValueError + If the ``connection`` attribute is not set or is an invalid type. + """ if isinstance(self.connection, pika.BlockingConnection): channel = self.connection.channel(channel_number=channel_number) elif isinstance(self.connection, pika.SelectConnection): @@ -556,6 +700,20 @@ def _set_channel(self, channel_number:int=None, def exchange_declare(self, name:str, exchange_type:ExchangeType, **kwargs) -> None: + """Declares an exchange on the RabbitMQ server. + + Creates the exchange using the rmqtools.connection.Exchange class, + allowing for the storage of information about the exchange. Adds the + created exchange to the list of exchanges (rmqtools.connection. + Exchanges) for ease of access later. + + Parameters + ---------- + name : str + The name of the exchange + exchange_type : ExchangeType + A pika.ExchangeType, e.g. pika.ExchangeType.topic + """ e = Exchange(self.channel, name, exchange_type, **kwargs) try: self.exchanges += e @@ -566,10 +724,34 @@ def exchange_declare(self, name:str, exchange_type:ExchangeType, pass def exchange_delete(self, name:str, if_unused=False) -> None: + """Deletes the given exchange on the RabbitMQ server and removes it + from the ``exchanges`` attribute. + + Parameters + ---------- + name : str + The name of the exchange to delete. + if_unused : bool, optional + If True, only delete the exchange if it is unused, by default False + """ self.exchanges.remove(name, if_unused=if_unused) def queue_declare(self, name='', binding_keys=[], binding_kwargs={}) -> None: + """Declares a queue on the RabbitMQ server using the ``channel`` + attribute as the channel to declare the queue on. + + Parameters + ---------- + name : str, optional + The name of the queue, by default '' + binding_keys : list, optional + An optional list of routing keys to bind the queue to, + by default [] + binding_kwargs : dict, optional + An optional dictionary of keyword arguments to pass to the queue + binding function (see pika.queue_bind for details), by default {} + """ q = Queue(self.channel, name=name, binding_keys=binding_keys, binding_kwargs=binding_kwargs) try: diff --git a/rmqtools/publisher.py b/rmqtools/publisher.py index 5f6a21b..5523d3b 100644 --- a/rmqtools/publisher.py +++ b/rmqtools/publisher.py @@ -1,20 +1,67 @@ """Tools for a publisher connection""" import json -import logging -from typing import Literal +from typing import Any, Dict, List, Literal, Union import pika from pika.exchange_type import ExchangeType -from pika.delivery_mode import DeliveryMode - from rmqtools import Connection class Publisher(): + """Creates a publisher object that has methods to easily publish messages + to the RabbitMQ server. This class is used by the higher-level + RmqConnection class. + + Parameters + ---------- + ptype : 'topic' | 'fanout' | 'direct' + The type of publisher. Topic publishers send messages to specific + routing keys. Fanout publishers send messages to all queues on an + exchange. Direct publishers are used for RPC calls, and they publish + to a single queue. The exchange type corresponds to the publisher + type (i.e. direct exchange for direct publisher). + exchange : str, optional + The name of the exchange to use for publishing. Leaving this value + blank uses the default exchange and is not supported for some types + of interactions. + + Attributes + ---------- + ptype : 'fanout' | 'topic' | 'direct' + The type of publisher this is. + etype : pika.ExchangeType + The type of exchange used by this publisher. Corresponds to ptype. + exchange_name : str + The name of the exchange this publisher operates on. + Connection : rmqtools.Connection + The Connection object associated with this publisher. + channel : pika.BlockingChannel + The channel used by this publisher to connect to RabbitMQ. + exchange : rmqtools.connection.Exchange + The Exchange object associated with etype and exchange_name. Allows + for syncing the publisher with the Connection object. + """ def __init__(self, ptype:Literal['topic', 'fanout', 'direct'], exchange='') -> None: + """Creates a publisher object that has methods to easily publish messages + to the RabbitMQ server. This class is used by the higher-level + RmqConnection class. + + Parameters + ---------- + ptype : 'topic' | 'fanout' | 'direct' + The type of publisher. Topic publishers send messages to specific + routing keys. Fanout publishers send messages to all queues on an + exchange. Direct publishers are used for RPC calls, and they publish + to a single queue. The exchange type corresponds to the publisher + type (i.e. direct exchange for direct publisher). + exchange : str, optional + The name of the exchange to use for publishing. Leaving this value + blank uses the default exchange and is not supported for some types + of interactions. + """ if ptype == 'fanout': etype = ExchangeType.fanout elif ptype == 'topic': @@ -28,6 +75,16 @@ def __init__(self, ptype:Literal['topic', 'fanout', 'direct'], self.exchange_name = exchange def connect(self, conn:Connection): + """Connect the publisher to a Connection object, giving it access to + the RabbitMQ server. Also declares the exchange defined in instance + initialization. + + Parameters + ---------- + conn : Connection + The ``rmqtools.Connection`` object to use to connect to RabbitMQ + and link to this publisher. Use a unique Connection per thread. + """ self.Connection = conn self.channel = self.Connection.channel self.Connection.exchange_declare(self.exchange_name, self.etype) @@ -42,13 +99,55 @@ def _get_publish_args(self, routing_key='') -> dict: def publish(self, message:str, routing_key='', properties:pika.BasicProperties=None, mandatory=False) -> None: + """Publish a message on the RabbitMQ server. + + Parameters + ---------- + message : str + The content of the message to send. + routing_key : str, optional + A routing key to publish the message to, by default ''. A blank + routing key is only used for fanout publishers. Direct publishers + will use the name of the queue they are publishing to for the + routing key. For topic publishers, see the RabbitMQ documentation + on routing keys + (https://www.rabbitmq.com/tutorials/tutorial-four-python.html) + properties : pika.BasicProperties, optional + Any additional publish properties to pass to + pika.BlockingChannel.basic_publish, by default None + mandatory : bool, optional + Set the mandatory parameter in pika.BlockingChannel.basic_publish, + by default False + """ publish_args = self._get_publish_args(routing_key=routing_key) self.Connection.channel.basic_publish( **publish_args, body=message, properties=properties, mandatory=mandatory) - def publish_json(self, message, routing_key='', - properties:pika.BasicProperties=None, mandatory=False) -> None: + def publish_json(self, message: Union[Dict[str, Any], List[Any], int, str, + float, bool, None], + routing_key='', properties:pika.BasicProperties=None, + mandatory=False) -> None: + """Publish a JSON object on the RabbitMQ server. + + Parameters + ---------- + message : Dict[str, Any] | List[Any] | int | str | float | bool | None + A JSON-serializable object to send via RabbitMQ. + routing_key : str, optional + A routing key to publish the message to, by default ''. A blank + routing key is only used for fanout publishers. Direct publishers + will use the name of the queue they are publishing to for the + routing key. For topic publishers, see the RabbitMQ documentation + on routing keys + (https://www.rabbitmq.com/tutorials/tutorial-four-python.html) + properties : pika.BasicProperties, optional + Any additional publish properties to pass to + pika.BlockingChannel.basic_publish, by default None + mandatory : bool, optional + Set the mandatory parameter in pika.BlockingChannel.basic_publish, + by default False + """ try: message = json.dumps(message) except TypeError: diff --git a/rmqtools/rmq.py b/rmqtools/rmq.py index f07797f..e18410e 100644 --- a/rmqtools/rmq.py +++ b/rmqtools/rmq.py @@ -1,25 +1,149 @@ -"""Highest level RabbitMQ wrapper""" +"""Highest level RabbitMQ wrapper. + +Contains high-level wrappers for RabbitMQ, implementing many common +use cases with built-in threading. The RmqConnection object is meant to +be used on a clustered RabbitMQ server with quorum queues. For details +on clustering and quorum queues, please see the RabbitMQ documentation +of both (https://www.rabbitmq.com/clustering.html and +https://www.rabbitmq.com/quorum-queues.html). For a good visualization of +the underlying Raft algorithm that quorum queues use, see +http://thesecretlivesofdata.com/raft/. Most of the functional methods +contained in RmqConnection are meant to be used as method decorators, though +the private methods they employ can be used in a standalone manner. See +the examples section below. +""" import functools -import logging -import inspect import signal import sys import threading import time -import uuid -from typing import Dict, List, Literal, Tuple, Callable, Any +from typing import Any, Callable, Dict, List, Literal, Tuple import pika -from pika.adapters.blocking_connection import BlockingChannel from pika.exchange_type import ExchangeType -from rmqtools import Connection, Publisher, Subscriber, RpcClient, RpcServer, ResponseObject +from rmqtools import (Connection, Publisher, ResponseObject, RpcClient, + RpcServer, Subscriber) class RmqConnection(): + """Contains high-level wrappers for RabbitMQ, implementing many common + use cases with built-in threading. The RmqConnection object is meant to + be used on a clustered RabbitMQ server with quorum queues. Most of the + functional methods contained in this class are meant to be used as method + decorators, though the private methods they employ can be used in a + standalone manner. + + Parameters + ---------- + username : str, optional + The username to use when logging into the RabbitMQ server, defaults to + 'guest'. Sometimes 'guest' login is not allowed, so it is recommended + to set up a different user for connecting. The username here must + correspond to a user with admin privileges. + password : str, optional + The password to use when logging into RabbitMQ, defaults to 'guest'. + host : str, optional + The hostname of the RabbitMQ server, defaults to 'localhost'. Using + localhost to host RabbitMQ is fine for some applications, but it is + not recommended. If using a clustered RabbitMQ server, any of the + cluster hostnames works (e.g. 'rabbit-1' or 'rabbit-2'). This can also + use an IP address intead of a hostname, but hostnames are recommended. + port : int, optional + The port of the RabbitMQ server, defaults to 5672. Most RabbitMQ + servers use port 5672, but some applications may have a different port. + For example, a virtual cluster hosted on one machine may use ports + 5672, 5673, and 5674. + autoconnect : bool, optional + Automatically run the ``connect`` method to sync to the Connection + object, defaults to True. It is recommended to leave autoconnect + enabled, but some use cases may require connecting at a specific time. + If this is required, run the ``connect`` method to perform the connection + tasks. + + Attributes + ---------- + username : str + The username used to authenticate with the RabbitMQ server. + password : str + The password used for RabbitMQ authentication. + host : str + The host (ip or hostname) of the RabbitMQ server (or one of the + clustered servers). + port : int + The port the RabbitMQ server is using (usually 5672). + autoconnect : bool + Automatically run the ``connect`` method on initialization if True. + threads : List[threading.Thread] + A list of the threads contained within the RmqConnection instance. + These threads are started with the ``run`` method and stoped with the + ``stop`` method. + stop_event : threading.Event + The threading Event used to shutdown all the threads running RabbitMQ + processes. + exchanges : Dict[str, Tuple[str, pika.ExchangeType]] + A dictionary containing information about all the exchanges involved + in this RMQ process. The key is the purpose of the exchange (e.g. + 'logs'), and the value is a tuple of the exchange name and the + exchange type. + publishers : Dict[str, Publisher] + A dictionary containing information about all the publishers involved + in this RMQ process. The key is the name of the publisher (e.g. + 'status') and the value is a Publisher object. These are usually auto- + generated within the method decorators and are named with the routing + key they correspond to (e.g. 'device.1.status'). + publish_props : Dict[str, pika.BasicProperties] + A dictionary containing information about all the publish properties + involved in this RMQ process. The key is the name of the Publisher (so + as to map it directly with the ``publishers`` attribute) and the value + is a pika.BasicProperties object. This is used to pass properties into + the Publisher's connection to the server and is usually auto-generated. + subscribers : Dict[str, Subscriber] + A dictionary containing information about all the subscribers involved + in this RMQ process. The key is the name of the subscriber (e.g. + 'device_logs') and the value is a Subscriber object. These are usually + auto-generated within the method decorators used for subscribing. + response_handlers : Dict[str, (Any) -> Any] + A dictionary containing information about all the response handlers + involved in this RMQ process. The key is an identifier and the value + is a callback function to serve as the response handler for a specific + process. These response handlers are used in the RPC processes to + determine which function needs to be called to handle the RPC response. + The identifier is usually related to a command system (e.g. + 'device_command'). + """ def __init__(self, username='guest', password='guest', host='localhost', port=5672, autoconnect=True) -> None: + """Create an instance of the high-level RmqConnection class. + + Parameters + ---------- + username : str, optional + The username to use when logging into the RabbitMQ server, defaults to + 'guest'. Sometimes 'guest' login is not allowed, so it is recommended + to set up a different user for connecting. The username here must + correspond to a user with admin privileges. + password : str, optional + The password to use when logging into RabbitMQ, defaults to 'guest'. + host : str, optional + The hostname of the RabbitMQ server, defaults to 'localhost'. Using + localhost to host RabbitMQ is fine for some applications, but it is + not recommended. If using a clustered RabbitMQ server, any of the + cluster hostnames works (e.g. 'rabbit-1' or 'rabbit-2'). This can also + use an IP address intead of a hostname, but hostnames are recommended. + port : int, optional + The port of the RabbitMQ server, defaults to 5672. Most RabbitMQ + servers use port 5672, but some applications may have a different port. + For example, a virtual cluster hosted on one machine may use ports + 5672, 5673, and 5674. + autoconnect : bool, optional + Automatically run the ``connect`` method to sync to the Connection + object, defaults to True. It is recommended to leave autoconnect + enabled, but some use cases may require connecting at a specific time. + If this is required, run the ``connect`` method to perform the connection + tasks. + """ self.username = username self.password = password self.host = host @@ -60,26 +184,95 @@ def _get_connection(self) -> Connection: return conn def set_status_exchange(self, name:str) -> None: + """Set the exchange used for status messages. + + Parameters + ---------- + name : str + The name of the status exchange. + """ self.exchanges.update({'status': (name, ExchangeType.topic)}) def set_command_exchange(self, name:str) -> None: + """Set the exchange used for commands. + + Parameters + ---------- + name : str + The name of the command exchange. + """ self.exchanges.update({'command': (name, ExchangeType.direct)}) def add_exchange(self, name:str, purpose:str, etype:ExchangeType) -> None: + """Add an exchange. + + Parameters + ---------- + name : str + The name of the exchange. + purpose : str + The purpose of the exchange (e.g. 'logs'). + etype : ExchangeType + They type of the exchange (e.g. pika.ExchangeType.topic for a + topic exchange). + """ self.exchanges.update({purpose: (name, etype)}) - def get_publisher(self, name:str, err=True): + def get_publisher(self, name:str, err=True) -> Publisher: + """Get a Publisher object by name. + + Parameters + ---------- + name : str + The name of the publisher. + err : bool, optional + Raise an error if no publishers with that name are found, by + default True + + Returns + ------- + Publisher + Returns the publisher corresponding to the given name. + + Raises + ------ + ValueError + If err is True and there isn't a publisher with the given name. + """ publisher = self.publishers.get(name, False) if err and not publisher: raise ValueError(f"A publisher with name '{name}' could not be " f"found.") return publisher - def add_publisher(self, name:str, ptype:Literal['topic', 'fanout']='topic', + def add_publisher(self, name:str, + ptype:Literal['topic', 'fanout', 'direct']='topic', exchange=''): - if ptype not in ['topic', 'fanout']: - raise ValueError(f"Publisher type must be either 'topic' or " - f"'fanout', not '{ptype}'") + """Add a publisher. + + See documentation for rmqtools.Publisher for + additional information about publisher creation. + + Parameters + ---------- + name : str + The name of the publisher. + ptype : Literal['topic'] | Literal['fanout'] | Literal['direct'], optional + The type of publisher, either 'topic', 'fanout', or 'direct', by + default 'topic'. + exchange : str, optional + The name of the exchange used by the new publisher, by default ''. + + Raises + ------ + ValueError + If an invalid publisher type is given. + ValueError + If a publisher with the same name already exists. + """ + if ptype not in ['topic', 'fanout', 'direct']: + raise ValueError(f"Publisher type must be either 'topic', " + f"'fanout', or 'direct', not '{ptype}'") publisher = Publisher(ptype, exchange=exchange) pub = self.get_publisher(name, err=False) if pub: @@ -89,11 +282,41 @@ def add_publisher(self, name:str, ptype:Literal['topic', 'fanout']='topic', def set_publish_props(self, publisher_name:str, publish_props:pika.BasicProperties=None) -> None: + """Updates ``publish_props`` with the properties for a publisher. + + Parameters + ---------- + publisher_name : str + The name of the Publisher to set the properties of. + publish_props : pika.BasicProperties, optional + The pika BasicProperties object to set as the publisher properties, + by default None. Most use cases will use None here. + """ # throws error if publisher doesn't exist self.get_publisher(publisher_name) self.publish_props.update({publisher_name: publish_props}) - def get_subscriber(self, name:str, err=True): + def get_subscriber(self, name:str, err=True) -> Subscriber: + """Get a subscriber by name. + + Parameters + ---------- + name : str + The name of the subscriber. + err : bool, optional + Throw an error if no subscribers with that name are found, by + default True. + + Returns + ------- + Subscriber + The subscriber corresponding with the given name. + + Raises + ------ + ValueError + If err is True and no subscriber exists with that name. + """ subscriber = self.subscribers.get(name, False) if err and not subscriber: raise ValueError(f"A subscriber with name '{name}' could not be " @@ -103,6 +326,30 @@ def get_subscriber(self, name:str, err=True): def add_subscriber(self, name:str, queue:str, exchange='', etype:ExchangeType=ExchangeType.topic, routing_keys:List[str]=[]) -> None: + """Add a subscriber. + + See documentation for rmqtools.Subscriber for more information about + subscriber creation. + + Parameters + ---------- + name : str + The name of the subscriber. + queue : str + The name of the queue associated with the subscriber. + exchange : str, optional + The name of the exchange associated wit the subscriber, by + default ''. + etype : ExchangeType, optional + The type of the exchange, by default ExchangeType.topic. + routing_keys : List[str], optional + A list of routing keys to bind to the subscriber, by default []. + + Raises + ------ + ValueError + If a subscriber with the same name already exists. + """ subscriber = Subscriber(queue=queue, exchange=exchange, etype=etype, routing_keys=routing_keys, queue_arguments={'durable': True}) @@ -113,10 +360,17 @@ def add_subscriber(self, name:str, queue:str, exchange='', self.subscribers.update({name: subscriber}) def start(self): + """Start all the threads in the ``threads`` attribute. + """ for thread in self.threads: thread.start() def run(self): + """Run the main IO loop for RmqConnection. + + Starts the threads then waits for user input. Any user input will + trigger the stop command to shutdown all threads. + """ print("Starting all RabbitMQ threads. Press enter at any time to " "shutdown all child threads.") self.start() @@ -125,11 +379,34 @@ def run(self): self.stop() def stop(self): + """Safely shutdown all the running threads. + """ self.stop_event.set() for thread in self.threads: thread.join() - def _publish_status(self, func, interval, routing_key, *args, **kwargs): + def _publish_status(self, func:Callable[[Any], Any], interval: float, + routing_key: str, *args, **kwargs): + """Publish a status message periodically. + + This method is not automatically threaded, but it is thread safe. It + checks RmqConnection.stop_event.is_set() to determine if it should + shut down. + + Parameters + ---------- + func : (Any) -> Any + The function that generates the message to be sent via RabbitMQ. + It must return a JSON-serializable object. + interval : float + The time waited between sending each subsequent message. + routing_key : str + The routing key to use when publishing. + args : List[Any], optional + A list of positional arguments to pass to ``func``. + kwargs : Dict[str, Any], optional + A dictionary of keyword arguments to pass to ``func``. + """ conn = self._get_connection() exchange = self.exchanges.get('status', ('', ExchangeType.topic)) exchange_name = exchange[0] @@ -145,7 +422,22 @@ def _publish_status(self, func, interval, routing_key, *args, **kwargs): # print(data) time.sleep(interval) - def publish_status(self, interval, routing_key): + def publish_status(self, interval: float, routing_key: str): + """A method decorator for publishing status messages periodically. + + This method is automatically threaded. The function it wraps must + return a JSON-serializable object. That return object will then be + sent with RabbitMQ to a subscriber listening to the given routing + key. This method calls the private method ``_publish_status``, which + is also available for more niche use cases. + + Parameters + ---------- + interval : float + The number of seconds to wait between each message. + routing_key : str + The routing key to use when publishing the message. + """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -158,7 +450,32 @@ def wrapper(*args, **kwargs): return wrapper return decorator - def _subscribe_status(self, callback:Callable, queue, routing_keys) -> None: + def _subscribe_status(self, callback:Callable[[pika.spec.Basic.Deliver, + pika.spec.BasicProperties, + bytes], Any], + queue: str, routing_keys: List[str]) -> None: + """Subscribe to status messages on a set of routing keys. + + This method is not automatically threaded, but is thread safe. It + takes a callback method that must take the arguments ``method``, + ``properties``, and ``body``. See Pika/RabbitMQ documentation for + specifics about callback functions and how these arguments work. + See Rmqtools documentation for quick examples of how this works. + Calls the private method ``_subscribe_status``, which is also + available to use for more niche cases. + + Parameters + ---------- + callback : (Deliver, BasicProperties, bytes) -> Any + The callback method that handles the received message. Must take + a ``method`` argument of type pika.spec.Basic.Deliver, a + ``properties`` argument of type pika.spec.BasicProperties, and a + ``body`` argument of type bytes for the body of the message. + queue : str + The name of the queue to use for subscribing. + routing_keys : List[str] + A list of routing keys to bind the queue to. + """ exchange = self.exchanges.get('status', ('', ExchangeType.topic)) exchange_name = exchange[0] etype = exchange[1] @@ -188,7 +505,24 @@ def _subscribe_status(self, callback:Callable, queue, routing_keys) -> None: for thread in threads: thread.join() - def subscribe_status(self, queue, routing_keys): + def subscribe_status(self, queue: str, routing_keys: List[str]): + """A method decorator for subscribing to a set of routing keys. + + This method is automatically threaded. The function it wraps must + be a callback method that can take the arguments ``method``, + ``properties``, and ``body``. See Pika/RabbitMQ documentation for + specifics about callback functions and how these arguments work. + See Rmqtools documentation for quick examples of how this works. + Calls the private method ``_subscribe_status``, which is also + available to use for more niche cases. + + Parameters + ---------- + queue : str + The name of the queue to use for subscribing. + routing_keys : List[str] + A list of routing keys to bind the queue to. + """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -201,14 +535,51 @@ def wrapper(*args, **kwargs): return wrapper return decorator - def _handle_command(self, worker:Callable[[Any], Any], queue:str) -> None: + def _handle_command(self, worker:Callable[[Any], ResponseObject], + queue:str) -> None: + """Handle a command sent by an RPC client. + + Creates an RPC server with the worker function as the request handler. + This method is not automatically threaded, but it is threadsafe using + the ``stop_event`` defined on the current RmqConnection object. + + Parameters + ---------- + worker : (Any) -> ResponseObject + The request handler that takes the positional and keyword arguments + given by the client's command and processes them. It then needs + to return a ResponseObject with positional and keyword arguments + corresponding to the client's response handler. + queue : str + The name of the queue to use for handling these commands. This + name will be the routing key used by the command sender to route + the command to the correct place. + """ exchange, _ = self.exchanges.get('command') server = RpcServer(exchange, queue) conn = self._get_connection() server.connect(conn) server.serve_threadsafe(worker, self.stop_event) - def handle_command(self, queue:str) -> None: + def handle_command(self, queue:str): + """A method decorator to set the response handler for a command sent + from an RPC client. + + Creates an RPC server to respond to commands from an RPC client. This + method is automatically threaded. The function it wraps can take any + positional and keyword arguments it wants, but those arguments must + be passed by the client that sends the command. The wrapped function + must return a ResponseObject with positional and keyword arguments + matching the response handler of the client. Calls the private method + ``_handle_command``, which is also available for niche cases. + + Parameters + ---------- + queue : str + The name of the queue to use for handling these commands. This + name will be the routing key used by the command sender to route + the command to the correct place. + """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -222,6 +593,21 @@ def wrapper(*args, **kwargs): return decorator def handle_response(self, command_id:str): + """A method decorator to set the response handler of an RPC client. + + This method is not threaded, but it will not cause any IO blocking. + All it does is update the ``response_handlers`` attribute with the + wrapped function. The wrapped function can have any positional or + keyword arguments, but those arguments must be matched by the return + of the associated worker function in the associated RPC server. The + wrapped function does not need to return any values. + + Parameters + ---------- + command_id : str + The identifier of the command this response handler is associated + with. + """ def decorator(func:Callable[[Any], Any]): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -230,11 +616,34 @@ def wrapper(*args, **kwargs): return wrapper return decorator - def _send_command(self, func:Callable[[], ResponseObject], - command_id:str, queue:str) -> None: - def default_handler(): + def _send_command(self, func:Callable[[Any], ResponseObject], + command_id:str, queue:str, *args, **kwargs) -> None: + """Sends a command to an associated RPC server. + + This method is not automatically threaded, but it is threadsafe. It + creates an RPC client to send commands to the associated RPC server + and handle the response. This function should be paired with an + associated ``handle_response`` decorator to define the response + handler. If there is no associated handler, the response from the + server will hit the default handler, which does nothing. + + Parameters + ---------- + func : (Any) -> ResponseObject + The function that generates the command data. It can take + positional and keyword arguments that are given by the ``args`` + and ``kwargs`` parameters. It must return a ResponseObject with + positional and keyword arguments corresponding to the arguments + needed by the server's request handler. + command_id : str + The identifier of the command, used to map the ``_send_command`` + call with the associated ``handle_response`` decorator. + queue : str + The queue name of the RPC server where the command is being sent. + """ + def default_handler(*a, **kw): pass - command = func() + command = func(*args, **kwargs) response_handler = self.response_handlers.get( command_id, default_handler) exchange, _ = self.exchanges.get('command') @@ -246,7 +655,27 @@ def default_handler(): kwargs = response.kwargs response_handler(*args, **kwargs) - def send_command(self, command_id:str, queue:str) -> None: + def send_command(self, command_id:str, queue:str): + """A method decorator to send a command to an RPC server. + + This method is automatically threaded. It creates an RPC client to + send commands to the associated RPC server and handle the response. + This decorator should be paired with a ``handle_response`` decorator + to define the response handler for the given command. If there is no + associated response handler, the response from the server will hit + the default handler, which does nothing. The wrapped function must + return a ResponseObject with the positional and keyword arguments + needed by the RPC server's request handler. The wrapped function + cannot take any arguments. + + Parameters + ---------- + command_id : str + The identifier of the command, used to map the ``send_command`` + decorator with the associated ``handle_response`` decorator. + queue : str + The queue name of the RPC server where the command is being sent. + """ def decorator(func:Callable[[], ResponseObject]): @functools.wraps(func) def wrapper(*args, **kwargs): diff --git a/rmqtools/rpc.py b/rmqtools/rpc.py new file mode 100644 index 0000000..f3c52bc --- /dev/null +++ b/rmqtools/rpc.py @@ -0,0 +1,370 @@ +import functools +import json +import threading +import uuid +from typing import Any, Callable, Union + +import pika +from pika.channel import Channel +from pika.exchange_type import ExchangeType +from pika.spec import Basic +from rmqtools import Connection, Publisher, ResponseObject + + +class RpcServer(): + """Creates a RPC server on the RabbitMQ server for command-response and + distributed worker systems. + + Parameters + ---------- + exchange : str + The name of the exchange this RPC server uses. + queue : str + The name of the queue this RPC server uses. + + Attributes + ---------- + etype : pika.ExchangeType.direct + The exchange type. This is always ExchangeType.direct because RPC + systems use direct exchanges. + exchange_name : str + The name of the exchange this RPC server uses. + queue_name : str + The name of the queue this RPC server uses. + Connection : rmqtools.Connection + The Connection object linked to this RPC server, set by the ``connect`` + method. Use a unique connection for each thread. + channel : pika.BlockingChannel + The channel this RPC server uses for communication with RabbitMQ. + exchange : rmqtools.connection.Exchange + The Exchange object to keep the RPC server's exchange data consistent + with the Connection object. + """ + + def __init__(self, exchange:str, queue:str): + """Creates a RPC server on the RabbitMQ server for command-response and + distributed worker systems. + + Parameters + ---------- + exchange : str + The name of the exchange this RPC server uses. + queue : str + The name of the queue this RPC server uses. + """ + self.etype = ExchangeType.direct + self.exchange_name = exchange + self.queue_name = queue + + def connect(self, conn:Connection): + """Connect the RPC server to a Connection object, giving it access to + the RabbitMQ server. Also declares the exchange and queue defined in + instance initialization, as well as binding the queue to the exchange + with the routing key set to the queue's name. + + Parameters + ---------- + conn : Connection + The ``rmqtools.Connection`` object to use to connect to RabbitMQ + and link to this RPC server. Use a unique Connection per thread. + """ + self.Connection = conn + self.channel = self.Connection.channel + self.Connection.exchange_declare(self.exchange_name, self.etype) + self.exchange = self.Connection.exchanges.get(self.exchange_name) + self.channel.queue_declare(queue=self.queue_name) + self.channel.queue_bind(self.queue_name, self.exchange_name, + self.queue_name) + + def on_request(self, worker:Callable[[Any], ResponseObject], ch:Channel, + method:Basic.Deliver, props:pika.BasicProperties, + body:bytes) -> None: + """Called when the client requests data from the server. + + It parses the ResponseObject sent by the client into positional and + keyword arguments to be passed to the worker function. The worker + function responds with a ResponseObject of its own that is then + serialized and published to the client's reply queue. + + Parameters + ---------- + worker : (Any) -> ResponseObject + The worker function that the request will be passed to. Should + be of the form ``def worker(...)`` where the arguments and keyword + arguments passed by the RPC client will fill out the parameters + of the worker function. The worker function should return a + rmqtools.ResponseObject with the arguments and keyword arguments + that will be passed to the client's response handler. + ch : Channel + Filled in automatically by ``basic_consume`` + method : pika.spec.Basic.Deliver + Filled in automatically by ``basic_consume`` + props : pika.BasicProperties + Filled in automatically by ``basic_consume`` + body : bytes + Filled in automatically by ``basic_consume`` + """ + try: + data = json.loads(body) + except: + data = {} + if not isinstance(data, dict): + data = {} + args = data.get('args', []) + kwargs = data.get('kwargs', {}) + + try: + response = worker(*args, **kwargs) + except TypeError: + raise ValueError("Incorrect arguments supplied to worker " + "function!") + + self.channel.basic_publish( + exchange=self.exchange_name, + routing_key=props.reply_to, + properties=pika.BasicProperties( + correlation_id=props.correlation_id), + body=json.dumps(response.__dict__), + ) + self.channel.basic_ack(delivery_tag=method.delivery_tag) + + def serve(self, worker:Callable[[Any], ResponseObject]) -> None: + """Initialize the RPC server to handle requests. + + Starts by defining with ``basic_qos`` that it handles one message at a + time. Then starts consuming using the passed worker function as the + callback for ``basic_consume``. This method will consume indefinitely, + so it is NOT thread safe. + + Parameters + ---------- + worker : (Any) -> ResponseObject + The worker function that the request will be passed to. Should + be of the form ``def worker(...)`` where the arguments and keyword + arguments passed by the RPC client will fill out the parameters + of the worker function. The worker function should return a + rmqtools.ResponseObject with the arguments and keyword arguments + that will be passed to the client's response handler. + """ + self.channel.basic_qos(prefetch_count=1) + callback = functools.partial(self.on_request, worker) + self.channel.basic_consume(queue=self.queue_name, + on_message_callback=callback) + self.channel.start_consuming() + + def serve_threadsafe(self, worker:Callable[[Any], ResponseObject], + stop_event:threading.Event) -> None: + """Similar to the serve method but is safe for use with threading. + + Rather than consuming indefinitely with the ``start_consuming`` method, + uses the ``consume`` method to handle one message at a time, blocking + only for 5 seconds at a time. The 5 second timeout allows for the + stop_event to propagate so it can stop the RPC server. + + Parameters + ---------- + worker : (Any) -> ResponseObject + The worker function that the request will be passed to. Should + be of the form ``def worker(...)`` where the arguments and keyword + arguments passed by the RPC client will fill out the parameters + of the worker function. The worker function should return a + rmqtools.ResponseObject with the arguments and keyword arguments + that will be passed to the client's response handler. + stop_event : threading.Event + The Event used to trigger the shutdown of the RPC server's thread. + """ + self.channel.basic_qos(prefetch_count=1) + + for method, props, body in self.channel.consume( + self.queue_name, inactivity_timeout=5): + if all([method, props, body]): + self.on_request(worker, self.channel, method, props, body) + if stop_event.is_set(): + break + + +class RpcClient(): + """Creates a RPC client on the RabbitMQ server for command-response and + distributed workload applications. + + Parameters + ---------- + exchange : str + The name of the exchange the RPC client and server will operate on. + + Attributes + ---------- + etype : pika.ExchangeType.direct + This is always ExchangeType.direct because RPC systems use direct + exchanges. + exchange_name : str + The name of the exchange used by the RPC client and server. + corr_id : str | None + The correlation id used to match a request to a reply from the server. + This is auto-generted with ``uuid.uuid4()`` when the ``call`` method is + run. + response : ResponseObject | None + The ResponseObject returned by the RPC server that is routed to the + client's request handler. The response attribute is set by the + ``on_response`` method, which is set as the callback function in + ``basic_consume``. + Connection : rmqtools.Connection + The Connection object linked to this RPC client, set by the ``connect`` + method. Use a unique connection for each thread. + channel : pika.BlockingChannel + The channel this RPC client uses for communication with RabbitMQ. + publisher : rmqtools.Publisher + The Publisher object that the client uses to send messages to the + RabbitMQ server. + callback_queue : str + The auto-generated identifier of the response queue given to the RPC + server to reply to. + """ + + def __init__(self, exchange:str): + """Creates a RPC client on the RabbitMQ server for command-response and + distributed workload applications. + + Parameters + ---------- + exchange : str + The name of the exchange the RPC client and server will operate on. + """ + self.etype = ExchangeType.direct + self.exchange_name = exchange + self.corr_id = None + + self.response: Union[ResponseObject, None] + self.response = None + + def connect(self, conn:Connection): + """Connect the RPC client to a Connection object, giving it access to + the RabbitMQ server. It also creates the Publisher object from the + exchange_name given in initialization. It then creates the callback + queue and binds it to the exchange with a routing key that is the + queue's name. Sets up the ``basic_consume`` method on the callback + queue with the callback method of ``on_response`` to set the response + object. + + Parameters + ---------- + conn : Connection + The ``rmqtools.Connection`` object to use to connect to RabbitMQ + and link to this RPC server. Use a unique Connection per thread. + """ + self.Connection = conn + self.channel = self.Connection.channel + self.publisher = Publisher('direct', self.exchange_name) + self.publisher.connect(conn) + + res = self.channel.queue_declare(queue='', exclusive=True) + self.callback_queue = res.method.queue + self.channel.queue_bind(self.callback_queue, self.exchange_name, + self.callback_queue) + + self.channel.basic_consume( + queue=self.callback_queue, + on_message_callback=self.on_response, + auto_ack=True, + ) + + def on_response(self, ch:Channel, method:Basic.Deliver, + props:pika.BasicProperties, body:bytes): + """The callback method that handles the response from the RPC server. + All it does is set the ``response`` attribute to be used by ``call`` and + ``call_threadsafe`` methods. + + Parameters + ---------- + ch : Channel + Filled in automatically by ``basic_consume`` + method : pika.spec.Basic.Deliver + Filled in automatically by ``basic_consume`` + props : pika.BasicProperties + Filled in automatically by ``basic_consume`` + body : bytes + Filled in automatically by ``basic_consume`` + """ + if self.corr_id == props.correlation_id: + self.response = ResponseObject(**json.loads(body)) + + def _get_publish_props(self): + props = pika.BasicProperties( + reply_to=self.callback_queue, + correlation_id=self.corr_id, + ) + return props + + def call(self, queue:str, command:ResponseObject) -> ResponseObject: + """Send a command to a RPC server and get the response. + + This method uses Pika's ``process_data_events`` method with no time limit + to get the server's response, so it is NOT thread safe. + + Parameters + ---------- + queue : str + The name of the queue to call (the name of the RPC server's queue). + command : ResponseObject + The command/request to send to the RPC server, in the + ResponseObject format. The args and kwargs in the ResponseObject + will be passed into the RPC server's worker function. + + Returns + ------- + ResponseObject + Returns the response of the RPC server's worker function, formatted + with the ResponseObject format, so the args and kwargs can be + passed into a custom response handler. + """ + self.response = None + self.corr_id = str(uuid.uuid4()) + + body = command.__dict__ + props = self._get_publish_props() + self.publisher.publish_json(body, routing_key=queue, + properties=props) + self.Connection.connection.process_data_events(time_limit=None) + return self.response + + def call_threadsafe(self, queue:str, stop_event:threading.Event, + command:ResponseObject) -> ResponseObject: + """Similar to the ``call`` method but requests the server in a thread- + safe manner. + + This method uses Pika's ``process_data_events`` with a timeout of one + second to allow for checking the stop event to make sure the thread + closes when it is supposed to. + + Parameters + ---------- + queue : str + The name of the queue to call (the name of the RPC server's queue). + stop_event : threading.Event + The Event object used to stop the thread this is running on. + command : ResponseObject + The command/request to send to the RPC server, in the + ResponseObject format. The args and kwargs in the ResponseObject + will be passed into the RPC server's worker function. + + Returns + ------- + ResponseObject + Returns the response of the RPC server's worker function, formatted + with the ResponseObject format, so the args and kwargs can be + passed into a custom response handler. + """ + self.response = None + self.corr_id = str(uuid.uuid4()) + + body = command.__dict__ + props = self._get_publish_props() + print(queue) + self.publisher.publish_json(body, routing_key=queue, + properties=props) + print('foo') + while not stop_event.is_set(): + self.Connection.connection.process_data_events(time_limit=1) + if self.response is not None: + break + return self.response diff --git a/rmqtools/rpc_client.py b/rmqtools/rpc_client.py deleted file mode 100644 index a86881f..0000000 --- a/rmqtools/rpc_client.py +++ /dev/null @@ -1,76 +0,0 @@ -import json -import logging -import threading -import uuid -from typing import Any, Dict, List, Literal, Callable, Union - -import pika -from pika.exchange_type import ExchangeType -from rmqtools import Connection, Publisher, ResponseObject - - -class RpcClient(): - - def __init__(self, exchange:str): - self.etype = ExchangeType.direct - self.exchange_name = exchange - self.corr_id = None - - self.response: Union[ResponseObject, None] - self.response = None - - def connect(self, conn:Connection): - self.Connection = conn - self.channel = self.Connection.channel - self.publisher = Publisher('direct', self.exchange_name) - self.publisher.connect(conn) - - res = self.channel.queue_declare(queue='', exclusive=True) - self.callback_queue = res.method.queue - self.channel.queue_bind(self.callback_queue, self.exchange_name, - self.callback_queue) - - self.channel.basic_consume( - queue=self.callback_queue, - on_message_callback=self.on_response, - auto_ack=True, - ) - - def on_response(self, ch, method, props, body): - if self.corr_id == props.correlation_id: - self.response = ResponseObject(**json.loads(body)) - - def _get_publish_props(self): - props = pika.BasicProperties( - reply_to=self.callback_queue, - correlation_id=self.corr_id, - ) - return props - - def call(self, queue:str, command:ResponseObject) -> ResponseObject: - self.response = None - self.corr_id = str(uuid.uuid4()) - - body = ResponseObject.__dict__ - props = self._get_publish_props() - self.publisher.publish_json(body, routing_key=queue, - properties=props) - self.Connection.connection.process_data_events(time_limit=None) - return self.response - - def call_threadsafe(self, queue:str, stop_event:threading.Event, - command:ResponseObject) -> ResponseObject: - self.response = None - self.corr_id = str(uuid.uuid4()) - - body = command.__dict__ - props = self._get_publish_props() - print(queue) - self.publisher.publish_json(body, routing_key=queue, - properties=props) - print('foo') - while not stop_event.is_set(): - self.Connection.connection.process_data_events(time_limit=1) - if self.response is not None: - break - return self.response diff --git a/rmqtools/rpc_server.py b/rmqtools/rpc_server.py deleted file mode 100644 index 856bcb5..0000000 --- a/rmqtools/rpc_server.py +++ /dev/null @@ -1,103 +0,0 @@ -import functools -import json -import logging -import threading -import uuid -from typing import Any, Dict, List, Literal, Callable, Optional - -import pika -from pika.channel import Channel -from pika.spec import Basic -from pika.exchange_type import ExchangeType -from rmqtools import Connection, ResponseObject - - -class RpcServer(): - - def __init__(self, exchange:str, queue:str): - self.etype = ExchangeType.direct - self.exchange_name = exchange - self.queue_name = queue - - def connect(self, conn:Connection): - self.Connection = conn - self.channel = self.Connection.channel - self.Connection.exchange_declare(self.exchange_name, self.etype) - self.exchange = self.Connection.exchanges.get(self.exchange_name) - self.channel.queue_declare(queue=self.queue_name) - self.channel.queue_bind(self.queue_name, self.exchange_name, - self.queue_name) - - def on_request(self, worker:Callable[[Any], ResponseObject], ch:Channel, - method:Basic.Deliver, props:pika.BasicProperties, - body:bytes): - """Called when the client requests data from the server. - Usage: - >>> callback = functools.partial(on_request, worker) - >>> channel.basic_consume(queue='rpc_queue', - on_message_callback=callback) - - Parameters - ---------- - worker : (Any) -> ResponseObject - The worker function that the request will be passed to. Should - be of the form `def worker(...)` where the arguments and keyword - arguments passed by the RPC client will fill out the parameters - of the worker function. The worker function should return a - rmqtools.ResponseObject with the arguments and keyword arguments - that will be passed to the client's response handler. - ch : Channel - Filled in automatically by `basic_consume` - method : Deliver - Filled in automatically by `basic_consume` - props : BasicProperties - Filled in automatically by `basic_consume` - body : bytes - Filled in automatically by `basic_consume` - - Returns - ------- - Any - Returns the response of the worker function - """ - try: - data = json.loads(body) - except: - data = {} - if not isinstance(data, dict): - data = {} - args = data.get('args', []) - kwargs = data.get('kwargs', {}) - - try: - response = worker(*args, **kwargs) - except TypeError: - raise ValueError("Incorrect arguments supplied to worker " - "function!") - - self.channel.basic_publish( - exchange=self.exchange_name, - routing_key=props.reply_to, - properties=pika.BasicProperties( - correlation_id=props.correlation_id), - body=json.dumps(response.__dict__), - ) - self.channel.basic_ack(delivery_tag=method.delivery_tag) - - def serve(self, worker:Callable[[Any], ResponseObject]) -> None: - self.channel.basic_qos(prefetch_count=1) - callback = functools.partial(self.on_request, worker) - self.channel.basic_consume(queue=self.queue_name, - on_message_callback=callback) - self.channel.start_consuming() - - def serve_threadsafe(self, worker:Callable[[Any], ResponseObject], - stop_event:threading.Event) -> None: - self.channel.basic_qos(prefetch_count=1) - - for method, props, body in self.channel.consume( - self.queue_name, inactivity_timeout=5): - if all([method, props, body]): - self.on_request(worker, self.channel, method, props, body) - if stop_event.is_set(): - break diff --git a/rmqtools/subscriber.py b/rmqtools/subscriber.py index e3a4097..6dfba19 100644 --- a/rmqtools/subscriber.py +++ b/rmqtools/subscriber.py @@ -1,19 +1,108 @@ -"""Tools for a publisher connection""" +"""Tools for a subscriber connection""" -import json -import logging -from typing import Any, Dict, List, Literal +from typing import Any, Dict, List -import pika from pika.exchange_type import ExchangeType from rmqtools import Connection class Subscriber(): + """Creates a subscriber object that has methods to easily subscribe to + messages on a specific RabbitMQ routing key. This class is used by the + higher-level RmqConnection class. + + Parameters + ---------- + queue : str, optional + The name of the queue the subscriber will use, defaults to ''. If left + blank, the queue will be given an automatically generated name. If + using a quorum queue, a queue name must be given (it cannot be left + blank). + exchange : str, optional + The name of the exchange the subscriber will use, defaults to ''. If + left blank, the subscriber will use the default exchange. If an + exchange name is given, the etype parameter must also be given. + etype : pika.ExchangeType, optional + The type of the exchange, given as a pika.ExchangeType object, defaults + to None. If exchange is not left blank, this field is required. + routing_keys : List[str], optional + A list of routing keys for the subscriber to listen on, defaults to []. + Routing keys can contain wildcards for subscribers - see RabbitMQ + documentation about routing keys + (https://www.rabbitmq.com/tutorials/tutorial-four-python.html). + quorum : bool, optional + Makes the subscriber use quorum queues, defaults to True. Quorum queues + are useful for clustered RabbitMQ networks. For more information about + clustering and quorum queues, see the RabbitMQ documentation on these + topics (https://www.rabbitmq.com/clustering.html and + https://www.rabbitmq.com/quorum-queues.html). For a visualization of + the underlying Raft algorithm that makes quorum queues work, see + http://thesecretlivesofdata.com/raft/. + queue_arguments : Dict[str, Any], optional + Additional keyword arguments to pass to the pika queue_bind function, + defaults to {}. + + Attributes + ---------- + queue_name : str + The name of the queue the subscriber is using. + exchange_name : str + The name of the exchange the subscriber is using. + routing_keys : List[str] + A list of routing keys the subscriber is listening to. + quorum : bool + Whether the subscriber is using quorum queues. + queue_arguments : Dict[str, Any] + Additional keyword arguments that were passed to the pika queue_bind + function. + Connection : rmqtools.Connection + The Connection object linked to this subscriber, set by the ``connect`` + method. Use a unique connection for each thread. + channel : pika.BlockingChannel + The channel the subscriber uses for communication with RabbitMQ. + exchange : rmqtools.connection.Exchange + The Exchange object to keep the subscriber's exchange data consistent + with the Connection object. + """ def __init__(self, queue='', exchange='', etype:ExchangeType=None, routing_keys:List[str]=[], quorum=True, queue_arguments:Dict[str, Any]={}) -> None: + """Creates a subscriber object that has methods to easily subscribe to + messages on a specific RabbitMQ routing key. This class is used by the + higher-level RmqConnection class. + + Parameters + ---------- + queue : str, optional + The name of the queue the subscriber will use, defaults to ''. If left + blank, the queue will be given an automatically generated name. If + using a quorum queue, a queue name must be given (it cannot be left + blank). + exchange : str, optional + The name of the exchange the subscriber will use, defaults to ''. If + left blank, the subscriber will use the default exchange. If an + exchange name is given, the etype parameter must also be given. + etype : pika.ExchangeType, optional + The type of the exchange, given as a pika.ExchangeType object, defaults + to None. If exchange is not left blank, this field is required. + routing_keys : List[str], optional + A list of routing keys for the subscriber to listen on, defaults to []. + Routing keys can contain wildcards for subscribers - see RabbitMQ + documentation about routing keys + (https://www.rabbitmq.com/tutorials/tutorial-four-python.html). + quorum : bool, optional + Makes the subscriber use quorum queues, defaults to True. Quorum queues + are useful for clustered RabbitMQ networks. For more information about + clustering and quorum queues, see the RabbitMQ documentation on these + topics (https://www.rabbitmq.com/clustering.html and + https://www.rabbitmq.com/quorum-queues.html). For a visualization of + the underlying Raft algorithm that makes quorum queues work, see + http://thesecretlivesofdata.com/raft/. + queue_arguments : Dict[str, Any], optional + Additional keyword arguments to pass to the pika queue_bind function, + defaults to {}. + """ # validate data if quorum and not queue: raise ValueError("Quorum queues must be named explicitly!") @@ -32,6 +121,16 @@ def __init__(self, queue='', exchange='', etype:ExchangeType=None, self.queue_arguments = queue_arguments def connect(self, conn:Connection) -> None: + """Connect the subscriber to a Connection object, giving it access to + the RabbitMQ server. Also declares the exchange defined in instance + initialization. + + Parameters + ---------- + conn : Connection + The ``rmqtools.Connection`` object to use to connect to RabbitMQ + and link to this subscriber. Use a unique Connection per thread. + """ self.Connection = conn self.channel = self.Connection.channel if self.exchange_name and self.etype: @@ -56,6 +155,11 @@ def _get_queue_bind_args(self, routing_key, **kwargs) -> dict: return args def subscribe(self) -> None: + """Links the subscriber to the queue defined in the initialization. + Also binds that queue to the routing keys given in the initialization. + This must be run after the ``connect`` method because it requires the + channel attribute to be set. + """ self.channel.queue_declare(**self._get_queue_declare_args()) for key in self.routing_keys: self.channel.queue_bind(**self._get_queue_bind_args(key)) From f2066a071c157e4b08e4a25917aa5b351fc3b651 Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Thu, 6 Jul 2023 14:24:15 -0700 Subject: [PATCH 02/11] Update version info to reflect 1.0.0-alpha --- docs/conf.py | 2 +- pyproject.toml | 2 +- rmqtools/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index f12b9b2..905e125 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -25,7 +25,7 @@ project = 'rmqtools' copyright = '2023, Christian Thompson, Paul Horton' author = 'Christian Thompson, Paul Horton' -release = '1.0.0' +release = '1.0.0-alpha' version = '.'.join(release.split('.')[0:2]) exclude_patterns = ['_build'] diff --git a/pyproject.toml b/pyproject.toml index 50d8580..df087ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "rmqtools" -version = "0.1.0" +version = "1.0.0-alpha" authors = [ { name="Christian Thompson", email="cjthom43@asu.edu" }, { name="Paul Horton", email="pahorton@asu.edu" }, diff --git a/rmqtools/__init__.py b/rmqtools/__init__.py index 8b07a28..fb97261 100644 --- a/rmqtools/__init__.py +++ b/rmqtools/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.1.0' +__version__ = '1.0.0-alpha' import logging From 4f2866d194b28e6f1e6542c0beb27051f05f1765 Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Thu, 6 Jul 2023 15:23:28 -0700 Subject: [PATCH 03/11] Added readthedocs config and added installation steps to readme --- .readthedocs.yaml | 21 +++++++++++++++++++++ README.rst | 34 ++++++++++++++++++++++++++++++++-- docs/requirements.txt | 1 + rmqtools/__init__.py | 13 +++++++++++++ 4 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 .readthedocs.yaml create mode 100644 docs/requirements.txt diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..2cdff22 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,21 @@ +# .readthedocs.yaml +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Set the version of Python +build: + os: ubuntu-22.04 + tools: + python: "3.11" + +# Build documentation in the docs/ directory with Sphinx +sphinx: + configuration: docs/conf.py + +# Declare the python requirements to build docs +python: + install: + - requirements: docs/requirements.txt diff --git a/README.rst b/README.rst index dd62f98..f180eb1 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,5 @@ -Rmqtools -======== +Welcome to Rmqtools! +==================== Rmqtools provides enhanced features for RabbitMQ development in Python. Introduction @@ -21,6 +21,33 @@ applications. for unique publish-subscribe relationships to be set up outside of the periodic status message use case contained in the high-level wrappers. +Installation +------------ + +Prerequisites +~~~~~~~~~~~~~ + +* Pika (`version 1.3.0+ `_) +* RabbitMQ (`version 3.12.0+ `_) + +Note: Pika should be automatically installed when installing Rmqtools + +Rmqtools is not currently available for download with PyPI. Once it is +available, it can be installed via pip:: + + pip install rmqtools + +To install the latest version before Rmqtools is available on PyPI, use the +latest_ release on GitHub and download ``rmqtools--py3-none-any.whl``. +Then install the wheel file with pip:: + + pip install rmqtools--py3-none-any.whl + +The prerelease version is currently available on the PyPI test site. It can +be installed via pip:: + + pip install -i https://test.pypi.org/simple/ rmqtools + Documentation ------------- Documentation coming soon. @@ -115,3 +142,6 @@ Exposed Classes handler, defaults to ``[]``; operates like ``*args`` - ``kwargs : dict`` - a dictionary of keyword arguments to pass to the response handler, defaults to ``{}``; operates like ``**kwargs`` + +.. aliases below here +.. _latest: https://github.com/217690thompson/rmqtools/releases/latest diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..65cd837 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1 @@ +pika>=1.3.0 diff --git a/rmqtools/__init__.py b/rmqtools/__init__.py index fb97261..c538b95 100644 --- a/rmqtools/__init__.py +++ b/rmqtools/__init__.py @@ -1,3 +1,16 @@ +""" +Rmqtools +======== + +Provides wrappers on top of the Pika library for easy RabbitMQ development in +Python. + +How to use the documentation +---------------------------- +Documentation is available in two forms: docstrings provided with the code, +and an API reference, available on Read the Docs at ... +""" + __version__ = '1.0.0-alpha' import logging From 9d5d0df7579e3a4119f54aa86537d60097065ce2 Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Wed, 29 Nov 2023 11:48:24 -0700 Subject: [PATCH 04/11] Remove debugging lines and add examples. --- examples/send_command.py | 21 ++++++------ examples/status_publisher.py | 2 +- examples/status_subscriber.py | 64 ++++++++++++----------------------- pyproject.toml | 2 +- rmqtools/rpc.py | 2 -- 5 files changed, 35 insertions(+), 56 deletions(-) diff --git a/examples/send_command.py b/examples/send_command.py index 44b1f8b..078e4f9 100644 --- a/examples/send_command.py +++ b/examples/send_command.py @@ -1,20 +1,21 @@ from rmqtools import RmqConnection, ResponseObject -rmq = RmqConnection(host='rabbit-1') -rmq.set_command_exchange('spec-command') +rmq = RmqConnection(host='localhost') +rmq.set_command_exchange('test_exg') @rmq.send_command('start_device', 'device_command') def start() -> ResponseObject: - device_num = 1 - command = 'start' - args = [device_num] - kwargs = {'command': command} - obj = ResponseObject(args, kwargs) - print('test') + spec_id = 'spec1' + command = 'stop' + kwargs = {'spec_id': spec_id, 'command': command} + obj = ResponseObject(kwargs=kwargs) return obj @rmq.handle_response('start_device') -def print_status(device_id, status=''): - print(f"Device {device_id} status: {status}") +def print_status(success='', error=''): + if error: + print(f"[ERROR] {error}") + else: + print(f"Device status: {success}") rmq.run() diff --git a/examples/status_publisher.py b/examples/status_publisher.py index 3ce0114..c217f68 100644 --- a/examples/status_publisher.py +++ b/examples/status_publisher.py @@ -1,7 +1,7 @@ from rmqtools import RmqConnection -rmq = RmqConnection(host='rabbit-1') +rmq = RmqConnection(host='192.168.137.1', port=5672) rmq.set_status_exchange('status') def get_status(device_id): diff --git a/examples/status_subscriber.py b/examples/status_subscriber.py index 8108641..9bf73ce 100644 --- a/examples/status_subscriber.py +++ b/examples/status_subscriber.py @@ -2,54 +2,34 @@ import json from rmqtools import RmqConnection -from pika.adapters.blocking_connection import BlockingChannel -rmq = RmqConnection(host='rabbit-3') -rmq.set_status_exchange('logs') +rmq = RmqConnection(host='') +rmq.set_status_exchange('status') response_count = {'ok': 0, 'down': 0} response_count_2 = {'ok': 0, 'down': 0} msg_times = [datetime.now()] msg_times_2 = [datetime.now()] -@rmq.subscribe_status('device_logs', ['device.*.status']) -def handle_response(channel:BlockingChannel, method, properties, body): - try: - data = json.loads(body) - except: - data = {'status': 'down'} - ok_count = response_count.get('ok') - down_count = response_count.get('down') - if data.get('status') == 'ok': - ok_count = ok_count + 1 - else: - down_count = down_count + 1 - response_count.update(ok=ok_count, down=down_count) - - total = sum(response_count.values()) - now = datetime.now() - if (now - msg_times[-1]).seconds >= 10: - msg_times.append(now) - print(f"(1) [{now.isoformat()}] Total status messages received: {total}", "\n") - -@rmq.subscribe_status('device_logs_2', ['device.*.status']) -def handle_response_2(channel:BlockingChannel, method, properties, body): - try: - data = json.loads(body) - except: - data = {'status': 'down'} - ok_count = response_count_2.get('ok') - down_count = response_count_2.get('down') - if data.get('status') == 'ok': - ok_count = ok_count + 1 - else: - down_count = down_count + 1 - response_count_2.update(ok=ok_count, down=down_count) - - total = sum(response_count_2.values()) - now = datetime.now() - if (now - msg_times_2[-1]).seconds >= 10: - msg_times_2.append(now) - print(f"(2) [{now.isoformat()}] Total status messages received: {total}", "\n") +@rmq.subscribe_status('device_logs', ['spec.*.status']) +def handle_response(channel, method, properties, body): + print(body) + # try: + # data = json.loads(body) + # except: + # data = {'status': 'down'} + # ok_count = response_count.get('ok') + # down_count = response_count.get('down') + # if data.get('status') == 'ok': + # ok_count = ok_count + 1 + # else: + # down_count = down_count + 1 + # response_count.update(ok=ok_count, down=down_count) + + # total = sum(response_count.values()) + # now = datetime.now() + # if (now - msg_times[-1]).seconds >= 10: + # msg_times.append(now) + # print(f"(1) [{now.isoformat()}] Total status messages received: {total}", "\n") rmq.run() diff --git a/pyproject.toml b/pyproject.toml index df087ab..b2d7bfc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "rmqtools" -version = "1.0.0-alpha" +version = "1.0.0-alpha.1" authors = [ { name="Christian Thompson", email="cjthom43@asu.edu" }, { name="Paul Horton", email="pahorton@asu.edu" }, diff --git a/rmqtools/rpc.py b/rmqtools/rpc.py index f3c52bc..1aff304 100644 --- a/rmqtools/rpc.py +++ b/rmqtools/rpc.py @@ -359,10 +359,8 @@ def call_threadsafe(self, queue:str, stop_event:threading.Event, body = command.__dict__ props = self._get_publish_props() - print(queue) self.publisher.publish_json(body, routing_key=queue, properties=props) - print('foo') while not stop_event.is_set(): self.Connection.connection.process_data_events(time_limit=1) if self.response is not None: From a3f9e3fd0f8758589e5d7a722ac34dbf15eb6bfe Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Wed, 28 Feb 2024 14:02:08 -0700 Subject: [PATCH 05/11] Added timeout feature to RPC connections. --- rmqtools/rmq.py | 46 ++++++++++++++++++++++++++++++++++++++++++---- rmqtools/rpc.py | 12 +++++++++++- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/rmqtools/rmq.py b/rmqtools/rmq.py index e18410e..ddfc68a 100644 --- a/rmqtools/rmq.py +++ b/rmqtools/rmq.py @@ -169,6 +169,9 @@ def __init__(self, username='guest', password='guest', host='localhost', self.response_handlers: Dict[str, Callable[[Any], Any]] self.response_handlers = {} + self.timeout_handlers: Dict[str, Callable[[int], None]] + self.timeout_handlers = {} + def handle_exit(sig, frame): print('Main thread interrupted by user. ' 'Shutting down all child threads.') @@ -616,8 +619,31 @@ def wrapper(*args, **kwargs): return wrapper return decorator + def handle_timeout(self, command_id:str): + """A method decorator to set the timeout handler of an RPC client. + + This method is not threaded, but it will not cause any IO blocking. + All it does is update the ``timeout_handlers`` attribute with the + wrapped function. The wrapped function should have only one argument + for the timeout length and should not return anything. + + Parameters + ---------- + command_id : str + The identifier of the command this response handler is associated + with. + """ + def decorator(func:Callable[[Any], Any]): + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + self.timeout_handlers.update({command_id: func}) + return wrapper + return decorator + def _send_command(self, func:Callable[[Any], ResponseObject], - command_id:str, queue:str, *args, **kwargs) -> None: + command_id:str, queue:str, *args, timeout:int=None, + **kwargs) -> None: """Sends a command to an associated RPC server. This method is not automatically threaded, but it is threadsafe. It @@ -640,22 +666,30 @@ def _send_command(self, func:Callable[[Any], ResponseObject], call with the associated ``handle_response`` decorator. queue : str The queue name of the RPC server where the command is being sent. + timeout : int, optional + How long to wait for a response before timing out. Default behavior + is no timeout. """ def default_handler(*a, **kw): pass command = func(*args, **kwargs) response_handler = self.response_handlers.get( command_id, default_handler) + timeout_handler = self.timeout_handlers.get( + command_id, default_handler) exchange, _ = self.exchanges.get('command') client = RpcClient(exchange) conn = self._get_connection() client.connect(conn) - response = client.call_threadsafe(queue, self.stop_event, command) + response = client.call_threadsafe(queue, self.stop_event, command, + timeout=timeout) + if not response: + return timeout_handler(timeout) args = response.args kwargs = response.kwargs - response_handler(*args, **kwargs) + return response_handler(*args, **kwargs) - def send_command(self, command_id:str, queue:str): + def send_command(self, command_id:str, queue:str, timeout:int=None): """A method decorator to send a command to an RPC server. This method is automatically threaded. It creates an RPC client to @@ -675,6 +709,9 @@ def send_command(self, command_id:str, queue:str): decorator with the associated ``handle_response`` decorator. queue : str The queue name of the RPC server where the command is being sent. + timeout : int, optional + How long to wait for a response before timing out. Default behavior + is no timeout. """ def decorator(func:Callable[[], ResponseObject]): @functools.wraps(func) @@ -683,6 +720,7 @@ def wrapper(*args, **kwargs): thread = threading.Thread( target=self._send_command, args=(wrapper, command_id, queue), + kwargs={'timeout': timeout}, ) self.threads.append(thread) return wrapper diff --git a/rmqtools/rpc.py b/rmqtools/rpc.py index 1aff304..4355177 100644 --- a/rmqtools/rpc.py +++ b/rmqtools/rpc.py @@ -2,6 +2,7 @@ import json import threading import uuid +from datetime import datetime, timedelta from typing import Any, Callable, Union import pika @@ -328,7 +329,7 @@ def call(self, queue:str, command:ResponseObject) -> ResponseObject: return self.response def call_threadsafe(self, queue:str, stop_event:threading.Event, - command:ResponseObject) -> ResponseObject: + command:ResponseObject, timeout=None) -> ResponseObject: """Similar to the ``call`` method but requests the server in a thread- safe manner. @@ -361,8 +362,17 @@ def call_threadsafe(self, queue:str, stop_event:threading.Event, props = self._get_publish_props() self.publisher.publish_json(body, routing_key=queue, properties=props) + timeout_time = datetime.max + if timeout: + timeout_time = datetime.now() + timedelta(seconds=timeout) while not stop_event.is_set(): self.Connection.connection.process_data_events(time_limit=1) if self.response is not None: break + if datetime.now() > timeout_time: + # command timed out + self.Connection.channel.queue_purge(queue) + self.Connection.connection.close() + self.response = None + break return self.response From e2e45d149db35fb7537a74a3287efa7234aa0194 Mon Sep 17 00:00:00 2001 From: Paul Horton Date: Mon, 18 Mar 2024 13:25:26 -0700 Subject: [PATCH 06/11] Changes to handle keyboard interrupts --- rmqtools/rmq.py | 12 ++++++++---- rmqtools/rpc.py | 26 ++++++++++++++++---------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/rmqtools/rmq.py b/rmqtools/rmq.py index ddfc68a..0abbd70 100644 --- a/rmqtools/rmq.py +++ b/rmqtools/rmq.py @@ -178,7 +178,7 @@ def handle_exit(sig, frame): self.stop() sys.exit(0) - signal.signal(signal.SIGINT, handle_exit) + #signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGTERM, handle_exit) def _get_connection(self) -> Connection: @@ -375,10 +375,14 @@ def run(self): trigger the stop command to shutdown all threads. """ print("Starting all RabbitMQ threads. Press enter at any time to " - "shutdown all child threads.") + "shutdown all child threads.") self.start() - input() - print("Quit command received. Shutting down all child threads...") + try: + input() + print("Quit command received. Shutting down all child threads...") + except KeyboardInterrupt as e: + print("Main thread interrupted by user. Shutting down all child " + "threads...") self.stop() def stop(self): diff --git a/rmqtools/rpc.py b/rmqtools/rpc.py index 4355177..f538bf1 100644 --- a/rmqtools/rpc.py +++ b/rmqtools/rpc.py @@ -363,16 +363,22 @@ def call_threadsafe(self, queue:str, stop_event:threading.Event, self.publisher.publish_json(body, routing_key=queue, properties=props) timeout_time = datetime.max + clearqueue = False if timeout: timeout_time = datetime.now() + timedelta(seconds=timeout) - while not stop_event.is_set(): - self.Connection.connection.process_data_events(time_limit=1) - if self.response is not None: - break - if datetime.now() > timeout_time: - # command timed out - self.Connection.channel.queue_purge(queue) - self.Connection.connection.close() - self.response = None - break + try: + while not stop_event.is_set(): + self.Connection.connection.process_data_events(time_limit=1) + if self.response is not None: + break + if datetime.now() > timeout_time: + # command timed out + clearqueue = True + break + except KeyboardInterrupt as e: + clearqueue = True + if clearqueue: + self.Connection.channel.queue_purge(queue) + self.Connection.connection.close() + self.response = None return self.response From e92e09a1c9de13bf25b922806f8f4674f8072c1e Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Mon, 1 Jul 2024 12:08:54 -0500 Subject: [PATCH 07/11] Version 1.0.0-alpha.3 - Add ability to define custom error handlers --- pyproject.toml | 2 +- rmqtools/__init__.py | 4 +++- rmqtools/exceptions.py | 6 ++++++ rmqtools/rmq.py | 48 ++++++++++++++++++++++++++++++++++++------ rmqtools/rpc.py | 3 +++ 5 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 rmqtools/exceptions.py diff --git a/pyproject.toml b/pyproject.toml index b2d7bfc..fd4e95f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "rmqtools" -version = "1.0.0-alpha.1" +version = "1.0.0-alpha.3" authors = [ { name="Christian Thompson", email="cjthom43@asu.edu" }, { name="Paul Horton", email="pahorton@asu.edu" }, diff --git a/rmqtools/__init__.py b/rmqtools/__init__.py index c538b95..6505b3a 100644 --- a/rmqtools/__init__.py +++ b/rmqtools/__init__.py @@ -11,13 +11,15 @@ and an API reference, available on Read the Docs at ... """ -__version__ = '1.0.0-alpha' +__version__ = '1.0.0-alpha.3' import logging # suppress logging warnings while importing rabbitmq-tools logging.getLogger(__name__).addHandler(logging.NullHandler()) +from rmqtools.exceptions import RmqError + from rmqtools.connection import ResponseObject from rmqtools.connection import Connection from rmqtools.publisher import Publisher diff --git a/rmqtools/exceptions.py b/rmqtools/exceptions.py new file mode 100644 index 0000000..ef39cfd --- /dev/null +++ b/rmqtools/exceptions.py @@ -0,0 +1,6 @@ +"""Implements custom error classes for rmqtools.""" + + +class RmqError(Exception): + """Common base class for all rmqtools errors""" + pass diff --git a/rmqtools/rmq.py b/rmqtools/rmq.py index 0abbd70..54b9a4b 100644 --- a/rmqtools/rmq.py +++ b/rmqtools/rmq.py @@ -23,7 +23,7 @@ import pika from pika.exchange_type import ExchangeType from rmqtools import (Connection, Publisher, ResponseObject, RpcClient, - RpcServer, Subscriber) + RpcServer, Subscriber, RmqError) class RmqConnection(): @@ -172,6 +172,9 @@ def __init__(self, username='guest', password='guest', host='localhost', self.timeout_handlers: Dict[str, Callable[[int], None]] self.timeout_handlers = {} + self.error_handlers: Dict[str, Callable[[Exception], None]] + self.error_handlers = {} + def handle_exit(sig, frame): print('Main thread interrupted by user. ' 'Shutting down all child threads.') @@ -634,7 +637,7 @@ def handle_timeout(self, command_id:str): Parameters ---------- command_id : str - The identifier of the command this response handler is associated + The identifier of the command this timeout handler is associated with. """ def decorator(func:Callable[[Any], Any]): @@ -645,6 +648,28 @@ def wrapper(*args, **kwargs): return wrapper return decorator + def handle_error(self, command_id:str): + """A method decorator to set the error handler of an RPC client. + + This method is not threaded, but it will not cause any IO blocking. + All it does is update the ``error_handlers`` attribute with the + wrapped function. The wrapped function should have only one argument + for the returned error and should not return anything. + + Parameters + ---------- + command_id : str + The identifier of the command this error handler is associated + with. + """ + def decorator(func:Callable[[Any], Any]): + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + self.error_handlers.update({command_id: func}) + return wrapper + return decorator + def _send_command(self, func:Callable[[Any], ResponseObject], command_id:str, queue:str, *args, timeout:int=None, **kwargs) -> None: @@ -676,17 +701,28 @@ def _send_command(self, func:Callable[[Any], ResponseObject], """ def default_handler(*a, **kw): pass + + def default_error_handler(e:Exception): + raise RmqError("An error occurred when sending the command") + command = func(*args, **kwargs) response_handler = self.response_handlers.get( command_id, default_handler) timeout_handler = self.timeout_handlers.get( command_id, default_handler) + error_handler = self.error_handlers.get( + command_id, default_error_handler) exchange, _ = self.exchanges.get('command') client = RpcClient(exchange) - conn = self._get_connection() - client.connect(conn) - response = client.call_threadsafe(queue, self.stop_event, command, - timeout=timeout) + + try: + conn = self._get_connection() + client.connect(conn) + response = client.call_threadsafe(queue, self.stop_event, command, + timeout=timeout) + except Exception as e: + return error_handler(e) + if not response: return timeout_handler(timeout) args = response.args diff --git a/rmqtools/rpc.py b/rmqtools/rpc.py index f538bf1..bb714df 100644 --- a/rmqtools/rpc.py +++ b/rmqtools/rpc.py @@ -355,6 +355,9 @@ def call_threadsafe(self, queue:str, stop_event:threading.Event, with the ResponseObject format, so the args and kwargs can be passed into a custom response handler. """ + # declare the recipient queue in case the receiver is dead + self.channel.queue_declare(queue=queue) + self.response = None self.corr_id = str(uuid.uuid4()) From 4fd0ffa6d24992b2f9c0e3ef72b0632b9e72f6ec Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Thu, 4 Jul 2024 09:21:29 -0500 Subject: [PATCH 08/11] Add more accurate interval settings for publishers --- pyproject.toml | 2 +- rmqtools/__init__.py | 2 +- rmqtools/rmq.py | 19 ++++++++++++++----- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index fd4e95f..c41d5fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "rmqtools" -version = "1.0.0-alpha.3" +version = "1.0.0-alpha.3.1" authors = [ { name="Christian Thompson", email="cjthom43@asu.edu" }, { name="Paul Horton", email="pahorton@asu.edu" }, diff --git a/rmqtools/__init__.py b/rmqtools/__init__.py index 6505b3a..1f78f79 100644 --- a/rmqtools/__init__.py +++ b/rmqtools/__init__.py @@ -11,7 +11,7 @@ and an API reference, available on Read the Docs at ... """ -__version__ = '1.0.0-alpha.3' +__version__ = '1.0.0-alpha.3.1' import logging diff --git a/rmqtools/rmq.py b/rmqtools/rmq.py index 54b9a4b..bb68e54 100644 --- a/rmqtools/rmq.py +++ b/rmqtools/rmq.py @@ -13,6 +13,7 @@ the examples section below. """ +from datetime import datetime import functools import signal import sys @@ -395,8 +396,8 @@ def stop(self): for thread in self.threads: thread.join() - def _publish_status(self, func:Callable[[Any], Any], interval: float, - routing_key: str, *args, **kwargs): + def _publish_status(self, func:Callable[[Any], Any], interval:float, + routing_key:str, *args, delay:float=0.0, **kwargs): """Publish a status message periodically. This method is not automatically threaded, but it is thread safe. It @@ -426,13 +427,20 @@ def _publish_status(self, func:Callable[[Any], Any], interval: float, publisher.connect(conn) props = self.publish_props.get(routing_key) while not self.stop_event.is_set(): + start = datetime.now() + time.sleep(min(delay, interval)) + data = func(*args, **kwargs) # get status data publisher.publish_json(data, routing_key=routing_key, properties=props) - # print(data) - time.sleep(interval) - def publish_status(self, interval: float, routing_key: str): + # make sure we don't add extra delay if the data request or publish + # command take a few seconds + stop = datetime.now() + true_delay = (stop - start).total_seconds() + time.sleep(max(interval - true_delay, 0)) + + def publish_status(self, interval:float, routing_key:str, delay:float=0.0): """A method decorator for publishing status messages periodically. This method is automatically threaded. The function it wraps must @@ -455,6 +463,7 @@ def wrapper(*args, **kwargs): thread = threading.Thread( target=self._publish_status, args=(wrapper, interval, routing_key), + kwargs={'delay': delay}, ) self.threads.append(thread) return wrapper From ab07ac9a3ef91a2bf91a4a2c2186fbce4febd731 Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Thu, 4 Jul 2024 09:25:23 -0500 Subject: [PATCH 09/11] Fix versioning --- pyproject.toml | 2 +- rmqtools/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c41d5fd..5cf2376 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "rmqtools" -version = "1.0.0-alpha.3.1" +version = "1.0.0-alpha.4" authors = [ { name="Christian Thompson", email="cjthom43@asu.edu" }, { name="Paul Horton", email="pahorton@asu.edu" }, diff --git a/rmqtools/__init__.py b/rmqtools/__init__.py index 1f78f79..9a6ce88 100644 --- a/rmqtools/__init__.py +++ b/rmqtools/__init__.py @@ -11,7 +11,7 @@ and an API reference, available on Read the Docs at ... """ -__version__ = '1.0.0-alpha.3.1' +__version__ = '1.0.0-alpha.4' import logging From 8977ded5ef234786d283a4cb8d7320b615db6edc Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Wed, 17 Jun 2026 13:37:31 -0400 Subject: [PATCH 10/11] Version 1.0.0 - Bug fixes for command queue declaration - Changed documentation links to correct repository --- .gitignore | 2 +- README.rst | 2 +- docs/conf.py | 4 ++-- docs/examples.rst | 3 +++ docs/faq.rst | 4 ++-- docs/index.rst | 4 ++-- pyproject.toml | 10 +++++----- rmqtools/__init__.py | 2 +- rmqtools/rpc.py | 8 ++++---- 9 files changed, 21 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 76d4a42..f54c094 100644 --- a/.gitignore +++ b/.gitignore @@ -160,4 +160,4 @@ cython_debug/ #.idea/ # vscode files -.vscode/** +.vscode diff --git a/README.rst b/README.rst index f180eb1..4ebaffc 100644 --- a/README.rst +++ b/README.rst @@ -144,4 +144,4 @@ Exposed Classes response handler, defaults to ``{}``; operates like ``**kwargs`` .. aliases below here -.. _latest: https://github.com/217690thompson/rmqtools/releases/latest +.. _latest: https://github.com/ASTHROS/rmqtools/releases/latest diff --git a/docs/conf.py b/docs/conf.py index 905e125..9f25144 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -23,9 +23,9 @@ master_doc = 'index' project = 'rmqtools' -copyright = '2023, Christian Thompson, Paul Horton' +copyright = '2026, Christian Thompson, Paul Horton' author = 'Christian Thompson, Paul Horton' -release = '1.0.0-alpha' +release = '1.0.0' version = '.'.join(release.split('.')[0:2]) exclude_patterns = ['_build'] diff --git a/docs/examples.rst b/docs/examples.rst index 0338038..082e1ab 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -3,6 +3,9 @@ Usage Examples Examples coming soon! ~~~~~~~~~~~~~~~~~~~~~ + +See ``examples/`` in the GitHub repository for now. + .. Rmqtools has various methods of use, between the high-level RmqConnection class .. and the various lower-level classes. The following examples illustrate .. different ways the RmqConnection class can be used to handle common use cases. diff --git a/docs/faq.rst b/docs/faq.rst index 10d5915..5c74876 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -11,9 +11,9 @@ Frequently Asked Questions - How do I report a bug with Rmqtools? - The `main Rmqtools repository `_ + The `main Rmqtools repository `_ is hosted on `GitHub `_ and we use the - `Issue tracker `_ to + `Issue tracker `_ to handle bug reports. - How can I contribute to Rmqtools? diff --git a/docs/index.rst b/docs/index.rst index 6250ee0..e52c554 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -58,5 +58,5 @@ Indices and tables .. aliases below here .. _Pika: https://pika.readthedocs.io -.. _latest: https://github.com/217690thompson/rmqtools/releases/latest -.. |repo_base| replace:: https://github.com/217690thompson/rmqtools +.. _latest: https://github.com/ASTHROS/rmqtools/releases/latest +.. |repo_base| replace:: https://github.com/ASTHROS/rmqtools diff --git a/pyproject.toml b/pyproject.toml index 5cf2376..7d0b9e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,10 +4,10 @@ build-backend = "setuptools.build_meta" [project] name = "rmqtools" -version = "1.0.0-alpha.4" +version = "1.0.0" authors = [ - { name="Christian Thompson", email="cjthom43@asu.edu" }, - { name="Paul Horton", email="pahorton@asu.edu" }, + { name="Christian Thompson", email="christian.thompson@mail.utoronto.ca" }, + { name="Paul Horton", email="paul_horton@uml.edu" }, ] description = "Wrappers for RabbitMQ development in Python. Based on the pika library." readme = "README.md" @@ -22,5 +22,5 @@ dependencies = [ ] [project.urls] -"Homepage" = "https://github.com/217690thompson/rmqtools" -"Bug Tracker" = "https://github.com/217690thompson/rmqtools/issues" +"Homepage" = "https://github.com/ASTHROS/rmqtools" +"Bug Tracker" = "https://github.com/ASTHROS/rmqtools/issues" diff --git a/rmqtools/__init__.py b/rmqtools/__init__.py index 9a6ce88..7f9db24 100644 --- a/rmqtools/__init__.py +++ b/rmqtools/__init__.py @@ -11,7 +11,7 @@ and an API reference, available on Read the Docs at ... """ -__version__ = '1.0.0-alpha.4' +__version__ = '1.0.0' import logging diff --git a/rmqtools/rpc.py b/rmqtools/rpc.py index bb714df..5df2cb1 100644 --- a/rmqtools/rpc.py +++ b/rmqtools/rpc.py @@ -73,7 +73,7 @@ def connect(self, conn:Connection): self.channel = self.Connection.channel self.Connection.exchange_declare(self.exchange_name, self.etype) self.exchange = self.Connection.exchanges.get(self.exchange_name) - self.channel.queue_declare(queue=self.queue_name) + self.channel.queue_declare(queue=self.queue_name, durable=True) self.channel.queue_bind(self.queue_name, self.exchange_name, self.queue_name) @@ -278,9 +278,9 @@ def on_response(self, ch:Channel, method:Basic.Deliver, Parameters ---------- ch : Channel - Filled in automatically by ``basic_consume`` + Filled in automatically by ``basic_consume``, unused method : pika.spec.Basic.Deliver - Filled in automatically by ``basic_consume`` + Filled in automatically by ``basic_consume``, unused props : pika.BasicProperties Filled in automatically by ``basic_consume`` body : bytes @@ -356,7 +356,7 @@ def call_threadsafe(self, queue:str, stop_event:threading.Event, passed into a custom response handler. """ # declare the recipient queue in case the receiver is dead - self.channel.queue_declare(queue=queue) + self.channel.queue_declare(queue=queue, durable=True) self.response = None self.corr_id = str(uuid.uuid4()) From 26224093e99032283ac947a4ff794b97ec569e12 Mon Sep 17 00:00:00 2001 From: Christian Thompson Date: Wed, 17 Jun 2026 14:07:33 -0400 Subject: [PATCH 11/11] Harmonize hosts used in examples Also remove commented debugging lines. --- examples/multi_status_publisher.py | 2 +- examples/receive_command.py | 2 +- examples/send_command.py | 2 +- examples/status_publisher.py | 2 +- examples/status_subscriber.py | 19 +------------------ rmqtools/connection.py | 8 ++------ rmqtools/rmq.py | 4 ++-- 7 files changed, 9 insertions(+), 30 deletions(-) diff --git a/examples/multi_status_publisher.py b/examples/multi_status_publisher.py index d977cda..8d221b7 100644 --- a/examples/multi_status_publisher.py +++ b/examples/multi_status_publisher.py @@ -25,7 +25,7 @@ def send_status(device_id): print(f"[{now.isoformat()}] Total status messages sent: {total}", "\n") return msg -rmq = RmqConnection(host='rabbit-1') +rmq = RmqConnection(host='rabbit') rmq.set_status_exchange('logs') for i in range(16): diff --git a/examples/receive_command.py b/examples/receive_command.py index 216e7c8..823b89a 100644 --- a/examples/receive_command.py +++ b/examples/receive_command.py @@ -1,6 +1,6 @@ from rmqtools import RmqConnection, ResponseObject -rmq = RmqConnection(host='rabbit-3') +rmq = RmqConnection(host='rabbit') rmq.set_command_exchange('spec-command') @rmq.handle_command('device_command') diff --git a/examples/send_command.py b/examples/send_command.py index 078e4f9..261f6fd 100644 --- a/examples/send_command.py +++ b/examples/send_command.py @@ -1,6 +1,6 @@ from rmqtools import RmqConnection, ResponseObject -rmq = RmqConnection(host='localhost') +rmq = RmqConnection(host='rabbit') rmq.set_command_exchange('test_exg') @rmq.send_command('start_device', 'device_command') diff --git a/examples/status_publisher.py b/examples/status_publisher.py index c217f68..e0c3988 100644 --- a/examples/status_publisher.py +++ b/examples/status_publisher.py @@ -1,7 +1,7 @@ from rmqtools import RmqConnection -rmq = RmqConnection(host='192.168.137.1', port=5672) +rmq = RmqConnection(host='rabbit') rmq.set_status_exchange('status') def get_status(device_id): diff --git a/examples/status_subscriber.py b/examples/status_subscriber.py index 9bf73ce..f601e93 100644 --- a/examples/status_subscriber.py +++ b/examples/status_subscriber.py @@ -3,7 +3,7 @@ from rmqtools import RmqConnection -rmq = RmqConnection(host='') +rmq = RmqConnection(host='rabbit') rmq.set_status_exchange('status') response_count = {'ok': 0, 'down': 0} @@ -14,22 +14,5 @@ @rmq.subscribe_status('device_logs', ['spec.*.status']) def handle_response(channel, method, properties, body): print(body) - # try: - # data = json.loads(body) - # except: - # data = {'status': 'down'} - # ok_count = response_count.get('ok') - # down_count = response_count.get('down') - # if data.get('status') == 'ok': - # ok_count = ok_count + 1 - # else: - # down_count = down_count + 1 - # response_count.update(ok=ok_count, down=down_count) - - # total = sum(response_count.values()) - # now = datetime.now() - # if (now - msg_times[-1]).seconds >= 10: - # msg_times.append(now) - # print(f"(1) [{now.isoformat()}] Total status messages received: {total}", "\n") rmq.run() diff --git a/rmqtools/connection.py b/rmqtools/connection.py index 653e965..2683738 100644 --- a/rmqtools/connection.py +++ b/rmqtools/connection.py @@ -718,9 +718,7 @@ def exchange_declare(self, name:str, exchange_type:ExchangeType, try: self.exchanges += e except ValueError as err: - # log that exchange already exists so skipping the creation of it - # print(f"Exchange '{name}' already exists on this channel. " - # "Skipping creation, instead verifying the exchange.") + # exchange already exists, but that is fine pass def exchange_delete(self, name:str, if_unused=False) -> None: @@ -757,7 +755,5 @@ def queue_declare(self, name='', binding_keys=[], try: self.queues += q except ValueError as err: - # log that queue already exists so skipping the creation of it - # print(f"Queue '{name}' already exists on this channel. " - # "Skipping creation, instead verifying the queue.") + # this queue already exists, but we can reuse it pass diff --git a/rmqtools/rmq.py b/rmqtools/rmq.py index bb68e54..cf73990 100644 --- a/rmqtools/rmq.py +++ b/rmqtools/rmq.py @@ -379,14 +379,14 @@ def run(self): trigger the stop command to shutdown all threads. """ print("Starting all RabbitMQ threads. Press enter at any time to " - "shutdown all child threads.") + "shutdown all child threads.") self.start() try: input() print("Quit command received. Shutting down all child threads...") except KeyboardInterrupt as e: print("Main thread interrupted by user. Shutting down all child " - "threads...") + "threads...") self.stop() def stop(self):