Skip to content

Commit c6f3d06

Browse files
authored
Merge pull request #2 from php-enqueue/introduce-psr-message-processor-interface
[psr] Introduce MessageProcessor interface (moved from consumption).
2 parents 192637a + 2bb31ae commit c6f3d06

File tree

70 files changed

+671
-659
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+671
-659
lines changed

docs/bundle/job_queue.md

+10-14
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@ Guaranty that there is only single job running with such name.
1414

1515
```php
1616
<?php
17-
use Enqueue\Consumption\MessageProcessorInterface;
18-
use Enqueue\Consumption\Result;
1917
use Enqueue\Psr\Message;
18+
use Enqueue\Psr\Processor;
2019
use Enqueue\Psr\Context;
2120
use Enqueue\Util\JSON;
2221
use Enqueue\JobQueue\JobRunner;
2322
use Enqueue\JobQueue\Job;
2423

25-
class MessageProcessor implements MessageProcessorInterface
24+
class ReindexProcessor implements Processor
2625
{
2726
/**
2827
* @var JobRunner
@@ -43,7 +42,7 @@ class MessageProcessor implements MessageProcessorInterface
4342
}
4443
);
4544

46-
return $result ? Result::ACK : Result::REJECT;
45+
return $result ? self::ACK : self::REJECT;
4746
}
4847
}
4948
```
@@ -54,16 +53,15 @@ Run several sub jobs in parallel.
5453

5554
```php
5655
<?php
57-
use Enqueue\Consumption\MessageProcessorInterface;
58-
use Enqueue\Consumption\Result;
5956
use Enqueue\JobQueue\JobRunner;
6057
use Enqueue\JobQueue\Job;
6158
use Enqueue\Client\MessageProducerInterface;
6259
use Enqueue\Util\JSON;
6360
use Enqueue\Psr\Message;
6461
use Enqueue\Psr\Context;
62+
use Enqueue\Psr\Processor;
6563

66-
class Step1MessageProcessor implements MessageProcessorInterface
64+
class Step1Processor implements Processor
6765
{
6866
/**
6967
* @var JobRunner
@@ -102,11 +100,11 @@ class Step1MessageProcessor implements MessageProcessorInterface
102100
}
103101
);
104102

105-
return $result ? Result::ACK : Result::REJECT;
103+
return $result ? self::ACK : self::REJECT;
106104
}
107105
}
108106

109-
class Step2MessageProcessor implements MessageProcessorInterface
107+
class Step2Processor implements Processor
110108
{
111109
/**
112110
* @var JobRunner
@@ -138,17 +136,15 @@ just after all steps are finished.
138136

139137
```php
140138
<?php
141-
use Enqueue\Consumption\MessageProcessorInterface;
142-
use Enqueue\Consumption\Result;
143139
use Enqueue\JobQueue\JobRunner;
144140
use Enqueue\JobQueue\Job;
145141
use Enqueue\JobQueue\DependentJobService;
146-
use Enqueue\Client\MessageProducerInterface;
147142
use Enqueue\Util\JSON;
148143
use Enqueue\Psr\Message;
149144
use Enqueue\Psr\Context;
145+
use Enqueue\Psr\Processor;
150146

