Module apps.email_scheduler.tasks
Expand source code
from mail_sender.celery import app
from django.db.models import F, Q
from django.utils import timezone
from django.db import transaction, DatabaseError
from .email_sender.abstract_email_sender import AbstractEmailSender
from .email_sender.mailjet_email_sender import MailjetEmailWrapper
from django.conf import settings
from .services import EmailService
from .constants import *
from .exceptions import EmailSendingFailedWith429or500
@app.task
def periodic_email_sender():
"""
This method is used to send emails periodically
"""
from .models import EmailScheduler
queryset = EmailScheduler.pending_periodic_email_finder()
"""
We are using atomic transaction here as the queryset returned has been fetched using select_for_update
which requires an atomic transaction in order to evaluate that queryset.
"""
try:
with transaction.atomic():
for email_scheduler in queryset:
send_email(email_scheduler.pk)
except DatabaseError:
pass
@app.task
def periodic_email_log_updater():
"""
This method is used to update the logs of periodic emails
"""
EmailService.email_scheduler_log_updater()
@app.task(bind=True, max_retries=3)
def send_email(self, email_scheduler_obj_id, retry_count=0):
"""
This method is used to send email
Arguments:
email_scheduler_obj_id {int} -- Email scheduler object id
retry_count {int} -- Retry count
"""
try:
EmailService.send_email(
email_scheduler_obj_id=email_scheduler_obj_id,
max_retries=self.max_retries,
retry_count=retry_count,
)
except EmailSendingFailedWith429or500:
self.retry(kwargs={"retry_count": retry_count + 1}, countdown=30)
Functions
def periodic_email_log_updater()-
This method is used to update the logs of periodic emails
Expand source code
@app.task def periodic_email_log_updater(): """ This method is used to update the logs of periodic emails """ EmailService.email_scheduler_log_updater() def periodic_email_sender()-
This method is used to send emails periodically
Expand source code
@app.task def periodic_email_sender(): """ This method is used to send emails periodically """ from .models import EmailScheduler queryset = EmailScheduler.pending_periodic_email_finder() """ We are using atomic transaction here as the queryset returned has been fetched using select_for_update which requires an atomic transaction in order to evaluate that queryset. """ try: with transaction.atomic(): for email_scheduler in queryset: send_email(email_scheduler.pk) except DatabaseError: pass def send_email(email_scheduler_obj_id, retry_count=0)-
This method is used to send email
Arguments
email_scheduler_obj_id {int} – Email scheduler object id retry_count {int} – Retry count
Expand source code
@app.task(bind=True, max_retries=3) def send_email(self, email_scheduler_obj_id, retry_count=0): """ This method is used to send email Arguments: email_scheduler_obj_id {int} -- Email scheduler object id retry_count {int} -- Retry count """ try: EmailService.send_email( email_scheduler_obj_id=email_scheduler_obj_id, max_retries=self.max_retries, retry_count=retry_count, ) except EmailSendingFailedWith429or500: self.retry(kwargs={"retry_count": retry_count + 1}, countdown=30)