-
Notifications
You must be signed in to change notification settings - Fork 439
/
Copy pathSqsDriver.php
62 lines (50 loc) · 1.87 KB
/
SqsDriver.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?php
namespace Enqueue\Client\Driver;
use Enqueue\Sqs\SqsContext;
use Enqueue\Sqs\SqsDestination;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
/**
* @method SqsContext getContext
* @method SqsDestination createQueue(string $clientQueueName): InteropQueue
*/
class SqsDriver extends GenericDriver
{
public function __construct(SqsContext $context, ...$args)
{
parent::__construct($context, ...$args);
}
public function setupBroker(?LoggerInterface $logger = null): void
{
$logger = $logger ?: new NullLogger();
$log = function ($text, ...$args) use ($logger) {
$logger->debug(sprintf('[SqsDriver] '.$text, ...$args));
};
// setup router
$routerQueue = $this->createQueue($this->getConfig()->getRouterQueue());
$log('Declare router queue: %s', $routerQueue->getQueueName());
$this->getContext()->declareQueue($routerQueue);
// setup queues
$declaredQueues = [];
foreach ($this->getRouteCollection()->all() as $route) {
/** @var SqsDestination $queue */
$queue = $this->createRouteQueue($route);
if (array_key_exists($queue->getQueueName(), $declaredQueues)) {
continue;
}
$log('Declare processor queue: %s', $queue->getQueueName());
$this->getContext()->declareQueue($queue);
$declaredQueues[$queue->getQueueName()] = true;
}
}
protected function createTransportRouterTopicName(string $name, bool $prefix): string
{
$name = parent::createTransportRouterTopicName($name, $prefix);
return str_replace('.', '_dot_', $name);
}
protected function createTransportQueueName(string $name, bool $prefix): string
{
$name = parent::createTransportQueueName($name, $prefix);
return str_replace('.', '_dot_', $name);
}
}