151-
class MessageProcessor implements MessageProcessorInterface
147+
class ReindexProcessor implements Processor
152148
{
153149
/**
154150
* @var JobRunner
@@ -182,7 +178,7 @@ class MessageProcessor implements MessageProcessorInterface
182178
}
183179
);
184180

185-
return $result ? Result::ACK : Result::REJECT;
181+
return $result ? self::ACK : self::REJECT;
186182
}
187183
}
188184
```

docs/bundle/quick_tour.md

+7-8
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,18 @@ To consume messages you have to first create a message processor:
4646
<?php
4747
use Enqueue\Psr\Message;
4848
use Enqueue\Psr\Context;
49-
use Enqueue\Consumption\MessageProcessorInterface;
50-
use Enqueue\Consumption\Result;
49+
use Enqueue\Psr\Processor;
5150
use Enqueue\Client\TopicSubscriberInterface;
5251

53-
class FooMessageProcessor implements MessageProcessorInterface, TopicSubscriberInterface
52+
class FooProcessor implements Processor, TopicSubscriberInterface
5453
{
5554
public function process(Message $message, Context $session)
5655
{
5756
echo $message->getBody();
5857

59-
return Result::ACK;
60-
// return Result::REJECT; // when the message is broken
61-
// return Result::REQUEUE; // the message is fine but you want to postpone processing
58+
return self::ACK;
59+
// return self::REJECT; // when the message is broken
60+
// return self::REQUEUE; // the message is fine but you want to postpone processing
6261
}
6362

6463
public static function getSubscribedTopics()
@@ -72,9 +71,9 @@ Register it as a container service and subscribe to the topic:
7271

7372
```yaml
7473
foo_message_processor:
75-
class: 'FooMessageProcessor'
74+
class: 'FooProcessor'
7675
tags:
77-
- { name: 'enqueue.client.message_processor' }
76+
- { name: 'enqueue.client.processor' }
7877
```
7978
8079
Now you can start consuming messages:

docs/job_queue/run_sub_job.md

+4-5
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,15 @@ They will be executed in parallel.
66

77
```php
88
<?php
9-
use Enqueue\Consumption\MessageProcessorInterface;
109
use Enqueue\Client\MessageProducerInterface;
11-
use Enqueue\Consumption\Result;
1210
use Enqueue\Psr\Message;
1311
use Enqueue\Psr\Context;
12+
use Enqueue\Psr\Processor;
1413
use Enqueue\JobQueue\JobRunner;
1514
use Enqueue\JobQueue\Job;
1615
use Enqueue\Util\JSON;
1716

18-
class RootJobMessageProcessor implements MessageProcessorInterface
17+
class RootJobProcessor implements Processor
1918
{
2019
/** @var JobRunner */
2120
private $jobRunner;
@@ -36,11 +35,11 @@ class RootJobMessageProcessor implements MessageProcessorInterface
3635
return true;
3736
});
3837

39-
return $result ? Result::ACK : Result::REJECT;
38+
return $result ? self::ACK : self::REJECT;
4039
}
4140
}
4241

