diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9756e22f5..eba7de96b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,6 +29,7 @@ jobs: - run: sed -i 's/525568/16777471/' vendor/kwn/php-rdkafka-stubs/stubs/constants.php + - run: cd docker && docker build --rm --force-rm --no-cache --pull --tag "enqueue/dev:latest" -f Dockerfile . - run: docker run --workdir="/mqdev" -v "`pwd`:/mqdev" --rm enqueue/dev:latest php -d memory_limit=1024M bin/phpstan analyse -l 1 -c phpstan.neon --error-format=github -- ${{ env.GIT_DIFF_FILTERED }} if: env.GIT_DIFF_FILTERED diff --git a/pkg/sns/SnsDestination.php b/pkg/sns/SnsDestination.php index 0db9d914e..5823fd91e 100644 --- a/pkg/sns/SnsDestination.php +++ b/pkg/sns/SnsDestination.php @@ -70,6 +70,34 @@ public function getDeliveryPolicy(): ?int return $this->getAttribute('DeliveryPolicy'); } + /** + * Only FIFO. + * + * Designates a topic as FIFO. You can provide this attribute only during queue creation. + * You can't change it for an existing topic. When you set this attribute, you must provide aMessageGroupId + * explicitly. + * For more information, see https://docs.aws.amazon.com/sns/latest/dg/sns-fifo-topics.html + */ + public function setFifoTopic(bool $enable): void + { + $value = $enable ? 'true' : null; + + $this->setAttribute('FifoTopic', $value); + } + + /** + * Only FIFO. + * + * Enables content-based deduplication. + * For more information, see: https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html + */ + public function setContentBasedDeduplication(bool $enable): void + { + $value = $enable ? 'true' : null; + + $this->setAttribute('ContentBasedDeduplication', $value); + } + public function getAttributes(): array { return $this->attributes; diff --git a/pkg/sns/SnsMessage.php b/pkg/sns/SnsMessage.php index f7311d0b0..43c85e553 100644 --- a/pkg/sns/SnsMessage.php +++ b/pkg/sns/SnsMessage.php @@ -42,21 +42,22 @@ class SnsMessage implements Message */ private $targetArn; + /** + * @var string|null + */ + private $messageGroupId; + + /** + * @var string|null + */ + private $messageDeduplicationId; + /** * SnsMessage constructor. * * See AWS documentation for message attribute structure. * * @see https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sns-2010-03-31.html#shape-messageattributevalue - * - * @param string $body - * @param array $properties - * @param array $headers - * @param array|null $messageAttributes - * @param string|null $messageStructure - * @param string|null $phoneNumber - * @param string|null $subject - * @param string|null $targetArn */ public function __construct( string $body = '', @@ -79,89 +80,58 @@ public function __construct( $this->redelivered = false; } - /** - * @return string|null - */ public function getSnsMessageId(): ?string { return $this->snsMessageId; } - /** - * @param string|null $snsMessageId - */ public function setSnsMessageId(?string $snsMessageId): void { $this->snsMessageId = $snsMessageId; } - /** - * @return string|null - */ public function getMessageStructure(): ?string { return $this->messageStructure; } - /** - * @param string|null $messageStructure - */ public function setMessageStructure(?string $messageStructure): void { $this->messageStructure = $messageStructure; } - /** - * @return string|null - */ public function getPhoneNumber(): ?string { return $this->phoneNumber; } - /** - * @param string|null $phoneNumber - */ public function setPhoneNumber(?string $phoneNumber): void { $this->phoneNumber = $phoneNumber; } - /** - * @return string|null - */ public function getSubject(): ?string { return $this->subject; } - /** - * @param string|null $subject - */ public function setSubject(?string $subject): void { $this->subject = $subject; } - /** - * @return array|null - */ public function getMessageAttributes(): ?array { return $this->messageAttributes; } - /** - * @param array|null $messageAttributes - */ public function setMessageAttributes(?array $messageAttributes): void { $this->messageAttributes = $messageAttributes; } /** - * @param string $name - * @param null $default + * @param null $default * * @return array|null */ @@ -177,9 +147,6 @@ public function getAttribute(string $name, $default = null) * 'DataType' => '', // REQUIRED * 'StringValue' => '', * ]. - * - * @param string $name - * @param array|null $attribute */ public function setAttribute(string $name, ?array $attribute): void { @@ -191,7 +158,6 @@ public function setAttribute(string $name, ?array $attribute): void } /** - * @param string $name * @param string $dataType String, String.Array, Number, or Binary * @param string|resource|StreamInterface $value */ @@ -205,19 +171,52 @@ public function addAttribute(string $name, string $dataType, $value): void ]; } - /** - * @return string|null - */ public function getTargetArn(): ?string { return $this->targetArn; } - /** - * @param string|null $targetArn - */ public function setTargetArn(?string $targetArn): void { $this->targetArn = $targetArn; } + + /** + * Only FIFO. + * + * The tag that specifies that a message belongs to a specific message group. Messages that belong to the same + * message group are processed in a FIFO manner (however, messages in different message groups might be processed + * out of order). + * To interleave multiple ordered streams within a single queue, use MessageGroupId values (for example, session + * data for multiple users). In this scenario, multiple readers can process the queue, but the session data + * of each user is processed in a FIFO fashion. + * For more information, see: https://docs.aws.amazon.com/sns/latest/dg/fifo-message-grouping.html + */ + public function setMessageGroupId(string $id = null): void + { + $this->messageGroupId = $id; + } + + public function getMessageGroupId(): ?string + { + return $this->messageGroupId; + } + + /** + * Only FIFO. + * + * The token used for deduplication of sent messages. If a message with a particular MessageDeduplicationId is + * sent successfully, any messages sent with the same MessageDeduplicationId are accepted successfully but + * aren't delivered during the 5-minute deduplication interval. + * For more information, see https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html + */ + public function setMessageDeduplicationId(string $id = null): void + { + $this->messageDeduplicationId = $id; + } + + public function getMessageDeduplicationId(): ?string + { + return $this->messageDeduplicationId; + } } diff --git a/pkg/sns/SnsProducer.php b/pkg/sns/SnsProducer.php index d4eba04d0..bbd46a96f 100644 --- a/pkg/sns/SnsProducer.php +++ b/pkg/sns/SnsProducer.php @@ -77,6 +77,14 @@ public function send(Destination $destination, Message $message): void $arguments['TargetArn'] = $targetArn; } + if ($messageGroupId = $message->getMessageGroupId()) { + $arguments['MessageGroupId'] = $messageGroupId; + } + + if ($messageDeduplicationId = $message->getMessageDeduplicationId()) { + $arguments['MessageDeduplicationId'] = $messageDeduplicationId; + } + $result = $this->context->getSnsClient()->publish($arguments); if (false == $result->hasKey('MessageId')) {