Skip to content

Commit 4a128f3

Browse files
committed
[beanstalk] Add transport for beanstalkd
1 parent 591f889 commit 4a128f3

36 files changed

+1826
-0
lines changed

bin/test

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ function waitForService()
2323
waitForService rabbitmq 5672 50
2424
waitForService mysql 3306 50
2525
waitForService redis 6379 50
26+
waitForService beanstalkd 11300
2627

2728
php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
2829
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force

composer.json

+5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"enqueue/null": "*@dev",
1414
"enqueue/dbal": "*@dev",
1515
"enqueue/sqs": "*@dev",
16+
"enqueue/pheanstalk": "*@dev",
1617
"enqueue/enqueue-bundle": "*@dev",
1718
"enqueue/job-queue": "*@dev",
1819
"enqueue/simple-client": "*@dev",
@@ -85,6 +86,10 @@
8586
"type": "path",
8687
"url": "pkg/sqs"
8788
},
89+
{
90+
"type": "path",
91+
"url": "pkg/pheanstalk"
92+
},
8893
{
8994
"type": "path",
9095
"url": "pkg/simple-client"

docker-compose.yml

+11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
version: '2'
2+
23
services:
34
dev:
45
image: enqueue/dev:latest
@@ -7,6 +8,7 @@ services:
78
- rabbitmq
89
- mysql
910
- redis
11+
- beanstalkd
1012
volumes:
1113
- './:/mqdev'
1214
environment:
@@ -29,6 +31,9 @@ services:
2931
- AWS__SQS__KEY=$ENQUEUE_AWS__SQS__KEY
3032
- AWS__SQS__SECRET=$ENQUEUE_AWS__SQS__SECRET
3133
- AWS__SQS__REGION=$ENQUEUE_AWS__SQS__REGION
34+
- BEANSTALKD_HOST=beanstalkd
35+
- BEANSTALKD_PORT=11300
36+
- BEANSTALKD_DSN=beanstalk://beanstalkd:11300
3237

3338
rabbitmq:
3439
image: enqueue/rabbitmq:latest
@@ -37,6 +42,12 @@ services:
3742
- RABBITMQ_DEFAULT_USER=guest
3843
- RABBITMQ_DEFAULT_PASS=guest
3944
- RABBITMQ_DEFAULT_VHOST=mqdev
45+
ports:
46+
- "15677:15672"
47+
48+
beanstalkd:
49+
image: 'schickling/beanstalkd'
50+
4051
redis:
4152
image: 'redis:3'
4253
ports:

phpunit.xml.dist

+4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@
4949
<directory>pkg/sqs/Tests</directory>
5050
</testsuite>
5151

52+
<testsuite name="pheanstalk transport">
53+
<directory>pkg/pheanstalk/Tests</directory>
54+
</testsuite>
55+
5256
<testsuite name="enqueue-bundle">
5357
<directory>pkg/enqueue-bundle/Tests</directory>
5458
</testsuite>

pkg/pheanstalk/LICENSE

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
The MIT License (MIT)
2+
Copyright (c) 2017 Kotliar Maksym
3+
4+
Permission is hereby granted, free of charge, to any person obtaining a copy
5+
of this software and associated documentation files (the "Software"), to deal
6+
in the Software without restriction, including without limitation the rights
7+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
copies of the Software, and to permit persons to whom the Software is furnished
9+
to do so, subject to the following conditions:
10+
11+
The above copyright notice and this permission notice shall be included in all
12+
copies or substantial portions of the Software.
13+
14+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
THE SOFTWARE.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
<?php
2+
3+
namespace Enqueue\Pheanstalk;
4+
5+
use Enqueue\Psr\PsrConnectionFactory;
6+
use Pheanstalk\Pheanstalk;
7+
8+
class PheanstalkConnectionFactory implements PsrConnectionFactory
9+
{
10+
/**
11+
* @var array
12+
*/
13+
private $config;
14+
15+
/**
16+
* @var Pheanstalk
17+
*/
18+
private $connection;
19+
20+
/**
21+
* The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default settings.
22+
*
23+
* [
24+
* 'host' => 'localhost',
25+
* 'port' => 11300,
26+
* 'timeout' => null,
27+
* 'persisted' => true,
28+
* ]
29+
*
30+
* or
31+
*
32+
* beanstalk://host:port
33+
*
34+
* @param array|string $config
35+
*/
36+
public function __construct($config = 'beanstalk://')
37+
{
38+
if (empty($config) || 'beanstalk://' === $config) {
39+
$config = [];
40+
} elseif (is_string($config)) {
41+
$config = $this->parseDsn($config);
42+
} elseif (is_array($config)) {
43+
} else {
44+
throw new \LogicException('The config must be either an array of options, a DSN string or null');
45+
}
46+
47+
$this->config = array_replace($this->defaultConfig(), $config);
48+
}
49+
50+
/**
51+
* {@inheritdoc}
52+
*
53+
* @return PheanstalkContext
54+
*/
55+
public function createContext()
56+
{
57+
return new PheanstalkContext($this->establishConnection());
58+
}
59+
60+
/**
61+
* @return Pheanstalk
62+
*/
63+
private function establishConnection()
64+
{
65+
if (false == $this->connection) {
66+
$this->connection = new Pheanstalk(
67+
$this->config['host'],
68+
$this->config['port'],
69+
$this->config['timeout'],
70+
$this->config['persisted']
71+
);
72+
}
73+
74+
return $this->connection;
75+
}
76+
77+
/**
78+
* @param string $dsn
79+
*
80+
* @return array
81+
*/
82+
private function parseDsn($dsn)
83+
{
84+
$dsnConfig = parse_url($dsn);
85+
if (false === $dsnConfig) {
86+
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
87+
}
88+
89+
$dsnConfig = array_replace([
90+
'scheme' => null,
91+
'host' => null,
92+
'port' => null,
93+
'user' => null,
94+
'pass' => null,
95+
'path' => null,
96+
'query' => null,
97+
], $dsnConfig);
98+
99+
if ('beanstalk' !== $dsnConfig['scheme']) {
100+
throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "beanstalk" only.', $dsnConfig['scheme']));
101+
}
102+
103+
$query = [];
104+
if ($dsnConfig['query']) {
105+
parse_str($dsnConfig['query'], $query);
106+
}
107+
108+
return array_replace($query, [
109+
'port' => $dsnConfig['port'],
110+
'host' => $dsnConfig['host'],
111+
]);
112+
}
113+
114+
/**
115+
* @return array
116+
*/
117+
private function defaultConfig()
118+
{
119+
return [
120+
'host' => 'localhost',
121+
'port' => Pheanstalk::DEFAULT_PORT,
122+
'timeout' => null,
123+
'persisted' => true,
124+
];
125+
}
126+
}

pkg/pheanstalk/PheanstalkConsumer.php

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
<?php
2+
3+
namespace Enqueue\Pheanstalk;
4+
5+
use Enqueue\Psr\InvalidMessageException;
6+
use Enqueue\Psr\PsrConsumer;
7+
use Enqueue\Psr\PsrMessage;
8+
use Pheanstalk\Job;
9+
use Pheanstalk\Pheanstalk;
10+
11+
class PheanstalkConsumer implements PsrConsumer
12+
{
13+
/**
14+
* @var PheanstalkDestination
15+
*/
16+
private $destination;
17+
18+
/**
19+
* @var Pheanstalk
20+
*/
21+
private $pheanstalk;
22+
23+
/**
24+
* @param PheanstalkDestination $destination
25+
* @param Pheanstalk $pheanstalk
26+
*/
27+
public function __construct(PheanstalkDestination $destination, Pheanstalk $pheanstalk)
28+
{
29+
$this->destination = $destination;
30+
$this->pheanstalk = $pheanstalk;
31+
}
32+
33+
/**
34+
* {@inheritdoc}
35+
*
36+
* @return PheanstalkDestination
37+
*/
38+
public function getQueue()
39+
{
40+
return $this->destination;
41+
}
42+
43+
/**
44+
* {@inheritdoc}
45+
*
46+
* @return PheanstalkMessage|null
47+
*/
48+
public function receive($timeout = 0)
49+
{
50+
if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), $timeout / 1000)) {
51+
return $this->convertJobToMessage($job);
52+
}
53+
}
54+
55+
/**
56+
* {@inheritdoc}
57+
*
58+
* @return PheanstalkMessage|null
59+
*/
60+
public function receiveNoWait()
61+
{
62+
if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), 0)) {
63+
return $this->convertJobToMessage($job);
64+
}
65+
}
66+
67+
/**
68+
* {@inheritdoc}
69+
*
70+
* @param PheanstalkMessage $message
71+
*/
72+
public function acknowledge(PsrMessage $message)
73+
{
74+
InvalidMessageException::assertMessageInstanceOf($message, PheanstalkMessage::class);
75+
76+
if (false == $message->getJob()) {
77+
throw new \LogicException('The message could not be acknowledged because it does not have job set.');
78+
}
79+
80+
$this->pheanstalk->delete($message->getJob());
81+
}
82+
83+
/**
84+
* {@inheritdoc}
85+
*
86+
* @param PheanstalkMessage $message
87+
*/
88+
public function reject(PsrMessage $message, $requeue = false)
89+
{
90+
$this->acknowledge($message);
91+
92+
if ($requeue) {
93+
$this->pheanstalk->release($message->getJob(), $message->getPriority(), $message->getDelay());
94+
}
95+
}
96+
97+
/**
98+
* @param Job $job
99+
*
100+
* @return PheanstalkMessage
101+
*/
102+
private function convertJobToMessage(Job $job)
103+
{
104+
$message = PheanstalkMessage::jsonUnserialize($job->getData());
105+
$message->setJob($job);
106+
107+
return $message;
108+
}
109+
}

0 commit comments

Comments
 (0)