Module apps.email_scheduler.models

Expand source code
from django.db import models
from django.contrib.postgres.fields import ArrayField
from .tasks import send_email
from .constants import *
from django.utils import timezone
from django.db.models import F, Q

# Create your models here.


class EmailScheduler(models.Model):
    """
    This class will define the EmailScheduler model
    """
    email_to = models.EmailField()
    email_cc = ArrayField(
        models.EmailField(),
        blank=True,
        default=list,
        help_text="list of comma separated email address in Cc",
    )
    email_bcc = ArrayField(
        models.EmailField(),
        blank=True,
        default=list,
        help_text="list of comma separated email address in Bcc",
    )
    email_subject = models.CharField(max_length=256)
    email_body = models.TextField()

    email_schedule = models.DateTimeField(
        blank=True, null=True, help_text="Schedule time to send email")
    email_repeat_after = models.DurationField(
        blank=True,
        null=True,
        help_text=
        "Duration after which to resend email, You can remove it in future to stop the campaign and mark "
        "the task COMPLETE",
    )

    email_service = models.CharField(max_length=32,
                                     default=DEFAULT_EMAIL_SERVICE,
                                     choices=EMAIL_SERVICE_CHOICES)

    email_last_sent_at = models.DateTimeField(blank=True, null=True)
    task_status = models.CharField(
        default=TASK_STATUS_PENDING,
        choices=TASK_STATUS_CHOICES,
        max_length=128,
        help_text="Status of this email sending task",
    )
    email_send_count = models.IntegerField(
        default=0, help_text="Count of emails sent successfully")
    task_failed_count = models.IntegerField(
        default=0, help_text="Count of number of times this task failed")
    task_failure_info = ArrayField(
        models.JSONField(),
        null=True,
        blank=True,
        help_text="If task failed, information regarding why it failed",
    )

    @classmethod
    def pending_periodic_email_finder(cls):
        """
        This method finds all the EmailScheduler objects which need to be sent in case of periodic task
        """

        pending_emails = (cls.objects.select_for_update().exclude(
            email_repeat_after__exact=None,
            email_last_sent_at__exact=None).filter(
                task_status__exact=TASK_STATUS_PENDING,
                email_last_sent_at__lte=timezone.now() -
                F("email_repeat_after"),
            ))
        return pending_emails

    def pre_update_processing(self):
        """
        The purpose of this method is, to do preprocessing before updating an object.
        Currently, when we are updating an EmailScheduler object, and we are changing
        the email_repeat_after to None then we should automatically mark the task complete
        if it's not already in Failed state. Here we first fetch the object from db which contains the old state
        before updating and self contains the state after updating, hence db call is necessary.
        """
        old_obj = EmailScheduler.objects.get(pk=self.pk)
        if (old_obj.email_repeat_after is not None
                and self.email_repeat_after is None
                and old_obj.task_status != TASK_STATUS_FAILED):
            self.task_status = TASK_STATUS_COMPLETE

    def save(self, *args, **kwargs):
        """
        We have overrided the save method to call respective send email method i.e send immidiately or
        after specified duration. We also call pre_update_processing to update the obejct properly in case of
        update.
        Arguments:
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.
        """
        is_adding_new = self._state.adding
        if not is_adding_new:
            self.pre_update_processing()
        super(EmailScheduler, self).save(*args, **kwargs)
        if is_adding_new:
            if self.email_schedule is None:
                send_email.delay(self.pk)
            elif self.email_schedule is not None:
                td = self.email_schedule
                send_email.apply_async(
                    kwargs={"email_scheduler_obj_id": self.pk}, eta=td)

    def update_fields(self, updated_fields: dict):
        """
        This method takes the dict of the updated keys in the model. It updates the
        corresponding fields in the model instance and saves it in db.
        Arguments:
            updated_fields: dict of updated fields in the model
        """
        update_fields = []
        for key, value in updated_fields.items():
            setattr(self, key, value)
            update_fields.append(key)
        self.save(update_fields=update_fields)


