GraphQL Subscriptions¶
django-graphex ships an optional GraphQL subscriptions engine built
on Django Channels 4. It is the modern
successor of the standalone graphene-django-subscriptions package, merged here
as an opt-in extra so that the base install never depends on channels.
Optional extra
Subscriptions are not installed by default. The base package never
imports channels. You opt in explicitly:
Importing django_graphex.subscriptions without the extra raises a
friendly error telling you to install it.
How it works¶
The engine preserves the original two-channel wire protocol, reimplemented on Channels 4:
- A client opens a WebSocket. On connect the server replies with a handshake
frame
{"channel_id": "<id>", "connect": "success"}. - The client sends a GraphQL
subscription { ... }operation over HTTP, echoing thechannelIdfrom the handshake. The resolver joins (or leaves) the relevant broadcast groups and returns a single confirmation{ok, error, stream, operation, action}. - When a model instance changes, an in-house signal binding serializes it once and broadcasts the payload to the group. Every subscribed WebSocket receives the notification, projected to the fields it requested.
A cross-process channel layer (Redis) is required when the HTTP and WebSocket processes are separate; the in-memory layer is fine for development.
Try it interactively
Add the browser client view to your URLConf to connect, subscribe and watch notifications stream in — straight from the browser, served from your own server.
1. Define a subscription¶
A Subscription is declared like a DjangoModelType, through Meta:
from django_graphex.subscriptions import Subscription
from myapp.models import User
class UserSubscription(Subscription):
class Meta:
model = User # required, a Django model class
stream = "users" # required, a str
queryset = None # optional
description = "User Subscription" # optional
serialize_data = None # optional, see "Notification payload"
The notification payload is serialized through the native (Pydantic) backend.
This generates, with the same names and semantics as the legacy package:
- Output fields:
ok,error,stream,operation,action. - Arguments:
channelId(required),action(required),operation(required),id(optional), and — only in full-payload mode —data(optional,[<Model>FieldsEnum]). See Notification payload. - Enums:
ActionSubscriptionEnum {CREATE, UPDATE, DELETE, ALL_ACTIONS},OperationSubscriptionEnum {SUBSCRIBE, UNSUBSCRIBE}and a generated<Model>Fieldsenum (full-payload mode only).
2. Mount it on the schema¶
import graphene
from myapp.subscriptions import UserSubscription
class Subscription(graphene.ObjectType):
user_subscription = UserSubscription.Field()
schema = graphene.Schema(query=Query, subscription=Subscription)
3. Wire the consumer and routing¶
# myapp/consumers.py
from django_graphex.subscriptions import GraphqlAPIDemultiplexer
from myapp.subscriptions import UserSubscription
class AppDemultiplexer(GraphqlAPIDemultiplexer):
subscriptions = {"users": UserSubscription}
# project/asgi.py
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")
django_asgi_app = get_asgi_application()
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import path
from myapp.consumers import AppDemultiplexer
application = ProtocolTypeRouter(
{
"http": django_asgi_app,
"websocket": URLRouter(
[path("ws/", AppDemultiplexer.as_asgi())]
),
}
)
# settings.py
ASGI_APPLICATION = "project.asgi.application"
# Development: in-memory layer. Production: use a Redis layer.
CHANNEL_LAYERS = {
"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"},
}
# Production example:
# CHANNEL_LAYERS = {
# "default": {
# "BACKEND": "channels_redis.core.RedisChannelLayer",
# "CONFIG": {"hosts": [("127.0.0.1", 6379)]},
# },
# }
From a DjangoModelType (one definition)¶
If you already use a DjangoModelType for
queries and mutations, you can get its subscription from the same class — no
separate Subscription subclass. Add stream (and optionally serialize_data)
to its Meta:
from myapp.models import User
class UserModelType(DjangoModelType):
class Meta:
model = User
stream = "users" # enables the subscription
serialize_data = True # optional; defaults to the global setting
Mount it on the schema and wire the consumer:
# schema.py
class Subscription(graphene.ObjectType):
user_subscription = UserModelType.SubscriptionField()
# consumers.py — the demultiplexer also accepts an iterable (set/list); the
# stream is derived from each entry's Meta.stream, so you never repeat it.
class AppDemultiplexer(GraphqlAPIDemultiplexer):
subscriptions = {UserModelType} # or mix: {UserModelType, OtherSubscription}
UserModelType.subscription_type() builds (and caches) the Subscription
lazily, so the base install stays Channels-free until you actually wire a
subscription. The generated subscription supports the same arguments — including
filters. Setting Meta.stream is required to use
SubscriptionField() / subscription_type().
Authorization and row-scoping¶
The generated subscription honors the type's authorization and scoping:
permission_classes/authorizegate the subscribe itself. Authorize runs at registration (the HTTP request, soinfo.context.useris available) for the read-like"subscribe"action; a denial yieldsok: False/errorand no group is joined.IsAuthenticatedOrReadOnlytherefore lets anyone subscribe to a public stream, whileIsAuthenticatedrequires a login.subscription_scope(info, **kwargs)returns a server-forced filter mapping (e.g.{"owner": info.context.user.pk}). It is evaluated at subscribe time and enforced per event at delivery, merged over the clientfilterswith server precedence — the client can neither widen nor drop it. Equality scopes on a serialized field (likeowner) are decided in memory, so there is no per-event query.
from myapp.models import Note
class NoteModelType(DjangoModelType):
permission_classes = [IsAuthenticated]
class Meta:
model = Note
stream = "notes"
serialize_data = True
@classmethod
def subscription_scope(cls, info, **kwargs):
return {"owner": info.context.user.pk} # only my notes
Why not filter_queryset?
filter_queryset is an opaque queryset transform used by the query/list
resolvers; it cannot be applied to a single changed instance without a
per-event query (and needs the request at delivery time). subscription_scope
returns a plain mapping instead, so scoping is enforced in memory and the
WebSocket connection itself does not need to be authenticated.
Indexed groups (optional, for high fan-out)¶
With only subscription_scope, a change is broadcast to one group per
model+action and every subscriber's connection is woken to evaluate the filter
in memory (cheap, but O(subscribers)). For streams with very many concurrent
subscribers partitioned by a value (per owner, per tenant, per room), declare
subscription_index_fields to route each change to a value-scoped group so
only the matching subscribers are woken:
from myapp.models import Message
class MessageModelType(DjangoModelType):
class Meta:
model = Message
stream = "messages"
serialize_data = True
subscription_index_fields = ("tenant", "room") # compound (AND) index
@classmethod
def subscription_scope(cls, info, **kwargs):
return {"tenant": info.context.tenant_id, "room": kwargs.get("room")}
How it works: at subscribe the group name gets a canonical suffix built from the
scope (e.g. messages-create:room=42&tenant=7); at broadcast the same suffix
is built from the changed instance (reading each field's attname, so a foreign
key yields its raw id without a query). The two names match by construction, so
Channels delivers only to that group -- no group enumeration is involved.
- Opt-in and additive. Omit it and everything works exactly as before.
- Must be a subset of the scope. Every index field has to be present in what
subscription_scopereturns; if any is missing the subscriber transparently falls back to the coarse group (still correct, just not narrowed). - Good index fields are high-cardinality equality keys (FKs like
owner,tenant,room). A low-cardinality boolean would only create two groups and buy nothing. - Independent of
serialize_data. The index reads the live instance, not the payload, so it works in id-only mode too. - The full filter is still applied on delivery, so indexing is purely a routing optimization -- correctness never depends on it.
4. Serve subscriptions over HTTP¶
The standard GraphQLView cannot execute subscription operations. Use
SubscriptionGraphQLView, which resolves the one-shot subscribe/unsubscribe
confirmation:
# urls.py
from django.urls import path
from django_graphex.subscriptions import SubscriptionGraphQLView
urlpatterns = [
path("graphql", SubscriptionGraphQLView.as_view(graphiql=True)),
]
Browser client view¶
SubscriptionClientView serves a self-contained HTML page (WebSocket + GraphQL)
to try subscriptions live. Add it to your URLConf like the admin — because it is
served from your own origin, there is no CORS to configure:
# urls.py
from django_graphex.subscriptions import SubscriptionClientView
urlpatterns = [
...,
path("graphql/client/", SubscriptionClientView.as_view()),
]
Open /graphql/client/: it connects the WebSocket (capturing the
channel_id), lets you send a subscription { … } over HTTP, and streams the
notifications back. The editor supports Tab to indent (Shift+Tab to outdent)
and schema-aware autocomplete — it introspects the configured HTTP endpoint
and suggests types, fields, arguments and enum values as you type (Ctrl+Space to
trigger, Enter/Tab to accept). If introspection is disabled on the server,
autocomplete falls back to GraphQL keywords only. The endpoints default to the
page's own host with the routes /ws/graphql/ and /graphql; override them if
yours differ:
path(
"graphql/client/",
SubscriptionClientView.as_view(ws_path="/ws/graphql/", http_path="/graphql"),
),
5. Subscribe from a client¶
subscription {
userSubscription(
channelId: "the-channel-id-from-the-handshake"
action: UPDATE
operation: SUBSCRIBE
id: 5
data: [USERNAME, EMAIL]
) {
ok
error
stream
operation
action
}
}
action: ALL_ACTIONSsubscribes toCREATE,UPDATEandDELETE.- Omit
idto subscribe to every instance for that action. - The
dataargument only exists in full-payload mode (see Notification payload). When present, omit it to receive the full serialized payload, or pass a field list to project the notificationdatadown to exactly those fields. In the default id-only mode there is nodataargument. - Use
operation: UNSUBSCRIBE(same arguments) to stop receiving events.
Filtering notifications¶
id scopes by the changed object's own primary key. To scope by field
values instead — e.g. a post-detail page that should only receive the comments
of that post — pass the optional filters argument: a mapping of Django ORM
lookup to value.
subscription ($channelId: String!) {
commentSubscription(
channelId: $channelId
action: ALL_ACTIONS
operation: SUBSCRIBE
filters: { post: 7 } # only comments whose post == 7
) {
ok error stream operation action
}
}
Filters are evaluated per connection at delivery time:
- Equality filters (
{post: 7}) are decided in memory against the serialized payload — no extra query. - Lookups (
{text__icontains: "urgent"},{created__gte: "..."}) fall back to a single-row database check, so any Django lookup works. - Combine them:
filters: { post: 7, text__icontains: "urgent" }. - Omitting
filterskeeps the previous behavior (every event in the group).
Delete + lookup filters
On a delete the row no longer exists, so only the in-memory (equality)
path applies. With serialize_data = True the payload still carries the
field values, so equality filters work on deletes; with id-only payloads,
non-pk filters cannot be evaluated on delete and the notification is dropped.
Scoping vs. security
Client-supplied filters are a convenience scope, not an authorization
boundary. To enforce row-level access (e.g. "only my records"), gate the
subscription with private_subscription / your auth layer rather than
relying on a client-provided filter.
Each subscribed WebSocket receives notifications shaped like this in the default id-only mode:
…or, in full-payload mode, with the serialized instance (optionally
projected to the requested data fields):
{
"stream": "users",
"payload": {
"action": "update",
"model": "auth.user",
"data": {"username": "neo", "email": "neo@example.com"}
}
}
Notification payload¶
On every model change the binding builds the notification data. By default it
is id-only — {"id": <pk>} — which skips serialization entirely (the
fastest option for hot models, where clients typically refetch on notification).
Set it to full to serialize the whole instance through the native (Pydantic)
backend.
Two controls, in order of precedence:
| Control | Values | Effect |
|---|---|---|
Meta.serialize_data |
None (default), True, False |
Per-subscription. None inherits the global setting; True = full; False = id-only. |
DJANGO_GRAPHEX["SUBSCRIPTION_SERIALIZE_DATA"] |
False (default), True |
Global default for subscriptions that don't override it. |
# settings.py — make every subscription serialize the full instance by default
DJANGO_GRAPHEX = {
"SUBSCRIPTION_SERIALIZE_DATA": True,
}
# ...or decide per subscription, regardless of the global default
class UserSubscription(Subscription):
class Meta:
model = User
stream = "users"
serialize_data = True # full payload for this one; True/False/None
id-only is the default
graphene-django-subscriptions always sent the full serialized instance.
django-graphex defaults to id-only for performance. In id-only mode the
subscription has no data
argument and no <Model>Fields enum (there are no fields to pick), so
clients that sent data: [...] must either drop it or opt back into full
mode via Meta.serialize_data = True / the global setting. The argument's
presence is fixed when the subscription class is defined.
Per-connection field selection¶
In full-payload mode, field projection is tracked per connection, not on
the shared serializer, so two clients subscribed to the same group with
different data arguments each receive their own projection. (This fixes a
global-state race present in the legacy implementation.)
Migrating from graphene-django-subscriptions¶
The old package is now a thin, deprecated shim that re-exports from here. To migrate:
- Install the extra:
uv add "django-graphex[subscriptions]"(orpip install "django-graphex[subscriptions]"). - Update imports to
django_graphex.subscriptions(old import paths keep working for one deprecation cycle, emitting aDeprecationWarning). - Remove
channels_apifromINSTALLED_APPS. - Replace
routing.py+CHANNEL_LAYERS.ROUTINGwithasgi.py+ASGI_APPLICATION(ProtocolTypeRouter/URLRouter). - Replace the
GRAPHENE.MIDDLEWAREdepromise_subscriptionentry with the URL served bySubscriptionGraphQLView. - Rename
consumers = {stream: Sub.get_binding().consumer}tosubscriptions = {stream: Sub}(the old form still works, with a warning). - Notifications are now id-only by default. If your clients relied on the
full serialized payload (or sent a
datafield list), setMeta.serialize_data = Trueon those subscriptions orDJANGO_GRAPHEX["SUBSCRIPTION_SERIALIZE_DATA"] = Trueglobally. See Notification payload. - Configure a Redis channel layer for multi-process deployments.