core icon indicating copy to clipboard operation
core copied to clipboard

Mercure normalization context per topic

Open norkunas opened this issue 4 years ago • 11 comments

Description
I think it could be useful support to provide normalization context (or also other options) for mercure per topic. For example if the resource has an additional serialization groups for admin user but the resource update should still be dispatched to admin and simple user currently the hidden information would be dispatched to the simple user or otherwise admin would not receive private information.

Example
Maybe something like:

#[ApiResource(
  mercure: [
    'topics' => [
      ['topic' => '/book/1', 'private' => true, 'normalization_context' => ['read', 'read_admin']],
      ['topic' => '/book/1', 'private' => false, 'normalization_context' => ['read']],
    ]
  ]
)]

norkunas avatar Jun 16 '21 04:06 norkunas

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Nov 05 '22 00:11 stale[bot]

waiting for feedback at least

norkunas avatar Nov 05 '22 04:11 norkunas

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Jan 04 '23 09:01 stale[bot]

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

:man_facepalming:

norkunas avatar Jan 11 '23 11:01 norkunas

@norkunas Hello. I had the same problem and i managed to made something work. Disclaimer : this is not unit tested since it has been done under an end of a project like "oh shit, this is not natively possible". This is a dirty implementation probably but a good track on how to achieve it in a proper version. But we use it in production since somes weeks now, and it work like a charm. So we did a complete override of ApiPlatform\Doctrine\EventListener\PublishMercureUpdatesListener with the following code.

final class PublishMercureUpdatesListenerDecorator
{
    use DispatchTrait;
    use ResourceClassInfoTrait;

    private const ALLOWED_KEYS = [
        'topics' => true,
        'data' => true,
        'private' => true,
        'id' => true,
        'type' => true,
        'retry' => true,
        'normalization_context' => true,
        'hub' => true,
        'enable_async_update' => true,
    ];
    private readonly ?ExpressionLanguage $expressionLanguage;
    private \SplObjectStorage $createdObjects;
    private \SplObjectStorage $updatedObjects;
    private \SplObjectStorage $deletedObjects;