class EmailSchedulerLogs(models.Model):
    """
    This class will define the EmailSchedulerLogs model
    """
    email_scheduler = models.ForeignKey(EmailScheduler,
                                        on_delete=models.CASCADE)
    email_recipient_id = models.EmailField()
    email_message_id = models.TextField(null=True, blank=True)
    email_send_status = models.CharField(max_length=128, blank=True, null=True)
    email_recipient_type = models.CharField(
        max_length=8, choices=EMAIL_RECIPIENT_TYPE_CHOICES)
    retry_count = models.IntegerField(
        default=0, help_text="Number of times this email is being retried")
    email_event_info = models.JSONField(null=True, blank=True)

    @classmethod
    def create_logs(cls, logs_to_be_created: list):
        """
        This method takes the list of logs to be created and creates them in bulk.
        Arguments:
            logs_to_be_created: list of logs to be created
        """
        for log in logs_to_be_created:
            cls(**log).save()

    @classmethod
    def get_logs_to_be_updated(cls):
        """
        This method returns the list of logs which are in pending state and are to be updated
        """
        return (cls.objects.select_for_update().exclude(
            email_message_id__exact=None).filter(
                Q(email_send_status=None) | Q(email_send_status="sent")))

    def update_fields(self, updated_fields: dict):
        """
        This method takes the dict of the updated keys in the model. It updates the
        corresponding fields in the model instance and saves it in db.
        Arguments:
            updated_fields: dict of updated fields in the model
        """
        update_fields = []
        for key, value in updated_fields.items():
            setattr(self, key, value)
            update_fields.append(key)
        self.save(update_fields=update_fields)

Classes

class EmailScheduler (*args, **kwargs)

This class will define the EmailScheduler model

Expand source code
class EmailScheduler(models.Model):
    """
    This class will define the EmailScheduler model
    """
    email_to = models.EmailField()
    email_cc = ArrayField(
        models.EmailField(),
        blank=True,
        default=list,
        help_text="list of comma separated email address in Cc",
    )
    email_bcc = ArrayField(
        models.EmailField(),
        blank=True,
        default=list,
        help_text="list of comma separated email address in Bcc",
    )
    email_subject = models.CharField(max_length=256)
    email_body = models.TextField()

    email_schedule = models.DateTimeField(
        blank=True, null=True, help_text="Schedule time to send email")
    email_repeat_after = models.DurationField(
        blank=True,
        null=True,
        help_text=
        "Duration after which to resend email, You can remove it in future to stop the campaign and mark "
        "the task COMPLETE",
    )

    email_service = models.CharField(max_length=32,
                                     default=DEFAULT_EMAIL_SERVICE,
                                     choices=EMAIL_SERVICE_CHOICES)

    email_last_sent_at = models.DateTimeField(blank=True, null=True)
    task_status = models.CharField(
        default=TASK_STATUS_PENDING,
        choices=TASK_STATUS_CHOICES,
        max_length=128,
        help_text="Status of this email sending task",
    )
    email_send_count = models.IntegerField(
        default=0, help_text="Count of emails sent successfully")
    task_failed_count = models.IntegerField(
        default=0, help_text="Count of number of times this task failed")
    task_failure_info = ArrayField(
        models.JSONField(),
        null=True,
        blank=True,
        help_text="If task failed, information regarding why it failed",
    )

    @classmethod
    def pending_periodic_email_finder(cls):
        """
        This method finds all the EmailScheduler objects which need to be sent in case of periodic task
        """

        pending_emails = (cls.objects.select_for_update().exclude(
            email_repeat_after__exact=None,
            email_last_sent_at__exact=None).filter(
                task_status__exact=TASK_STATUS_PENDING,
                email_last_sent_at__lte=timezone.now() -
                F("email_repeat_after"),
            ))
        return pending_emails

    def pre_update_processing(self):
        """
        The purpose of this method is, to do preprocessing before updating an object.
        Currently, when we are updating an EmailScheduler object, and we are changing
        the email_repeat_after to None then we should automatically mark the task complete
        if it's not already in Failed state. Here we first fetch the object from db which contains the old state
        before updating and self contains the state after updating, hence db call is necessary.
        """
        old_obj = EmailScheduler.objects.get(pk=self.pk)
        if (old_obj.email_repeat_after is not None
                and self.email_repeat_after is None
                and old_obj.task_status != TASK_STATUS_FAILED):
            self.task_status = TASK_STATUS_COMPLETE

    def save(self, *args, **kwargs):
        """
        We have overrided the save method to call respective send email method i.e send immidiately or
        after specified duration. We also call pre_update_processing to update the obejct properly in case of
        update.
        Arguments:
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.
        """
        is_adding_new = self._state.adding
        if not is_adding_new:
            self.pre_update_processing()
        super(EmailScheduler, self).save(*args, **kwargs)
        if is_adding_new:
            if self.email_schedule is None:
                send_email.delay(self.pk)
            elif self.email_schedule is not None:
                td = self.email_schedule
                send_email.apply_async(
                    kwargs={"email_scheduler_obj_id": self.pk}, eta=td)

    def update_fields(self, updated_fields: dict):
        """
        This method takes the dict of the updated keys in the model. It updates the
        corresponding fields in the model instance and saves it in db.
        Arguments:
            updated_fields: dict of updated fields in the model
        """
        update_fields = []
        for key, value in updated_fields.items():
            setattr(self, key, value)
            update_fields.append(key)
        self.save(update_fields=update_fields)

