From a5bd4a6df40a45abe09b86e55f3114c45be14218 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tudor=20V=C4=83ran?= Date: Thu, 10 Jul 2025 15:41:42 +0300 Subject: [PATCH 01/15] Remove introspection and ops checks on Django >= 5.0 (#264) --- psqlextra/backend/base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/psqlextra/backend/base.py b/psqlextra/backend/base.py index c8ae73c..58222dd 100644 --- a/psqlextra/backend/base.py +++ b/psqlextra/backend/base.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING +from django import VERSION from django.conf import settings from django.contrib.postgres.signals import ( get_hstore_oids, @@ -45,6 +46,9 @@ class DatabaseWrapper(Wrapper): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + if VERSION >= (5, 0): + return + # Some base back-ends such as the PostGIS back-end don't properly # set `ops_class` and `introspection_class` and initialize these # classes themselves. From 552fb271227677d6c91fa94a599a58437090cc9a Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Thu, 10 Jul 2025 21:23:24 +0200 Subject: [PATCH 02/15] Custom CompositePrimaryKey and foreign key support on partitioned models --- psqlextra/models/partitioned.py | 137 +++++++++++++++++++++- tests/test_partitioned_model.py | 195 +++++++++++++++++++++++++++++++- 2 files changed, 325 insertions(+), 7 deletions(-) diff --git a/psqlextra/models/partitioned.py b/psqlextra/models/partitioned.py index f011536..d833155 100644 --- a/psqlextra/models/partitioned.py +++ b/psqlextra/models/partitioned.py @@ -1,6 +1,10 @@ -from typing import Iterable +from typing import Iterable, List, Optional, Tuple +from django.core.exceptions import ImproperlyConfigured +from django.db import models from django.db.models.base import ModelBase +from django.db.models.fields.composite import CompositePrimaryKey +from django.db.models.options import Options from psqlextra.types import PostgresPartitioningMethod @@ -20,19 +24,140 @@ class PostgresPartitionedModelMeta(ModelBase): default_key: Iterable[str] = [] def __new__(cls, name, bases, attrs, **kwargs): - new_class = super().__new__(cls, name, bases, attrs, **kwargs) - meta_class = attrs.pop("PartitioningMeta", None) + partitioning_meta_class = attrs.pop("PartitioningMeta", None) + + partitioning_method = getattr(partitioning_meta_class, "method", None) + partitioning_key = getattr(partitioning_meta_class, "key", None) + special = getattr(partitioning_meta_class, "special", None) - method = getattr(meta_class, "method", None) - key = getattr(meta_class, "key", None) + if special: + cls._create_primary_key(attrs, partitioning_key) patitioning_meta = PostgresPartitionedModelOptions( - method=method or cls.default_method, key=key or cls.default_key + method=partitioning_method or cls.default_method, + key=partitioning_key or cls.default_key, ) + new_class = super().__new__(cls, name, bases, attrs, **kwargs) new_class.add_to_class("_partitioning_meta", patitioning_meta) return new_class + @classmethod + def _create_primary_key(cls, attrs, partitioning_key: Optional[List[str]]): + pk = cls._find_primary_key(attrs) + if pk and isinstance(pk[1], CompositePrimaryKey): + return + + if not pk: + attrs["id"] = attrs.get("id") or cls._create_auto_field(attrs) + pk_fields = ["id"] + else: + pk_fields = [pk[0]] + + unique_pk_fields = set(pk_fields + (partitioning_key or [])) + if len(unique_pk_fields) <= 1: + return + + auto_generated_pk = CompositePrimaryKey(*sorted(unique_pk_fields)) + attrs["pk"] = auto_generated_pk + + @classmethod + def _create_auto_field(cls, attrs): + app_label = attrs.get("app_label") + meta_class = attrs.get("Meta", None) + + pk_class = Options(meta_class, app_label)._get_default_pk_class() + return pk_class(verbose_name="ID", primary_key=True, auto_created=True) + + @classmethod + def _find_primary_key(cls, attrs) -> Optional[Tuple[str, models.Field]]: + """Gets the field that has been marked by the user as the primary key + field for this model. + + This is quite complex because Django allows a variety of options: + + 1. No PK at all. In this case, Django generates one named `id` + as an auto-increment integer (AutoField) + + 2. One field that has `primary_key=True`. Any field can have + this attribute, but Django would error if there were more. + + 3. One field named `pk`. + + 4. One field that has `primary_key=True` and a field that + is of type `CompositePrimaryKey` that includes that + field. + + Since a table can only have one primary key, our goal here + is to find the field (if any) that is going to become + the primary key of the table. + + Our logic is straight forward: + + 1. If there is a `CompositePrimaryKey`, that field becomes the primary key. + + 2. If there is a field with `primary_key=True`, that field becomes the primary key. + + 3. There is no primary key. + """ + + fields = { + name: value + for name, value in attrs.items() + if isinstance(value, models.Field) + } + + fields_marked_as_pk = { + name: value for name, value in fields.items() if value.primary_key + } + + # We cannot let the user define a field named `pk` that is not a CompositePrimaryKey + # already because when we generate a primary key, we want to name it `pk`. + field_named_pk = attrs.get("pk") + if field_named_pk and not field_named_pk.primary_key: + raise ImproperlyConfigured( + "You cannot define a field named `pk` that is not a primary key." + ) + + if field_named_pk: + if not isinstance(field_named_pk, CompositePrimaryKey): + raise ImproperlyConfigured( + "You cannot define a field named `pk` that is not a composite primary key on a partitioned model. Either make `pk` a CompositePrimaryKey or rename it." + ) + + return ("pk", field_named_pk) + + if not fields_marked_as_pk: + return None + + # Make sure the user didn't define N primary keys. Django would also warn + # about this. + # + # One exception is a set up such as: + # + # >>> id = models.AutoField(primary_key=True) + # >>> timestamp = models.DateTimeField() + # >>> pk = models.CompositePrimaryKey("id", "timestamp") + # + # In this case, both `id` and `pk` are marked as primary key. Django + # allows this and just ignores the `primary_key=True` attribute + # on all the other fields except the composite one. + # + # We also handle this as expected and treat the CompositePrimaryKey + # as the primary key. + sorted_fields_marked_as_pk = sorted( + list(fields_marked_as_pk.items()), + key=lambda pair: 0 + if isinstance(pair[1], CompositePrimaryKey) + else 1, + ) + if len(sorted_fields_marked_as_pk[1:]) > 1: + raise ImproperlyConfigured( + "You cannot mark more than one fields as a primary key." + ) + + return sorted_fields_marked_as_pk[0] + class PostgresPartitionedModel( PostgresModel, metaclass=PostgresPartitionedModelMeta diff --git a/tests/test_partitioned_model.py b/tests/test_partitioned_model.py index 8956273..f4fc3c6 100644 --- a/tests/test_partitioned_model.py +++ b/tests/test_partitioned_model.py @@ -1,7 +1,13 @@ +import django +import pytest + +from django.core.exceptions import ImproperlyConfigured +from django.db import models + from psqlextra.models import PostgresPartitionedModel from psqlextra.types import PostgresPartitioningMethod -from .fake_model import define_fake_partitioned_model +from .fake_model import define_fake_model, define_fake_partitioned_model def test_partitioned_model_abstract(): @@ -70,3 +76,190 @@ def test_partitioned_model_key_option_none(): model = define_fake_partitioned_model(partitioning_options=dict(key=None)) assert model._partitioning_meta.key == [] + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_custom_composite_primary_key_with_auto_field(): + model = define_fake_partitioned_model( + fields={ + "auto_id": models.AutoField(), + "my_custom_pk": models.CompositePrimaryKey("auto_id", "timestamp"), + "timestamp": models.DateTimeField(), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + assert isinstance(model._meta.pk, models.CompositePrimaryKey) + assert model._meta.pk.name == "my_custom_pk" + assert model._meta.pk.columns == ("auto_id", "timestamp") + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_custom_composite_primary_key_with_id_field(): + model = define_fake_partitioned_model( + fields={ + "id": models.IntegerField(), + "my_custom_pk": models.CompositePrimaryKey("id", "timestamp"), + "timestamp": models.DateTimeField(), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + assert isinstance(model._meta.pk, models.CompositePrimaryKey) + assert model._meta.pk.name == "my_custom_pk" + assert model._meta.pk.columns == ("id", "timestamp") + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_custom_composite_primary_key_named_id(): + model = define_fake_partitioned_model( + fields={ + "other_field": models.TextField(), + "id": models.CompositePrimaryKey("other_field", "timestamp"), + "timestamp": models.DateTimeField(), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + assert isinstance(model._meta.pk, models.CompositePrimaryKey) + assert model._meta.pk.name == "id" + assert model._meta.pk.columns == ("other_field", "timestamp") + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_field_named_pk_not_composite_not_primary(): + with pytest.raises(ImproperlyConfigured): + define_fake_partitioned_model( + fields={ + "pk": models.TextField(), + "id": models.CompositePrimaryKey("other_field", "timestamp"), + "timestamp": models.DateTimeField(), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_field_named_pk_not_composite(): + with pytest.raises(ImproperlyConfigured): + define_fake_partitioned_model( + fields={ + "pk": models.AutoField(primary_key=True), + "timestamp": models.DateTimeField(), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_field_multiple_pks(): + with pytest.raises(ImproperlyConfigured): + define_fake_partitioned_model( + fields={ + "id": models.AutoField(primary_key=True), + "another_pk": models.TextField(primary_key=True), + "timestamp": models.DateTimeField(), + "real_pk": models.CompositePrimaryKey("id", "timestamp"), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_no_pk_defined(): + model = define_fake_partitioned_model( + fields={ + "timestamp": models.DateTimeField(), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + assert isinstance(model._meta.pk, models.CompositePrimaryKey) + assert model._meta.pk.name == "pk" + assert model._meta.pk.columns == ("id", "timestamp") + + id_field = model._meta.get_field("id") + assert id_field.name == "id" + assert id_field.column == "id" + assert isinstance(id_field, models.AutoField) + assert id_field.primary_key is True + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_composite_primary_key(): + model = define_fake_partitioned_model( + fields={ + "id": models.AutoField(primary_key=True), + "pk": models.CompositePrimaryKey("id", "timestamp"), + "timestamp": models.DateTimeField(), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + assert isinstance(model._meta.pk, models.CompositePrimaryKey) + assert model._meta.pk.name == "pk" + assert model._meta.pk.columns == ("id", "timestamp") + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_composite_primary_key_foreign_key(): + model = define_fake_partitioned_model( + fields={ + "timestamp": models.DateTimeField(), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + define_fake_model( + fields={ + "model": models.ForeignKey(model, on_delete=models.CASCADE), + }, + ) + + +@pytest.mark.skipif( + django.VERSION < (5, 2), + reason="Django < 5.2 doesn't implement composite primary keys", +) +def test_partitioned_model_custom_composite_primary_key_foreign_key(): + model = define_fake_partitioned_model( + fields={ + "id": models.TextField(primary_key=True), + "timestamp": models.DateTimeField(), + "custom": models.CompositePrimaryKey("id", "timestamp"), + }, + partitioning_options=dict(key=["timestamp"], special=True), + ) + + define_fake_model( + fields={ + "model": models.ForeignKey(model, on_delete=models.CASCADE), + }, + ) From 72a6070c7d7ec05f3fd54f7c26ea5e2edc8adbf1 Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Fri, 11 Jul 2025 09:11:32 +0200 Subject: [PATCH 03/15] Very clearly specify that FK to partitioned models are not supported --- docs/source/table_partitioning.rst | 58 ++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/docs/source/table_partitioning.rst b/docs/source/table_partitioning.rst index 0e72e3b..9194c41 100644 --- a/docs/source/table_partitioning.rst +++ b/docs/source/table_partitioning.rst @@ -28,9 +28,7 @@ Known limitations Foreign keys ~~~~~~~~~~~~ -Support for foreign keys to partitioned models is limited in Django 5.1 and older. These are only suported under specific conditions. - -For full support for foreign keys to partitioned models, use Django 5.2 or newer. Django 5.2 supports composite primary and foreign keys native through :class:`~django:django.db.models.CompositePrimaryKey` to support. +There is no support for foreign keys **to** partitioned models. Even in Django 5.2 with the introduction of :class:`~django:django.db.models.CompositePrimaryKey`, there is no support for foreign keys. See: https://code.djangoproject.com/ticket/36034 Foreing keys **on** a partitioned models to other, non-partitioned models are always supported. @@ -118,27 +116,65 @@ Primary key PostgreSQL demands that the primary key is the same or is part of the partitioning key. See `PostgreSQL Table Partitioning Limitations`_. -TL;DR Foreign keys don't work in Django <5.2. Use Django 5.2 or newer for proper support. - **In Django <5.2, the behavior is as following:** - - If the primary key is the same as the partitioning key: - - Foreign keys to partitioned tables will work as you expect. + - If the primary key is the same as the partitioning key, standard Django behavior applies. - If the primary key is not the exact same as the partitioning key or the partitioning key consists of more than one field: An implicit composite primary key (not visible from Django) is created. - Foreign keys to partitioned tables will **NOT** work. - **In Django >5.2, the behavior is as following:** - If no explicit primary key is defined, a :class:`~django:django.db.models.CompositePrimaryKey` is created automatically that includes an auto-incrementing `id` primary key field and the partitioning keys. - If an explicit :class:`~django:django.db.models.CompositePrimaryKey` is specified, no modifications are made to it and it is your responsibility to make sure the partitioning keys are part of the primary key. - In Django 5.2 and newer, foreign keys to partitioned models always work. +Django 5.2 examples +******************* + +Custom composite primary key +"""""""""""""""""""""""""""" + +.. code-block:: python + + from django.db import models + + from psqlextra.types import PostgresPartitioningMethod + from psqlextra.models import PostgresPartitionedModel + + class MyModel(PostgresPartitionedModel): + class PartitioningMeta: + method = PostgresPartitioningMethod.RANGE + key = ["timestamp"] + + # WARNING: This overrides default primary key that includes a auto-increment `id` field. + pk = models.CompositePrimaryKey("name", "timestamp") + + name = models.TextField() + timestamp = models.DateTimeField() + + +Custom composite primary key with auto-incrementing ID +"""""""""""""""""""""""""""""""""""""""""""""""""""""" + +.. code-block:: python + + from django.db import models + + from psqlextra.types import PostgresPartitioningMethod + from psqlextra.models import PostgresPartitionedModel + + class MyModel(PostgresPartitionedModel): + class PartitioningMeta: + method = PostgresPartitioningMethod.RANGE + key = ["timestamp"] + + id = models.AutoField(primary_key=True) + pk = models.CompositePrimaryKey("id", "timestamp") + + name = models.TextField() + timestamp = models.DateTimeField() Generating a migration From b450c69cb11ecd023df38270c554ceb5a64f109b Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Fri, 11 Jul 2025 09:13:53 +0200 Subject: [PATCH 04/15] Make sure temp dir for tablespace tests exists --- tests/conftest.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index d0a379a..03d01ab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import os import tempfile import uuid @@ -35,6 +36,9 @@ def django_db_setup(django_db_setup, django_db_blocker): qn = connection.ops.quote_name with tempfile.TemporaryDirectory() as temp_dir: + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + with connection.cursor() as cursor: cursor.execute( f"CREATE TABLESPACE {qn(custom_tablespace_name)} LOCATION %s", From 3a6229ce3dc73075095d0b1cf0ec29a6ced94f2f Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Sun, 5 Oct 2025 21:52:25 +0200 Subject: [PATCH 05/15] Finalize `CompositePrimaryKey` support on `PostgresPartitionedModel` --- psqlextra/models/partitioned.py | 76 ++++++++++++++++++++++++++++++--- tests/test_partitioned_model.py | 24 +++++------ 2 files changed, 82 insertions(+), 18 deletions(-) diff --git a/psqlextra/models/partitioned.py b/psqlextra/models/partitioned.py index d833155..3a20677 100644 --- a/psqlextra/models/partitioned.py +++ b/psqlextra/models/partitioned.py @@ -1,9 +1,10 @@ from typing import Iterable, List, Optional, Tuple +import django + from django.core.exceptions import ImproperlyConfigured from django.db import models from django.db.models.base import ModelBase -from django.db.models.fields.composite import CompositePrimaryKey from django.db.models.options import Options from psqlextra.types import PostgresPartitioningMethod @@ -28,9 +29,11 @@ def __new__(cls, name, bases, attrs, **kwargs): partitioning_method = getattr(partitioning_meta_class, "method", None) partitioning_key = getattr(partitioning_meta_class, "key", None) - special = getattr(partitioning_meta_class, "special", None) - if special: + if django.VERSION >= (5, 2): + for base in bases: + cls._delete_auto_created_fields(base) + cls._create_primary_key(attrs, partitioning_key) patitioning_meta = PostgresPartitionedModelOptions( @@ -43,21 +46,57 @@ def __new__(cls, name, bases, attrs, **kwargs): return new_class @classmethod - def _create_primary_key(cls, attrs, partitioning_key: Optional[List[str]]): + def _create_primary_key( + cls, attrs, partitioning_key: Optional[List[str]] + ) -> None: + from django.db.models.fields.composite import CompositePrimaryKey + + # Find any existing primary key the user might have declared. + # + # If it is a composite primary key, we will do nothing and + # keep it as it is. You're own your own. pk = cls._find_primary_key(attrs) if pk and isinstance(pk[1], CompositePrimaryKey): return + # Create an `id` field (auto-incrementing) if there is no + # primary key yet. + # + # This matches standard Django behavior. if not pk: attrs["id"] = attrs.get("id") or cls._create_auto_field(attrs) pk_fields = ["id"] else: pk_fields = [pk[0]] - unique_pk_fields = set(pk_fields + (partitioning_key or [])) + partitioning_keys = ( + partitioning_key + if isinstance(partitioning_key, list) + else list(filter(None, [partitioning_key])) + ) + + unique_pk_fields = set(pk_fields + (partitioning_keys or [])) if len(unique_pk_fields) <= 1: + if "id" in attrs: + attrs["id"].primary_key = True return + # You might have done something like this: + # + # id = models.AutoField(primary_key=True) + # pk = CompositePrimaryKey("id", "timestamp") + # + # The `primary_key` attribute has to be removed + # from the `id` field in the example above to + # avoid having two primary keys. + # + # Without this, the generated schema will + # have two primary keys, which is an error. + for field in attrs.values(): + is_pk = getattr(field, "primary_key", False) + if is_pk: + field.primary_key = False + auto_generated_pk = CompositePrimaryKey(*sorted(unique_pk_fields)) attrs["pk"] = auto_generated_pk @@ -67,7 +106,7 @@ def _create_auto_field(cls, attrs): meta_class = attrs.get("Meta", None) pk_class = Options(meta_class, app_label)._get_default_pk_class() - return pk_class(verbose_name="ID", primary_key=True, auto_created=True) + return pk_class(verbose_name="ID", auto_created=True) @classmethod def _find_primary_key(cls, attrs) -> Optional[Tuple[str, models.Field]]: @@ -101,6 +140,8 @@ def _find_primary_key(cls, attrs) -> Optional[Tuple[str, models.Field]]: 3. There is no primary key. """ + from django.db.models.fields.composite import CompositePrimaryKey + fields = { name: value for name, value in attrs.items() @@ -158,6 +199,29 @@ def _find_primary_key(cls, attrs) -> Optional[Tuple[str, models.Field]]: return sorted_fields_marked_as_pk[0] + @classmethod + def _delete_auto_created_fields(cls, model: models.Model): + """Base classes might be injecting an auto-generated `id` field before + we even have the chance of doing this ourselves. + + Delete any auto generated fields from the base class so that we + can declare our own. If there is no auto-generated field, one + will be added anyways by our own logic + """ + + fields = model._meta.local_fields + model._meta.local_many_to_many + for field in fields: + auto_created = getattr(field, "auto_created", False) + if auto_created: + if field in model._meta.local_fields: + model._meta.local_fields.remove(field) + + if field in model._meta.fields: + model._meta.fields.remove(field) # type: ignore [attr-defined] + + if hasattr(model, field.name): + delattr(model, field.name) + class PostgresPartitionedModel( PostgresModel, metaclass=PostgresPartitionedModelMeta diff --git a/tests/test_partitioned_model.py b/tests/test_partitioned_model.py index f4fc3c6..55c6651 100644 --- a/tests/test_partitioned_model.py +++ b/tests/test_partitioned_model.py @@ -85,11 +85,11 @@ def test_partitioned_model_key_option_none(): def test_partitioned_model_custom_composite_primary_key_with_auto_field(): model = define_fake_partitioned_model( fields={ - "auto_id": models.AutoField(), + "auto_id": models.AutoField(primary_key=True), "my_custom_pk": models.CompositePrimaryKey("auto_id", "timestamp"), "timestamp": models.DateTimeField(), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) assert isinstance(model._meta.pk, models.CompositePrimaryKey) @@ -108,7 +108,7 @@ def test_partitioned_model_custom_composite_primary_key_with_id_field(): "my_custom_pk": models.CompositePrimaryKey("id", "timestamp"), "timestamp": models.DateTimeField(), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) assert isinstance(model._meta.pk, models.CompositePrimaryKey) @@ -127,7 +127,7 @@ def test_partitioned_model_custom_composite_primary_key_named_id(): "id": models.CompositePrimaryKey("other_field", "timestamp"), "timestamp": models.DateTimeField(), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) assert isinstance(model._meta.pk, models.CompositePrimaryKey) @@ -147,7 +147,7 @@ def test_partitioned_model_field_named_pk_not_composite_not_primary(): "id": models.CompositePrimaryKey("other_field", "timestamp"), "timestamp": models.DateTimeField(), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) @@ -162,7 +162,7 @@ def test_partitioned_model_field_named_pk_not_composite(): "pk": models.AutoField(primary_key=True), "timestamp": models.DateTimeField(), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) @@ -179,7 +179,7 @@ def test_partitioned_model_field_multiple_pks(): "timestamp": models.DateTimeField(), "real_pk": models.CompositePrimaryKey("id", "timestamp"), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) @@ -192,7 +192,7 @@ def test_partitioned_model_no_pk_defined(): fields={ "timestamp": models.DateTimeField(), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) assert isinstance(model._meta.pk, models.CompositePrimaryKey) @@ -203,7 +203,7 @@ def test_partitioned_model_no_pk_defined(): assert id_field.name == "id" assert id_field.column == "id" assert isinstance(id_field, models.AutoField) - assert id_field.primary_key is True + assert id_field.primary_key is False @pytest.mark.skipif( @@ -217,7 +217,7 @@ def test_partitioned_model_composite_primary_key(): "pk": models.CompositePrimaryKey("id", "timestamp"), "timestamp": models.DateTimeField(), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) assert isinstance(model._meta.pk, models.CompositePrimaryKey) @@ -234,7 +234,7 @@ def test_partitioned_model_composite_primary_key_foreign_key(): fields={ "timestamp": models.DateTimeField(), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) define_fake_model( @@ -255,7 +255,7 @@ def test_partitioned_model_custom_composite_primary_key_foreign_key(): "timestamp": models.DateTimeField(), "custom": models.CompositePrimaryKey("id", "timestamp"), }, - partitioning_options=dict(key=["timestamp"], special=True), + partitioning_options=dict(key=["timestamp"]), ) define_fake_model( From d454875cbc203fc8ddc58d4507e0ca911a0fc664 Mon Sep 17 00:00:00 2001 From: Walison Filipe Date: Sun, 5 Oct 2025 16:53:30 -0300 Subject: [PATCH 06/15] Make multi-day partitions deterministic and aligned (#263) --- psqlextra/partitioning/time_partition_size.py | 13 ++++- tests/test_partitioning_time.py | 49 +++++++++++++++++-- 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/psqlextra/partitioning/time_partition_size.py b/psqlextra/partitioning/time_partition_size.py index 3d013bc..46ef369 100644 --- a/psqlextra/partitioning/time_partition_size.py +++ b/psqlextra/partitioning/time_partition_size.py @@ -1,6 +1,6 @@ import enum -from datetime import date, datetime +from datetime import date, datetime, timedelta, timezone from typing import Optional, Union from dateutil.relativedelta import relativedelta @@ -15,11 +15,15 @@ class PostgresTimePartitionUnit(enum.Enum): DAYS = "days" +UNIX_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc) + + class PostgresTimePartitionSize: """Size of a time-based range partition table.""" unit: PostgresTimePartitionUnit value: int + anchor: datetime def __init__( self, @@ -27,6 +31,7 @@ def __init__( months: Optional[int] = None, weeks: Optional[int] = None, days: Optional[int] = None, + anchor: datetime = UNIX_EPOCH ) -> None: sizes = [years, months, weeks, days] @@ -38,6 +43,7 @@ def __init__( "Partition can only have on size unit." ) + self.anchor = anchor if years: self.unit = PostgresTimePartitionUnit.YEARS self.value = years @@ -82,7 +88,10 @@ def start(self, dt: datetime) -> datetime: if self.unit == PostgresTimePartitionUnit.WEEKS: return self._ensure_datetime(dt - relativedelta(days=dt.weekday())) - return self._ensure_datetime(dt) + diff_days = (dt - self.anchor).days + partition_index = diff_days // self.value + start = self.anchor + timedelta(days=partition_index * self.value) + return self._ensure_datetime(start) @staticmethod def _ensure_datetime(dt: Union[date, datetime]) -> datetime: diff --git a/tests/test_partitioning_time.py b/tests/test_partitioning_time.py index 9f6b5bf..6d190b3 100644 --- a/tests/test_partitioning_time.py +++ b/tests/test_partitioning_time.py @@ -254,6 +254,49 @@ def test_partitioning_time_daily_apply(): assert table.partitions[6].name == "2019_jun_04" +@pytest.mark.postgres_version(lt=110000) +def test_partitioning_time_consistent_daily_apply(): + """Ensures that automatic daily partition creation is consistent and aligned + when the partition size spans multiple days (e.g., days > 1)""" + + model = define_fake_partitioned_model( + {"timestamp": models.DateTimeField()}, {"key": ["timestamp"]} + ) + + schema_editor = connection.schema_editor() + schema_editor.create_partitioned_model(model) + + with freezegun.freeze_time("2025-06-20"): + manager = PostgresPartitioningManager( + [partition_by_current_time(model, days=5, count=3)] + ) + manager.plan().apply() + + table = _get_partitioned_table(model) + assert len(table.partitions) == 3 + + # Partitions are aligned based on the fixed anchor (Unix Epoch by default). + # 2025-06-20 falls within the partition starting at 2025-06-16, + # since it's the most recent multiple of 5 days since 1970-01-01. + assert table.partitions[0].name == "2025_jun_16" + assert table.partitions[1].name == "2025_jun_21" + assert table.partitions[2].name == "2025_jun_26" + + # re-running it another day only creates the next one needed. + with freezegun.freeze_time("2025-06-22"): + manager = PostgresPartitioningManager( + [partition_by_current_time(model, days=5, count=3)] + ) + manager.plan().apply() + + table = _get_partitioned_table(model) + assert len(table.partitions) == 4 + assert table.partitions[0].name == "2025_jun_16" + assert table.partitions[1].name == "2025_jun_21" + assert table.partitions[2].name == "2025_jun_26" + assert table.partitions[3].name == "2025_jul_01" + + @pytest.mark.postgres_version(lt=110000) def test_partitioning_time_monthly_apply_insert(): """Tests whether automatically created monthly partitions line up @@ -376,7 +419,7 @@ def test_partitioning_time_daily_apply_insert(): @pytest.mark.parametrize( "kwargs,partition_names", [ - (dict(days=2), ["2019_jan_01", "2019_jan_03"]), + (dict(days=2), ["2018_dec_31", "2019_jan_02"]), (dict(weeks=2), ["2018_week_53", "2019_week_02"]), (dict(months=2), ["2019_jan", "2019_mar"]), (dict(years=2), ["2019", "2021"]), @@ -422,7 +465,7 @@ def test_partitioning_time_multiple(kwargs, partition_names): dict(days=7, max_age=relativedelta(weeks=1)), [ ("2019-1-1", 6), - ("2019-1-4", 6), + ("2019-1-4", 5), ("2019-1-8", 5), ("2019-1-15", 4), ("2019-1-16", 4), @@ -450,7 +493,7 @@ def test_partitioning_time_delete(kwargs, timepoints): with freezegun.freeze_time(timepoints[0][0]): manager.plan().apply() - for index, (dt, partition_count) in enumerate(timepoints): + for (dt, partition_count) in timepoints: with freezegun.freeze_time(dt): manager.plan(skip_create=True).apply() From 182ff726ccd8d390be44f6cff8895f822f15680a Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Sun, 5 Oct 2025 21:56:43 +0200 Subject: [PATCH 07/15] Re-format code after merging partition anchor support --- psqlextra/partitioning/time_partition_size.py | 2 +- tests/test_partitioning_time.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/psqlextra/partitioning/time_partition_size.py b/psqlextra/partitioning/time_partition_size.py index 46ef369..bb0c7e7 100644 --- a/psqlextra/partitioning/time_partition_size.py +++ b/psqlextra/partitioning/time_partition_size.py @@ -31,7 +31,7 @@ def __init__( months: Optional[int] = None, weeks: Optional[int] = None, days: Optional[int] = None, - anchor: datetime = UNIX_EPOCH + anchor: datetime = UNIX_EPOCH, ) -> None: sizes = [years, months, weeks, days] diff --git a/tests/test_partitioning_time.py b/tests/test_partitioning_time.py index 6d190b3..1e6e21a 100644 --- a/tests/test_partitioning_time.py +++ b/tests/test_partitioning_time.py @@ -256,8 +256,8 @@ def test_partitioning_time_daily_apply(): @pytest.mark.postgres_version(lt=110000) def test_partitioning_time_consistent_daily_apply(): - """Ensures that automatic daily partition creation is consistent and aligned - when the partition size spans multiple days (e.g., days > 1)""" + """Ensures that automatic daily partition creation is consistent and + aligned when the partition size spans multiple days (e.g., days > 1)""" model = define_fake_partitioned_model( {"timestamp": models.DateTimeField()}, {"key": ["timestamp"]} From 67f20301d0038221d8cc1a921424f5a9537957ee Mon Sep 17 00:00:00 2001 From: Stuart Leitch Date: Thu, 5 Oct 2023 15:02:23 +0100 Subject: [PATCH 08/15] Add in Hourly Partition Support --- .gitignore | 3 + docs/source/table_partitioning.rst | 11 +++ .../partitioning/current_time_strategy.py | 3 +- psqlextra/partitioning/shorthands.py | 6 +- psqlextra/partitioning/time_partition.py | 5 +- psqlextra/partitioning/time_partition_size.py | 27 ++++-- tests/test_partitioning_time.py | 91 +++++++++++++++++++ 7 files changed, 136 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 63d6378..52805a8 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,6 @@ build/ # Ignore PyCharm / IntelliJ files .idea/ +build/ +.python-version +docker-compose.yml \ No newline at end of file diff --git a/docs/source/table_partitioning.rst b/docs/source/table_partitioning.rst index 9194c41..1c36db0 100644 --- a/docs/source/table_partitioning.rst +++ b/docs/source/table_partitioning.rst @@ -292,6 +292,17 @@ Time-based partitioning count=12, ), ), + + # 24 partitions ahead, each partition is 1 hour, for a total of 24 hours. Starting with hour 0 of current day + # old partitions are never deleted, `max_age` is not set + # partitions will be named `[table_name]_[year]_[month]_[month day number]_[hour (24h)]:00:00`. + PostgresPartitioningConfig( + model=MyPartitionedModel, + strategy=PostgresCurrentTimePartitioningStrategy( + size=PostgresTimePartitionSize(hours=1), + count=24, + ), + ), ]) diff --git a/psqlextra/partitioning/current_time_strategy.py b/psqlextra/partitioning/current_time_strategy.py index 114a1aa..795f60c 100644 --- a/psqlextra/partitioning/current_time_strategy.py +++ b/psqlextra/partitioning/current_time_strategy.py @@ -16,7 +16,8 @@ class PostgresCurrentTimePartitioningStrategy( All buckets will be equal in size and start at the start of the unit. With monthly partitioning, partitions start on the 1st and - with weekly partitioning, partitions start on monday. + with weekly partitioning, partitions start on monday, with hourly + partitioning, partitions start at 00:00. """ def __init__( diff --git a/psqlextra/partitioning/shorthands.py b/psqlextra/partitioning/shorthands.py index 3017527..f263e36 100644 --- a/psqlextra/partitioning/shorthands.py +++ b/psqlextra/partitioning/shorthands.py @@ -16,6 +16,7 @@ def partition_by_current_time( months: Optional[int] = None, weeks: Optional[int] = None, days: Optional[int] = None, + hours: Optional[int] = None, max_age: Optional[relativedelta] = None, name_format: Optional[str] = None, ) -> PostgresPartitioningConfig: @@ -43,6 +44,9 @@ def partition_by_current_time( days: The amount of days each partition should contain. + hours: + The amount of hours each partition should contain. + max_age: The maximum age of a partition (calculated from the start of the partition). @@ -56,7 +60,7 @@ def partition_by_current_time( """ size = PostgresTimePartitionSize( - years=years, months=months, weeks=weeks, days=days + years=years, months=months, weeks=weeks, days=days, hours=hours ) return PostgresPartitioningConfig( diff --git a/psqlextra/partitioning/time_partition.py b/psqlextra/partitioning/time_partition.py index 3c8a4d8..64a8cf8 100644 --- a/psqlextra/partitioning/time_partition.py +++ b/psqlextra/partitioning/time_partition.py @@ -20,6 +20,7 @@ class PostgresTimePartition(PostgresRangePartition): PostgresTimePartitionUnit.MONTHS: "%Y_%b", PostgresTimePartitionUnit.WEEKS: "%Y_week_%W", PostgresTimePartitionUnit.DAYS: "%Y_%b_%d", + PostgresTimePartitionUnit.HOURS: "%Y_%b_%d_%H:00:00", } def __init__( @@ -31,8 +32,8 @@ def __init__( end_datetime = start_datetime + size.as_delta() super().__init__( - from_values=start_datetime.strftime("%Y-%m-%d"), - to_values=end_datetime.strftime("%Y-%m-%d"), + from_values=start_datetime.strftime("%Y-%m-%d %H:00:00"), + to_values=end_datetime.strftime("%Y-%m-%d %H:00:00"), ) self.size = size diff --git a/psqlextra/partitioning/time_partition_size.py b/psqlextra/partitioning/time_partition_size.py index bb0c7e7..b8231dd 100644 --- a/psqlextra/partitioning/time_partition_size.py +++ b/psqlextra/partitioning/time_partition_size.py @@ -13,6 +13,7 @@ class PostgresTimePartitionUnit(enum.Enum): MONTHS = "months" WEEKS = "weeks" DAYS = "days" + HOURS = "hours" UNIX_EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc) @@ -31,9 +32,10 @@ def __init__( months: Optional[int] = None, weeks: Optional[int] = None, days: Optional[int] = None, + hours: Optional[int] = None, anchor: datetime = UNIX_EPOCH, ) -> None: - sizes = [years, months, weeks, days] + sizes = [years, months, weeks, days, hours] if not any(sizes): raise PostgresPartitioningError("Partition cannot be 0 in size.") @@ -56,6 +58,9 @@ def __init__( elif days: self.unit = PostgresTimePartitionUnit.DAYS self.value = days + elif hours: + self.unit = PostgresTimePartitionUnit.HOURS + self.value = hours else: raise PostgresPartitioningError( "Unsupported time partitioning unit" @@ -74,6 +79,9 @@ def as_delta(self) -> relativedelta: if self.unit == PostgresTimePartitionUnit.DAYS: return relativedelta(days=self.value) + if self.unit == PostgresTimePartitionUnit.HOURS: + return relativedelta(hours=self.value) + raise PostgresPartitioningError( "Unsupported time partitioning unit: %s" % self.unit ) @@ -88,14 +96,21 @@ def start(self, dt: datetime) -> datetime: if self.unit == PostgresTimePartitionUnit.WEEKS: return self._ensure_datetime(dt - relativedelta(days=dt.weekday())) - diff_days = (dt - self.anchor).days - partition_index = diff_days // self.value - start = self.anchor + timedelta(days=partition_index * self.value) - return self._ensure_datetime(start) + if self.unit == PostgresTimePartitionUnit.DAYS: + diff_days = (dt - self.anchor).days + partition_index = diff_days // self.value + start = self.anchor + timedelta(days=partition_index * self.value) + return self._ensure_datetime(start) + + if self.unit == PostgresTimePartitionUnit.HOURS: + return self._ensure_datetime(dt.replace(hour=0)) + + raise ValueError("Unknown unit") @staticmethod def _ensure_datetime(dt: Union[date, datetime]) -> datetime: - return datetime(year=dt.year, month=dt.month, day=dt.day) + hour = dt.hour if isinstance(dt, datetime) else 0 + return datetime(year=dt.year, month=dt.month, day=dt.day, hour=hour) def __repr__(self) -> str: return "PostgresTimePartitionSize<%s, %s>" % (self.unit, self.value) diff --git a/tests/test_partitioning_time.py b/tests/test_partitioning_time.py index 1e6e21a..0ab0daf 100644 --- a/tests/test_partitioning_time.py +++ b/tests/test_partitioning_time.py @@ -254,6 +254,56 @@ def test_partitioning_time_daily_apply(): assert table.partitions[6].name == "2019_jun_04" +@pytest.mark.postgres_version(lt=110000) +def test_partitioning_time_hourly_apply(): + """Tests whether automatically creating new partitions ahead hourly works + as expected.""" + + model = define_fake_partitioned_model( + {"timestamp": models.DateTimeField()}, {"key": ["timestamp"]} + ) + + schema_editor = connection.schema_editor() + schema_editor.create_partitioned_model(model) + + # create partitions for the next 4 hours (including the current) + with freezegun.freeze_time("2019-1-23"): + manager = PostgresPartitioningManager( + [partition_by_current_time(model, hours=1, count=4)] + ) + manager.plan().apply() + + table = _get_partitioned_table(model) + assert len(table.partitions) == 4 + assert table.partitions[0].name == "2019_jan_23_00:00:00" + assert table.partitions[1].name == "2019_jan_23_01:00:00" + assert table.partitions[2].name == "2019_jan_23_02:00:00" + assert table.partitions[3].name == "2019_jan_23_03:00:00" + + # re-running it with 5, should just create one additional partition + with freezegun.freeze_time("2019-1-23"): + manager = PostgresPartitioningManager( + [partition_by_current_time(model, hours=1, count=5)] + ) + manager.plan().apply() + + table = _get_partitioned_table(model) + assert len(table.partitions) == 5 + assert table.partitions[4].name == "2019_jan_23_04:00:00" + + # it's june now, we want to partition two hours ahead + with freezegun.freeze_time("2019-06-03"): + manager = PostgresPartitioningManager( + [partition_by_current_time(model, hours=1, count=2)] + ) + manager.plan().apply() + + table = _get_partitioned_table(model) + assert len(table.partitions) == 7 + assert table.partitions[5].name == "2019_jun_03_00:00:00" + assert table.partitions[6].name == "2019_jun_03_01:00:00" + + @pytest.mark.postgres_version(lt=110000) def test_partitioning_time_consistent_daily_apply(): """Ensures that automatic daily partition creation is consistent and @@ -415,11 +465,52 @@ def test_partitioning_time_daily_apply_insert(): model.objects.create(timestamp=datetime.date(2019, 1, 10)) +@pytest.mark.postgres_version(lt=110000) +def test_partitioning_time_hourly_apply_insert(): + """Tests whether automatically created hourly partitions line up + perfectly.""" + + model = define_fake_partitioned_model( + {"timestamp": models.DateTimeField()}, {"key": ["timestamp"]} + ) + + schema_editor = connection.schema_editor() + schema_editor.create_partitioned_model(model) + + # that's a monday + with freezegun.freeze_time("2019-1-07"): + manager = PostgresPartitioningManager( + [partition_by_current_time(model, hours=1, count=2)] + ) + manager.plan().apply() + + table = _get_partitioned_table(model) + assert len(table.partitions) == 2 + + model.objects.create(timestamp=datetime.datetime(2019, 1, 7, 0)) + model.objects.create(timestamp=datetime.datetime(2019, 1, 7, 1)) + + with transaction.atomic(): + with pytest.raises(IntegrityError): + model.objects.create(timestamp=datetime.datetime(2019, 1, 7, 2)) + model.objects.create(timestamp=datetime.datetime(2019, 1, 7, 3)) + + with freezegun.freeze_time("2019-1-07"): + manager = PostgresPartitioningManager( + [partition_by_current_time(model, hours=1, count=4)] + ) + manager.plan().apply() + + model.objects.create(timestamp=datetime.datetime(2019, 1, 7, 2)) + model.objects.create(timestamp=datetime.datetime(2019, 1, 7, 3)) + + @pytest.mark.postgres_version(lt=110000) @pytest.mark.parametrize( "kwargs,partition_names", [ (dict(days=2), ["2018_dec_31", "2019_jan_02"]), + (dict(hours=2), ["2019_jan_01_00:00:00", "2019_jan_01_02:00:00"]), (dict(weeks=2), ["2018_week_53", "2019_week_02"]), (dict(months=2), ["2019_jan", "2019_mar"]), (dict(years=2), ["2019", "2021"]), From 97413d4a028449b6ce06d4304d4c366c51155153 Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Sun, 5 Oct 2025 22:58:41 +0200 Subject: [PATCH 09/15] Fix temporary tablespace directory not existing in Postgres container on CI --- tests/conftest.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 03d01ab..2df5e27 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,9 @@ -import os import tempfile import uuid import pytest +from django.conf import settings from django.contrib.postgres.signals import register_type_handlers from django.db import connection @@ -35,11 +35,24 @@ def django_db_setup(django_db_setup, django_db_blocker): with django_db_blocker.unblock(): qn = connection.ops.quote_name - with tempfile.TemporaryDirectory() as temp_dir: - if not os.path.exists(temp_dir): - os.makedirs(temp_dir) + db_hostname = settings.DATABASES[connection.alias]["HOST"] + with tempfile.TemporaryDirectory() as temp_dir: with connection.cursor() as cursor: + # If the database is remote, like in a CI environment, make + # sure the temporary directory exists in the container + # that PostgreSQL is running. + # + # Note that this only typically works in CI environments + # where we have utter control to execute arbitary commands. + if db_hostname and db_hostname not in ( + "127.0.0.1", + "localhost", + ): + cursor.execute( + f"COPY (select 1) TO PROGRAM 'mkdir --mode=777 -p {temp_dir}'" + ) + cursor.execute( f"CREATE TABLESPACE {qn(custom_tablespace_name)} LOCATION %s", (temp_dir,), From 663ecf63075a26d28b69c626f66a686d85653d8f Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Sun, 5 Oct 2025 23:16:27 +0200 Subject: [PATCH 10/15] Move Docker images to bullseye --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a00cc61..4549c36 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -10,7 +10,7 @@ executors: default: "16.0" debiandist: type: string - default: "buster" + default: "bullseye" docker: - image: python:<< parameters.pyversion >>-<< parameters.debiandist >> - image: postgres:<< parameters.pgversion >> From 37d67d759986a77058b9cda43046138c5a8e9db1 Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Sun, 5 Oct 2025 23:17:13 +0200 Subject: [PATCH 11/15] Add contrib module with utilities that have been helpful over the years --- psqlextra/contrib/README.md | 5 + psqlextra/contrib/__init__.py | 11 + psqlextra/contrib/expressions.py | 47 +++ psqlextra/contrib/model_data_migrator.py | 352 +++++++++++++++++++++++ psqlextra/contrib/static_row.py | 97 +++++++ psqlextra/contrib/transaction.py | 33 +++ 6 files changed, 545 insertions(+) create mode 100644 psqlextra/contrib/README.md create mode 100644 psqlextra/contrib/__init__.py create mode 100644 psqlextra/contrib/expressions.py create mode 100644 psqlextra/contrib/model_data_migrator.py create mode 100644 psqlextra/contrib/static_row.py create mode 100644 psqlextra/contrib/transaction.py diff --git a/psqlextra/contrib/README.md b/psqlextra/contrib/README.md new file mode 100644 index 0000000..296194e --- /dev/null +++ b/psqlextra/contrib/README.md @@ -0,0 +1,5 @@ +# psqlextra.contrib + +This module contains a arbitrary collection of utilities and snippets that build on top of core functionality provided by django-postgres-extra. + +This collection is UNTESTED, UNSUPPORTED and UNDOCUMENTED. They are only provided here as an inspiration. Use at your own risk. diff --git a/psqlextra/contrib/__init__.py b/psqlextra/contrib/__init__.py new file mode 100644 index 0000000..97794eb --- /dev/null +++ b/psqlextra/contrib/__init__.py @@ -0,0 +1,11 @@ +from .model_data_migrator import PostgresModelDataMigrator +from .static_row import StaticRowQueryCompiler, StaticRowQuerySet +from .transaction import no_transaction + +__all__ = [ + "PostgresModelDataMigrator", + "PostgresModelDataMigratorState" "StaticRowQuery", + "StaticRowQueryCompiler", + "StaticRowQuerySet", + "no_transaction", +] diff --git a/psqlextra/contrib/expressions.py b/psqlextra/contrib/expressions.py new file mode 100644 index 0000000..dfc57f7 --- /dev/null +++ b/psqlextra/contrib/expressions.py @@ -0,0 +1,47 @@ +from django.db import models +from django.db.models.expressions import CombinedExpression, Func + + +class Equals(CombinedExpression): + """Expression that constructs `{lhs} = {rhs}`. + + Used as an alternative to Django's `Q` object when the + left-hand side is a aliased field not known to Django. + """ + + connector: str = "=" + + def __init__(self, lhs, rhs) -> None: + super().__init__( + lhs, self.connector, rhs, output_field=models.BooleanField() + ) + + +class Is(Equals): + """Expression that constructs `{lhs} IS {rhs}`.""" + + connector: str = "IS" + + +class GreaterThen(Equals): + """Expression that constructs `{lhs} > {rhs}`.""" + + connector: str = ">" + + +class LowerThenOrEqual(Equals): + """Expression that constructs `{lhs} <= {rhs}`.""" + + connector: str = "<=" + + +class And(Equals): + """Expression that constructs `{lhs} AND {rhs}`.""" + + connector: str = "AND" + + +class Bool(Func): + """Cast to a boolean.""" + + function = "BOOL" diff --git a/psqlextra/contrib/model_data_migrator.py b/psqlextra/contrib/model_data_migrator.py new file mode 100644 index 0000000..35a2dcd --- /dev/null +++ b/psqlextra/contrib/model_data_migrator.py @@ -0,0 +1,352 @@ +# mypy: disable-error-code="attr-defined" + +import json +import os +import time + +from abc import abstractmethod +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import timedelta +from typing import Any, Dict, Type + +from django.db import DEFAULT_DB_ALIAS, connections, models, transaction + +from psqlextra.locking import PostgresTableLockMode, postgres_lock_model +from psqlextra.schema import PostgresSchema +from psqlextra.settings import ( + postgres_prepend_local_search_path, + postgres_set_local, +) + +from .transaction import no_transaction + + +@dataclass +class PostgresModelDataMigratorState: + id: str + work_schema: PostgresSchema + backup_schema: PostgresSchema + default_schema: PostgresSchema + storage_settings: Dict[str, Any] + + +class PostgresModelDataMigrator: + """Helps altering/moving large amounts of data in a table quickly without + interruptions. + + In simple terms: This class temporarily drops all indices + and constraints from a table to speed up writes. + + In complicated terms: + + 1. Create copy of the table without indices or constraints + in a separate schema. + + The clone is made in a separate schema so that there + are no naming conflicts and there is no need to rename + anything. + + 2. Allow the caller to fill the copy. + + This will be an order of magnitude faster because + there are no indices to build or constraints to + statisfy. You are responsible for making sure the + data is ok and will statisfy the constraints when + they come back. + + 3. Add the indices and constraints to the table. + + This takes time, but it's still a lot faster than + the indices being built incrementally. + + 4. Allow the caller to clean up the copied table. + + With the indices back in place, filtering the copied + table should be fast. Perfect time to clean up + some data. + + 5. Vacuum+Analyze the table. + + Vacuuming ensures we don't risk transaction ID + wrap-around and analyzing ensures up-to-date + statistics. + + 6. Start a transaction. + + 7. Lock the real table in EXCLUSIVE mode. + + This blocks writes or modifications to the table, + but does not block readers. + + 8. Allow the caller to move some data from the real table + into the copied one. + + This is the perfect time to copy any data that was + written to the real table since the migration process + began. Since the original table is locked, you can + be sure no more rows are being added or modified. + + 9. Move the original table into a backup schema. + + This allows it to be quickly restored manually + if the migration is broken in any way. + + 10. Move the copied table in place of the real one. + + 11. Commit the transaction, which releases the lock. + + The process is very similiar to how pg_repack rewrites + an entire table without long-running locks on the table. + + Attributes: + model: The model to migrate. + using: Optional name of the database connection to use. + operation_timeout: Maximum amount of time a single statement + can take. + """ + + model: Type[models.Model] + using: str = DEFAULT_DB_ALIAS + operation_timeout: timedelta + + def __init__(self, logger) -> None: + self.logger = logger + self.connection = connections[self.using] + self.schema_editor = self.connection.schema_editor(atomic=False) + + @abstractmethod + def fill_cloned_table_lockless( + self, work_schema: PostgresSchema, default_schema: PostgresSchema + ) -> None: + """Moment to fill the cloned table with data.""" + + @abstractmethod + def clean_cloned_table( + self, work_schema: PostgresSchema, default_schema: PostgresSchema + ) -> None: + """Moment to clean the filled table after it has indices and validated + data.""" + + @abstractmethod + def fill_cloned_table_locked( + self, work_schema: PostgresSchema, default_schema: PostgresSchema + ) -> None: + """Moment to do final cleaning while the original table is locked for + writing.""" + + @no_transaction( + why="The transaction would be too big and some statements cannot be run in a transaction." + ) + def migrate(self) -> PostgresModelDataMigratorState: + start_time = time.time() + + with self.atomic(): + with self.connection.cursor() as cursor: + storage_settings = ( + self.connection.introspection.get_storage_settings( + cursor, self.table_name + ) + ) + + state = PostgresModelDataMigratorState( + id=os.urandom(4).hex(), + work_schema=PostgresSchema.create_random( + f"migrate_{self.table_name}", using=self.using + ), + backup_schema=PostgresSchema.create_random( + f"backup_{self.table_name}", using=self.using + ), + default_schema=PostgresSchema.default, + storage_settings=storage_settings, + ) + + logger = self.logger.bind(id=state.id) + logger.info( + f"Starting migration of {self.table_name}", + data=json.dumps( + { + "work_schema": state.work_schema.name, + "backup_schema": state.backup_schema.name, + "default_schema": state.default_schema.name, + "storage_settings": state.storage_settings, + } + ), + ) + + count = self.model.objects.using(self.using).count() + logger.info(f"Found {count} records in {self.table_name}") + + phases = [ + (self._migrate_phase_1, "cloning and filling table"), + (self._migrate_phase_2, "adding constraints and indexes"), + (self._migrate_phase_3, "cleaning up and vacuuming"), + (self._migrate_phase_4, "swapping"), + ] + + for index, (phase, description) in enumerate(phases): + phase_start_time = time.time() + logger.info( + f"Starting phase #{index + 1} of migrating {self.table_name}: {description}" + ) + phase(state) + logger.info( + f"Finished phase #{index + 1} of migrating {self.table_name}: {description}", + task_time=time.time() - phase_start_time, + ) + + state.work_schema.delete(cascade=True, using=self.using) + + logger.info( + f"Finished migrating {self.table_name}", + task_time=time.time() - start_time, + ) + + return state + + def _migrate_phase_1(self, state: PostgresModelDataMigratorState) -> None: + """Clone the table without constraints or indices.""" + + with self.atomic(): + self.schema_editor.clone_model_structure_to_schema( + self.model, schema_name=state.work_schema.name + ) + + # Disable auto-vacuum on the cloned table to prevent + # it from consuming excessive resources _while_ we're + # writing to it. We're running this manually before + # we turn it back on in the last phase. + with postgres_prepend_local_search_path( + [state.work_schema.name], using=self.using + ): + self.schema_editor.alter_model_storage_setting( + self.model, "autovacuum_enabled", "false" + ) + + # Let the derived class fill our cloned table + self.fill_cloned_table_lockless(state.work_schema, state.default_schema) + + def _migrate_phase_2(self, state: PostgresModelDataMigratorState) -> None: + """Add indices and constraints to the cloned table.""" + + # Add indices and constraints to the temporary table + # This could be speed up by increasing `maintenance_work_mem` + # and `max_parallel_workers_per_gather`, but we won't as + # it'll consume more I/O, potentially disturbing normal traffic. + with self.atomic(): + self.schema_editor.clone_model_constraints_and_indexes_to_schema( + self.model, schema_name=state.work_schema.name + ) + + # Validate foreign keys + # + # The foreign keys have been added in NOT VALID mode so they + # only validate new rows. Validate the existing rows. + # + # This is a two-step process to avoid a AccessExclusiveLock + # on the referenced tables. + with self.atomic(): + self.schema_editor.clone_model_foreign_keys_to_schema( + self.model, schema_name=state.work_schema.name + ) + + def _migrate_phase_3(self, state: PostgresModelDataMigratorState) -> None: + """Clean & finalize the cloned table.""" + + # Let the derived class do some clean up on the temporary + # table now that we have indices and constraints. + with self.atomic(): + self.clean_cloned_table(state.work_schema, state.default_schema) + + # Finalize the copy by vacuuming+analyzing it + # + # VACUUM: There should not be much bloat since the table + # is new, but the clean up phase might have generated some. + # + # We mostly VACUUM to reset the transaction ID and prevent + # transaction ID wraparound. + # + # ANALYZE: The table went from 0 to being filled, by running ANALYZE, + # we update the statistics, allowing the query planner to + # make good decisions. + with postgres_prepend_local_search_path( + [state.work_schema.name], using=self.using + ): + self.schema_editor.vacuum_model(self.model, analyze=True) + + # Re-enable autovacuum on the cloned table + with postgres_prepend_local_search_path( + [state.work_schema.name], using=self.using + ): + autovacuum_enabled = state.storage_settings.get( + "autovacuum_enabled" + ) + if autovacuum_enabled: + self.schema_editor.alter_model_storage_setting( + self.model, "autovacuum_enabled", autovacuum_enabled + ) + else: + self.schema_editor.reset_model_storage_setting( + self.model, "autovacuum_enabled" + ) + + def _migrate_phase_4(self, state: PostgresModelDataMigratorState) -> None: + """Replace the original table with the cloned one.""" + + with self.atomic(): + # Lock the original table for writing so that the caller + # is given a chance to do last-minute moving of data. + postgres_lock_model( + self.model, PostgresTableLockMode.EXCLUSIVE, using=self.using + ) + + # Let derived class finalize the temporary table while the + # original is locked. Not much work should happen here. + self.fill_cloned_table_locked( + state.work_schema, state.default_schema + ) + + # Move the original table into the backup schema. + # Disable autovacuum on it so we don't waste resources + # keeping it clean. + self.schema_editor.alter_model_storage_setting( + self.model, "autovacuum_enabled", "false" + ) + self.schema_editor.alter_model_schema( + self.model, state.backup_schema.name + ) + + # Move the cloned table in place of the original + with postgres_prepend_local_search_path( + [state.work_schema.name], using=self.using + ): + self.schema_editor.alter_model_schema( + self.model, state.default_schema.name + ) + + @property + def model_name(self) -> str: + return self.model.__name__ + + @property + def table_name(self) -> str: + return self.model._meta.db_table + + @contextmanager + def atomic(self): + """Creates a atomic transaction with run-time parameters tuned for a + live migration. + + - Statement/idle timeout set to prevent runaway queries + from continuing long after the migrator was killed. + - No parallel works to keep I/O under control. + """ + + with transaction.atomic(durable=True, using=self.using): + with postgres_set_local( + statement_timeout=f"{self.operation_timeout.total_seconds()}s", + idle_in_transaction_session_timeout=f"{self.operation_timeout.total_seconds()}s", + max_parallel_workers_per_gather=0, + using=self.using, + ): + yield diff --git a/psqlextra/contrib/static_row.py b/psqlextra/contrib/static_row.py new file mode 100644 index 0000000..a89905a --- /dev/null +++ b/psqlextra/contrib/static_row.py @@ -0,0 +1,97 @@ +from typing import Any, List, Optional, Tuple, Type, TypeVar, cast + +from django.db import DEFAULT_DB_ALIAS, connections, models +from django.db.models.expressions import Value +from django.db.models.query import RawQuerySet +from django.db.models.sql import Query +from django.db.models.sql.compiler import SQLCompiler + +TModel = TypeVar("TModel", bound=models.Model) + + +class StaticRowQueryCompiler(SQLCompiler): + has_extra_select = False + + def as_sql(self, *args, **kwargs): + cols = [] + params = [] + + select, _, _ = self.get_select() + + for _, (s_sql, s_params), s_alias in select: + cols.append( + "%s AS %s" + % ( + s_sql, + self.connection.ops.quote_name(s_alias), + ) + ) + + params.extend(s_params) + + return f"SELECT {', '.join(cols)}", tuple(params) + + +class StaticRowQuery(Query): + def __init__( + self, model: Type[models.Model], using: str = DEFAULT_DB_ALIAS + ): + self.using = using + + super().__init__(model) + + def get_columns(self): + return list(self.annotations.keys()) + + def get_compiler( + self, using: Optional[str] = None, connection=None, elide_empty=True + ): + using = using or self.using + + compiler = StaticRowQueryCompiler( + self, connection or connections[using], using + ) + compiler.setup_query() + + return compiler + + def __iter__(self): + compiler = self.get_compiler() + + cursor = compiler.connection.cursor() + cursor.execute(*compiler.as_sql()) + + return iter(cursor) + + +class StaticRowQuerySet(RawQuerySet): + """Query set that compiles queries that don't select from anything and have + their values hard-coded. + + Example: + + >>> SELECT 'mystring' AS something, -1 AS somethingelse; + + This is used when you want to add some rows to a result + set using UNION in SQL. + """ + + def __init__( + self, + model: Type[models.Model], + row: List[Tuple[str, Value]], + using: str = DEFAULT_DB_ALIAS, + ) -> None: + query = StaticRowQuery(model, using) + query.default_cols = False + query.annotations = dict(row) + + sql, params = query.sql_with_params() + + # cast(Tuple[Any], params) because `RawQuerySet.__init_` is mistyped + super().__init__( + raw_query=sql, + model=model, + query=query, + params=cast(Tuple[Any], params), + ) diff --git a/psqlextra/contrib/transaction.py b/psqlextra/contrib/transaction.py new file mode 100644 index 0000000..796246c --- /dev/null +++ b/psqlextra/contrib/transaction.py @@ -0,0 +1,33 @@ +from contextlib import contextmanager +from typing import Optional + +from django.conf import settings +from django.core.exceptions import SuspiciousOperation +from django.db import DEFAULT_DB_ALIAS, connections + + +def _is_in_test(): + return ( + getattr(settings, "TEST_MODE", False) + or getattr(settings, "TESTING", False) + or getattr(settings, "TEST", False) + ) + + +@contextmanager +def no_transaction(*, why: str, using: Optional[str] = None): + """Prevents a method or a block from running in a database transaction.""" + + # During tests, allow one level of transaction.atomic(..) nesting + # because tests themselves run in a transaction. If there's only + # one level of nesting, it's from the test itself and the code + # would actually run without a transaction outside the test. + + connection = connections[using or DEFAULT_DB_ALIAS] + + if connection.in_atomic_block and not ( + _is_in_test() and len(connection.savepoint_ids) <= 1 + ): + raise SuspiciousOperation(f"Unexpected database transaction: {why}") + + yield From 7eccfd3e6fa75ddde68fd57143e013a7ca509e28 Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Sun, 5 Oct 2025 23:25:44 +0200 Subject: [PATCH 12/15] Add additional check to try to distingush between running tests on CI vs locally --- tests/conftest.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 2df5e27..25996fa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -35,6 +35,7 @@ def django_db_setup(django_db_setup, django_db_blocker): with django_db_blocker.unblock(): qn = connection.ops.quote_name + db_user = settings.DATABASES[connection.alias]["USER"] db_hostname = settings.DATABASES[connection.alias]["HOST"] with tempfile.TemporaryDirectory() as temp_dir: @@ -45,9 +46,13 @@ def django_db_setup(django_db_setup, django_db_blocker): # # Note that this only typically works in CI environments # where we have utter control to execute arbitary commands. - if db_hostname and db_hostname not in ( - "127.0.0.1", - "localhost", + if db_user or ( + db_hostname + and db_hostname + not in ( + "127.0.0.1", + "localhost", + ) ): cursor.execute( f"COPY (select 1) TO PROGRAM 'mkdir --mode=777 -p {temp_dir}'" From f59417abaf066b65ce673124a718223244fb83b4 Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Sun, 5 Oct 2025 23:27:28 +0200 Subject: [PATCH 13/15] Use explicit flag to detect database in container during tests --- .circleci/config.yml | 1 + settings.py | 2 ++ tests/conftest.py | 13 +------------ 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 4549c36..c926b2c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -45,6 +45,7 @@ commands: command: tox --listenvs | grep ^py<< parameters.pyversion >> | circleci tests split | xargs -n 1 tox -e environment: DATABASE_URL: 'postgres://psqlextra:psqlextra@localhost:5432/psqlextra' + DATABASE_IN_CONTAINER: 'true' jobs: test-python36: diff --git a/settings.py b/settings.py index 7ece171..2a5e0fa 100644 --- a/settings.py +++ b/settings.py @@ -43,3 +43,5 @@ def _parse_db_url(/service/url: str): USE_TZ = True TIME_ZONE = 'UTC' + +DATABASE_IN_CONTAINER = os.environ.get('DATABASE_IN_CONTAINER') == 'true' diff --git a/tests/conftest.py b/tests/conftest.py index 25996fa..9620d12 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,10 +34,6 @@ def django_db_setup(django_db_setup, django_db_blocker): with django_db_blocker.unblock(): qn = connection.ops.quote_name - - db_user = settings.DATABASES[connection.alias]["USER"] - db_hostname = settings.DATABASES[connection.alias]["HOST"] - with tempfile.TemporaryDirectory() as temp_dir: with connection.cursor() as cursor: # If the database is remote, like in a CI environment, make @@ -46,14 +42,7 @@ def django_db_setup(django_db_setup, django_db_blocker): # # Note that this only typically works in CI environments # where we have utter control to execute arbitary commands. - if db_user or ( - db_hostname - and db_hostname - not in ( - "127.0.0.1", - "localhost", - ) - ): + if settings.DATABASE_IN_CONTAINER: cursor.execute( f"COPY (select 1) TO PROGRAM 'mkdir --mode=777 -p {temp_dir}'" ) From d7cd98e4f87125941ea120475b86fee763c6f04a Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Sun, 5 Oct 2025 23:30:44 +0200 Subject: [PATCH 14/15] Pass through `DATABASE_IN_CONTAINER` in tox --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 697d1c4..a5a33ed 100644 --- a/tox.ini +++ b/tox.ini @@ -27,5 +27,5 @@ deps = .[test] setenv = DJANGO_SETTINGS_MODULE=settings -passenv = DATABASE_URL +passenv = DATABASE_URL, DATABASE_IN_CONTAINER commands = poe test From 5b1f9f759bae6166e62cf57d898e5b578bba9b13 Mon Sep 17 00:00:00 2001 From: Swen Kooij Date: Sat, 18 Oct 2025 22:07:10 +0200 Subject: [PATCH 15/15] Make tox v3.x pass `DATABASE_URL` into env --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index c926b2c..49eac3a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -46,6 +46,7 @@ commands: environment: DATABASE_URL: 'postgres://psqlextra:psqlextra@localhost:5432/psqlextra' DATABASE_IN_CONTAINER: 'true' + TOX_TESTENV_PASSENV: 'DATABASE_URL DATABASE_IN_CONTAINER' jobs: test-python36: