rabbit_force
rabbit_force copied to clipboard
Salesforce Streaming API to RabbitMQ adapter service
rabbit_force
.. image:: https://readthedocs.org/projects/rabbit-force/badge/?version=latest :target: https://rabbit-force.readthedocs.io/en/latest/?badge=latest :alt: Documentation Status
.. image:: https://travis-ci.org/robertmrk/rabbit_force.svg?branch=develop :target: https://travis-ci.org/robertmrk/rabbit_force :alt: Build status
.. image:: https://coveralls.io/repos/github/robertmrk/rabbit_force/badge.svg :target: https://coveralls.io/github/robertmrk/rabbit_force :alt: Coverage
.. image:: https://img.shields.io/badge/License-MIT-yellow.svg :target: https://opensource.org/licenses/MIT :alt: MIT license
rabbit_force is a Salesforce Streaming API <api_>_ to RabbitMQ <rabbitmq_>_
adapter service. It listens for event messages from
Salesforce's Streaming API <api_>_ and forwards them to a
RabbitMQ broker <rabbitmq_>_ for you, so you don't have to.
Streaming API <api_>_ is useful when you want notifications to be pushed from
the server to the client based on criteria that you define with
PushTopics <PushTopic_>_ or to receive
generic streaming <GenericStreaming_>_ messages. While
RabbitMQ <rabbitmq_>_ is one of the most popular options for implementing
inter-service communication in a microservice architecture <microservice_>_.
While there are lots of great client implementations for RabbitMQ/AMQP for various languages, there are a lot less Streaming API clients, and many of them are badly maintained. Furthermore RabbitMQ offers much more flexible message consumption techniques.
rabbit_force aims to fix these problems, by providing and adapter between the Streaming API and RabbitMQ, so inter-connected services can consume Streaming API event messages with RabbitMQ. It even supports connection with multiple Salesforce orgs, multiple RabbitMQ brokers, and routing messages between them.
Features
- Forward
Streaming API <api_>_ messages from one or more Salesforce orgs to one or moreRabbitMQ brokers <rabbitmq_>_ - Route incoming messages to a specific broker and exchange with the
specified routing key and properties, with the help of routing rules defined
as
JSONPath <jsonpath_>_ expressions - Support for Salesforce's replay extension for
message reliability and durability <replay_>_ by storing replay markers in aRedis <redis_>_ database - Configurable error handling behavior, either fail instantly or try to recover from network and service outages
- Message sources, sinks and routing configurable with JSON or YAML configuration files
- Implemented using
python asyncio <asyncio_>_ for efficient handling of IO intensive operations
Usage
A relatively simple use case is illustrated on the image bellow. rabbit_force
is connected to a single message source and message sink, or in other words to
a single Salesforce org named as my_org and a single RabbitMQ broker named
my_broker. It listens for messages from two PushTopics (lead_changes
and contact_changes) and a StreamingChannel (my_channel), and forwards
messages into the exchange my_exchange with different routing keys. A redis
database is used to store replay markers sent by Salesforce to take advantage
of message durability <replay_>_.
.. image:: docs/source/_static/usage.svg
The configuration file bellow sets up rabbit_force to forward messages from
lead_changes, contact_changes and my_channel with the routing keys
of lead_change_message, contact_change_message and
my_channel_message respectively.
config.yaml:
.. code-block:: yaml
# message source definition
source:
# mapping of Salesforce orgs to use as message sources
orgs:
# a Salesforce org named as "my_org"
my_org:
# authentication credentials
consumer_key: "<consumer_key>"
consumer_secret: "<consumer_secret>"
username: "<username>"
password: "<password>"
# list of resources that the service can listen to for messages
# if they doesn't exist, they'll be created on application startup
resources:
# a PushTopic resource
- type: PushTopic
# the definition of the PushTopic
spec:
Name: lead_changes
ApiVersion: 42.0
NotifyForFields: Referenced
NotifyForOperationCreate: true
NotifyForOperationUpdate: true
NotifyForOperationDelete: true
NotifyForOperationUndelete: true
Query: SELECT Id, Email, Name, Phone, MobilePhone, Status, LeadSource FROM Lead
# optional durable flag, if false then the resource will be removed on application shutdown
durable: false
# a PushTopic resource
- type: PushTopic
# the definition of the PushTopic
spec:
Name: contact_changes
ApiVersion: 42.0
NotifyForFields: Referenced
NotifyForOperationCreate: true
NotifyForOperationUpdate: true
NotifyForOperationDelete: true
NotifyForOperationUndelete: true
Query: SELECT Id, Email, Name, Phone, MobilePhone FROM Contact
# optional durable flag, if false then the resource will be removed on application shutdown
durable: false
# a StreamingChannel resource
- type: StreamingChannel
# the definition of the StreamingChannel
spec:
Name: /u/my_channel
Description: Streaming channel for notifications
# optional replay storage definition. if defined it'll be used to store replay
# markers sent by Salesforce in order to support message durability
replay:
# redis server address
address: "redis://localhost:6389"
# key prefix
key_prefix: replay
# message sink definition
sink:
# mapping of RabbitMQ brokers to use as message sinks
brokers:
# a RabbitMQ broker named as "by_broker"
my_broker:
# host name of the broker
host: localhost
# definition of the exchange where the messages should be forwarded
exchanges:
- exchange_name: my_exchange
type_name: topic
durable: true
# message router definition
router:
# optional default route to use if no routing rule matches a given message
default_route:
broker_name: my_broker
exchange_name: my_exchange
routing_key: my_channel_message
# list of routing rules
rules:
# JSONPath filter expression as the condition
- condition: "$[?(@.message.channel ~ '.*/lead_changes')]"
# the route to use if the condition produces a non-empty match
route:
broker_name: my_broker
exchange_name: my_exchange
routing_key: lead_change_message
# JSONPath filter expression as the condition
- condition: "$[?(@.message.channel ~ '.*/contact_changes')]"
# the route to use if the condition produces a non-empty match
route:
broker_name: my_broker
exchange_name: my_exchange
routing_key: contact_change_message
A sample run of rabbit_force with the above configuration file.
.. code-block:: bash
$ python -m rabbit_force config.yaml
2018-06-19 16:23:07,909:INFO: Starting up ...
2018-06-19 16:23:07,996:INFO: Configuration loaded from 'config.yaml'
2018-06-19 16:23:07,999:INFO: Configuring application ...
2018-06-19 16:23:10,619:INFO: Using message broker AmqpBroker(host='localhost', port=None, login='guest', password='guest', virtualhost='/', ssl=False, login_method='AMQPLAIN', insist=False, verify_ssl=True)
2018-06-19 16:23:12,128:INFO: Listening for messages from Salesforce org 'my_org':
* from PushTopic 'lead_changes' on channel '/topic/lead_changes'
* from PushTopic 'contact_changes' on channel '/topic/contact_changes'
* from StreamingChannel '/u/my_channel' on channel '/u/my_channel'
With replay storage RedisReplayStorage(address='redis://localhost:6389', key_prefix='replay:my_org', additional_params={}, ignore_network_errors=False).
2018-06-19 16:23:48,119:INFO: Forwarded message 1 on channel '/topic/lead_changes' from 'my_org' to Route(broker_name='my_broker', exchange_name='my_exchange', routing_key='lead_change_message', properties=None).
2018-06-19 16:24:03,039:INFO: Forwarded message 1 on channel '/topic/contact_changes' from 'my_org' to Route(broker_name='my_broker', exchange_name='my_exchange', routing_key='contact_change_message', properties=None).
2018-06-19 16:24:20,180:INFO: Forwarded message 1 on channel '/u/my_channel' from 'my_org' to Route(broker_name='my_broker', exchange_name='my_exchange', routing_key='my_channel_message', properties=None).
2018-06-19 16:24:27,097:INFO: Shutting down ...
More advanced examples can be found in the examples section of the documentation <example_docs_>_.
Documentation
https://rabbit-force.readthedocs.io <https://rabbit-force.readthedocs.io>_
Requirements
Python 3.7
Docker
The public docker image <https://hub.docker.com/r/robertmrk/rabbit_force>_ can
be found on docker hub.
.. code-block:: bash
$ docker pull robertmrk/rabbit_force
License
rabbit_force is offered under the MIT license <LICENSE.txt>_.
.. _asyncio: https://docs.python.org/3/library/asyncio.html .. _api: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/intro_stream.htm .. _PushTopic: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/working_with_pushtopics.htm .. _GenericStreaming: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/generic_streaming_intro.htm#generic_streaming_intro .. _replay: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/using_streaming_api_durability.htm .. _rabbitmq: http://www.rabbitmq.com/ .. _microservice: http://microservices.io/patterns/communication-style/messaging.html .. _jsonpath: http://goessner.net/articles/JsonPath/ .. _redis: https://redis.io/ .. _example_docs: https://rabbit-force.readthedocs.io/en/latest/examples.html