Ancestors

  • django.db.models.base.Model

Class variables

var DoesNotExist

The requested object does not exist

var MultipleObjectsReturned

The query returned multiple objects when only one was expected.

var objects

Static methods

def pending_periodic_email_finder()

This method finds all the EmailScheduler objects which need to be sent in case of periodic task

Expand source code
@classmethod
def pending_periodic_email_finder(cls):
    """
    This method finds all the EmailScheduler objects which need to be sent in case of periodic task
    """

    pending_emails = (cls.objects.select_for_update().exclude(
        email_repeat_after__exact=None,
        email_last_sent_at__exact=None).filter(
            task_status__exact=TASK_STATUS_PENDING,
            email_last_sent_at__lte=timezone.now() -
            F("email_repeat_after"),
        ))
    return pending_emails

Instance variables

var email_bcc

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_body

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_cc

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_last_sent_at

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_repeat_after

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_schedule

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_send_count

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_service

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_subject

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_to

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var emailschedulerlogs_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

Expand source code
def __get__(self, instance, cls=None):
    """
    Get the related objects through the reverse relation.

    With the example above, when getting ``parent.children``:

    - ``self`` is the descriptor managing the ``children`` attribute
    - ``instance`` is the ``parent`` instance
    - ``cls`` is the ``Parent`` class (unused)
    """
    if instance is None:
        return self

    return self.related_manager_cls(instance)
var id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var task_failed_count

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var task_failure_info

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var task_status

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]

Methods

def get_email_service_display(self, *, field=<django.db.models.fields.CharField: email_service>)
Expand source code
def _method(cls_or_self, /, *args, **keywords):
    keywords = {**self.keywords, **keywords}
    return self.func(cls_or_self, *self.args, *args, **keywords)
def get_task_status_display(self, *, field=<django.db.models.fields.CharField: task_status>)
Expand source code
def _method(cls_or_self, /, *args, **keywords):
    keywords = {**self.keywords, **keywords}
    return self.func(cls_or_self, *self.args, *args, **keywords)
def pre_update_processing(self)

The purpose of this method is, to do preprocessing before updating an object. Currently, when we are updating an EmailScheduler object, and we are changing the email_repeat_after to None then we should automatically mark the task complete if it's not already in Failed state. Here we first fetch the object from db which contains the old state before updating and self contains the state after updating, hence db call is necessary.

Expand source code
def pre_update_processing(self):
    """
    The purpose of this method is, to do preprocessing before updating an object.
    Currently, when we are updating an EmailScheduler object, and we are changing
    the email_repeat_after to None then we should automatically mark the task complete
    if it's not already in Failed state. Here we first fetch the object from db which contains the old state
    before updating and self contains the state after updating, hence db call is necessary.
    """
    old_obj = EmailScheduler.objects.get(pk=self.pk)
    if (old_obj.email_repeat_after is not None
            and self.email_repeat_after is None
            and old_obj.task_status != TASK_STATUS_FAILED):
        self.task_status = TASK_STATUS_COMPLETE
