RabbitMQ

What is RabbitMQ?

RabbitMQ is a message broker that uses Message Queuing to exchange information between single or multiple services. It is based on the AMQP (Advanced Message Queuing Protocol).

What is RabbitMQ used for?

Basic Information Exchange – Helps exchange information between two or more computers, applications.
High Scalability – Helps multiple services run concurrently.
Load Balancing – Even if a large number of queries come from a single service, it helps distribute them evenly across multiple infrastructures.
Asynchronous Processing – The required information can be stored in a background processor rather than being retrieved in real-time.

How does RabbitMQ work?

  • Producer – The entity that sends data to RabbitMQ.
  • Exchange – The entity that receives data sent by the Producer and decides where to send it.
  • Queue – The place where data is stored.
  • Consumer – The process that receives data from RabbitMQ.

Producer → Exchange → Queue → Consumer

Where RabbitMQ is used

  • Server-server communication
  • Microservices architecture
  • Real-time data processing (Chat apps, Live tracking, Notifications)
  • Task queue management (Background jobs, delayed processing)
  • Distributed systems coordination

RabbitMQ and other message brokers

  • RabbitMQ
  • Kafka
  • ActiveMQ
  • Redis Pub/Sub

RabbitMQ Benefits

  • Very easy to install and use
  • Can exchange large amounts of data
  • Suitable for Microservices development
  • Helps store data without data loss

Where can RabbitMQ be used?

Message Queuing

  • It helps in queuing messages between different applications or services and executing them in an orderly manner.
  • For example, in a web application, user registration, order processing, etc. can be queued and executed.

Microservices Communication

  • In a microservices architecture, RabbitMQ is used to exchange messages between different services.
  • This helps create loose coupling between services.

Microservices Communication

  • In a microservices architecture, RabbitMQ is used to exchange messages between different services.
  • This helps create loose coupling between services.

Real-time Messaging

  • RabbitMQ is used in applications that require real-time messaging.
  • For example, chat applications, notification systems, etc.

Data Streaming

  • In data streaming applications, RabbitMQ is used to manage data streams.
  • For example, log data, sensor data, etc. can be streamed.

Fault Tolerance and Recovery

  • RabbitMQ provides reliable delivery of messages and helps in recovery from error conditions.
  • This is important in critical applications.

Cloud Computing

  • In cloud environments, RabbitMQ is used to exchange messages between various services and applications.

Integration

  • RabbitMQ is used to integrate various software systems and applications.
  • For example, data can be exchanged between systems such as ERP and CRM.

Which programming language can rabbitmq be connected to?

  • Java
  • Python
  • C# (.NET)
  • JavaScript (Node.js)
  • Ruby
  • PHP
  • Go
  • C++
  • Erlang
  • Swift

How to connect python

