Source code for shoop.core.models._orders

# -*- coding: utf-8 -*-
# This file is part of Shoop.
#
# Copyright (c) 2012-2016, Shoop Ltd. All rights reserved.
#
# This source code is licensed under the AGPLv3 license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import unicode_literals, with_statement

import datetime
from collections import defaultdict
from decimal import Decimal

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import models
from django.utils.crypto import get_random_string
from django.utils.encoding import force_text, python_2_unicode_compatible
from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from enumfields import Enum, EnumIntegerField
from jsonfield import JSONField
from parler.managers import TranslatableQuerySet
from parler.models import TranslatableModel, TranslatedFields

from shoop.core import taxing
from shoop.core.excs import (
    NoPaymentToCreateException, NoProductsToShipException
)
from shoop.core.fields import (
    CurrencyField, InternalIdentifierField, LanguageField, MoneyValueField,
    UnsavedForeignKey
)
from shoop.core.pricing import TaxfulPrice, TaxlessPrice
from shoop.utils.analog import define_log_model, LogEntryKind
from shoop.utils.money import Money
from shoop.utils.numbers import bankers_round
from shoop.utils.properties import (
    MoneyPropped, TaxfulPriceProperty, TaxlessPriceProperty
)

from ._order_lines import OrderLineType
from ._order_utils import get_order_identifier, get_reference_number
from ._products import Product
from ._suppliers import Supplier


class PaymentStatus(Enum):
    NOT_PAID = 0
    PARTIALLY_PAID = 1
    FULLY_PAID = 2
    CANCELED = 3
    DEFERRED = 4

    class Labels:
        NOT_PAID = _('not paid')
        PARTIALLY_PAID = _('partially paid')
        FULLY_PAID = _('fully paid')
        CANCELED = _('canceled')
        DEFERRED = _('deferred')


class ShippingStatus(Enum):
    NOT_SHIPPED = 0
    PARTIALLY_SHIPPED = 1
    FULLY_SHIPPED = 2

    class Labels:
        NOT_SHIPPED = _('not shipped')
        PARTIALLY_SHIPPED = _('partially shipped')
        FULLY_SHIPPED = _('fully shipped')


class OrderStatusRole(Enum):
    NONE = 0
    INITIAL = 1
    COMPLETE = 2
    CANCELED = 3

    class Labels:
        NONE = _('none')
        INITIAL = _('initial')
        COMPLETE = _('complete')
        CANCELED = _('canceled')


class OrderStatusQuerySet(TranslatableQuerySet):
    def _default_for_role(self, role):
        """
        Get the default order status for the given role.

        :param role: The role to look for.
        :type role: OrderStatusRole
        :return: The OrderStatus
        :rtype: OrderStatus
        """
        try:
            return self.get(default=True, role=role)
        except ObjectDoesNotExist:
            raise ObjectDoesNotExist("No default %s OrderStatus exists." % getattr(role, "label", role))

    def get_default_initial(self):
        return self._default_for_role(OrderStatusRole.INITIAL)

    def get_default_canceled(self):
        return self._default_for_role(OrderStatusRole.CANCELED)

    def get_default_complete(self):
        return self._default_for_role(OrderStatusRole.COMPLETE)


@python_2_unicode_compatible
class OrderStatus(TranslatableModel):
    identifier = InternalIdentifierField(db_index=True, blank=False, unique=True)
    ordering = models.IntegerField(db_index=True, default=0, verbose_name=_('ordering'))
    role = EnumIntegerField(OrderStatusRole, db_index=True, default=OrderStatusRole.NONE, verbose_name=_('role'))
    default = models.BooleanField(default=False, db_index=True, verbose_name=_('default'))

    objects = OrderStatusQuerySet.as_manager()

    translations = TranslatedFields(
        name=models.CharField(verbose_name=_("name"), max_length=64)
    )

    def __str__(self):
        return self.safe_translation_getter("name", default=self.identifier)

