Basic Information Exchange – Helps exchange information between two or more computers, applications.
High Scalability – Helps multiple services run concurrently.
Load Balancing – Even if a large number of queries come from a single service, it helps distribute them evenly across multiple infrastructures.
Asynchronous Processing – The required information can be stored in a background processor rather than being retrieved in real-time.
views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from google.oauth2 import service_account
import pika
import json
import requests
from google.auth.transport.requests import Request
from .models import SimpleNotificationModel, SimpleFailedNotificationModel,BatchNotificationModel,BatchFailedNotificationModel,BroadcastNotificationModel,BroadcastFailedNotificationModel
SCOPES = [‘https://www.googleapis.com/auth/firebase.messaging’]
FCM_URL = “https://fcm.googleapis.com/v1/projects/fir-fcm-learing-2025/messages:send”
PRIORITY_QUEUES = {‘high’: ‘high_priority_notifications’, ‘normal’: ‘normal_priority_notifications’}
class BaseNotification(APIView):
def publish_to_queue(self, data, queue_name):
“””Publishes data to RabbitMQ queue.”””
with pika.BlockingConnection(pika.ConnectionParameters(‘localhost’)) as connection:
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_publish(exchange=”, routing_key=queue_name, body=json.dumps(data))
def get_firebase_access_token(self):
“””Fetches a fresh Firebase access token.”””
credentials = service_account.Credentials.from_service_account_file(‘serviceAccountKey.json’, scopes=SCOPES)
credentials.refresh(Request())
return credentials.token
def send_fcm_message(self, token, data, headers):
“””Sends a Firebase Cloud Messaging (FCM) notification.”””
message = {
“message”: {
“token”: token,
“notification”: {“title”: data[‘title’], “body”: data[‘message’]},
“data”: data.get(‘data’, {})
}
}
try:
response = requests.post(FCM_URL, headers=headers, json=message)
response.raise_for_status()
return True, {“token”: token, “success”: True, “response”: response.json()}
except requests.exceptions.RequestException as e:
error_response = e.response.json() if e.response else str(e)
return False, {“token”: token, “success”: False, “error”: error_response}
def retry_notification(self, token, data, access_token, max_retries=3):
“””Retries sending failed notifications up to max_retries times.”””
headers = {‘Authorization’: f’Bearer {access_token}’, ‘Content-Type’: ‘application/json’}
for attempt in range(max_retries):
success, response = self.send_fcm_message(token, data, headers)
response[‘retry_attempt’] = attempt + 1
if success:
return response
return response
def save_notification_to_db(self, data):
“””Saves a notification record to the database.”””
SimpleNotificationModel.objects.create(
tokens=data[‘tokens’],
priority=data[‘priority’],
title=data[‘title’],
message=data[‘message’],
data=data.get(‘data’, {})
)
def save_failed_token_to_db(self, token, data):
“””Stores failed notification attempts.”””
SimpleFailedNotificationModel.objects.create(
tokens=[token],
priority=data[‘priority’],
title=data[‘title’],
message=data[‘message’],
data=data.get(‘data’, {})
)
def retry_failed_notifications(self, failed_notifications, headers, max_retries=3):
“””Retries sending failed notifications up to max_retries times.”””
retry_responses = []
for retry in range(max_retries):
if not failed_notifications:
break # Stop retrying if there are no failed notifications left
temp_failed_notifications = []
for notification in failed_notifications:
remaining_failed_tokens = [] # Keep track of tokens that still fail
for token in notification[‘tokens’]:
success, response = self.send_fcm_message(token, notification, headers)
response[‘retry_attempt’] = retry + 1
retry_responses.append(response)
if not success:
remaining_failed_tokens.append(token) # Only retain failed tokens
if remaining_failed_tokens:
temp_failed_notifications.append({
‘tokens’: remaining_failed_tokens,
‘title’: notification.get(‘title’, ”),
‘message’: notification.get(‘message’, ”),
‘data’: notification.get(‘data’, {})
})
failed_notifications = temp_failed_notifications
return retry_responses
class SimpleNotification(BaseNotification):
def post(self, request):
data = request.data
token = data[‘token’]
data[‘tokens’] = [token]
priority = data.get(‘priority’, ‘normal’).lower()
queue_name = PRIORITY_QUEUES.get(priority, ‘normal_priority_notifications’)
self.save_notification_to_db(data)
self.publish_to_queue(data, queue_name)
access_token = self.get_firebase_access_token()
headers = {‘Authorization’: f’Bearer {access_token}’, ‘Content-Type’: ‘application/json’}
success, response = self.send_fcm_message(token, data, headers)
if not success:
retry_response = self.retry_notification(token, data, access_token)
if not retry_response[‘success’]:
self.save_failed_token_to_db(token, data)
response = retry_response
return Response({“message”: “Notification processing completed”, “result”: response})
class BatchNotification(BaseNotification):
def post(self, request):
data = request.data
priority = data.get(‘priority’, ‘normal’).lower()
queue_name = PRIORITY_QUEUES.get(priority, ‘normal_priority_notifications’)
self.save_batch_notifications_to_db(data)
self.publish_to_queue(data, queue_name)
access_token = self.get_firebase_access_token()
headers = {‘Authorization’: f’Bearer {access_token}’, ‘Content-Type’: ‘application/json’}
responses, failed_notifications = self.process_batch_notifications(data, headers)
if failed_notifications:
retry_responses = self.retry_failed_notifications(failed_notifications, headers)
responses.extend(retry_responses)
self.save_failed_batch_to_db(failed_notifications, priority)
return Response({“message”: “Batch notification processing completed”, “results”: responses})
def save_batch_notifications_to_db(self, data):
“””Saves batch notifications efficiently using bulk_create.”””
notifications = [
BatchNotificationModel(
tokens=notification[‘tokens’],
priority=data[‘priority’],
title=notification[‘title’],
message=notification[‘message’],
data=notification.get(‘data’, {})
) for notification in data[‘notifications’]
]
BatchNotificationModel.objects.bulk_create(notifications)
def process_batch_notifications(self, data, headers):
responses, failed_notifications = [], []
for notification in data[‘notifications’]:
failed_tokens = [
token for token in notification[‘tokens’]
if not self.send_fcm_message(token, notification, headers)[0]
]
if failed_tokens:
failed_notifications.append({**notification, “tokens”: failed_tokens})
return responses, failed_notifications
def retry_failed_notifications(self, failed_notifications, headers, max_retries=3):
“””Override with specific implementation for batch notifications”””
return super().retry_failed_notifications(failed_notifications, headers, max_retries)
def save_failed_batch_to_db(self, failed_notifications, priority):
“””Stores failed batch notifications efficiently.”””
BatchFailedNotificationModel.objects.bulk_create([
BatchFailedNotificationModel(
tokens=notification[‘tokens’],
priority=priority,
title=notification[‘title’],
message=notification[‘message’],
data=notification.get(‘data’, {})
) for notification in failed_notifications
])
class BroadcastNotification(BaseNotification):
def post(self, request):
data = request.data
priority = data.get(‘priority’, ‘normal’).lower()
queue_name = PRIORITY_QUEUES.get(priority, ‘normal_priority_notifications’)
self.save_broadcast_to_db(data)
self.publish_to_queue(data, queue_name)
access_token = self.get_firebase_access_token()
headers = {‘Authorization’: f’Bearer {access_token}’, ‘Content-Type’: ‘application/json’}
responses, failed_tokens = self.process_broadcast(data, headers)
if failed_tokens:
retry_responses = self.retry_failed_notifications([{
“tokens”: failed_tokens,
“title”: data[‘title’],
“message”: data[‘message’],
“data”: data.get(‘data’, {})
}], headers)
responses.extend(retry_responses)
self.save_failed_broadcast_to_db(failed_tokens, data)
return Response({“message”: “Broadcast notification completed”, “results”: responses})
def process_broadcast(self, data, headers):
responses, failed_tokens = [], []
for token in data[‘tokens’]:
success, response = self.send_fcm_message(token, data, headers)
responses.append(response)
if not success:
failed_tokens.append(token)
return responses, failed_tokens
def save_broadcast_to_db(self, data):
BroadcastNotificationModel.objects.create(**data)
def save_failed_broadcast_to_db(self, failed_tokens, data):
“””Stores failed broadcast notifications.”””
notification_data = {
‘tokens’: failed_tokens,
‘priority’: data.get(‘priority’, ‘normal’),
‘title’: data.get(‘title’, ”),
‘message’: data.get(‘message’, ”),
‘data’: data.get(‘data’, {})
}
BroadcastFailedNotificationModel.objects.create(**notification_data)
models.py