PSR-14: Example - Delayed Events, Queues, and Asynchronicity

in #php6 years ago (edited)

One of the long-running debates while designing PSR-14 was how to handle events that were inherently mono-directional. Often, a library will trigger an Event that is, from its point of view, entirely informational. The Event object it fires has no mutator methods so Listeners have no way to interact with each other, which means that the order Listeners run in is irrelevant. It also means Listeners can pass no data back to the emitting library, which means the result of the Event can have no impact on the Emitter's further logic.

This special case opens up more options for how to execute the Listeners. Because there is guaranteed no communication from Listener to Listener or from Listener to Emitter, it's safe to run the Listeners concurrently. In fact, it's safe to run the Listeners concurrently with the Emitter. The Emitter doesn't even need to wait for the Listeners to fire before continuing.

We struggled with how to leverage that concurrency potential for a long time. How can an Emitter indicate that the Event it's sending can happen concurrently, or delayed, or asynchronously? In only some cases will the system be able to do so, but the Emitter needs to be able to indicate that it can be done.

For a long time that was handled via two subclasses of Event, one that had to be synchronous and one that could be concurrent. In the end, though, we realized we were approaching the problem from the wrong way around.

The Emitter doesn't need to indicate that the Event is concurrent-safe. That's not its job. The Event itself is concurrent-safe or not. The Emitter doesn't need to add anything to it besides the fact that the Event is immutable.

Moreover, the Emitter by definition doesn't know what Listeners may respond to the Event. Some Listeners are so fast that trying to run them concurrently is pointless. Others are so slow that trying to run then synchronously is stupid. The Emitter can't know the difference. Only the Listener does.

In fact, it goes a step further. Even if an Event is mutable, a given Listener may still be concurrent-safe because it doesn't modify the Event or pass data back to the Emitter. Only the Listener (or rather, the developer writing the Listener) knows that.

That led us to the same conclusion, albeit not from the same direction, as Laravel's event system and the common event systems in Node.js. Node, despite being written in a fundamentally asynchronous language, has an event passing system that is synchronous. (We're talking here of systems similar to PSR-14, not to DOM events which are an entirely different beast.) It's up to listeners to take asynchronous actions if appropriate.

Laravel supports asynchronous events by allowing Listeners (which are classes) to implement a ShouldQueue marker interface to indicate that if a queue system is available the Listener should not execute immediately but a queue entry should be made that will, in turn, execute the Listener "later".

By shifting the responsibility of determining concurrency compatibility from the Emitter to the Listener we were able to eliminate the second pipeline and alternate Event subclass, getting down to just the bare 3 interfaces of the final specification.

One-off queues

Let's look at the simplest case. An Event is triggered, and some Listener that's been setup wants to send an email in response. Sending an email is a relatively slow operation, especially if multiple people need to be emailed, so it makes sense to kick that off to a queue. In that case, the Listener's job is not "send an email" but "queue that a document was saved". For argument's sake, let's assume we're using the "php-amqplib/php-amqplib" library. Here's the setup code:

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('doc_notifications', false, false, false, false);

And then our most basic Listener is simply:

$provider->addListener(function (DocumentSaved $event) use ($channel) {

    $details = [
        'document_id' => $event->getSubject()->id(),
    ];

    $message = new AMQPMessage(serialize($details));
    $channel->basic_publish($message, '', 'doc_notifications');
});

The Listener is synchronous. The Event Dispatcher process is synchronous. But actually doing something interesting (sending email) is asynchronous. Whatever queue listener is waiting for it on the other end can proceed to do whatever it's going to do, immediately, or later, or whenever.

This approach works with virtually any Provider implementation.

Auto-queuing

What if we don't want to write the machinery to put an item into the queue each time? Can we subsume that into the Provider? Certainly!

There are some caveats, of course. Since we're going to push execution of the Listener off to another PHP process, the Listener itself has to be statically identifiable: Either a function, static method, or service call. For simplicity I'm going to demonstrate only supporting service methods as Listeners, but a more robust implementation would certainly be possible.

Let's start with the Provider. We'll ignore ordering for now, and just allow registering listeners that are a method on a service in a PSR-11 Container. For simplicity we'll make queue-or-not a simple boolean flag, but making Listeners be classes and checking for an interface is also an entirely viable option.

The Provider then looks like this:

class QueueableProvider implements ListenerProviderInterface
{
   /** @var array */
   protected $listeners = [];

   /** @var ContainerInterface */
   protected $container;

   /** @var AMQPChannel */
   private $channel;

   public function __construct(ContainerInterface $container, AMQPChannel $channel = null)
   {
       $this->container = $container;
       $this->channel = $channel;
       if ($this->channel) {
           $this->channel->queue_declare('events', false, false, false, false);
       }
   }

   public function getListenersForEvent(object $event): iterable
   {
       foreach ($this->listeners as $listener) {
           if ($event instanceof $listener['type']) {
               yield $listener['listener'];
           }
       }
   }

   public function addListenerService(string $serviceName, string $methodName, string $type, bool $queue = false): void
   {
       if ($queue && $this->channel) {
           $listener = $this->makeListenerForQueue($serviceName, $methodName);
       }
       else {
           $listener = $this->makeListenerForService($serviceName, $methodName);
       }

       $this->listeners[] = [
           'listener' => $listener,
           'type' => $type,
       ];
   }

   protected function makeListenerForService(string $serviceName, string $methodName) : callable
   {
       $container = $this->container;
       $listener = function (object $event) use ($serviceName, $methodName, $container) : void {
           $container->get($serviceName)->$methodName($event);
       };
       return $listener;
   }

   protected function makeListenerForQueue(string $serviceName, string $methodName) : callable 
   {
       $channel = $this->channel;

       $listener = function (object $event) use ($serviceName, $methodName, $channel) : void {
           $details = [
               'serviceName' => $serviceName,
               'methodName' => $methodName,
               'event' => $event,
           ];

           $message = new AMQPMessage(serialize($details));
           $channel->basic_publish($message, '', 'events');
       };

       return $listener;
   }
}

The addListenerService() method is the main entry point. It's fairly boring and simply queues up a listener/type pair to be checked later by getListenersForEvent(). The interesting part is that, based on the $queue parameter, it will generate two different Listeners for the service/method specified.

If there is no queue available, or if $queue was false, makeListenerForService() takes a service name and method and wraps that into an anonymous function that will, when invoked, pull the service out of the container and invoke the specified method, giving it the Event. The logic here is copied straight from Tukio.

Alternatively, if the Listener is queueable and there is a queue available, it will be wrapped into an entirely different Listener by makeListenerForQueue(). In this case, the wrapper will enqueue an item that tracks the service, method, and the Event itself. That means whatever the business logic of the service/method, invoking that Listener has roughly constant time.

How does the actual Listener then get invoked? For that we need a queue worker that is listening to the channel and handling each item as it comes out. A simple version of that could look like this:

function queue_runner(AMQPChannel $channel, ContainerInterface $container)
{
   $worker = function (AMQPMessage $message) use ($container) {
       $details = unserialize($message->body);

       $container->get($details['serviceName']->$details['methodName']($details['event']));
   };

   $channel->queue_declare('events', false, false, false, false);
   $channel->basic_consume('events', '', false, true, false, false, $worker);
   while (count($channel->callbacks)) {
       $channel->wait();
   }
}

The second half of it is just AMQP boilerplate. The interesting part is the $worker, which will be called with each message object. All it need is to deserialize the payload, and then make the exact same container->service->method call as before.

The contents of the service and method are identical in either case. The Listener does not change whether it's called immediately or "later".

You'll also notice that we haven't mentioned the Dispatcher at all. That's because the Dispatcher has no interest in whether the Listener is called now or later; Its behavior is absolutely identical. If the Listener has nothing to say to the Dispatcher, so be it.

There is one caveat, though. In order to run the Event "later" in a generic fashion we need to be able to serialize the Event so it can be passed through the queue. Per the PSR-14 spec, there is no strict requirement that an Event object be serializable even though it's recommended. If the Event is carrying an object that is not serializable then it won't be queueable. That may not be immediately obvious, but for instance if it's carrying an ActiveRecord object, that object very likely has a transitive dependency on a database connection, making the Event unserializable. Even if it doesn't the object graph behind the Event may be considerably larger than expected, making serializing it impractical or slow.

That's a risk of offering implementers as much flexibility as possible. It's the responsibility of the implementer to write Event objects that are sufficiently light-weight and well-designed for the use case in question. There are, no doubt, use cases for Events that are not serializable. In those cases, though, the Listener author would know that the Event isn't serializable because, well, they know what the Event class is. They're writing against that API so really ought to know what its limitations are. As a result, in practice this limitation won't come up too often, or rather when it does it's clear that it's doing so and it's for a good reason. (And if it's not a good reason, well, it's the Event author's fault.)

PSR-14: The series

Sort:  

Hi crell,

This post has been upvoted by the Curie community curation project and associated vote trail as exceptional content (human curated and reviewed). Have a great day :)

Visit curiesteem.com or join the Curie Discord community to learn more.

Congratulations @crell! You have completed the following achievement on the Steem blockchain and have been rewarded with new badge(s) :

You received more than 2000 upvotes. Your next target is to reach 3000 upvotes.

You can view your badges on your Steem Board and compare to others on the Steem Ranking
If you no longer want to receive notifications, reply to this comment with the word STOP

Vote for @Steemitboard as a witness to get one more award and increased upvotes!

Coin Marketplace

STEEM 0.25
TRX 0.21
JST 0.037
BTC 98337.34
ETH 3416.22
USDT 1.00
SBD 3.42