Skip to content

Commit 5f33f18

Browse files
committed
Support "parallel" message processing during idle Webklex#338
1 parent af3ad97 commit 5f33f18

File tree

1 file changed

+39
-34
lines changed

1 file changed

+39
-34
lines changed

src/Folder.php

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -421,48 +421,53 @@ public function idle(callable $callback, int $timeout = 300): void {
421421
if (!in_array("IDLE", $this->client->getConnection()->getCapabilities()->validatedData())) {
422422
throw new Exceptions\NotSupportedCapabilityException("IMAP server does not support IDLE");
423423
}
424-
$this->client->openFolder($this->path, true);
425-
$connection = $this->client->getConnection();
426-
$connection->idle();
424+
425+
$idle_client = $this->client->clone();
426+
$idle_client->connect();
427+
$idle_client->openFolder($this->path, true);
428+
$idle_client->getConnection()->idle();
429+
430+
$last_action = Carbon::now()->addSeconds($timeout);
427431

428432
$sequence = ClientManager::get('options.sequence', IMAP::ST_MSGN);
429433

430434
while (true) {
431-
try {
432-
// This polymorphic call is fine - Protocol::idle() will throw an exception beforehand
433-
$line = $connection->nextLine();
434-
435-
if (($pos = strpos($line, "EXISTS")) !== false) {
436-
$connection->done();
437-
$msgn = (int) substr($line, 2, $pos -2);
438-
439-
$this->client->openFolder($this->path, true);
440-
$message = $this->query()->getMessageByMsgn($msgn);
441-
$message->setSequence($sequence);
442-
$callback($message);
443-
444-
$event = $this->getEvent("message", "new");
445-
$event::dispatch($message);
446-
$connection->idle();
447-
} elseif (strpos($line, "OK") === false) {
448-
$connection->done();
449-
$connection->idle();
450-
}
451-
}catch (Exceptions\RuntimeException $e) {
452-
if(strpos($e->getMessage(), "empty response") >= 0 && $connection->connected()) {
453-
$connection->done();
454-
$connection->idle();
455-
continue;
456-
}
457-
if(strpos($e->getMessage(), "connection closed") === false) {
458-
throw $e;
435+
// This polymorphic call is fine - Protocol::idle() will throw an exception beforehand
436+
$line = $idle_client->getConnection()->nextLine(Response::empty());
437+
438+
if (($pos = strpos($line, "EXISTS")) !== false) {
439+
$msgn = (int)substr($line, 2, $pos - 2);
440+
441+
// Check if the stream is still alive or should be considered stale
442+
if (!$this->client->isConnected() || $last_action->isBefore(Carbon::now())) {
443+
// Reset the connection before interacting with it. Otherwise, the resource might be stale which
444+
// would result in a stuck interaction. If you know of a way of detecting a stale resource, please
445+
// feel free to improve this logic. I tried a lot but nothing seem to work reliably...
446+
// Things that didn't work:
447+
// - Closing the resource with fclose()
448+
// - Verifying the resource with stream_get_meta_data()
449+
// - Bool validating the resource stream (e.g.: (bool)$stream)
450+
// - Sending a NOOP command
451+
// - Sending a null package
452+
// - Reading a null package
453+
// - Catching the fs warning
454+
455+
// This polymorphic call is fine - Protocol::idle() will throw an exception beforehand
456+
$this->client->getConnection()->reset();
457+
// Establish a new connection
458+
$this->client->connect();
459459
}
460+
$last_action = Carbon::now()->addSeconds($timeout);
460461

461-
$this->client->reconnect();
462+
// Always reopen the folder - otherwise the new message number isn't known to the current remote session
462463
$this->client->openFolder($this->path, true);
463464

464-
$connection = $this->client->getConnection();
465-
$connection->idle();
465+
$message = $this->query()->getMessageByMsgn($msgn);
466+
$message->setSequence($sequence);
467+
$callback($message);
468+
469+
$event = $this->getEvent("message", "new");
470+
$event::dispatch($message);
466471
}
467472
}
468473
}

0 commit comments

Comments
 (0)