    /**
     * @param array<string, string[]|string> $formats
     */
    public function __construct(
        ResourceClassResolverInterface                                    $resourceClassResolver,
        private readonly IriConverterInterface                            $iriConverter,
        ResourceMetadataCollectionFactoryInterface                        $resourceMetadataFactory,
        private readonly SerializerInterface                              $serializer,
        private readonly array                                            $formats,
        MessageBusInterface                                               $messageBus = null,
        private readonly ?HubRegistry                                     $hubRegistry = null,
        private readonly ?GraphQlSubscriptionManagerInterface             $graphQlSubscriptionManager = null,
        private readonly ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null,
        ExpressionLanguage                                                $expressionLanguage = null,
        private bool                                                      $includeType = false
    )
    {
        if (null === $messageBus && null === $hubRegistry) {
            throw new InvalidArgumentException('A message bus or a hub registry must be provided.');
        }

        $this->resourceClassResolver = $resourceClassResolver;

        $this->resourceMetadataFactory = $resourceMetadataFactory;
        $this->messageBus = $messageBus;
        $this->expressionLanguage = $expressionLanguage ?? (class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null);
        $this->reset();

        if ($this->expressionLanguage) {
            $rawurlencode = ExpressionFunction::fromPhp('rawurlencode', 'escape');
            $this->expressionLanguage->addFunction($rawurlencode);

            $this->expressionLanguage->addFunction(
                new ExpressionFunction('iri', static fn(string $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => sprintf('iri(%s, %d)', $apiResource, $referenceType), static fn(array $arguments, $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => $iriConverter->getIriFromResource($apiResource, $referenceType))
            );
        }

        if (false === $this->includeType) {
            trigger_deprecation('api-platform/core', '3.1', 'Having mercure.include_type (always include @type in Mercure updates, even delete ones) set to false in the configuration is deprecated. It will be true by default in API Platform 4.0.');
        }
    }

    /**
     * Collects created, updated and deleted objects.
     */
    public function onFlush(EventArgs $eventArgs): void
    {
        if ($eventArgs instanceof OrmOnFlushEventArgs) {
            $uow = method_exists($eventArgs, 'getObjectManager') ? $eventArgs->getObjectManager()->getUnitOfWork() : $eventArgs->getEntityManager()->getUnitOfWork();
        } elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
            $uow = $eventArgs->getDocumentManager()->getUnitOfWork();
        } else {
            return;
        }

        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
        foreach ($uow->{$methodName}() as $object) {
            $this->storeObjectToPublish($object, 'createdObjects');
        }

        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
        foreach ($uow->{$methodName}() as $object) {
            $this->storeObjectToPublish($object, 'updatedObjects');
        }

        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
        foreach ($uow->{$methodName}() as $object) {
            $this->storeObjectToPublish($object, 'deletedObjects');
        }
    }

    /**
     * Publishes updates for changes collected on flush, and resets the store.
     */
    public function postFlush(): void
    {
        try {
            foreach ($this->createdObjects as $object) {
                $this->publishUpdate($object, $this->createdObjects[$object], 'create');
            }

            foreach ($this->updatedObjects as $object) {
                $this->publishUpdate($object, $this->updatedObjects[$object], 'update');
            }

            foreach ($this->deletedObjects as $object) {
                $this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
            }
        } finally {
            $this->reset();
        }
    }

    private function reset(): void
    {
        $this->createdObjects = new \SplObjectStorage();
        $this->updatedObjects = new \SplObjectStorage();
        $this->deletedObjects = new \SplObjectStorage();
    }

    private function storeObjectToPublish(object $object, string $property): void
    {
        if (null === $resourceClass = $this->getResourceClass($object)) {
            return;
        }

        $operation = $this->resourceMetadataFactory->create($resourceClass)->getOperation();
        try {
            $options = $operation->getMercure() ?? false;
        } catch (OperationNotFoundException) {
            return;
        }

        if (\is_string($options)) {
            if (null === $this->expressionLanguage) {
                throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
            }

            $options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
        }

        if (false === $options) {
            return;
        }

        if (true === $options) {
            $options = [];
        }

        if (!\is_array($options)) {
            throw new InvalidArgumentException(sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
        }

        foreach ($options as $key => $value) {
            if (!isset(self::ALLOWED_KEYS[$key])) {
                throw new InvalidArgumentException(sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', self::ALLOWED_KEYS)));
            }
        }

        $options['enable_async_update'] ??= true;

        if ($options['topics'] ?? false) {
            $topics = [];
            foreach ((array)$options['topics'] as $topic) {
                //------------------------ Home made code ---------------------------- //
                if (!\is_string($topic) && !\is_array($topic)) {
                    $topics[] = $topic;
                    continue;
                }

                //------------------------ Home made code ---------------------------- //
                if (\is_string($topic) && !str_starts_with($topic, '@=')) {
                    $topics[] = $topic;
                    continue;
                }

                //------------------------ Home made code ---------------------------- //
                if (\is_array($topic) && !str_starts_with($topic['iri'], '@=')) {
                    $topics[] = $topic;
                    continue;
                }

                if (null === $this->expressionLanguage) {
                    throw new \LogicException('The "@=" expression syntax cannot be used without the Expression Language component. Try running "composer require symfony/expression-language".');
                }

                //------------------------ Home made code ---------------------------- //
                if (is_array($topic)) {
                    $topics[] = [
                        'iri' => $this->expressionLanguage->evaluate(substr($topic['iri'], 2), ['object' => $object]),
                        'context' => $topic['context']
                    ];
                } else {
                    $topics[] = $this->expressionLanguage->evaluate(substr($topic, 2), ['object' => $object]);
                }
            }

            $options['topics'] = $topics;
        }

        if ('deletedObjects' === $property) {
            $types = $operation instanceof HttpOperation ? $operation->getTypes() : null;
            if (null === $types) {
                $types = [$operation->getShortName()];
            }

            $this->deletedObjects[(object)[
                'id' => $this->iriConverter->getIriFromResource($object),
                'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL),
                'type' => 1 === \count($types) ? $types[0] : $types,
            ]] = $options;

            return;
        }

        $this->{$property}[$object] = $options;
    }

    private function publishUpdate(object $object, array $options, string $type): void
    {
        if ($object instanceof \stdClass) {
            // By convention, if the object has been deleted, we send only its IRI and its type.
            // This may change in the feature, because it's not JSON Merge Patch compliant,
            // and I'm not a fond of this approach.

            //------------------------ Home made code ---------------------------- //
            $iris = isset($options['topics']) ? $this->groupIri($options['topics']) : [$object->iri];
            /** @var string $data */
            $data = json_encode(['@id' => $object->id] + ($this->includeType ? ['@type' => $object->type] : []), \JSON_THROW_ON_ERROR);
        } else {
            $resourceClass = $this->getObjectClass($object);
            $context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? [];

            //------------------------ Home made code ---------------------------- //
            $iris = isset($options['topics']) ? $this->groupIriByTopicContext($options['topics'], $context) : [$this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL)];
        }

        //------------------------ Home made code ---------------------------- //
        $updateByIris = [];
        foreach ($iris as $iri) {
            if ($object instanceof \stdClass) {
                $updateByIris[] = $this->buildUpdate($iri, $data, $options);
                break;
            }

            $context = is_array($iri) ? $iri['context'] : $context;
            $iri = is_array($iri) ? $iri['topics'] : $iri;

            if(empty($iri)){
                continue;
            }

            $data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
            $updateByIris[] = $this->buildUpdate($iri, $data, $options);
        }


        $updates = array_merge($updateByIris, $this->getGraphQlSubscriptionUpdates($object, $options, $type));
        //------------------------ Home made code end ---------------------------- //


        foreach ($updates as $update) {
            if ($options['enable_async_update'] && $this->messageBus) {
                $this->dispatch($update);
                continue;
            }

            $this->hubRegistry->getHub($options['hub'] ?? null)->publish($update);
        }
    }

    //------------------------ Home made code ---------------------------- //
    private function groupIri(array $topics): array
    {
        $iris = [];
        foreach ($topics as $topic) {
            if (!\is_array($topic)) {
                $iris[] = $topic;
            } else {
                $iris[] = $topic['iri'];
            }
        }

        return $iris;
    }

    //------------------------ Home made code ---------------------------- //
    private function groupIriByTopicContext(array $topics, array $defaultContext): array
    {
        $irisGroupedByContext = [
            [
                'context' => $defaultContext,
                'topics' => [],
            ]
        ];

        foreach ($topics as $topic) {
            if (!\is_array($topic)) {
                foreach ($irisGroupedByContext as $key => $iriGroupedByContext) {
                    if ($iriGroupedByContext['context'] === $defaultContext) {
                        $irisGroupedByContext[$key]['topics'][] = $topic;
                    }
                }
                continue;
            }

            $topicContext = $topic['context'];

            $filled = false;
            foreach ($irisGroupedByContext as $key => $iriGroupedByContext) {
                if ($iriGroupedByContext['context'] === $topicContext) {
                    $irisGroupedByContext[$key]['topics'][] = $topic['iri'];
                    $filled = true;
                    break;
                }
            }

            if ($filled) {
                continue;
            }

            $irisGroupedByContext[] = [
                'context' => $topicContext,
                'topics' => [$topic['iri']]
            ];

        }

        return $irisGroupedByContext;
    }

    /**
     * @return Update[]
     */
    private function getGraphQlSubscriptionUpdates(object $object, array $options, string $type): array
    {
        if ('update' !== $type || !$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
            return [];
        }

        $payloads = $this->graphQlSubscriptionManager->getPushPayloads($object);

        $updates = [];
        foreach ($payloads as [$subscriptionId, $data]) {
            $updates[] = $this->buildUpdate(
                $this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
                (string)(new JsonResponse($data))->getContent(),
                $options
            );
        }

        return $updates;
    }

    /**
     * @param string|string[] $iri
     */
    private function buildUpdate(string|array $iri, string $data, array $options): Update
    {
        return new Update($iri, $data, $options['private'] ?? false, $options['id'] ?? null, $options['type'] ?? null, $options['retry'] ?? null);
    }
}

Basically, only storeObjectToPublish() and publishUpdate() had changed. And we added those two function to avoid duplicate code : groupIri() and groupIriByTopicContext()

Then inside service.yaml :

ApiPlatform\Doctrine\EventListener\PublishMercureUpdatesListener:
    class: App\Service\PublishMercureUpdatesListenerDecorator
    arguments:
      $formats: '%api_platform.formats%'

Then inside a mercure option its simple :

$topics = [
            [
                'iri' => "@='/admin/?topic=' ~ escape(iri(object))",
                'context' => ['groups' => ['admin:read']]
            ],
        ];

Note that you can mix whatever you want, it is totally working with the traditional way if for some topics, serialization group are not needed. For example :

$topics = [
            [
                'iri' => "@='/admin/?topic=' ~ escape(iri(object))",
                'context' => ['groups' => ['admin:read']]
            ],
            "@='/users/$uuid/assets' ~ '/?topic=' ~ escape(iri(object))",
        ];

Basically, the change just look at the array entry, if its a string it does the old way, if its an array, it look for an 'iri' key and a 'context' key. We use it to always publish complete object to admin iri and the default context of the object to users. Feel free to ask any question and also feel free to improve our shit haha. But at least it works great.

ThomasCarbon avatar Nov 15 '23 13:11 ThomasCarbon

Yes, I've implemented a custom listener too :) just not as generic as yours but specific to our project needs

norkunas avatar Nov 15 '23 13:11 norkunas

Yes, I've implemented a custom listener too :) just not as generic as yours but specific to our project needs

Yes, note that it work for any entity. And i think (Hello apiplatform) this would be a very good native feature since it was not "too hard" to do for someone that never seen the code before. The only "drawback" is that it publish one message inside the hub for each different context but i think this is not a real problem.

ThomasCarbon avatar Nov 15 '23 13:11 ThomasCarbon

And i think (Hello apiplatform) this would be a very good native feature

I though that too, but no feedback or interest was given :disappointed:

norkunas avatar Nov 15 '23 13:11 norkunas

It's sad because it answer to real business issue. Its normal for some "entity" to have some information available for admin, not available for customer. Hiding them on front is not enough regarding "security" of the data. We have multiple case like that in our app. Would be worth to try to reopen it ahah

ThomasCarbon avatar Nov 15 '23 13:11 ThomasCarbon

I don't see a way to reopen it :(, so if you wish to pursue this you could create new issue :smiling_face_with_tear:

norkunas avatar Nov 15 '23 13:11 norkunas

sorry we all hate stale bots...

soyuka avatar Nov 16 '23 15:11 soyuka