Skip to content

feat(graphql): added support for graphql subscriptions to work for actions#6904

Open
psihius wants to merge 11 commits intoapi-platform:mainfrom
psihius:feature/graphql-subscription-for-all-operations
Open

feat(graphql): added support for graphql subscriptions to work for actions#6904
psihius wants to merge 11 commits intoapi-platform:mainfrom
psihius:feature/graphql-subscription-for-all-operations

Conversation

@psihius
Copy link
Contributor

@psihius psihius commented Jan 9, 2025

Q A
Branch? 4.0 or main
Tickets None that I found
License MIT
Doc PR

This PR makes create and delete operations issue mercure updates for GraphQL subscriptions.
It also adds the ability to configure "restrictive updates" - https://api-platform.com/docs/core/mercure/#dispatching-restrictive-updates-security-mode - implementation follows the same idea, just one that fits GraphQL subscriptions.

Some things that need feedback/work/ideas:

  • Private fields config option probably should also use expression language same way topics do
  • Caching item of the subscriptionCollection is going to become quite sizeable when you have a lot of segmentation based on owners of the data and I don't have a good idea how to segment that - operation context is very restricted in the doctrine listener and I don't have a good knowledge of the internals to properly construct one, but I did my best. Best idea I have is to read the object's data and use it as part of the cache key (the private fields option). One problem is when deleting data - when in publishUpdate, the object data is already gone since it constructs and passes and stdObject with very limited data, so I do not have context at that point.
  • Mercure payload format probably needs updating - with it now containing all 3 types of operations, I think it needs a field that contains what type of operation it was - create, update or deleted so mercure consumers can decide what to do
  • Deleted stdObject contains too little information, kind'a want ability to retain some of the data in there, like for example the parent relation field that points to the parent ID the record was deleted for. No ideas as of now how to implement that besides adding more configurations to mercure and modifying publish listener's deleteObject if branch to retain extra fields from the resource, kind'a like i did with private_fields in the SubscriptionManager and graphql object context.

This is a WIP because I need feedback on the direction taken and suggestions for improving some aspects of it.

This code is being used in production, works for our use case, but it's obviously far from ideal and I want to improve it and make it standard part of ApiPlatform.

The goal for this is to make managing items on frontend easy purely via mercure updates via graphql subscriptions. In our case it's a chat like frontend that needs to not only receive updates, but also new data (new messages, new chats) and data removal messages. This puts GraphQL API on par with functionality of the REST API where you can have restrictive data updates AND you can subscribe to collections which include all 3 types of data events.

@psihius
Copy link
Contributor Author

psihius commented Jan 15, 2025

@soyuka @dunglas could you provide some feedback please, so I can decide how I want to continue working on this? :)

Copy link
Member

@soyuka soyuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Private fields config option probably should also use expression language same way topics do

Not sure to understand this functionality, did I miss something?

  • Caching item of the subscriptionCollection is going to become quite sizeable when you have a lot of segmentation based on owners of the data and I don't have a good idea how to segment that

I don't think we should send collection updates, only items inside a collection should be sufficient.

When a data is deleted you can still send an update on its IRI (as mercure topic) with a null value? I'm not sure why data should be part of a cache key, instead we should use IRIs just as in REST? Or is it that you want to send a mercure update with only the fields selected in the GraphQl Query?

  • Mercure payload format probably needs updating - with it now containing all 3 types of operations, I think it needs a field that contains what type of operation it was - create, update or deleted so mercure consumers can decide what to do

IMO, we should always consider a mercure update as a modified element, it's a new one if you don't have it in your referential, or else it's updated. You can detect it's deleted when the value is null.

  • Deleted stdObject contains too little information, kind'a want ability to retain some of the data in there, like for example the parent relation field that points to the parent ID the record was deleted for.

Not sure why we would need this. It makes things really complicated when we look for data associations, when do you stop looking for relations?

@psihius
Copy link
Contributor Author

psihius commented Jan 17, 2025

  • Private fields config option probably should also use expression language same way topics do

Not sure to understand this functionality, did I miss something?

https://api-platform.com/docs/core/mercure/#dispatching-restrictive-updates-security-mode - this, but for the graphql subscription. It's a way to restrict data pushing to specific user/resource/whatever way you want to restrict access based on resource field values. I use it to restrict access between accounts in SaaS service.

  • Caching item of the subscriptionCollection is going to become quite sizeable when you have a lot of segmentation based on owners of the data and I don't have a good idea how to segment that

