This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

Source code for kombu.asynchronous.aws.sqs.connection

"""Amazon SQS Connection."""

from __future__ import annotations

from kombu.asynchronous import get_event_loop
from kombu.asynchronous.aws.connection import AsyncAWSQueryConnection

from .ext import boto3

__all__ = ('AsyncSQSConnection',)


[docs]class AsyncSQSConnection(AsyncAWSQueryConnection): """Async SQS Connection.""" def __init__(self, sqs_connection, debug=0, region=None, **kwargs): if boto3 is None: raise ImportError('boto3 is not installed') super().__init__( sqs_connection, region_name=region, debug=debug, **kwargs ) self.hub = kwargs.get('hub') or get_event_loop() def _async_sqs_request(self, api, callback, *args, **kwargs): """Makes an asynchronous request to an SQS API. Arguments: --------- api -- The name of the API, e.g. 'receive_message'. callback -- The callback to pass the response to when it is available. *args, **kwargs -- The arguments and keyword arguments to pass to the SQS API. Those are API dependent and can be found in the boto3 documentation. """ # Define a method to execute the SQS API synchronously. def sqs_request(api, callback, args, kwargs): method = getattr(self.sqs_connection, api) resp = method(*args, **kwargs) if callback: callback(resp) # Hand off the request to the event loop to execute it asynchronously. self.hub.call_soon(sqs_request, api, callback, args, kwargs)
[docs] def receive_message( self, queue_url, number_messages=1, visibility_timeout=None, attributes=('ApproximateReceiveCount',), wait_time_seconds=None, callback=None ): kwargs = { "QueueUrl": queue_url, "MaxNumberOfMessages": number_messages, "MessageAttributeNames": attributes, "WaitTimeSeconds": wait_time_seconds, } if visibility_timeout: kwargs["VisibilityTimeout"] = visibility_timeout return self._async_sqs_request('receive_message', callback, **kwargs)
[docs] def delete_message(self, queue_url, receipt_handle, callback=None): return self._async_sqs_request('delete_message', callback, QueueUrl=queue_url, ReceiptHandle=receipt_handle)