views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from google.oauth2 import service_account
import pika
import json
import requests
from google.auth.transport.requests import Request
from .models import SimpleNotificationModel, SimpleFailedNotificationModel,BatchNotificationModel,BatchFailedNotificationModel,BroadcastNotificationModel,BroadcastFailedNotificationModel
SCOPES = [‘https://www.googleapis.com/auth/firebase.messaging’]
FCM_URL = “https://fcm.googleapis.com/v1/projects/fir-fcm-learing-2025/messages:send”
PRIORITY_QUEUES = {‘high’: ‘high_priority_notifications’, ‘normal’: ‘normal_priority_notifications’}
class BaseNotification(APIView):
    def publish_to_queue(self, data, queue_name):
        “””Publishes data to RabbitMQ queue.”””
        with pika.BlockingConnection(pika.ConnectionParameters(‘localhost’)) as connection:
            channel = connection.channel()
            channel.queue_declare(queue=queue_name)
            channel.basic_publish(exchange=”, routing_key=queue_name, body=json.dumps(data))
    def get_firebase_access_token(self):
        “””Fetches a fresh Firebase access token.”””
        credentials = service_account.Credentials.from_service_account_file(‘serviceAccountKey.json’, scopes=SCOPES)
        credentials.refresh(Request())
        return credentials.token
    def send_fcm_message(self, token, data, headers):
        “””Sends a Firebase Cloud Messaging (FCM) notification.”””
        message = {
            “message”: {
                “token”: token,
                “notification”: {“title”: data[‘title’], “body”: data[‘message’]},
                “data”: data.get(‘data’, {})
            }
        }
        try:
            response = requests.post(FCM_URL, headers=headers, json=message)
            response.raise_for_status()
            return True, {“token”: token, “success”: True, “response”: response.json()}
        except requests.exceptions.RequestException as e:
            error_response = e.response.json() if e.response else str(e)
            return False, {“token”: token, “success”: False, “error”: error_response}
    def retry_notification(self, token, data, access_token, max_retries=3):
        “””Retries sending failed notifications up to max_retries times.”””
        headers = {‘Authorization’: f’Bearer {access_token}’, ‘Content-Type’: ‘application/json’}
        for attempt in range(max_retries):
            success, response = self.send_fcm_message(token, data, headers)
            response[‘retry_attempt’] = attempt + 1
            if success:
                return response
        return response
    def save_notification_to_db(self, data):
        “””Saves a notification record to the database.”””
        SimpleNotificationModel.objects.create(
            tokens=data[‘tokens’],
            priority=data[‘priority’],
            title=data[‘title’],
            message=data[‘message’],
            data=data.get(‘data’, {})
        )
    def save_failed_token_to_db(self, token, data):
        “””Stores failed notification attempts.”””
        SimpleFailedNotificationModel.objects.create(
            tokens=[token],
            priority=data[‘priority’],
            title=data[‘title’],
            message=data[‘message’],
            data=data.get(‘data’, {})
        )
    def retry_failed_notifications(self, failed_notifications, headers, max_retries=3):
        “””Retries sending failed notifications up to max_retries times.”””
        retry_responses = []
        for retry in range(max_retries):
            if not failed_notifications:
                break  # Stop retrying if there are no failed notifications left
            temp_failed_notifications = []
            for notification in failed_notifications:
                remaining_failed_tokens = []  # Keep track of tokens that still fail
                for token in notification[‘tokens’]:
                    success, response = self.send_fcm_message(token, notification, headers)
                    response[‘retry_attempt’] = retry + 1
                    retry_responses.append(response)
                    if not success:
                        remaining_failed_tokens.append(token)  # Only retain failed tokens
                if remaining_failed_tokens:
                    temp_failed_notifications.append({
                        ‘tokens’: remaining_failed_tokens,
                        ‘title’: notification.get(‘title’, ”),
                        ‘message’: notification.get(‘message’, ”),
                        ‘data’: notification.get(‘data’, {})
                    })
            failed_notifications = temp_failed_notifications
        return retry_responses
class SimpleNotification(BaseNotification):
    def post(self, request):
        data = request.data
        token = data[‘token’]
        data[‘tokens’] = [token]
        priority = data.get(‘priority’, ‘normal’).lower()
        queue_name = PRIORITY_QUEUES.get(priority, ‘normal_priority_notifications’)
        self.save_notification_to_db(data)
        self.publish_to_queue(data, queue_name)
        access_token = self.get_firebase_access_token()
        headers = {‘Authorization’: f’Bearer {access_token}’, ‘Content-Type’: ‘application/json’}
        success, response = self.send_fcm_message(token, data, headers)
        if not success:
            retry_response = self.retry_notification(token, data, access_token)
            if not retry_response[‘success’]:
                self.save_failed_token_to_db(token, data)
            response = retry_response
        return Response({“message”: “Notification processing completed”, “result”: response})
class BatchNotification(BaseNotification):
    def post(self, request):
        data = request.data
        priority = data.get(‘priority’, ‘normal’).lower()
        queue_name = PRIORITY_QUEUES.get(priority, ‘normal_priority_notifications’)
        self.save_batch_notifications_to_db(data)
        self.publish_to_queue(data, queue_name)
        access_token = self.get_firebase_access_token()
        headers = {‘Authorization’: f’Bearer {access_token}’, ‘Content-Type’: ‘application/json’}
        responses, failed_notifications = self.process_batch_notifications(data, headers)
        if failed_notifications:
            retry_responses = self.retry_failed_notifications(failed_notifications, headers)
            responses.extend(retry_responses)
            self.save_failed_batch_to_db(failed_notifications, priority)
        return Response({“message”: “Batch notification processing completed”, “results”: responses})
    def save_batch_notifications_to_db(self, data):
        “””Saves batch notifications efficiently using bulk_create.”””
        notifications = [
            BatchNotificationModel(
                tokens=notification[‘tokens’],
                priority=data[‘priority’],
                title=notification[‘title’],
                message=notification[‘message’],
                data=notification.get(‘data’, {})
            ) for notification in data[‘notifications’]
        ]
        BatchNotificationModel.objects.bulk_create(notifications)
    def process_batch_notifications(self, data, headers):
        responses, failed_notifications = [], []
        for notification in data[‘notifications’]:
            failed_tokens = [
                token for token in notification[‘tokens’]
                if not self.send_fcm_message(token, notification, headers)[0]
            ]
            if failed_tokens:
                failed_notifications.append({**notification, “tokens”: failed_tokens})
        return responses, failed_notifications
    def retry_failed_notifications(self, failed_notifications, headers, max_retries=3):
        “””Override with specific implementation for batch notifications”””
        return super().retry_failed_notifications(failed_notifications, headers, max_retries)
    def save_failed_batch_to_db(self, failed_notifications, priority):
        “””Stores failed batch notifications efficiently.”””
        BatchFailedNotificationModel.objects.bulk_create([
            BatchFailedNotificationModel(
                tokens=notification[‘tokens’],
                priority=priority,
                title=notification[‘title’],
                message=notification[‘message’],
                data=notification.get(‘data’, {})
            ) for notification in failed_notifications
        ])
class BroadcastNotification(BaseNotification):
    def post(self, request):
        data = request.data
        priority = data.get(‘priority’, ‘normal’).lower()
        queue_name = PRIORITY_QUEUES.get(priority, ‘normal_priority_notifications’)
        self.save_broadcast_to_db(data)
        self.publish_to_queue(data, queue_name)
        access_token = self.get_firebase_access_token()
        headers = {‘Authorization’: f’Bearer {access_token}’, ‘Content-Type’: ‘application/json’}
        responses, failed_tokens = self.process_broadcast(data, headers)
        if failed_tokens:
            retry_responses = self.retry_failed_notifications([{
                “tokens”: failed_tokens,
                “title”: data[‘title’],
                “message”: data[‘message’],
                “data”: data.get(‘data’, {})
            }], headers)
            responses.extend(retry_responses)
            self.save_failed_broadcast_to_db(failed_tokens, data)
        return Response({“message”: “Broadcast notification completed”, “results”: responses})
    def process_broadcast(self, data, headers):
        responses, failed_tokens = [], []
        for token in data[‘tokens’]:
            success, response = self.send_fcm_message(token, data, headers)
            responses.append(response)
            if not success:
                failed_tokens.append(token)
        return responses, failed_tokens
    def save_broadcast_to_db(self, data):
        BroadcastNotificationModel.objects.create(**data)
    def save_failed_broadcast_to_db(self, failed_tokens, data):
        “””Stores failed broadcast notifications.”””
        notification_data = {
            ‘tokens’: failed_tokens,
            ‘priority’: data.get(‘priority’, ‘normal’),
            ‘title’: data.get(‘title’, ”),
            ‘message’: data.get(‘message’, ”),
            ‘data’: data.get(‘data’, {})
        }
        BroadcastFailedNotificationModel.objects.create(**notification_data)

models.py

from django.db import models
class BaseNotificationModel(models.Model):
    tokens = models.JSONField(default=list)
    priority = models.CharField(max_length=200)
    title = models.CharField(max_length=200)
    message = models.TextField()
    data = models.JSONField(default=dict)
    created_at = models.DateTimeField(auto_now_add=True)
    class Meta:
        abstract = True
class SimpleNotificationModel(BaseNotificationModel):
    pass
class SimpleFailedNotificationModel(BaseNotificationModel):
    pass
class BatchNotificationModel(BaseNotificationModel):
    pass
class BatchFailedNotificationModel(BaseNotificationModel):
    pass
class BroadcastNotificationModel(BaseNotificationModel):
    pass
class BroadcastFailedNotificationModel(BaseNotificationModel):
    pass

Does rabbitmq have to pay?

RabbitMQ is an open-source software written in the Erlang language. It is distributed under the Apache 2.0 license, which means it is free to download and use. Therefore, you don’t have to pay to use RabbitMQ.

RabbitMQ Features

  • Free: The basic version of RabbitMQ is available for free.
  • Open Source: Its source code is in the public domain, and you can modify it or adapt it to your needs.
  • Commercial Support: If you need commercial support or additional features, RabbitMQ offers commercial licensing and support services. This is a paid option.

Situations where you may need to pay

  • If you want to purchase a commercial license.
  • If you need additional features (e.g., advanced clustering, monitoring tools).
  • If you need dedicated support (enterprise support).

 

Latest articles

Related articles

Leave a reply

Please enter your comment!
Please enter your name here