Module apps.email_scheduler.models
Expand source code
from django.db import models
from django.contrib.postgres.fields import ArrayField
from .tasks import send_email
from .constants import *
from django.utils import timezone
from django.db.models import F, Q
# Create your models here.
class EmailScheduler(models.Model):
"""
This class will define the EmailScheduler model
"""
email_to = models.EmailField()
email_cc = ArrayField(
models.EmailField(),
blank=True,
default=list,
help_text="list of comma separated email address in Cc",
)
email_bcc = ArrayField(
models.EmailField(),
blank=True,
default=list,
help_text="list of comma separated email address in Bcc",
)
email_subject = models.CharField(max_length=256)
email_body = models.TextField()
email_schedule = models.DateTimeField(
blank=True, null=True, help_text="Schedule time to send email")
email_repeat_after = models.DurationField(
blank=True,
null=True,
help_text=
"Duration after which to resend email, You can remove it in future to stop the campaign and mark "
"the task COMPLETE",
)
email_service = models.CharField(max_length=32,
default=DEFAULT_EMAIL_SERVICE,
choices=EMAIL_SERVICE_CHOICES)
email_last_sent_at = models.DateTimeField(blank=True, null=True)
task_status = models.CharField(
default=TASK_STATUS_PENDING,
choices=TASK_STATUS_CHOICES,
max_length=128,
help_text="Status of this email sending task",
)
email_send_count = models.IntegerField(
default=0, help_text="Count of emails sent successfully")
task_failed_count = models.IntegerField(
default=0, help_text="Count of number of times this task failed")
task_failure_info = ArrayField(
models.JSONField(),
null=True,
blank=True,
help_text="If task failed, information regarding why it failed",
)
@classmethod
def pending_periodic_email_finder(cls):
"""
This method finds all the EmailScheduler objects which need to be sent in case of periodic task
"""
pending_emails = (cls.objects.select_for_update().exclude(
email_repeat_after__exact=None,
email_last_sent_at__exact=None).filter(
task_status__exact=TASK_STATUS_PENDING,
email_last_sent_at__lte=timezone.now() -
F("email_repeat_after"),
))
return pending_emails
def pre_update_processing(self):
"""
The purpose of this method is, to do preprocessing before updating an object.
Currently, when we are updating an EmailScheduler object, and we are changing
the email_repeat_after to None then we should automatically mark the task complete
if it's not already in Failed state. Here we first fetch the object from db which contains the old state
before updating and self contains the state after updating, hence db call is necessary.
"""
old_obj = EmailScheduler.objects.get(pk=self.pk)
if (old_obj.email_repeat_after is not None
and self.email_repeat_after is None
and old_obj.task_status != TASK_STATUS_FAILED):
self.task_status = TASK_STATUS_COMPLETE
def save(self, *args, **kwargs):
"""
We have overrided the save method to call respective send email method i.e send immidiately or
after specified duration. We also call pre_update_processing to update the obejct properly in case of
update.
Arguments:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
is_adding_new = self._state.adding
if not is_adding_new:
self.pre_update_processing()
super(EmailScheduler, self).save(*args, **kwargs)
if is_adding_new:
if self.email_schedule is None:
send_email.delay(self.pk)
elif self.email_schedule is not None:
td = self.email_schedule
send_email.apply_async(
kwargs={"email_scheduler_obj_id": self.pk}, eta=td)
def update_fields(self, updated_fields: dict):
"""
This method takes the dict of the updated keys in the model. It updates the
corresponding fields in the model instance and saves it in db.
Arguments:
updated_fields: dict of updated fields in the model
"""
update_fields = []
for key, value in updated_fields.items():
setattr(self, key, value)
update_fields.append(key)
self.save(update_fields=update_fields)
class EmailSchedulerLogs(models.Model):
"""
This class will define the EmailSchedulerLogs model
"""
email_scheduler = models.ForeignKey(EmailScheduler,
on_delete=models.CASCADE)
email_recipient_id = models.EmailField()
email_message_id = models.TextField(null=True, blank=True)
email_send_status = models.CharField(max_length=128, blank=True, null=True)
email_recipient_type = models.CharField(
max_length=8, choices=EMAIL_RECIPIENT_TYPE_CHOICES)
retry_count = models.IntegerField(
default=0, help_text="Number of times this email is being retried")
email_event_info = models.JSONField(null=True, blank=True)
@classmethod
def create_logs(cls, logs_to_be_created: list):
"""
This method takes the list of logs to be created and creates them in bulk.
Arguments:
logs_to_be_created: list of logs to be created
"""
for log in logs_to_be_created:
cls(**log).save()
@classmethod
def get_logs_to_be_updated(cls):
"""
This method returns the list of logs which are in pending state and are to be updated
"""
return (cls.objects.select_for_update().exclude(
email_message_id__exact=None).filter(
Q(email_send_status=None) | Q(email_send_status="sent")))
def update_fields(self, updated_fields: dict):
"""
This method takes the dict of the updated keys in the model. It updates the
corresponding fields in the model instance and saves it in db.
Arguments:
updated_fields: dict of updated fields in the model
"""
update_fields = []
for key, value in updated_fields.items():
setattr(self, key, value)
update_fields.append(key)
self.save(update_fields=update_fields)
Classes
class EmailScheduler (*args, **kwargs)-
This class will define the EmailScheduler model
Expand source code
class EmailScheduler(models.Model): """ This class will define the EmailScheduler model """ email_to = models.EmailField() email_cc = ArrayField( models.EmailField(), blank=True, default=list, help_text="list of comma separated email address in Cc", ) email_bcc = ArrayField( models.EmailField(), blank=True, default=list, help_text="list of comma separated email address in Bcc", ) email_subject = models.CharField(max_length=256) email_body = models.TextField() email_schedule = models.DateTimeField( blank=True, null=True, help_text="Schedule time to send email") email_repeat_after = models.DurationField( blank=True, null=True, help_text= "Duration after which to resend email, You can remove it in future to stop the campaign and mark " "the task COMPLETE", ) email_service = models.CharField(max_length=32, default=DEFAULT_EMAIL_SERVICE, choices=EMAIL_SERVICE_CHOICES) email_last_sent_at = models.DateTimeField(blank=True, null=True) task_status = models.CharField( default=TASK_STATUS_PENDING, choices=TASK_STATUS_CHOICES, max_length=128, help_text="Status of this email sending task", ) email_send_count = models.IntegerField( default=0, help_text="Count of emails sent successfully") task_failed_count = models.IntegerField( default=0, help_text="Count of number of times this task failed") task_failure_info = ArrayField( models.JSONField(), null=True, blank=True, help_text="If task failed, information regarding why it failed", ) @classmethod def pending_periodic_email_finder(cls): """ This method finds all the EmailScheduler objects which need to be sent in case of periodic task """ pending_emails = (cls.objects.select_for_update().exclude( email_repeat_after__exact=None, email_last_sent_at__exact=None).filter( task_status__exact=TASK_STATUS_PENDING, email_last_sent_at__lte=timezone.now() - F("email_repeat_after"), )) return pending_emails def pre_update_processing(self): """ The purpose of this method is, to do preprocessing before updating an object. Currently, when we are updating an EmailScheduler object, and we are changing the email_repeat_after to None then we should automatically mark the task complete if it's not already in Failed state. Here we first fetch the object from db which contains the old state before updating and self contains the state after updating, hence db call is necessary. """ old_obj = EmailScheduler.objects.get(pk=self.pk) if (old_obj.email_repeat_after is not None and self.email_repeat_after is None and old_obj.task_status != TASK_STATUS_FAILED): self.task_status = TASK_STATUS_COMPLETE def save(self, *args, **kwargs): """ We have overrided the save method to call respective send email method i.e send immidiately or after specified duration. We also call pre_update_processing to update the obejct properly in case of update. Arguments: *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. """ is_adding_new = self._state.adding if not is_adding_new: self.pre_update_processing() super(EmailScheduler, self).save(*args, **kwargs) if is_adding_new: if self.email_schedule is None: send_email.delay(self.pk) elif self.email_schedule is not None: td = self.email_schedule send_email.apply_async( kwargs={"email_scheduler_obj_id": self.pk}, eta=td) def update_fields(self, updated_fields: dict): """ This method takes the dict of the updated keys in the model. It updates the corresponding fields in the model instance and saves it in db. Arguments: updated_fields: dict of updated fields in the model """ update_fields = [] for key, value in updated_fields.items(): setattr(self, key, value) update_fields.append(key) self.save(update_fields=update_fields)Ancestors
- django.db.models.base.Model
Class variables
var DoesNotExist-
The requested object does not exist
var MultipleObjectsReturned-
The query returned multiple objects when only one was expected.
var objects
Static methods
def pending_periodic_email_finder()-
This method finds all the EmailScheduler objects which need to be sent in case of periodic task
Expand source code
@classmethod def pending_periodic_email_finder(cls): """ This method finds all the EmailScheduler objects which need to be sent in case of periodic task """ pending_emails = (cls.objects.select_for_update().exclude( email_repeat_after__exact=None, email_last_sent_at__exact=None).filter( task_status__exact=TASK_STATUS_PENDING, email_last_sent_at__lte=timezone.now() - F("email_repeat_after"), )) return pending_emails
Instance variables
var email_bcc-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_body-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_cc-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_last_sent_at-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_repeat_after-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_schedule-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_send_count-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_service-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_subject-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_to-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var emailschedulerlogs_set-
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model): parent = ForeignKey(Parent, related_name='children')Parent.childrenis aReverseManyToOneDescriptorinstance.Most of the implementation is delegated to a dynamically defined manager class built by
create_forward_many_to_many_manager()defined below.Expand source code
def __get__(self, instance, cls=None): """ Get the related objects through the reverse relation. With the example above, when getting ``parent.children``: - ``self`` is the descriptor managing the ``children`` attribute - ``instance`` is the ``parent`` instance - ``cls`` is the ``Parent`` class (unused) """ if instance is None: return self return self.related_manager_cls(instance) var id-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var task_failed_count-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var task_failure_info-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var task_status-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name]
Methods
def get_email_service_display(self, *, field=<django.db.models.fields.CharField: email_service>)-
Expand source code
def _method(cls_or_self, /, *args, **keywords): keywords = {**self.keywords, **keywords} return self.func(cls_or_self, *self.args, *args, **keywords) def get_task_status_display(self, *, field=<django.db.models.fields.CharField: task_status>)-
Expand source code
def _method(cls_or_self, /, *args, **keywords): keywords = {**self.keywords, **keywords} return self.func(cls_or_self, *self.args, *args, **keywords) def pre_update_processing(self)-
The purpose of this method is, to do preprocessing before updating an object. Currently, when we are updating an EmailScheduler object, and we are changing the email_repeat_after to None then we should automatically mark the task complete if it's not already in Failed state. Here we first fetch the object from db which contains the old state before updating and self contains the state after updating, hence db call is necessary.
Expand source code
def pre_update_processing(self): """ The purpose of this method is, to do preprocessing before updating an object. Currently, when we are updating an EmailScheduler object, and we are changing the email_repeat_after to None then we should automatically mark the task complete if it's not already in Failed state. Here we first fetch the object from db which contains the old state before updating and self contains the state after updating, hence db call is necessary. """ old_obj = EmailScheduler.objects.get(pk=self.pk) if (old_obj.email_repeat_after is not None and self.email_repeat_after is None and old_obj.task_status != TASK_STATUS_FAILED): self.task_status = TASK_STATUS_COMPLETE def save(self, *args, **kwargs)-
We have overrided the save method to call respective send email method i.e send immidiately or after specified duration. We also call pre_update_processing to update the obejct properly in case of update.
Arguments
args: Variable length argument list. *kwargs: Arbitrary keyword arguments.
Expand source code
def save(self, *args, **kwargs): """ We have overrided the save method to call respective send email method i.e send immidiately or after specified duration. We also call pre_update_processing to update the obejct properly in case of update. Arguments: *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. """ is_adding_new = self._state.adding if not is_adding_new: self.pre_update_processing() super(EmailScheduler, self).save(*args, **kwargs) if is_adding_new: if self.email_schedule is None: send_email.delay(self.pk) elif self.email_schedule is not None: td = self.email_schedule send_email.apply_async( kwargs={"email_scheduler_obj_id": self.pk}, eta=td) def update_fields(self, updated_fields: dict)-
This method takes the dict of the updated keys in the model. It updates the corresponding fields in the model instance and saves it in db.
Arguments
updated_fields: dict of updated fields in the model
Expand source code
def update_fields(self, updated_fields: dict): """ This method takes the dict of the updated keys in the model. It updates the corresponding fields in the model instance and saves it in db. Arguments: updated_fields: dict of updated fields in the model """ update_fields = [] for key, value in updated_fields.items(): setattr(self, key, value) update_fields.append(key) self.save(update_fields=update_fields)
class EmailSchedulerLogs (*args, **kwargs)-
This class will define the EmailSchedulerLogs model
Expand source code
class EmailSchedulerLogs(models.Model): """ This class will define the EmailSchedulerLogs model """ email_scheduler = models.ForeignKey(EmailScheduler, on_delete=models.CASCADE) email_recipient_id = models.EmailField() email_message_id = models.TextField(null=True, blank=True) email_send_status = models.CharField(max_length=128, blank=True, null=True) email_recipient_type = models.CharField( max_length=8, choices=EMAIL_RECIPIENT_TYPE_CHOICES) retry_count = models.IntegerField( default=0, help_text="Number of times this email is being retried") email_event_info = models.JSONField(null=True, blank=True) @classmethod def create_logs(cls, logs_to_be_created: list): """ This method takes the list of logs to be created and creates them in bulk. Arguments: logs_to_be_created: list of logs to be created """ for log in logs_to_be_created: cls(**log).save() @classmethod def get_logs_to_be_updated(cls): """ This method returns the list of logs which are in pending state and are to be updated """ return (cls.objects.select_for_update().exclude( email_message_id__exact=None).filter( Q(email_send_status=None) | Q(email_send_status="sent"))) def update_fields(self, updated_fields: dict): """ This method takes the dict of the updated keys in the model. It updates the corresponding fields in the model instance and saves it in db. Arguments: updated_fields: dict of updated fields in the model """ update_fields = [] for key, value in updated_fields.items(): setattr(self, key, value) update_fields.append(key) self.save(update_fields=update_fields)Ancestors
- django.db.models.base.Model
Class variables
var DoesNotExist-
The requested object does not exist
var MultipleObjectsReturned-
The query returned multiple objects when only one was expected.
var objects
Static methods
def create_logs(logs_to_be_created: list)-
This method takes the list of logs to be created and creates them in bulk.
Arguments
logs_to_be_created: list of logs to be created
Expand source code
@classmethod def create_logs(cls, logs_to_be_created: list): """ This method takes the list of logs to be created and creates them in bulk. Arguments: logs_to_be_created: list of logs to be created """ for log in logs_to_be_created: cls(**log).save() def get_logs_to_be_updated()-
This method returns the list of logs which are in pending state and are to be updated
Expand source code
@classmethod def get_logs_to_be_updated(cls): """ This method returns the list of logs which are in pending state and are to be updated """ return (cls.objects.select_for_update().exclude( email_message_id__exact=None).filter( Q(email_send_status=None) | Q(email_send_status="sent")))
Instance variables
var email_event_info-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_message_id-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_recipient_id-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_recipient_type-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_scheduler-
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model): parent = ForeignKey(Parent, related_name='children')Child.parentis aForwardManyToOneDescriptorinstance.Expand source code
def __get__(self, instance, cls=None): """ Get the related instance through the forward relation. With the example above, when getting ``child.parent``: - ``self`` is the descriptor managing the ``parent`` attribute - ``instance`` is the ``child`` instance - ``cls`` is the ``Child`` class (we don't need it) """ if instance is None: return self # The related instance is loaded from the database and then cached # by the field on the model instance state. It can also be pre-cached # by the reverse accessor (ReverseOneToOneDescriptor). try: rel_obj = self.field.get_cached_value(instance) except KeyError: has_value = None not in self.field.get_local_related_value(instance) ancestor_link = instance._meta.get_ancestor_link(self.field.model) if has_value else None if ancestor_link and ancestor_link.is_cached(instance): # An ancestor link will exist if this field is defined on a # multi-table inheritance parent of the instance's class. ancestor = ancestor_link.get_cached_value(instance) # The value might be cached on an ancestor if the instance # originated from walking down the inheritance chain. rel_obj = self.field.get_cached_value(ancestor, default=None) else: rel_obj = None if rel_obj is None and has_value: rel_obj = self.get_object(instance) remote_field = self.field.remote_field # If this is a one-to-one relation, set the reverse accessor # cache on the related object to the current instance to avoid # an extra SQL query if it's accessed later on. if not remote_field.multiple: remote_field.set_cached_value(rel_obj, instance) self.field.set_cached_value(instance, rel_obj) if rel_obj is None and not self.field.null: raise self.RelatedObjectDoesNotExist( "%s has no %s." % (self.field.model.__name__, self.field.name) ) else: return rel_obj var email_scheduler_id-
Retrieve and caches the value from the datastore on the first lookup. Return the cached value.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var email_send_status-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var id-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name] var retry_count-
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Expand source code
def __get__(self, instance, cls=None): """ Retrieve and caches the value from the datastore on the first lookup. Return the cached value. """ if instance is None: return self data = instance.__dict__ field_name = self.field.attname if field_name not in data: # Let's see if the field is part of the parent chain. If so we # might be able to reuse the already loaded value. Refs #18343. val = self._check_parent_chain(instance) if val is None: instance.refresh_from_db(fields=[field_name]) val = getattr(instance, field_name) data[field_name] = val return data[field_name]
Methods
def get_email_recipient_type_display(self, *, field=<django.db.models.fields.CharField: email_recipient_type>)-
Expand source code
def _method(cls_or_self, /, *args, **keywords): keywords = {**self.keywords, **keywords} return self.func(cls_or_self, *self.args, *args, **keywords) def update_fields(self, updated_fields: dict)-
This method takes the dict of the updated keys in the model. It updates the corresponding fields in the model instance and saves it in db.
Arguments
updated_fields: dict of updated fields in the model
Expand source code
def update_fields(self, updated_fields: dict): """ This method takes the dict of the updated keys in the model. It updates the corresponding fields in the model instance and saves it in db. Arguments: updated_fields: dict of updated fields in the model """ update_fields = [] for key, value in updated_fields.items(): setattr(self, key, value) update_fields.append(key) self.save(update_fields=update_fields)