Skip to content

Commit c71d0fa

Browse files
committed
Migrate pheanstalk transport
1 parent 971083e commit c71d0fa

7 files changed

+102
-244
lines changed

pkg/pheanstalk/PheanstalkConnectionFactory.php

+5-17
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Pheanstalk;
44

55
use Interop\Queue\PsrConnectionFactory;
6+
use Interop\Queue\PsrContext;
67
use Pheanstalk\Pheanstalk;
78

89
class PheanstalkConnectionFactory implements PsrConnectionFactory
@@ -49,19 +50,14 @@ public function __construct($config = 'beanstalk:')
4950
}
5051

5152
/**
52-
* {@inheritdoc}
53-
*
5453
* @return PheanstalkContext
5554
*/
56-
public function createContext()
55+
public function createContext(): PsrContext
5756
{
5857
return new PheanstalkContext($this->establishConnection());
5958
}
6059

61-
/**
62-
* @return Pheanstalk
63-
*/
64-
private function establishConnection()
60+
private function establishConnection(): Pheanstalk
6561
{
6662
if (false == $this->connection) {
6763
$this->connection = new Pheanstalk(
@@ -75,12 +71,7 @@ private function establishConnection()
7571
return $this->connection;
7672
}
7773

78-
/**
79-
* @param string $dsn
80-
*
81-
* @return array
82-
*/
83-
private function parseDsn($dsn)
74+
private function parseDsn(string $dsn): array
8475
{
8576
$dsnConfig = parse_url($dsn);
8677
if (false === $dsnConfig) {
@@ -112,10 +103,7 @@ private function parseDsn($dsn)
112103
]);
113104
}
114105

115-
/**
116-
* @return array
117-
*/
118-
private function defaultConfig()
106+
private function defaultConfig(): array
119107
{
120108
return [
121109
'host' => 'localhost',

pkg/pheanstalk/PheanstalkConsumer.php

+13-27
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Interop\Queue\InvalidMessageException;
66
use Interop\Queue\PsrConsumer;
77
use Interop\Queue\PsrMessage;
8+
use Interop\Queue\PsrQueue;
89
use Pheanstalk\Job;
910
use Pheanstalk\Pheanstalk;
1011

@@ -20,32 +21,24 @@ class PheanstalkConsumer implements PsrConsumer
2021
*/
2122
private $pheanstalk;
2223

23-
/**
24-
* @param PheanstalkDestination $destination
25-
* @param Pheanstalk $pheanstalk
26-
*/
2724
public function __construct(PheanstalkDestination $destination, Pheanstalk $pheanstalk)
2825
{
2926
$this->destination = $destination;
3027
$this->pheanstalk = $pheanstalk;
3128
}
3229

3330
/**
34-
* {@inheritdoc}
35-
*
3631
* @return PheanstalkDestination
3732
*/
38-
public function getQueue()
33+
public function getQueue(): PsrQueue
3934
{
4035
return $this->destination;
4136
}
4237

4338
/**
44-
* {@inheritdoc}
45-
*
46-
* @return PheanstalkMessage|null
39+
* @return PheanstalkMessage
4740
*/
48-
public function receive($timeout = 0)
41+
public function receive(int $timeout = 0): ?PsrMessage
4942
{
5043
if (0 === $timeout) {
5144
while (true) {
@@ -58,26 +51,26 @@ public function receive($timeout = 0)
5851
return $this->convertJobToMessage($job);
5952
}
6053
}
54+
55+
return null;
6156
}
6257

6358
/**
64-
* {@inheritdoc}
65-
*
66-
* @return PheanstalkMessage|null
59+
* @return PheanstalkMessage
6760
*/
68-
public function receiveNoWait()
61+
public function receiveNoWait(): ?PsrMessage
6962
{
7063
if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), 0)) {
7164
return $this->convertJobToMessage($job);
7265
}
66+
67+
return null;
7368
}
7469

7570
/**
76-
* {@inheritdoc}
77-
*
7871
* @param PheanstalkMessage $message
7972
*/
80-
public function acknowledge(PsrMessage $message)
73+
public function acknowledge(PsrMessage $message): void
8174
{
8275
InvalidMessageException::assertMessageInstanceOf($message, PheanstalkMessage::class);
8376

@@ -89,11 +82,9 @@ public function acknowledge(PsrMessage $message)
8982
}
9083

9184
/**
92-
* {@inheritdoc}
93-
*
9485
* @param PheanstalkMessage $message
9586
*/
96-
public function reject(PsrMessage $message, $requeue = false)
87+
public function reject(PsrMessage $message, bool $requeue = false): void
9788
{
9889
$this->acknowledge($message);
9990

@@ -102,12 +93,7 @@ public function reject(PsrMessage $message, $requeue = false)
10293
}
10394
}
10495

