Source code for concurrency.forms

from importlib import import_module

from django import forms
from django.core.exceptions import ImproperlyConfigured, NON_FIELD_ERRORS, ValidationError
from django.core.signing import BadSignature, Signer
from django.forms import HiddenInput, ModelForm
from django.utils.safestring import mark_safe
from django.utils.translation import gettext as _

from concurrency.config import conf
from concurrency.core import _select_lock
from concurrency.exceptions import RecordModifiedError, VersionError


[docs]class ConcurrentForm(ModelForm): """ Simple wrapper to ModelForm that try to mitigate some concurrency error. Note that is always possible have a RecordModifiedError in model.save(). Statistically form.clean() should catch most of the concurrent editing, but is good to catch RecordModifiedError in the view too. """ def clean(self): try: if self.instance.pk: _select_lock(self.instance, self.cleaned_data[self.instance._concurrencymeta.field.name]) except RecordModifiedError: self._update_errors(ValidationError({NON_FIELD_ERRORS: self.error_class([_('Record Modified')])})) return super().clean()
[docs]class VersionWidget(HiddenInput): """ Widget that show the revision number using <div> Usually VersionField use `HiddenInput` as Widget to minimize the impact on the forms, in the Admin this produce a side effect to have the label *Version* without any value, you should use this widget to display the current revision number """ def format_value(self, value): if value: value = str(value) return value _format_value = format_value def render(self, name, value, attrs=None): ret = super().render(name, value, attrs) label = '' if isinstance(value, SignedValue): label = str(value).split(':')[0] elif value is not None: label = str(value) return mark_safe("%s<div>%s</div>" % (ret, label))
class VersionFieldSigner(Signer): def sign(self, value): if not value: return None return super().sign(value) def get_signer(): path = conf.FIELD_SIGNER i = path.rfind('.') module, attr = path[:i], path[i + 1:] try: mod = import_module(module) except ImportError as e: raise ImproperlyConfigured('Error loading concurrency signer %s: "%s"' % (module, e)) try: signer_class = getattr(mod, attr) except AttributeError: # pragma: no cover raise ImproperlyConfigured('Module "%s" does not define a valid signer named "%s"' % (module, attr)) return signer_class() class SignedValue: def __init__(self, value): self.value = value def __repr__(self): if self.value: return str(self.value) else: return '' class VersionField(forms.IntegerField): widget = HiddenInput # Default widget to use when rendering this type of Field. hidden_widget = HiddenInput # Default widget to use when rendering this as "hidden". def __init__(self, *args, **kwargs): self._signer = kwargs.pop('signer', get_signer()) kwargs.pop('min_value', None) kwargs.pop('max_value', None) kwargs['required'] = True kwargs['initial'] = None kwargs.setdefault('widget', HiddenInput) super().__init__(*args, **kwargs) def bound_data(self, data, initial): return SignedValue(data) def prepare_value(self, value): if isinstance(value, SignedValue): return value elif value is None: return '' return SignedValue(self._signer.sign(value)) def to_python(self, value): try: if value not in (None, '', 'None'): return int(self._signer.unsign(str(value))) return 0 except (BadSignature, ValueError): raise VersionError(value) def widget_attrs(self, widget): return {}