def save(self, *args, **kwargs)

We have overrided the save method to call respective send email method i.e send immidiately or after specified duration. We also call pre_update_processing to update the obejct properly in case of update.

Arguments

args: Variable length argument list. *kwargs: Arbitrary keyword arguments.

Expand source code
def save(self, *args, **kwargs):
    """
    We have overrided the save method to call respective send email method i.e send immidiately or
    after specified duration. We also call pre_update_processing to update the obejct properly in case of
    update.
    Arguments:
        *args: Variable length argument list.
        **kwargs: Arbitrary keyword arguments.
    """
    is_adding_new = self._state.adding
    if not is_adding_new:
        self.pre_update_processing()
    super(EmailScheduler, self).save(*args, **kwargs)
    if is_adding_new:
        if self.email_schedule is None:
            send_email.delay(self.pk)
        elif self.email_schedule is not None:
            td = self.email_schedule
            send_email.apply_async(
                kwargs={"email_scheduler_obj_id": self.pk}, eta=td)
def update_fields(self, updated_fields: dict)

This method takes the dict of the updated keys in the model. It updates the corresponding fields in the model instance and saves it in db.

Arguments

updated_fields: dict of updated fields in the model

Expand source code
def update_fields(self, updated_fields: dict):
    """
    This method takes the dict of the updated keys in the model. It updates the
    corresponding fields in the model instance and saves it in db.
    Arguments:
        updated_fields: dict of updated fields in the model
    """
    update_fields = []
    for key, value in updated_fields.items():
        setattr(self, key, value)
        update_fields.append(key)
    self.save(update_fields=update_fields)
class EmailSchedulerLogs (*args, **kwargs)

This class will define the EmailSchedulerLogs model

Expand source code
class EmailSchedulerLogs(models.Model):
    """
    This class will define the EmailSchedulerLogs model
    """
    email_scheduler = models.ForeignKey(EmailScheduler,
                                        on_delete=models.CASCADE)
    email_recipient_id = models.EmailField()
    email_message_id = models.TextField(null=True, blank=True)
    email_send_status = models.CharField(max_length=128, blank=True, null=True)
    email_recipient_type = models.CharField(
        max_length=8, choices=EMAIL_RECIPIENT_TYPE_CHOICES)
    retry_count = models.IntegerField(
        default=0, help_text="Number of times this email is being retried")
    email_event_info = models.JSONField(null=True, blank=True)

    @classmethod
    def create_logs(cls, logs_to_be_created: list):
        """
        This method takes the list of logs to be created and creates them in bulk.
        Arguments:
            logs_to_be_created: list of logs to be created
        """
        for log in logs_to_be_created:
            cls(**log).save()

    @classmethod
    def get_logs_to_be_updated(cls):
        """
        This method returns the list of logs which are in pending state and are to be updated
        """
        return (cls.objects.select_for_update().exclude(
            email_message_id__exact=None).filter(
                Q(email_send_status=None) | Q(email_send_status="sent")))

    def update_fields(self, updated_fields: dict):
        """
        This method takes the dict of the updated keys in the model. It updates the
        corresponding fields in the model instance and saves it in db.
        Arguments:
            updated_fields: dict of updated fields in the model
        """
        update_fields = []
        for key, value in updated_fields.items():
            setattr(self, key, value)
            update_fields.append(key)
        self.save(update_fields=update_fields)

Ancestors

  • django.db.models.base.Model

Class variables

var DoesNotExist

The requested object does not exist

var MultipleObjectsReturned

The query returned multiple objects when only one was expected.

var objects

Static methods

def create_logs(logs_to_be_created: list)

This method takes the list of logs to be created and creates them in bulk.

Arguments

logs_to_be_created: list of logs to be created

