Skip to content

Commit 7c893a2

Browse files
committed
Merge pull request barryvdh#33 from gurkov/0.4
Delayed task implementation
2 parents 4b4c963 + 7d0c783 commit 7c893a2

File tree

2 files changed

+36
-32
lines changed

2 files changed

+36
-32
lines changed

src/AsyncQueue.php

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
<?php
22
namespace Barryvdh\Queue;
33

4+
use Carbon\Carbon;
5+
use DateTime;
46
use Illuminate\Database\Connection;
57
use Illuminate\Queue\DatabaseQueue;
68
use Illuminate\Queue\Jobs\DatabaseJob;
@@ -45,7 +47,7 @@ public function __construct(Connection $database, $table, $default = 'default',
4547
public function push($job, $data = '', $queue = null)
4648
{
4749
$id = parent::push($job, $data, $queue);
48-
$this->startProcess($queue, $id);
50+
$this->startProcess($id);
4951

5052
return $id;
5153
}
@@ -61,7 +63,7 @@ public function push($job, $data = '', $queue = null)
6163
public function pushRaw($payload, $queue = null, array $options = array())
6264
{
6365
$id = parent::push($job, $data, $queue);
64-
$this->startProcess($queue, $id);
66+
$this->startProcess($id);
6567

6668
return $id;
6769
}
@@ -79,7 +81,7 @@ public function pushRaw($payload, $queue = null, array $options = array())
7981
public function later($delay, $job, $data = '', $queue = null)
8082
{
8183
$id = parent::later($delay, $job, $data, $queue);
82-
$this->startProcess($queue, $id);
84+
$this->startProcess($id);
8385

8486
return $id;
8587
}
@@ -95,36 +97,46 @@ public function later($delay, $job, $data = '', $queue = null)
9597
public function release($queue, $job, $delay)
9698
{
9799
$id = parent::release($queue, $job, $delay);
98-
$this->startProcess($queue, $id);
100+
$this->startProcess($id);
99101

100102
return $id;
101103
}
102104

105+
protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
106+
{
107+
$availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
108+
109+
return $this->database->table($this->table)->insertGetId([
110+
'queue' => $this->getQueue($queue),
111+
'payload' => $payload,
112+
'attempts' => $attempts,
113+
'reserved' => 1,
114+
'reserved_at' => $this->getTime(),
115+
'available_at' => $availableAt->getTimestamp(),
116+
'created_at' => $this->getTime(),
117+
]);
118+
}
119+
103120
/**
104121
* Get the next available job for the queue.
105122
*
106123
* @param string|null $queue
107124
* @return \StdClass|null
108125
*/
109-
public function getJobFromId($queue, $id)
126+
public function getJobFromId($id)
110127
{
111-
$this->database->beginTransaction();
112128
$job = $this->database->table($this->table)
113-
->lockForUpdate()
114-
->where('queue', $this->getQueue($queue))
115-
->where('reserved', 0)
116129
->where('id', $id)
117130
->first();
118131

119132
if($job) {
120-
$this->markJobAsReserved($job->id);
121133

122134
return new DatabaseJob(
123-
$this->container, $this, $job, $queue
135+
$this->container, $this, $job, $job->queue
124136
);
125137
}
126138
}
127-
139+
128140
/**
129141
* Make a Process for the Artisan command for the job id.
130142
*
@@ -133,9 +145,9 @@ public function getJobFromId($queue, $id)
133145
*
134146
* @return void
135147
*/
136-
public function startProcess($queue, $id)
148+
public function startProcess($id)
137149
{
138-
$command = $this->getCommand($queue, $id);
150+
$command = $this->getCommand($id);
139151
$cwd = base_path();
140152

141153
$process = new Process($command, $cwd);
@@ -150,16 +162,15 @@ public function startProcess($queue, $id)
150162
*
151163
* @return string
152164
*/
153-
protected function getCommand($queue, $id)
165+
protected function getCommand($id)
154166
{
155167
$connection = $this->connectionName;
156-
$cmd = '%s artisan queue:async %d %s --env=%s --queue=%s';
168+
$cmd = '%s artisan queue:async %d %s';
157169
$cmd = $this->getBackgroundCommand($cmd);
158170

159171
$binary = $this->getPhpBinary();
160-
$environment = $this->container->environment();
161172

162-
return sprintf($cmd, $binary, $id, $connection, $environment, $this->getQueue($queue));
173+
return sprintf($cmd, $binary, $id, $connection);
163174
}
164175

165176
/**

src/Console/AsyncCommand.php

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,11 @@ public function __construct(Worker $worker)
4949
*/
5050
public function fire()
5151
{
52-
$queue = $this->option('queue');
53-
54-
5552
$id = $this->argument('id');
5653
$connection = $this->argument('connection');
57-
$delay = $this->option('delay');
58-
$tries = $this->option('tries');
5954

6055
$this->processJob(
61-
$connection, $queue, $id, $delay, $tries
56+
$connection, $id
6257
);
6358
}
6459

@@ -67,20 +62,22 @@ public function fire()
6762
* Process the job
6863
*
6964
*/
70-
protected function processJob($connectionName, $queue, $id, $delay, $maxTries)
65+
protected function processJob($connectionName, $id)
7166
{
7267
$manager = $this->worker->getManager();
7368
$connection = $manager->connection($connectionName);
7469

75-
$job = $connection->getJobFromId($queue, $id);
70+
$job = $connection->getJobFromId($id);
7671

7772
// If we're able to pull a job off of the stack, we will process it and
7873
// then immediately return back out. If there is no job on the queue
7974
// we will "sleep" the worker for the specified number of seconds.
8075
if ( ! is_null($job))
8176
{
77+
$sleep = max($job->getDatabaseJob()->available_at - time(), 0);
78+
sleep($sleep);
8279
return $this->worker->process(
83-
$manager->getName($connectionName), $job, $maxTries, $delay
80+
$manager->getName($connectionName), $job
8481
);
8582
}
8683

@@ -110,11 +107,7 @@ protected function getArguments()
110107
protected function getOptions()
111108
{
112109
return array(
113-
array('queue', null, InputOption::VALUE_OPTIONAL, 'The queue name', null),
114-
115-
array('delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0),
116-
117-
array('tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0),
110+
118111
);
119112
}
120113
}

0 commit comments

Comments
 (0)