105-
/**
106-
* @param Job $job
107-
*
108-
* @return PheanstalkMessage
109-
*/
110-
private function convertJobToMessage(Job $job)
96+
private function convertJobToMessage(Job $job): PheanstalkMessage
11197
{
11298
$stats = $this->pheanstalk->statsJob($job);
11399

pkg/pheanstalk/PheanstalkContext.php

+31-25
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,17 @@
33
namespace Enqueue\Pheanstalk;
44

55
use Interop\Queue\InvalidDestinationException;
6+
use Interop\Queue\PsrConsumer;
67
use Interop\Queue\PsrContext;
78
use Interop\Queue\PsrDestination;
9+
use Interop\Queue\PsrMessage;
10+
use Interop\Queue\PsrProducer;
11+
use Interop\Queue\PsrQueue;
12+
use Interop\Queue\PsrSubscriptionConsumer;
13+
use Interop\Queue\PsrTopic;
14+
use Interop\Queue\PurgeQueueNotSupportedException;
15+
use Interop\Queue\SubscriptionConsumerNotSupportedException;
16+
use Interop\Queue\TemporaryQueueNotSupportedException;
817
use Pheanstalk\Pheanstalk;
918

1019
class PheanstalkContext implements PsrContext
@@ -14,79 +23,76 @@ class PheanstalkContext implements PsrContext
1423
*/
1524
private $pheanstalk;
1625

17-
/**
18-
* @param Pheanstalk $pheanstalk
19-
*/
2026
public function __construct(Pheanstalk $pheanstalk)
2127
{
2228
$this->pheanstalk = $pheanstalk;
2329
}
2430

2531
/**
26-
* {@inheritdoc}
32+
* @return PheanstalkMessage
2733
*/
28-
public function createMessage($body = '', array $properties = [], array $headers = [])
34+
public function createMessage(string $body = '', array $properties = [], array $headers = []): PsrMessage
2935
{
3036
return new PheanstalkMessage($body, $properties, $headers);
3137
}
3238

3339
/**
34-
* {@inheritdoc}
40+
* @return PheanstalkDestination
3541
*/
36-
public function createTopic($topicName)
42+
public function createTopic(string $topicName): PsrTopic
3743
{
3844
return new PheanstalkDestination($topicName);
3945
}
4046

4147
/**
42-
* {@inheritdoc}
48+
* @return PheanstalkDestination
4349
*/
44-
public function createQueue($queueName)
50+
public function createQueue(string $queueName): PsrQueue
4551
{
4652
return new PheanstalkDestination($queueName);
4753
}
4854

49-
/**
50-
* {@inheritdoc}
51-
*/
52-
public function createTemporaryQueue()
55+
public function createTemporaryQueue(): PsrQueue
5356
{
54-
throw new \LogicException('Not implemented');
57+
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
5558
}
5659

5760
/**
58-
* {@inheritdoc}
59-
*
6061
* @return PheanstalkProducer
6162
*/
62-
public function createProducer()
63+
public function createProducer(): PsrProducer
6364
{
6465
return new PheanstalkProducer($this->pheanstalk);
6566
}
6667

6768
/**
68-
* {@inheritdoc}
69-
*
7069
* @param PheanstalkDestination $destination
7170
*
7271
* @return PheanstalkConsumer
7372
*/
74-
public function createConsumer(PsrDestination $destination)
73+
public function createConsumer(PsrDestination $destination): PsrConsumer
7574
{
7675
InvalidDestinationException::assertDestinationInstanceOf($destination, PheanstalkDestination::class);
7776

7877
return new PheanstalkConsumer($destination, $this->pheanstalk);
7978
}
8079

81-
public function close()
80+
public function close(): void
8281
{
8382
$this->pheanstalk->getConnection()->disconnect();
8483
}
8584

86-
/**
87-
* @return Pheanstalk
88-
*/
89-
public function getPheanstalk()
85+
public function createSubscriptionConsumer(): PsrSubscriptionConsumer
86+
{
87+
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
88+
}
89+
90+
public function purgeQueue(PsrQueue $queue): void
91+
{
92+
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
93+
}
94+
95+
public function getPheanstalk(): Pheanstalk
9096
{
9197
return $this->pheanstalk;
9298
}

pkg/pheanstalk/PheanstalkDestination.php

+6-18
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,23 @@ class PheanstalkDestination implements PsrQueue, PsrTopic
1212
*/
1313
private $destinationName;
1414

15-
/**
16-
* @param string $destinationName
17-
*/
18-
public function __construct($destinationName)
15+
public function __construct(string $destinationName)
1916
{
2017
$this->destinationName = $destinationName;
2118
}
2219

23-
/**
24-
* @return string
25-
*/
26-
public function getName()
20+
public function getName(): string
2721
{
2822
return $this->destinationName;
2923
}
3024

31-
/**
32-
* {@inheritdoc}
33-
*/
34-
public function getQueueName()
25+
public function getQueueName(): string
3526
{
36-
return $this->getName();
27+
return $this->destinationName;
3728
}
3829

39-
/**
40-
* {@inheritdoc}
41-
*/
42-
public function getTopicName()
30+
public function getTopicName(): string
4331
{
44-
return $this->getName();
32+
return $this->destinationName;
4533
}
4634
}

0 commit comments

Comments
 (0)