Source code for concurrency.forms

from importlib import import_module

from django import forms
from django.core.exceptions import (
    NON_FIELD_ERRORS,
    ImproperlyConfigured,
    ValidationError,
)
from django.core.signing import BadSignature, Signer
from django.forms import HiddenInput, ModelForm
from django.utils.html import format_html
from django.utils.translation import gettext as _

from concurrency.config import conf
from concurrency.core import _select_lock
from concurrency.exceptions import RecordModifiedError, VersionError


[docs] class ConcurrentForm(ModelForm): """Simple wrapper to ModelForm that try to mitigate some concurrency error. Note that is always possible have a RecordModifiedError in model.save(). Statistically form.clean() should catch most of the concurrent editing, but is good to catch RecordModifiedError in the view too. """ def clean(self): try: if self.instance.pk: _select_lock( self.instance, self.cleaned_data[self.instance._concurrencymeta.field.name], ) except RecordModifiedError: self._update_errors(ValidationError({NON_FIELD_ERRORS: self.error_class([_("Record Modified")])})) return super().clean()
[docs] class VersionWidget(HiddenInput): """Widget that show the revision number using <div>. Usually VersionField use `HiddenInput` as Widget to minimize the impact on the forms, in the Admin this produce a side effect to have the label *Version* without any value, you should use this widget to display the current revision number """ def format_value(self, value): if value: value = str(value) return value _format_value = format_value def render(self, name, value, attrs=None): ret = super().render(name, value, attrs) label = "" if isinstance(value, SignedValue): label = str(value).split(":")[0] elif value is not None: label = str(value) return format_html("{}<div>{}</div>", ret, label)
class VersionFieldSigner(Signer): def sign(self, value): if not value: return None return super().sign(value) def get_signer(): path = conf.FIELD_SIGNER i = path.rfind(".") module, attr = path[:i], path[i + 1 :] try: mod = import_module(module) except ImportError as e: msg = f'Error loading concurrency signer {module}: "{e}"' raise ImproperlyConfigured(msg) try: signer_class = getattr(mod, attr) except AttributeError: # pragma: no cover msg = f'Module "{module}" does not define a valid signer named "{attr}"' raise ImproperlyConfigured(msg) return signer_class() class SignedValue: def __init__(self, value) -> None: self.value = value def __repr__(self) -> str: if self.value: return str(self.value) return "" class VersionField(forms.IntegerField): widget = HiddenInput # Default widget to use when rendering this type of Field. hidden_widget = HiddenInput # Default widget to use when rendering this as "hidden". def __init__(self, *args, **kwargs) -> None: self._signer = kwargs.pop("signer", get_signer()) kwargs.pop("min_value", None) kwargs.pop("max_value", None) kwargs["required"] = True kwargs["initial"] = None kwargs.setdefault("widget", HiddenInput) super().__init__(*args, **kwargs) def bound_data(self, data, initial): return SignedValue(data) def prepare_value(self, value): if isinstance(value, SignedValue): return value if value is None: return "" return SignedValue(self._signer.sign(value)) def to_python(self, value): try: if value not in {None, "", "None"}: return int(self._signer.unsign(str(value))) return 0 except (BadSignature, ValueError): raise VersionError(value) def widget_attrs(self, widget): return {}