Skip to content

Commit 08fb26c

Browse files
chore(Pubsub): Fix Avro samples (GoogleCloudPlatform#1813)
1 parent 6232bd3 commit 08fb26c

File tree

3 files changed

+12
-13
lines changed

3 files changed

+12
-13
lines changed

pubsub/api/src/publish_avro_records.php

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
use AvroStringIO;
3030
use AvroSchema;
3131
use AvroIODatumWriter;
32-
use AvroDataIOWriter;
32+
use AvroIOBinaryEncoder;
3333

3434
/**
3535
* Publish a message using an AVRO schema.
@@ -81,14 +81,10 @@ function publish_avro_records($projectId, $topicId, $definitionFile)
8181
$io = new AvroStringIO();
8282
$schema = AvroSchema::parse($definition);
8383
$writer = new AvroIODatumWriter($schema);
84-
$dataWriter = new AvroDataIOWriter($io, $writer, $schema);
84+
$encoder = new AvroIOBinaryEncoder($io);
85+
$writer->write($messageData, $encoder);
8586

86-
$dataWriter->append($messageData);
87-
88-
$dataWriter->close();
89-
90-
// AVRO binary data must be base64-encoded.
91-
$encodedMessageData = base64_encode($io->string());
87+
$encodedMessageData = $io->string();
9288
} else {
9389
// encode as JSON.
9490
$encodedMessageData = json_encode($messageData);

pubsub/api/src/subscribe_avro_records.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,26 @@
3131
* @param string $projectId
3232
* @param string $subscriptionId
3333
*/
34-
function subscribe_avro_records($projectId, $subscriptionId)
34+
function subscribe_avro_records($projectId, $subscriptionId, $definitionFile)
3535
{
3636
$pubsub = new PubSubClient([
3737
'projectId' => $projectId,
3838
]);
3939

4040
$subscription = $pubsub->subscription($subscriptionId);
41+
$definition = file_get_contents($definitionFile);
4142
$messages = $subscription->pull();
4243

4344
foreach ($messages as $message) {
4445
$decodedMessageData = '';
4546
$encoding = $message->attribute('googclient_schemaencoding');
4647
switch ($encoding) {
4748
case 'BINARY':
48-
$ioReader = new \AvroStringIO(base64_decode($message->data()));
49-
$dataReader = new \AvroDataIOReader($ioReader, new \AvroIODatumReader());
50-
51-
$decodedMessageData = json_encode($dataReader->data());
49+
$io = new \AvroStringIO($message->data());
50+
$schema = \AvroSchema::parse($definition);
51+
$reader = new \AvroIODatumReader($schema);
52+
$decoder = new \AvroIOBinaryDecoder($io);
53+
$decodedMessageData = json_encode($reader->read($decoder));
5254
break;
5355
case 'JSON':
5456
$decodedMessageData = $message->data();

pubsub/api/test/SchemaTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ public function testPublishAndSubscribeAvro($encoding)
210210
$subscribeOutput = $this->runFunctionSnippet('subscribe_avro_records', [
211211
self::$projectId,
212212
$subscriptionId,
213+
self::AVRO_DEFINITION,
213214
]);
214215

215216
$this->assertStringContainsString(

0 commit comments

Comments
 (0)