I don't think we should send collection updates, only items inside a collection should be sufficient.
This already does not happen. When a resource already exists, it hits the individual resource URI cache, so it doesn't access the collection cache item.

$iri = $this->iriConverter->getIriFromResource($object);
// Find based on IRI if there's an item already cached. If resource is not being created - this always returns the subscription array, meaning the collection iri is never accessed.
$subscriptions = $this->getSubscriptionsFromIri($iri);
if ($subscriptions === []) {
    // Get subscriptions from collection Iri
    $subscriptions = $this->getSubscriptionsFromIri($this->getCollectionIri($iri));
}

As you can see here, it first tries to get subscriptions from full IRI. If it's an update operation - the item is already in cache, if not - it goes to collection cache (which would happen only for create operation).
As I understand, the underlying cache is system cache, so data is permanently in storage, meaning it will access collection cache only for create action, never for update or delete.

When a data is deleted you can still send an update on its IRI (as mercure topic) with a null value? I'm not sure why data should be part of a cache key, instead we should use IRIs just as in REST? Or is it that you want to send a mercure update with only the fields selected in the GraphQl Query?
Because that data is used to find the correct subscription id that has exactly the same field set with the same values in the private fields section.

if ($private && $privateFields && $previousObject) {
    foreach ($options['private_fields'] as $privateField) {
        $fields['__private_field_'.$privateField] = $this->getResourceId($privateField, $previousObject);
    }
}
$subscriptionsCacheItem = $this->subscriptionsCache->getItem($this->encodeIriToCacheKey($iri));
$subscriptions = [];
if ($subscriptionsCacheItem->isHit()) {
    $subscriptions = $subscriptionsCacheItem->get();
    foreach ($subscriptions as [$subscriptionId, $subscriptionFields, $subscriptionResult]) {
        if ($subscriptionFields === $fields) {
            return $subscriptionId;
        }
    }
}

See that second foreach there? It filters out based on field combo subscriptions that are not addressed to the specific combo of fields. The _private_field$field item makes sure with it's value that you send updates to a subscription id that is restricted to the same field value. IF you use owner_id aka the user who owns it, then you effectively restrict pushing the update only to subscriptions that were created by that user only.

There's the corresponding change in SubscriptionManager::retrieveSubscriptionId which gets called in SubscriptionProcessor which is where the subscription id is created for a given IRI, that subscription id is created from fields that were selected as return data for the subscription. In there I also inject those private field and their data from the resource in question, which affects the value of generated subscription id. The subscription id always is generated as a resource wide subscription, it's the same id regardless what IRI you pass as long as it's the same resource type. The cache just was stored per resource IRI (aka per item), so I just added also the collection level cache item so you can push updates for "create" operation, since that adds a new item and you need to know what subscriptions you need to push that update into.

  • Mercure payload format probably needs updating - with it now containing all 3 types of operations, I think it needs a field that contains what type of operation it was - create, update or deleted so mercure consumers can decide what to do

IMO, we should always consider a mercure update as a modified element, it's a new one if you don't have it in your referential, or else it's updated. You can detect it's deleted when the value is null.

I did modify the output for the "delete" operation to contain operationType: deleted - the one receiving mercure update can check for that. It just makes things easier, but you can totally figure out that it's a delete operation just by the fact it sends you IRI only and that's it, no other data.
The current data format was designed only for Update operation, that's why I think it might need updating.

  • Deleted stdObject contains too little information, kind'a want ability to retain some of the data in there, like for example the parent relation field that points to the parent ID the record was deleted for.

Not sure why we would need this. It makes things really complicated when we look for data associations, when do you stop looking for relations?
Yeah, I think it's not worth it. In my case it was when a chat message is deleted, I kind'a wanted to know what chat it belongs to. But I also receive an update for chat itself that something change, so I can re-query messages. But my idea was that if I know what chat id it is, I don't need to maintain the whole chat message => chat mapping array, which might get very big.

@stale
Copy link

stale bot commented Mar 18, 2025

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Mar 18, 2025
@soyuka soyuka added enhancement and removed stale labels Mar 21, 2025
@soyuka
Copy link
Member

soyuka commented Mar 21, 2025

I definitely need time to review this I think it's still something that we should integrate.

