Skip to content

Commit 5d26312

Browse files
committed
client extension
1 parent 0c76d8d commit 5d26312

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

pkg/enqueue/Client/ChainExtension.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
class ChainExtension implements ExtensionInterface
6+
{
7+
/**
8+
* @var ExtensionInterface[]
9+
*/
10+
private $extensions;
11+
12+
/**
13+
* @param ExtensionInterface[] $extensions
14+
*/
15+
public function __construct(array $extensions)
16+
{
17+
$this->extensions = $extensions;
18+
}
19+
20+
/**
21+
* {@inheritdoc}
22+
*/
23+
public function onPreSend($topic, Message $message)
24+
{
25+
foreach ($this->extensions as $extension) {
26+
$extension->onPreSend($topic, $message);
27+
}
28+
}
29+
30+
/**
31+
* {@inheritdoc}
32+
*/
33+
public function onPostSend($topic, Message $message)
34+
{
35+
foreach ($this->extensions as $extension) {
36+
$extension->onPostSend($topic, $message);
37+
}
38+
}
39+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
namespace Enqueue\Client;
3+
4+
interface ExtensionInterface
5+
{
6+
/**
7+
* @param string $topic
8+
* @param Message $message
9+
* @return
10+
*/
11+
public function onPreSend($topic, Message $message);
12+
13+
/**
14+
* @param string $topic
15+
* @param Message $message
16+
* @return
17+
*/
18+
public function onPostSend($topic, Message $message);
19+
20+
public function onInterrupted($topic, Message $message);
21+
}

pkg/enqueue/Client/Producer.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ class Producer implements ProducerInterface
1212
*/
1313
protected $driver;
1414

15+
/**
16+
* @var ExtensionInterface
17+
*/
18+
private $extension;
19+
1520
/**
1621
* @param DriverInterface $driver
1722
*/
18-
public function __construct(DriverInterface $driver)
23+
public function __construct(DriverInterface $driver, ExtensionInterface $extension = null)
1924
{
2025
$this->driver = $driver;
26+
$this->extension = $extension ?: new ChainExtension([]);
2127
}
2228

2329
/**
@@ -47,6 +53,8 @@ public function send($topic, $message)
4753
$message->setPriority(MessagePriority::NORMAL);
4854
}
4955

56+
$this->extension->onPreSend($topic, $message);
57+
5058
if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) {
5159
if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
5260
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME));
@@ -68,6 +76,8 @@ public function send($topic, $message)
6876
} else {
6977
throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope()));
7078
}
79+
80+
$this->extension->onPostSend($topic, $message);
7181
}
7282

7383
/**

0 commit comments

Comments
 (0)