Expand source code
@classmethod
def create_logs(cls, logs_to_be_created: list):
    """
    This method takes the list of logs to be created and creates them in bulk.
    Arguments:
        logs_to_be_created: list of logs to be created
    """
    for log in logs_to_be_created:
        cls(**log).save()
def get_logs_to_be_updated()

This method returns the list of logs which are in pending state and are to be updated

Expand source code
@classmethod
def get_logs_to_be_updated(cls):
    """
    This method returns the list of logs which are in pending state and are to be updated
    """
    return (cls.objects.select_for_update().exclude(
        email_message_id__exact=None).filter(
            Q(email_send_status=None) | Q(email_send_status="sent")))

Instance variables

var email_event_info

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_message_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_recipient_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_recipient_type

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_scheduler

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

Expand source code
def __get__(self, instance, cls=None):
    """
    Get the related instance through the forward relation.

    With the example above, when getting ``child.parent``:

    - ``self`` is the descriptor managing the ``parent`` attribute
    - ``instance`` is the ``child`` instance
    - ``cls`` is the ``Child`` class (we don't need it)
    """
    if instance is None:
        return self

    # The related instance is loaded from the database and then cached
    # by the field on the model instance state. It can also be pre-cached
    # by the reverse accessor (ReverseOneToOneDescriptor).
    try:
        rel_obj = self.field.get_cached_value(instance)
    except KeyError:
        has_value = None not in self.field.get_local_related_value(instance)
        ancestor_link = instance._meta.get_ancestor_link(self.field.model) if has_value else None
        if ancestor_link and ancestor_link.is_cached(instance):
            # An ancestor link will exist if this field is defined on a
            # multi-table inheritance parent of the instance's class.
            ancestor = ancestor_link.get_cached_value(instance)
            # The value might be cached on an ancestor if the instance
            # originated from walking down the inheritance chain.
            rel_obj = self.field.get_cached_value(ancestor, default=None)
        else:
            rel_obj = None
        if rel_obj is None and has_value:
            rel_obj = self.get_object(instance)
            remote_field = self.field.remote_field
            # If this is a one-to-one relation, set the reverse accessor
            # cache on the related object to the current instance to avoid
            # an extra SQL query if it's accessed later on.
            if not remote_field.multiple:
                remote_field.set_cached_value(rel_obj, instance)
        self.field.set_cached_value(instance, rel_obj)

    if rel_obj is None and not self.field.null:
        raise self.RelatedObjectDoesNotExist(
            "%s has no %s." % (self.field.model.__name__, self.field.name)
        )
    else:
        return rel_obj
var email_scheduler_id

Retrieve and caches the value from the datastore on the first lookup. Return the cached value.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var email_send_status

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]
var retry_count

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

Expand source code
def __get__(self, instance, cls=None):
    """
    Retrieve and caches the value from the datastore on the first lookup.
    Return the cached value.
    """
    if instance is None:
        return self
    data = instance.__dict__
    field_name = self.field.attname
    if field_name not in data:
        # Let's see if the field is part of the parent chain. If so we
        # might be able to reuse the already loaded value. Refs #18343.
        val = self._check_parent_chain(instance)
        if val is None:
            instance.refresh_from_db(fields=[field_name])
            val = getattr(instance, field_name)
        data[field_name] = val
    return data[field_name]

Methods

def get_email_recipient_type_display(self, *, field=<django.db.models.fields.CharField: email_recipient_type>)
Expand source code
def _method(cls_or_self, /, *args, **keywords):
    keywords = {**self.keywords, **keywords}
    return self.func(cls_or_self, *self.args, *args, **keywords)
def update_fields(self, updated_fields: dict)

This method takes the dict of the updated keys in the model. It updates the corresponding fields in the model instance and saves it in db.

Arguments

updated_fields: dict of updated fields in the model

Expand source code
def update_fields(self, updated_fields: dict):
    """
    This method takes the dict of the updated keys in the model. It updates the
    corresponding fields in the model instance and saves it in db.
    Arguments:
        updated_fields: dict of updated fields in the model
    """
    update_fields = []
    for key, value in updated_fields.items():
        setattr(self, key, value)
        update_fields.append(key)
    self.save(update_fields=update_fields)