43-
class SubJobMessageProcessor implements MessageProcessorInterface
42+
class SubJobProcessor implements Processor
4443
{
4544
/** @var JobRunner */
4645
private $jobRunner;

docs/job_queue/run_unique_job.md

+3-4
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@ It shows how you can run unique job using job queue (The configuration is descri
1212

1313
```php
1414
<?php
15-
use Enqueue\Consumption\MessageProcessorInterface;
16-
use Enqueue\Consumption\Result;
1715
use Enqueue\Psr\Message;
1816
use Enqueue\Psr\Context;
17+
use Enqueue\Psr\Processor;
1918
use Enqueue\JobQueue\JobRunner;
2019

21-
class MessageProcessor implements MessageProcessorInterface
20+
class UniqueJobProcessor implements Processor
2221
{
2322
/** @var JobRunner */
2423
private $jobRunner;
@@ -31,7 +30,7 @@ class MessageProcessor implements MessageProcessorInterface
3130
return true; // if you want to ACK message or false to REJECT
3231
});
3332

34-
return $result ? Result::ACK : Result::REJECT;
33+
return $result ? self::ACK : self::REJECT;
3534
}
3635
}
3736
```

docs/quick_tour.md

+7-7
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ The `consume` method starts the consumption process which last as long as it is
6565
```php
6666
<?php
6767
use Enqueue\Psr\Message;
68+
use Enqueue\Psr\Processor;
6869
use Enqueue\Consumption\QueueConsumer;
69-
use Enqueue\Consumption\Result;
7070

7171
/** @var \Enqueue\Psr\Context $psrContext */
7272

@@ -75,12 +75,12 @@ $queueConsumer = new QueueConsumer($psrContext);
7575
$queueConsumer->bind('foo_queue', function(Message $message) {
7676
// process messsage
7777

78-
return Result::ACK;
78+
return Processor::ACK;
7979
});
8080
$queueConsumer->bind('bar_queue', function(Message $message) {
8181
// process messsage
8282

83-
return Result::ACK;
83+
return Processor::ACK;
8484
});
8585

8686
$queueConsumer->consume();
@@ -167,16 +167,16 @@ Here's an example of how you can send and consume messages.
167167
```php
168168
<?php
169169
use Enqueue\Client\SimpleClient;
170-
use Enqueue\Consumption\Result;
171170
use Enqueue\Psr\Message;
171+
use Enqueue\Psr\Processor;
172172

173-
/** @var \Enqueue\Psr\Context $psrClient */
173+
/** @var \Enqueue\Psr\Context $psrContext */
174174

175-
$client = new SimpleClient($psrClient);
175+
$client = new SimpleClient($psrContext);
176176
$client->bind('foo_topic', function (Message $message) {
177177
// process message
178178

179-
return Result::ACK;
179+
return Processor::ACK;
180180
});
181181

182182
$client->send('foo_topic', 'Hello there!');

pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php

+7-7
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
namespace Enqueue\AmqpExt\Tests\Functional;
33

44
use Enqueue\AmqpExt\AmqpContext;
5-
use Enqueue\Psr\Context;
6-
use Enqueue\Psr\Message;
75
use Enqueue\Consumption\ChainExtension;
86
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
97
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
108
use Enqueue\Consumption\Extension\ReplyExtension;
11-
use Enqueue\Consumption\MessageProcessorInterface;
129
use Enqueue\Consumption\QueueConsumer;
1310
use Enqueue\Consumption\Result;
11+
use Enqueue\Psr\Context;
12+
use Enqueue\Psr\Message;
13+
use Enqueue\Psr\Processor;
1414
use Enqueue\Test\RabbitmqAmqpExtension;
1515
use Enqueue\Test\RabbitmqManagmentExtensionTrait;
1616

@@ -52,7 +52,7 @@ public function testConsumeOneMessageAndExit()
5252
new LimitConsumptionTimeExtension(new \DateTime('+3sec')),
5353
]));
5454

55-
$processor = new StubMessageProcessor();
55+
$processor = new StubProcessor();
5656
$queueConsumer->bind($queue, $processor);
5757

5858
$queueConsumer->consume();
@@ -81,10 +81,10 @@ public function testConsumeOneMessageAndSendReplyExit()
8181

8282
$replyMessage = $this->amqpContext->createMessage(__METHOD__.'.reply');
8383

84-
$processor = new StubMessageProcessor();
84+
$processor = new StubProcessor();
8585
$processor->result = Result::reply($replyMessage);
8686

87-
$replyProcessor = new StubMessageProcessor();
87+
$replyProcessor = new StubProcessor();
8888

8989
$queueConsumer->bind($queue, $processor);
9090
$queueConsumer->bind($replyQueue, $replyProcessor);
@@ -98,7 +98,7 @@ public function testConsumeOneMessageAndSendReplyExit()
9898
}
9999
}
100100

101-
class StubMessageProcessor implements MessageProcessorInterface
101+
class StubProcessor implements Processor
102102
{
103103
public $result = Result::ACK;
104104

pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientRoutingPass.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66

77
class BuildClientRoutingPass implements CompilerPassInterface
88
{
9-
use ExtractMessageProcessorTagSubscriptionsTrait;
9+
use ExtractProcessorTagSubscriptionsTrait;
1010

1111
/**
1212
* {@inheritdoc}
1313
*/
1414
public function process(ContainerBuilder $container)
1515
{
16-
$processorTagName = 'enqueue.client.message_processor';
16+
$processorTagName = 'enqueue.client.processor';
1717
$routerId = 'enqueue.client.router_processor';
1818

1919
if (false == $container->hasDefinition($routerId)) {

pkg/enqueue-bundle/DependencyInjection/Compiler/BuildMessageProcessorRegistryPass.php renamed to pkg/enqueue-bundle/DependencyInjection/Compiler/BuildProcessorRegistryPass.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@
44
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
55
use Symfony\Component\DependencyInjection\ContainerBuilder;
66

7-
class BuildMessageProcessorRegistryPass implements CompilerPassInterface
7+
class BuildProcessorRegistryPass implements CompilerPassInterface
88
{
9-
use ExtractMessageProcessorTagSubscriptionsTrait;
9+
use ExtractProcessorTagSubscriptionsTrait;
1010

1111
/**
1212
* {@inheritdoc}
1313
*/
1414
public function process(ContainerBuilder $container)
1515
{
16-
$processorTagName = 'enqueue.client.message_processor';
17-
$processorRegistryId = 'enqueue.client.message_processor_registry';
16+
$processorTagName = 'enqueue.client.processor';
17+
$processorRegistryId = 'enqueue.client.processor_registry';
1818

1919
if (false == $container->hasDefinition($processorRegistryId)) {
2020
return;

pkg/enqueue-bundle/DependencyInjection/Compiler/BuildQueueMetaRegistryPass.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66

77
class BuildQueueMetaRegistryPass implements CompilerPassInterface
88
{
9-
use ExtractMessageProcessorTagSubscriptionsTrait;
9+
use ExtractProcessorTagSubscriptionsTrait;
1010

1111
/**
1212
* {@inheritdoc}
1313
*/
1414
public function process(ContainerBuilder $container)
1515
{
16-
$processorTagName = 'enqueue.client.message_processor';
16+
$processorTagName = 'enqueue.client.processor';
1717
$queueMetaRegistryId = 'enqueue.client.meta.queue_meta_registry';
1818
if (false == $container->hasDefinition($queueMetaRegistryId)) {
1919
return;

pkg/enqueue-bundle/DependencyInjection/Compiler/BuildTopicMetaSubscribersPass.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66

77
class BuildTopicMetaSubscribersPass implements CompilerPassInterface
88
{
9-
use ExtractMessageProcessorTagSubscriptionsTrait;
9+
use ExtractProcessorTagSubscriptionsTrait;
1010

1111
/**
1212
* {@inheritdoc}
1313
*/
1414
public function process(ContainerBuilder $container)
1515
{
16-
$processorTagName = 'enqueue.client.message_processor';
16+
$processorTagName = 'enqueue.client.processor';
1717

1818
$topicsSubscribers = [];
1919
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use Symfony\Component\DependencyInjection\ContainerBuilder;
66
use Symfony\Component\DependencyInjection\Exception\ParameterNotFoundException;
77

8-
trait ExtractMessageProcessorTagSubscriptionsTrait
8+
trait ExtractProcessorTagSubscriptionsTrait
99
{
1010
/**
1111
* @param ContainerBuilder $container

pkg/enqueue-bundle/EnqueueBundle.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@
44
use Enqueue\AmqpExt\AmqpContext;
55
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
66
use Enqueue\AmqpExt\Symfony\RabbitMqTransportFactory;
7-
use Enqueue\Symfony\DefaultTransportFactory;
8-
use Enqueue\Symfony\NullTransportFactory;
97
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
108
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass;
11-
use Enqueue\Bundle\DependencyInjection\Compiler\BuildMessageProcessorRegistryPass;
9+
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
1210
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
1311
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
1412
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
1513
use Enqueue\Stomp\StompContext;
1614
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
1715
use Enqueue\Stomp\Symfony\StompTransportFactory;
16+
use Enqueue\Symfony\DefaultTransportFactory;
17+
use Enqueue\Symfony\NullTransportFactory;
1818
use Symfony\Component\DependencyInjection\ContainerBuilder;
1919
use Symfony\Component\HttpKernel\Bundle\Bundle;
2020

@@ -27,7 +27,7 @@ public function build(ContainerBuilder $container)
2727
{
2828
$container->addCompilerPass(new BuildExtensionsPass());
2929
$container->addCompilerPass(new BuildClientRoutingPass());
30-
$container->addCompilerPass(new BuildMessageProcessorRegistryPass());
30+
$container->addCompilerPass(new BuildProcessorRegistryPass());
3131
$container->addCompilerPass(new BuildTopicMetaSubscribersPass());
3232
$container->addCompilerPass(new BuildQueueMetaRegistryPass());
3333

0 commit comments

Comments
 (0)