@psihius
Copy link
Contributor Author

psihius commented Mar 21, 2025

I definitely need time to review this I think it's still something that we should integrate.

Feel free to reach out on Symfony Slack into my DM's or tag me on #api-platform channel about this whole thing. I will be able to do changes/fixes to this whole thing next week cause I think we found a small bug to fix, so I will be touching the code :)

@psihius psihius force-pushed the feature/graphql-subscription-for-all-operations branch from ead72e1 to c9cefd8 Compare April 24, 2025 06:39
@psihius
Copy link
Contributor Author

psihius commented Apr 24, 2025

I found some flaws in my previous implementation, split out completely the handling of collection updates and so that collection subscriptions generate their own subscription ID instead of piggybacking on individual item subscriptions.
Subscription operation got a new parameter that's by default false to maintain BC called "collection: bool" which controls if the given subscription is an individual item subscription or a whole collection subscription, now you can choose to use one or the other or both at the same time if you with it by defining two subscriptions :)

I've also added relationship handling for mercure updates, because those have never been handled so you could not any -to-many relationship data on an update, even if you asked for it in the GraphQL query. Nothing complicated in there, but it works and will allow people to have simple related payloads to be included with the mercure update payload.

There is more I would do, but it would require some major amount of time and effort that I simply just don't have: I would split out mercure update pipeline completely from graphql processing pipeline because part of GraphQL library work is happening at the response time once it;s out of ApiPlatform's graphql internals and as you can see from my latest commit, there are some things that have been missing because mercure updates happen as a side channel as a doctrine event listener.
Delete updates also do not have certain data I would like them to have, for example it would be nice to know an items parent related resource (like deleting a message on a chat - knowing the chat id would be very beneficial)
I also really need someone familiar with the internals to help me sort out the normalizers - why for mercure updates it uses item normalizer and not object normalizer for example? :) I get that it has to do with some serializer processor priority, but that stuff requires major digging and stumbling about (and debugging this giant and it's configuration is.... well... hard :D )

@psihius psihius force-pushed the feature/graphql-subscription-for-all-operations branch 6 times, most recently from decfc5b to 9e61e92 Compare April 25, 2025 12:38
@soyuka
Copy link
Member

soyuka commented Apr 30, 2025

don't hesitate to add functional tests to help me understand the needs behind that, there are tests inside tests/Functional that are easier to setup then behat :)

@psihius
Copy link
Contributor Author

psihius commented Apr 30, 2025

don't hesitate to add functional tests to help me understand the needs behind that, there are tests inside tests/Functional that are easier to setup then behat :)

Yeah, I've been slowly working on it.

In other news, my last commit is more of a temporary fix for our production system, but the cache pool that's used there right now needs to be replaced. The system cache pool does not have locking and protection from parallel writing, which under 20+ users, started to overwrite each other's changes.
It worked for individual IRI items because there was very little writing done and mostly reading (you usually had 1-2 records in the cache, depending on how varied your fields were in the subscriptions you use). But on a collection cache, there was a lot of initial writing and it ended up being a writing race condition. So the pool needs to be replaced with one that supports locking and/or protection against parallel writing. System pool does not have those as far as I can tell.

I've also added private field data to deleted object data in the doctrine listener, probably same needs to happen in ODM listener. I needed to read deleted item fields and pass them on to the subscription manager.

@psihius psihius force-pushed the feature/graphql-subscription-for-all-operations branch from ef602bf to cea37ca Compare June 26, 2025 14:22
@psihius
Copy link
Contributor Author

psihius commented Jul 3, 2025

Thanks for the review, I will adjust to feedback.

I do have a question about collections, if I implement CollectionOperationInterface for the collection subscription, I assume the system will allow operating on collections via the reference to the resource aka /api/resource_items on the backend? Right now current code works only on individual items, which has security implications that you have to have an item and it's a bit of a hack to subscribe (you can't do that unless you have at least one record in the resource that's available to the current user).

I would like some references what to read and what files to dig into to explore proper collection subscription implementation and what helpers and components do I need to look at, since I'm not very familiar with broader internals. Like I did not know and would not find IdentifiersExtractor without the suggestion for quite a while if I continued to dig on my own :)

@psihius psihius force-pushed the feature/graphql-subscription-for-all-operations branch from 1aaf997 to 14994de Compare March 21, 2026 18:43
@psihius psihius force-pushed the feature/graphql-subscription-for-all-operations branch 5 times, most recently from 0e1eb28 to 6d97b96 Compare March 22, 2026 02:49
@psihius
Copy link
Contributor Author

psihius commented Mar 22, 2026

Performed a rework of the feature based on feedback and did some internal cache related changes that changed how this works quite a bit.

Documentation PR: api-platform/docs#2267

The biggest change is that collection subscriptions are now modeled as their own GraphQL operation type via SubscriptionCollection, instead of trying to stretch the old single-item subscription model with extra flags. In practice, item and collection subscriptions are now exposed separately on the GraphQL Subscription root type, so the client explicitly chooses which subscription operation it is using.

Current behavior

  • Item subscriptions receive update and delete
  • Collection subscriptions receive create, update, and delete
  • create / update pushes contain the affected item normalized with the fields requested in the subscription query
  • Initial collection subscription registration returns subscription metadata only, with the wrapped item field being null
  • Delete uses one explicit envelope for both item and collection subscriptions:
{
  "type": "delete",
  "payload": {
    "id": "/books/1",
    "iri": "https://example.com/books/1",
    "type": "Book"
  }
}

I kept the delete payload explicit on purpose. For item subscriptions you could argue null would be enough because the subscription is already tied to one IRI, but for collection subscriptions the client still needs to know which item to remove. I did not want two different delete contracts depending on subscription type.

Internal changes

The old collection IRI derivation hack is gone. Collection subscription lookup is now based on resource metadata + getOperation(forceCollection: true) + IriConverter, which is much closer to how this should work in the framework.

private_fields is also now treated as a real GraphQL feature and not just an implementation side effect. The intended split is:

  • mercure.private controls private delivery on the Mercure side
  • private_fields controls API Platform-side partitioning of private subscription registrations using resource field values

This is meant for cases where a client is authenticated and allowed to see multiple scopes, but subscriptions still need to be isolated by something like organization / workspace / inbox / whatever the app defines. This is now validated explicitly, the extraction logic was centralized, and it now uses Symfony property access instead of raw getter assumptions.

Cache behavior changed quite a bit too.

  • The cache is treated as a subscription registry, not as a place where collection payloads are stored
  • Collection subscriptions now keep registration state only and are looked up through proper collection operation metadata
  • Item and collection subscriptions are partitioned consistently when private_fields are used, so private deliveries are separated by the configured scope and not mixed in one shared item cache bucket
  • Fan-out from item + collection subscriptions is deduplicated by subscription id
  • Delete handling now cleans up the correct partitioned item cache entries too

This makes the internal cache model much more predictable and fixes a few edge cases from the earlier implementation, especially around private subscriptions and collection notifications.

I also addressed the main review points around:

  • Proper collection subscription type
  • Removing the legacy internal collection flag
  • Removing the manual Mercure config merge in SubscriptionManager
  • Centralizing duplicated private field extraction logic
  • Removing array_merge usage in the hot path
  • Keeping BC on SubscriptionManagerInterface

Testing

Testing is much broader now too. The feature is covered in:

  • Manager tests
  • Listener tests
  • Schema / type / field tests
  • Processor tests
  • Serializer tests for nested Mercure relation normalization
  • A small functional GraphQL subscription contract test

@psihius psihius requested a review from soyuka March 22, 2026 03:04
@psihius psihius force-pushed the feature/graphql-subscription-for-all-operations branch 2 times, most recently from 2d830af to aec5007 Compare March 22, 2026 03:38
Signed-off-by: psihius <arvids.godjuks@gmail.com>
@psihius psihius force-pushed the feature/graphql-subscription-for-all-operations branch from aec5007 to f42ceee Compare March 22, 2026 10:45
@psihius
Copy link
Contributor Author

psihius commented Mar 22, 2026

Small follow-up after the last force-pushes:

I fixed two remaining issues in the subscription/cache path:

  • item subscription snapshots are now refreshed after a changed push, so repeated identical updates are not re-published
  • delete fan-out now uses the same metadata-based collection subscription lookup as create/update, instead of relying on item IRI trimming

I re-ran the focused subscription surface locally after that:

  • SubscriptionManagerTest
  • PublishMercureUpdatesListenerTest (GraphQL subscription cases)
  • SchemaBuilderTest
  • SubscriptionSchemaTest

All green.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants