Skip to content

Latest commit

 

History

History
116 lines (82 loc) · 2.57 KB

quick_tour.md

File metadata and controls

116 lines (82 loc) · 2.57 KB

EnqueueBundle. Quick tour.

The EnqueueBundle integrates enqueue library. It adds easy to use configuration layer, register services, adds handy cli commands.

Install

$ composer require enqueue/enqueue-bundle enqueue/amqp-ext

Note: You could use not only AMQP transport but other available: STOMP, Amazon SQS, Redis, Filesystem, Doctrine DBAL and others.

Enable the Bundle

Then, enable the bundle by adding new Enqueue\Bundle\EnqueueBundle() to the bundles array of the registerBundles method in your project's app/AppKernel.php file:

<?php

// app/AppKernel.php

// ...
class AppKernel extends Kernel
{
    public function registerBundles()
    {
        $bundles = array(
            // ...

            new Enqueue\Bundle\EnqueueBundle(),
        );

        // ...
    }

    // ...
}

Usage

First, you have to configure a transport layer and set one to be default.

# app/config/config.yml

enqueue:
    transport:
        default: "amqp://"
    client: ~

Once you configured everything you can start producing messages:

<?php
use Enqueue\Client\Producer;

/** @var Producer $producer **/
$producer = $container->get('enqueue.producer');


// send event to many consumers
$producer->sendEvent('aFooTopic', 'Something has happened');

// send command to ONE consumer
$producer->sendCommand('aProcessorName', 'Something has happened');

To consume messages you have to first create a message processor:

<?php
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrProcessor;
use Enqueue\Client\TopicSubscriberInterface;

class FooProcessor implements PsrProcessor, TopicSubscriberInterface
{
    public function process(PsrMessage $message, PsrContext $session)
    {
        echo $message->getBody();

        return self::ACK;
        // return self::REJECT; // when the message is broken
        // return self::REQUEUE; // the message is fine but you want to postpone processing
    }

    public static function getSubscribedTopics()
    {
        return ['aFooTopic'];
    }
}

Register it as a container service and subscribe to the topic:

foo_message_processor:
    class: 'FooProcessor'
    tags:
        - { name: 'enqueue.client.processor' }

Now you can start consuming messages:

$ ./app/console enqueue:consume --setup-broker

Note: Add -vvv to find out what is going while you are consuming messages. There is a lot of valuable debug info there.

back to index