import kombu
import uuid
import socket
from threading import Thread, Event
from six.moves.queue import Queue, Empty
from time import sleep
from weakref import proxy

from . import codecs, log
from .eventhook import EventHook

class IPCException(Exception):

class RemoteException(IPCException):

class LocalException(IPCException):

class TimeoutException(LocalException):

[docs]class FutureResult(object): """ Encapsulates future result. Provides interface to block until future data is ready. Thread-safe. """ def __init__(self, cannonical_name, **extra): self.event = Event() self.exception = None self.value = None self.cannonical_name = cannonical_name self.extra = extra
[docs] def wait(self, timeout=5): """ Blocks until data is ready. """ if not self.event.wait(timeout=timeout): self.exception = TimeoutException() return False return True
[docs] def resolve(self, value): """ Resolves this promise with result and sets "ready" event. """ if not self.is_ready(): self.exception = None self.value = value self.event.set()
[docs] def reject(self, exception): """ Rejects this promise with exception and sets "ready" event. """ if not self.is_ready(): self.exception = exception self.value = None self.event.set()
[docs] def is_ready(self): """ Checks if this result has been resolved or rejected. """ return self.event.is_set()
[docs]class QueuedRequest(object): """ Internal class. Represents pending outgoing message. """ def __init__(self, codec, **kwargs): self.codec = codec self.correlation_id = str(uuid.uuid4()) self.__dict__.update(**kwargs)
[docs]class QueuedInvocation(QueuedRequest): """ Internal class. Represents pending outgoing method call. """ def __init__(self, codec, service, method, args, kwargs): super(QueuedInvocation, self).__init__( codec, service=service, method=method, args=args, kwargs=kwargs )
[docs]class QueuedNotification(QueuedRequest): """ Internal class. Represents pending outgoing notification. """ def __init__(self, codec, event, data): super(QueuedNotification, self).__init__( codec, event=event, data=data )
[docs]class ConsumerThread(Thread): """ Internal class. Represents connection & message consuming thread. """ def __init__(self, hostname, exchange_name, connect_timeout, reconnect_timeout, codec): super(ConsumerThread, self).__init__() self.daemon = True self._is_running = False self._is_connected = False self._hostname = hostname self._exchange_name = exchange_name self.on_connect = EventHook() self.on_error = EventHook() self.on_disconnect = EventHook() self.on_message = EventHook() self.on_return = EventHook() self._connect_timeout = connect_timeout self._reconnect_timeout = reconnect_timeout if not isinstance(codec, codecs.AbstractCodec): codec = codecs.PickleCodec() self._codec = codec def run(self): self._is_running = True while self._is_running: try:'Connecting to AMQP...') self._connect() self._is_connected = True'Connected to AMQP') self._start_consuming() self._is_connected = False return except Exception as e: self._is_connected = False if self._reconnect_timeout: log.error( 'Disconnected from AMQP.\n' 'Retrying in {} seconds.\n' 'Error was: {}'.format( self._reconnect_timeout, # format_exc() str(e) ) ) sleep(self._reconnect_timeout) continue else: log.error( 'Disconnected from AMQP.\n' 'Not reconnecting because reconnect_timeout = 0.\n' 'Error was: {}'.format( # format_exc() str(e) ) ) return def _connect(self): self._conn = kombu.Connection( self._hostname, connect_timeout=self._connect_timeout ) self._channel = self._exchange = kombu.Exchange( name=self._exchange_name, channel=self._channel, durable=False ) self._callback_queue = self._create_callback_queue( self._channel, self._exchange ) def _create_callback_queue(self, channel, exchange): name = 'response-{}'.format(uuid.uuid4()) callback_queue = kombu.Queue( name=name, exchange=exchange, routing_key=name, exclusive=True, channel=self._channel ) callback_queue.declare() return callback_queue def _start_consuming(self): """ Start consuming messages. This function is blocking. """ consumer = kombu.Consumer( self._conn, queues=[self._callback_queue], on_message=self._on_message, # accept=[self._codec.content_type], no_ack=True ) consumer.consume() while self._is_running: try: self._conn.drain_events(timeout=0.5) except socket.timeout: continue def _on_message(self, message): log.debug('Got response for invocation ..{}'.format( str(['correlation_id'])[-4:] )) def _handle_return(self, exception, exchange, routing_key, message): if exception.reply_code == 312: exception = RemoteException('Could not find route to the destination queue. Is RPC server running?') else: exception = RemoteException(str(exception)), message) def is_connected(self): return self._is_connected def shutdown_worker(self): self._is_running = False def get_connection(self): return self._conn def get_codec(self): return self._codec def get_exchange(self): return self._exchange def get_callback_queue(self): return self._callback_queue
[docs]class PublisherThread(Thread): """ Internal class. Represents message publishing thread. """ def __init__(self, consumer): self.consumer = proxy(consumer) super(PublisherThread, self).__init__() self.daemon = True self._out_queue = Queue() self._is_running = False def run(self): self._is_running = True while self._is_running: if self.consumer.is_connected(): producer = kombu.Producer(self.consumer._channel, on_return=self.consumer._handle_return) try: queued_request = self._out_queue.get(timeout=0.5) if True: # with kombu.producers[self.consumer.get_connection()].acquire(block=True) as producer: # producer.on_return = print try: self._dispatch_request(queued_request, producer) except Exception as e: # except ConnectionResetError: log.debug('Failed to dispatch request, re-enqueueing again, error was: {}'.format( str(e) )) self.enqueue(queued_request) except Empty: continue else: sleep(0.5) log.debug('Waiting for consumer to be ready...') def enqueue(self, queued_request): self._out_queue.put(queued_request) def _dispatch_request(self, queued_request, producer): if isinstance(queued_request, QueuedInvocation): log.debug('Publishing invocation ..{}'.format( queued_request.correlation_id[-4:] )) producer.publish( exchange=self.consumer.get_exchange(), routing_key='{}_service_{}'.format( self.consumer.get_exchange().name, queued_request.service ), body=queued_request.codec.encode(( queued_request.method, queued_request.args, queued_request.kwargs )), mandatory=True, reply_to=self.consumer.get_callback_queue().name, correlation_id=queued_request.correlation_id, content_type=self.consumer.get_codec().content_type, ) else: log.debug('Publishing notification ..{}'.format( queued_request.correlation_id[-4:] )) producer.publish( exchange='{}_fanout'.format( self.consumer.get_exchange().name ), routing_key='', body=queued_request.codec.encode(( queued_request.event, )), correlation_id=queued_request.correlation_id, content_type=self.consumer.get_codec().content_type ) def shutdown_worker(self): self._is_running = False
[docs]class Client(object): """ Represents a single low-level connection to the ISC messaging broker. Thread-safe. """ def __init__( self, host='amqp://guest:guest@', exchange='isc', codec=None, connect_timeout=2, reconnect_timeout=3, invoke_timeout=20 ): self.future_results = {} self._invoke_timeout = invoke_timeout self._consumer = ConsumerThread(host, exchange, connect_timeout, reconnect_timeout, codec) self._publisher = PublisherThread(self._consumer) self._consumer.on_message += self._on_response self._consumer.on_return += self._on_return self.on_connect = self._consumer.on_connect self.on_error = self._consumer.on_error self.on_disconnect = self._consumer.on_disconnect
[docs] def start(self): """ Start connection & publisher threads. """ self._consumer.start() self._publisher.start()
[docs] def stop(self): """ Stops the client and waits for its termination. """ self._publisher.shutdown_worker() self._consumer.shutdown_worker() self._publisher.join() self._consumer.join()
def _on_response(self, message): """ Called when a message is consumed. """ future_result = self.future_results.get(['correlation_id'], None) if not future_result: # pragma: no cover # TODO: Should not happen! log.error('FIXME: This should not happen.') return try: exception, result = self._consumer.get_codec().decode(message.body) if exception: exception = RemoteException(exception) except Exception as e: # pragma: no cover exception, result = LocalException(str(e)), None if exception: future_result.reject(exception) else: future_result.resolve(result) def _on_return(self, exception, message): future_result = self.future_results.get(['correlation_id'], None) if not future_result: # pragma: no cover # TODO: Should not happen! log.error('FIXME: This should not happen.') return future_result.reject(RemoteException(exception))
[docs] def invoke_async(self, service, method, *args, **kwargs): """ Serialize & publish method call request. """ queued_request = QueuedInvocation(self._consumer.get_codec(), service, method, args, kwargs) future_result = FutureResult('{}.{}'.format(service, method)) self.future_results[queued_request.correlation_id] = future_result self._publisher.enqueue(queued_request) return future_result
[docs] def notify(self, event, data): """ Serialize & publish notification. """ queued_request = QueuedNotification(self._consumer.get_codec(), event, data) self._publisher.enqueue(queued_request)
[docs] def invoke(self, service, method, *args, **kwargs): """ Call a remote method and wait for a result. Blocks until a result is ready. """ future_result = self.invoke_async(service, method, *args, **kwargs) future_result.wait(self._invoke_timeout) if future_result.exception: raise future_result.exception else: return future_result.value
[docs] def set_invoke_timeout(self, timeout): """ Sets timeout for waiting for results on this client. """ self._invoke_timeout = timeout
[docs] def set_logging_level(self, level): """ Set logging level. """ log.set_level(level)
[docs] def set_codec(self, codec): """ Set codec. """ self._consumer._codec = codec
[docs] def force_json(self): """ Force to use JSON codec only. """ self._consumer._codec = codecs.JSONCodec()
def __getattr__(self, attr): """ Convenience method. Returns :class:`.ServiceProxy` to make it look like we're actually calling local methods from local objects. """ return ServiceProxy(self, attr)
[docs]class ServiceProxy(object): """ Convenience wrapper for service. It allows you to perform attribute chaining (e. g. :code:`client.example.add(2, 3)`) """ def __init__(self, client, service_name): self.client = client self.service_name = service_name def __getattr__(self, attr): """ Returns :class:`.MethodProxy` """ return MethodProxy(self.client, self.service_name, attr)
[docs]class MethodProxy(object): """ Convenience wrapper for method. It allows you to perform attribute chaining (e. g. :code:`client.example.add(2, 3)`) """ def __init__(self, client, service_name, method_name): self.client = client self.service_name = service_name self.method_name = method_name def __call__(self, *args, **kwargs): """ Finalizes the chain & performs actual RPC invocation. Blocks while waiting for result. Returns the result. This is same as calling :func:`~isc.client.Client.invoke` """ return self.client.invoke(self.service_name, self.method_name, *args, **kwargs)
[docs] def call_async(self, *args, **kwargs): """ Finalizes the chain & performs actual RPC invocation. Does not block. Returns :class:`.FutureResult`. This is same as calling :func:`~isc.client.Client.invoke_async` """ return self.client.invoke_async(self.service_name, self.method_name, *args, **kwargs)