Skip to content

Commit ec1e022

Browse files
committed
[kafka] migrate kafka transport
1 parent 3371538 commit ec1e022

10 files changed

+109
-302
lines changed

JsonSerializer.php

+2-8
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44

55
class JsonSerializer implements Serializer
66
{
7-
/**
8-
* {@inheritdoc}
9-
*/
10-
public function toString(RdKafkaMessage $message)
7+
public function toString(RdKafkaMessage $message): string
118
{
129
$json = json_encode([
1310
'body' => $message->getBody(),
@@ -26,10 +23,7 @@ public function toString(RdKafkaMessage $message)
2623
return $json;
2724
}
2825

29-
/**
30-
* {@inheritdoc}
31-
*/
32-
public function toMessage($string)
26+
public function toMessage(string $string): RdKafkaMessage
3327
{
3428
$data = json_decode($string, true);
3529
if (JSON_ERROR_NONE !== json_last_error()) {

RdKafkaConnectionFactory.php

+4-13
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\RdKafka;
44

55
use Interop\Queue\PsrConnectionFactory;
6+
use Interop\Queue\PsrContext;
67

78
class RdKafkaConnectionFactory implements PsrConnectionFactory
89
{
@@ -48,21 +49,14 @@ public function __construct($config = 'kafka:')
4849
}
4950

5051
/**
51-
* {@inheritdoc}
52-
*
5352
* @return RdKafkaContext
5453
*/
55-
public function createContext()
54+
public function createContext(): PsrContext
5655
{
5756
return new RdKafkaContext($this->config);
5857
}
5958

60-
/**
61-
* @param string $dsn
62-
*
63-
* @return array
64-
*/
65-
private function parseDsn($dsn)
59+
private function parseDsn(string $dsn): array
6660
{
6761
$dsnConfig = parse_url($dsn);
6862
if (false === $dsnConfig) {
@@ -98,10 +92,7 @@ private function parseDsn($dsn)
9892
return array_replace_recursive($this->defaultConfig(), $config);
9993
}
10094

101-
/**
102-
* @return array
103-
*/
104-
private function defaultConfig()
95+
private function defaultConfig(): array
10596
{
10697
return [
10798
'global' => [

RdKafkaConsumer.php

+16-36
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Interop\Queue\InvalidMessageException;
66
use Interop\Queue\PsrConsumer;
77
use Interop\Queue\PsrMessage;
8+
use Interop\Queue\PsrQueue;
89
use RdKafka\KafkaConsumer;
910
use RdKafka\TopicPartition;
1011

@@ -42,12 +43,6 @@ class RdKafkaConsumer implements PsrConsumer
4243
*/
4344
private $offset;
4445

45-
/**
46-
* @param KafkaConsumer $consumer
47-
* @param RdKafkaContext $context
48-
* @param RdKafkaTopic $topic
49-
* @param Serializer $serializer
50-
*/
5146
public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic, Serializer $serializer)
5247
{
5348
$this->consumer = $consumer;
@@ -59,23 +54,17 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
5954
$this->setSerializer($serializer);
6055
}
6156

62-
/**
63-
* @return bool
64-
*/
65-
public function isCommitAsync()
57+
public function isCommitAsync(): bool
6658
{
6759
return $this->commitAsync;
6860
}
6961

70-
/**
71-
* @param bool $async
72-
*/
73-
public function setCommitAsync($async)
62+
public function setCommitAsync(bool $async): void
7463
{
75-
$this->commitAsync = (bool) $async;
64+
$this->commitAsync = $async;
7665
}
7766

78-
public function setOffset($offset)
67+
public function setOffset(int $offset = null): void
7968
{
8069
if ($this->subscribed) {
8170
throw new \LogicException('The consumer has already subscribed.');
@@ -84,18 +73,15 @@ public function setOffset($offset)
8473
$this->offset = $offset;
8574
}
8675

87-
/**
88-
* {@inheritdoc}
89-
*/
90-
public function getQueue()
76+
public function getQueue(): PsrQueue
9177
{
9278
return $this->topic;
9379
}
9480

9581
/**
96-
* {@inheritdoc}
82+
* @return RdKafkaMessage
9783
*/
98-
public function receive($timeout = 0)
84+
public function receive(int $timeout = 0): ?PsrMessage
9985
{
10086
if (false === $this->subscribed) {
10187
if (null === $this->offset) {
@@ -107,6 +93,7 @@ public function receive($timeout = 0)
10793
$this->offset
10894
)]);
10995
}
96+
11097
$this->subscribed = true;
11198
}
11299

@@ -125,19 +112,17 @@ public function receive($timeout = 0)
125112
}
126113

127114
/**
128-
* {@inheritdoc}
115+
* @return RdKafkaMessage
129116
*/
130-
public function receiveNoWait()
117+
public function receiveNoWait(): ?PsrMessage
131118
{
132119
throw new \LogicException('Not implemented');
133120
}
134121

135122
/**
136-
* {@inheritdoc}
137-
*
138123
* @param RdKafkaMessage $message
139124
*/
140-
public function acknowledge(PsrMessage $message)
125+
public function acknowledge(PsrMessage $message): void
141126
{
142127
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);
143128

@@ -153,11 +138,9 @@ public function acknowledge(PsrMessage $message)
153138
}
154139

155140
/**
156-
* {@inheritdoc}
157-
*
158141
* @param RdKafkaMessage $message
159142
*/
160-
public function reject(PsrMessage $message, $requeue = false)
143+
public function reject(PsrMessage $message, bool $requeue = false): void
161144
{
162145
$this->acknowledge($message);
163146

@@ -166,12 +149,7 @@ public function reject(PsrMessage $message, $requeue = false)
166149
}
167150
}
168151

169-
/**
170-
* @param int $timeout
171-
*
172-
* @return RdKafkaMessage|null
173-
*/
174-
private function doReceive($timeout)
152+
private function doReceive(int $timeout): ?RdKafkaMessage
175153
{
176154
$kafkaMessage = $this->consumer->consume($timeout);
177155

@@ -190,5 +168,7 @@ private function doReceive($timeout)
190168
throw new \LogicException($kafkaMessage->errstr(), $kafkaMessage->err);
191169
break;
192170
}
171+
172+
return null;
193173
}
194174
}

RdKafkaContext.php

+32-31
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,17 @@
33
namespace Enqueue\RdKafka;
44

55
use Interop\Queue\InvalidDestinationException;
6+
use Interop\Queue\PsrConsumer;
67
use Interop\Queue\PsrContext;
78
use Interop\Queue\PsrDestination;
9+
use Interop\Queue\PsrMessage;
10+
use Interop\Queue\PsrProducer;
11+
use Interop\Queue\PsrQueue;
12+
use Interop\Queue\PsrSubscriptionConsumer;
13+
use Interop\Queue\PsrTopic;
14+
use Interop\Queue\PurgeQueueNotSupportedException;
15+
use Interop\Queue\SubscriptionConsumerNotSupportedException;
16+
use Interop\Queue\TemporaryQueueNotSupportedException;
817
use RdKafka\Conf;
918
use RdKafka\KafkaConsumer;
1019
use RdKafka\Producer;
@@ -46,57 +55,48 @@ public function __construct(array $config)
4655
}
4756

4857
/**
49-
* {@inheritdoc}
58+
* @return RdKafkaMessage
5059
*/
51-
public function createMessage($body = '', array $properties = [], array $headers = [])
60+
public function createMessage(string $body = '', array $properties = [], array $headers = []): PsrMessage
5261
{
5362
return new RdKafkaMessage($body, $properties, $headers);
5463
}
5564

5665
/**
57-
* {@inheritdoc}
58-
*
5966
* @return RdKafkaTopic
6067
*/
61-
public function createTopic($topicName)
68+
public function createTopic(string $topicName): PsrTopic
6269
{
6370
return new RdKafkaTopic($topicName);
6471
}
6572

6673
/**
67-
* {@inheritdoc}
68-
*
6974
* @return RdKafkaTopic
7075
*/
71-
public function createQueue($queueName)
76+
public function createQueue(string $queueName): PsrQueue
7277
{
7378
return new RdKafkaTopic($queueName);
7479
}
7580

76-
/**
77-
* {@inheritdoc}
78-
*/
79-
public function createTemporaryQueue()
81+
public function createTemporaryQueue(): PsrQueue
8082
{
81-
throw new \LogicException('Not implemented');
83+
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
8284
}
8385

8486
/**
85-
* {@inheritdoc}
86-
*
8787
* @return RdKafkaProducer
8888
*/
89-
public function createProducer()
89+
public function createProducer(): PsrProducer
9090
{
9191
return new RdKafkaProducer($this->getProducer(), $this->getSerializer());
9292
}
9393

9494
/**
95-
* {@inheritdoc}
96-
*
9795
* @param RdKafkaTopic $destination
96+
*
97+
* @return RdKafkaConsumer
9898
*/
99-
public function createConsumer(PsrDestination $destination)
99+
public function createConsumer(PsrDestination $destination): PsrConsumer
100100
{
101101
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
102102

@@ -116,10 +116,7 @@ public function createConsumer(PsrDestination $destination)
116116
return $consumer;
117117
}
118118

119-
/**
120-
* {@inheritdoc}
121-
*/
122-
public function close()
119+
public function close(): void
123120
{
124121
$kafkaConsumers = $this->kafkaConsumers;
125122
$this->kafkaConsumers = [];
@@ -129,10 +126,17 @@ public function close()
129126
}
130127
}
131128

132-
/**
133-
* @return Producer
134-
*/
135-
private function getProducer()
129+
public function createSubscriptionConsumer(): PsrSubscriptionConsumer
130+
{
131+
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
132+
}
133+
134+
public function purgeQueue(PsrQueue $queue): void
135+
{
136+
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
137+
}
138+
139+
private function getProducer(): Producer
136140
{
137141
if (null === $this->producer) {
138142
$this->producer = new Producer($this->getConf());
@@ -145,10 +149,7 @@ private function getProducer()
145149
return $this->producer;
146150
}
147151

148-
/**
149-
* @return Conf
150-
*/
151-
private function getConf()
152+
private function getConf(): Conf
152153
{
153154
if (null === $this->conf) {
154155
$topicConf = new TopicConf();

0 commit comments

Comments
 (0)