[docs] def save(self, *args, **kwargs): super(OrderStatus, self).save(*args, **kwargs) if self.default and self.role != OrderStatusRole.NONE: # If this status is the default, make the others for this role non-default. OrderStatus.objects.filter(role=self.role).exclude(pk=self.pk).update(default=False)
class OrderQuerySet(models.QuerySet): def paid(self): return self.filter(payment_status=PaymentStatus.FULLY_PAID) def incomplete(self): return self.filter(status__role__in=(OrderStatusRole.NONE, OrderStatusRole.INITIAL)) def complete(self): return self.filter(status__role=OrderStatusRole.COMPLETE) def valid(self): return self.exclude(status__role=OrderStatusRole.CANCELED) def since(self, days): return self.filter( order_date__gte=datetime.datetime.combine( datetime.date.today() - datetime.timedelta(days=days), datetime.time.min ) ) @python_2_unicode_compatible class Order(MoneyPropped, models.Model): # Identification shop = UnsavedForeignKey("Shop", on_delete=models.PROTECT, verbose_name=_('shop')) created_on = models.DateTimeField(auto_now_add=True, editable=False, verbose_name=_('created on')) identifier = InternalIdentifierField(unique=True, db_index=True, verbose_name=_('order identifier')) # TODO: label is actually a choice field, need to check migrations/choice deconstruction label = models.CharField(max_length=32, db_index=True, verbose_name=_('label')) # The key shouldn't be possible to deduce (i.e. it should be random), but it is # not a secret. (It could, however, be used as key material for an actual secret.) key = models.CharField(max_length=32, unique=True, blank=False, verbose_name=_('key')) reference_number = models.CharField( max_length=64, db_index=True, unique=True, blank=True, null=True, verbose_name=_('reference number')) # Contact information customer = UnsavedForeignKey( "Contact", related_name='customer_orders', blank=True, null=True, on_delete=models.PROTECT, verbose_name=_('customer')) orderer = UnsavedForeignKey( "PersonContact", related_name='orderer_orders', blank=True, null=True, on_delete=models.PROTECT, verbose_name=_('orderer')) billing_address = models.ForeignKey( "ImmutableAddress", related_name="billing_orders", blank=True, null=True, on_delete=models.PROTECT, verbose_name=_('billing address')) shipping_address = models.ForeignKey( "ImmutableAddress", related_name='shipping_orders', blank=True, null=True, on_delete=models.PROTECT, verbose_name=_('shipping address')) tax_number = models.CharField(max_length=20, blank=True, verbose_name=_('tax number')) phone = models.CharField(max_length=32, blank=True, verbose_name=_('phone')) email = models.EmailField(max_length=128, blank=True, verbose_name=_('email address')) # Status creator = UnsavedForeignKey( settings.AUTH_USER_MODEL, related_name='orders_created', blank=True, null=True, on_delete=models.PROTECT, verbose_name=_('creating user')) deleted = models.BooleanField(db_index=True, default=False, verbose_name=_('deleted')) status = UnsavedForeignKey("OrderStatus", verbose_name=_('status'), on_delete=models.PROTECT) payment_status = EnumIntegerField( PaymentStatus, db_index=True, default=PaymentStatus.NOT_PAID, verbose_name=_('payment status')) shipping_status = EnumIntegerField( ShippingStatus, db_index=True, default=ShippingStatus.NOT_SHIPPED, verbose_name=_('shipping status')) # Methods payment_method = UnsavedForeignKey( "PaymentMethod", related_name="payment_orders", blank=True, null=True, default=None, on_delete=models.PROTECT, verbose_name=_('payment method')) payment_method_name = models.CharField( max_length=64, blank=True, default="", verbose_name=_('payment method name')) payment_data = JSONField(blank=True, null=True, verbose_name=_('payment data')) shipping_method = UnsavedForeignKey( "ShippingMethod", related_name='shipping_orders', blank=True, null=True, default=None, on_delete=models.PROTECT, verbose_name=_('shipping method')) shipping_method_name = models.CharField( max_length=64, blank=True, default="", verbose_name=_('shipping method name')) shipping_data = JSONField(blank=True, null=True, verbose_name=_('shipping data')) extra_data = JSONField(blank=True, null=True, verbose_name=_('extra data')) # Money stuff taxful_total_price = TaxfulPriceProperty('taxful_total_price_value', 'currency') taxless_total_price = TaxlessPriceProperty('taxless_total_price_value', 'currency') taxful_total_price_value = MoneyValueField(editable=False, verbose_name=_('grand total'), default=0) taxless_total_price_value = MoneyValueField(editable=False, verbose_name=_('taxless total'), default=0) currency = CurrencyField(verbose_name=_('currency')) prices_include_tax = models.BooleanField(verbose_name=_('prices include tax')) display_currency = CurrencyField(blank=True, verbose_name=_('display currency')) display_currency_rate = models.DecimalField( max_digits=36, decimal_places=9, default=1, verbose_name=_('display currency rate') ) # Other ip_address = models.GenericIPAddressField(null=True, blank=True, verbose_name=_('IP address')) # order_date is not `auto_now_add` for backdating purposes order_date = models.DateTimeField(editable=False, verbose_name=_('order date')) payment_date = models.DateTimeField(null=True, editable=False, verbose_name=_('payment date')) language = LanguageField(blank=True, verbose_name=_('language')) customer_comment = models.TextField(blank=True, verbose_name=_('customer comment')) admin_comment = models.TextField(blank=True, verbose_name=_('admin comment/notes')) require_verification = models.BooleanField(default=False, verbose_name=_('requires verification')) all_verified = models.BooleanField(default=False, verbose_name=_('all lines verified')) marketing_permission = models.BooleanField(default=True, verbose_name=_('marketing permission')) common_select_related = ("billing_address",) objects = OrderQuerySet.as_manager() class Meta: ordering = ("-id",) verbose_name = _('order') verbose_name_plural = _('orders') def __str__(self): # pragma: no cover if self.billing_address_id: name = self.billing_address.name else: name = "-" if settings.SHOOP_ENABLE_MULTIPLE_SHOPS: return "Order %s (%s, %s)" % (self.identifier, self.shop.name, name) else: return "Order %s (%s)" % (self.identifier, name)
[docs] def cache_prices(self): taxful_total = TaxfulPrice(0, self.currency) taxless_total = TaxlessPrice(0, self.currency) for line in self.lines.all(): taxful_total += line.taxful_price taxless_total += line.taxless_price self.taxful_total_price = _round_price(taxful_total) self.taxless_total_price = _round_price(taxless_total)
def _cache_contact_values(self): sources = [ self.shipping_address, self.billing_address, self.customer, self.orderer, ] fields = ("tax_number", "email", "phone") for field in fields: if getattr(self, field, None): continue for source in sources: val = getattr(source, field, None) if val: setattr(self, field, val) break def _cache_values(self): self._cache_contact_values() if not self.label: self.label = settings.SHOOP_DEFAULT_ORDER_LABEL if not self.currency: self.currency = self.shop.currency if not self.prices_include_tax: self.prices_include_tax = self.shop.prices_include_tax if not self.display_currency: self.display_currency = self.currency self.display_currency_rate = 1 if self.shipping_method_id and not self.shipping_method_name: self.shipping_method_name = self.shipping_method.safe_translation_getter( "name", default=self.shipping_method.identifier, any_language=True) if self.payment_method_id and not self.payment_method_name: self.payment_method_name = self.payment_method.safe_translation_getter( "name", default=self.payment_method.identifier, any_language=True) if not self.key: self.key = get_random_string(32) def _save_identifiers(self): self.identifier = "%s" % (get_order_identifier(self)) self.reference_number = get_reference_number(self) super(Order, self).save(update_fields=("identifier", "reference_number",))
[docs] def full_clean(self, exclude=None, validate_unique=True): self._cache_values() return super(Order, self).full_clean(exclude, validate_unique)
[docs] def save(self, *args, **kwargs): if not self.creator_id: if not settings.SHOOP_ALLOW_ANONYMOUS_ORDERS: raise ValidationError( "Anonymous (userless) orders are not allowed " "when SHOOP_ALLOW_ANONYMOUS_ORDERS is not enabled.") self._cache_values() first_save = (not self.pk) super(Order, self).save(*args, **kwargs) if first_save: # Have to do a double save the first time around to be able to save identifiers self._save_identifiers()
[docs] def delete(self, using=None): if not self.deleted: self.deleted = True self.add_log_entry("Deleted.", kind=LogEntryKind.DELETION) # Bypassing local `save()` on purpose. super(Order, self).save(update_fields=("deleted", ), using=using)
[docs] def set_canceled(self): if self.status.role != OrderStatusRole.CANCELED: self.status = OrderStatus.objects.get_default_canceled() self.save()
def _set_paid(self): if self.payment_status != PaymentStatus.FULLY_PAID: # pragma: no branch self.add_log_entry(_('Order marked as paid.')) self.payment_status = PaymentStatus.FULLY_PAID self.payment_date = now() self.save()
[docs] def is_paid(self): return (self.payment_status == PaymentStatus.FULLY_PAID)
[docs] def get_total_paid_amount(self): amounts = self.payments.values_list('amount_value', flat=True) return Money(sum(amounts, Decimal(0)), self.currency)
[docs] def create_payment(self, amount, payment_identifier=None, description=''): """ Create a payment with given amount for this order. If the order already has payments and sum of their amounts is equal or greater than self.taxful_total_price, an exception is raised. If the end sum of all payments is equal or greater than self.taxful_total_price, then the order is marked as paid. :param amount: Amount of the payment to be created :type amount: Money :param payment_identifier: Identifier of the created payment. If not set, default value of "gateway_id:order_id:number" will be used (where number is number of payments in the order). :type payment_identifier: str|None :param description: Description of the payment. Will be set to `method` property of the created payment. :type description: str :returns: The created Payment object :rtype: shoop.core.models.Payment """ assert isinstance(amount, Money) assert amount.currency == self.currency payments = self.payments.order_by('created_on') total_paid_amount = self.get_total_paid_amount() if total_paid_amount >= self.taxful_total_price.amount: raise NoPaymentToCreateException( "Order %s has already been fully paid (%s >= %s)." % ( self.pk, total_paid_amount, self.taxful_total_price ) ) if not payment_identifier: number = payments.count() + 1 payment_identifier = '%d:%d' % (self.id, number) payment = self.payments.create( payment_identifier=payment_identifier, amount_value=amount.value, description=description, ) if self.get_total_paid_amount() >= self.taxful_total_price.amount: self._set_paid() # also calls save return payment
[docs] def create_shipment(self, supplier, product_quantities): """ Create a shipment for this order from `product_quantities`. `product_quantities` is expected to be a dict mapping Product instances to quantities. Only quantities over 0 are taken into account, and if the mapping is empty or has no quantity value over 0, `NoProductsToShipException` will be raised. :param supplier: The Supplier for this product. No validation is made as to whether the given supplier supplies the products. :param product_quantities: a dict mapping Product instances to quantities to ship :type product_quantities: dict[shoop.shop.models.Product, decimal.Decimal] :raises: NoProductsToShipException :return: Saved, complete Shipment object :rtype: shoop.core.models.Shipment """ if not product_quantities or not any(quantity > 0 for quantity in product_quantities.values()): raise NoProductsToShipException("No products to ship (`quantities` is empty or has no quantity over 0).") from ._shipments import Shipment, ShipmentProduct shipment = Shipment(order=self, supplier=supplier) shipment.save() for product, quantity in product_quantities.items(): if quantity > 0: sp = ShipmentProduct(shipment=shipment, product=product, quantity=quantity) sp.cache_values() sp.save() shipment.cache_values() shipment.save() self.add_log_entry(_(u"Shipment #%d created.") % shipment.id) self.check_and_set_fully_shipped() return shipment
[docs] def create_shipment_of_all_products(self, supplier=None): """ Create a shipment of all the products in this Order, no matter whether or not any have been previously marked as shipped or not. See the documentation for `create_shipment`. :param supplier: The Supplier to use. If `None`, the first supplier in the order is used. (If several are in the order, this fails.) :return: Saved, complete Shipment object :rtype: shoop.shop.models.Shipment """ suppliers_to_product_quantities = defaultdict(lambda: defaultdict(lambda: 0)) lines = ( self.lines .filter(type=OrderLineType.PRODUCT) .values_list("supplier_id", "product_id", "quantity")) for supplier_id, product_id, quantity in lines: if product_id: suppliers_to_product_quantities[supplier_id][product_id] += quantity if not suppliers_to_product_quantities: raise NoProductsToShipException("Could not find any products to ship.") if supplier is None: if len(suppliers_to_product_quantities) > 1: # pragma: no cover raise ValueError("Can only use create_shipment_of_all_products when there is only one supplier") supplier_id, quantities = suppliers_to_product_quantities.popitem() supplier = Supplier.objects.get(pk=supplier_id) else: quantities = suppliers_to_product_quantities[supplier.id] products = dict((product.pk, product) for product in Product.objects.filter(pk__in=quantities.keys())) quantities = dict((products[product_id], quantity) for (product_id, quantity) in quantities.items()) return self.create_shipment(supplier, quantities)
[docs] def check_all_verified(self): if not self.all_verified: new_all_verified = (not self.lines.filter(verified=False).exists()) if new_all_verified: self.all_verified = True if self.require_verification: self.add_log_entry(_('All rows requiring verification have been verified.')) self.require_verification = False self.save() return self.all_verified
[docs] def get_purchased_attachments(self): from ._product_media import ProductMedia if self.payment_status != PaymentStatus.FULLY_PAID: return ProductMedia.objects.none() prods = self.lines.exclude(product=None).values_list("product_id", flat=True) return ProductMedia.objects.filter(product__in=prods, enabled=True, purchased=True)
[docs] def get_tax_summary(self): """ :rtype: taxing.TaxSummary """ all_line_taxes = [] untaxed = TaxlessPrice(0, self.currency) for line in self.lines.all(): line_taxes = list(line.taxes.all()) all_line_taxes.extend(line_taxes) if not line_taxes: untaxed += line.taxless_price return taxing.TaxSummary.from_line_taxes(all_line_taxes, untaxed)
[docs] def get_product_ids_and_quantities(self): quantities = defaultdict(lambda: 0) for product_id, quantity in self.lines.filter(type=OrderLineType.PRODUCT).values_list("product_id", "quantity"): quantities[product_id] += quantity return dict(quantities)
[docs] def is_complete(self): return (self.status.role == OrderStatusRole.COMPLETE)
[docs] def can_set_complete(self): fully_shipped = (self.shipping_status == ShippingStatus.FULLY_SHIPPED) canceled = (self.status.role == OrderStatusRole.CANCELED) return (not self.is_complete()) and fully_shipped and (not canceled)
[docs] def check_and_set_fully_shipped(self): if self.shipping_status != ShippingStatus.FULLY_SHIPPED: if not self.get_unshipped_products(): self.shipping_status = ShippingStatus.FULLY_SHIPPED self.add_log_entry(_(u"All products have been shipped. Fully Shipped status set.")) self.save(update_fields=("shipping_status",)) return True
[docs] def get_known_additional_data(self): """ Get a list of "known additional data" in this order's payment_data, shipping_data and extra_data. The list is returned in the order the fields are specified in the settings entries for said known keys. `dict(that_list)` can of course be used to "flatten" the list into a dict. :return: list of 2-tuples. """ output = [] for data_dict, name_mapping in ( (self.payment_data, settings.SHOOP_ORDER_KNOWN_PAYMENT_DATA_KEYS), (self.shipping_data, settings.SHOOP_ORDER_KNOWN_SHIPPING_DATA_KEYS), (self.extra_data, settings.SHOOP_ORDER_KNOWN_EXTRA_DATA_KEYS), ): if hasattr(data_dict, "get"): for key, display_name in name_mapping: if key in data_dict: output.append((force_text(display_name), data_dict[key])) return output
[docs] def get_product_summary(self): """Return a dict of product IDs -> {ordered, unshipped, shipped}""" products = defaultdict(lambda: defaultdict(lambda: Decimal(0))) lines = ( self.lines.filter(type=OrderLineType.PRODUCT) .values_list("product_id", "quantity")) for product_id, quantity in lines: products[product_id]['ordered'] += quantity products[product_id]['unshipped'] += quantity from ._shipments import ShipmentProduct shipment_prods = ( ShipmentProduct.objects .filter(shipment__order=self) .values_list("product_id", "quantity")) for product_id, quantity in shipment_prods: products[product_id]['shipped'] += quantity products[product_id]['unshipped'] -= quantity return products
[docs] def get_unshipped_products(self): return dict( (product, summary_datum) for product, summary_datum in self.get_product_summary().items() if summary_datum['unshipped'] )
[docs] def get_status_display(self): return force_text(self.status)
OrderLogEntry = define_log_model(Order) def _round_price(value): return bankers_round(value, 2) # TODO: To be fixed in SHOOP-1912