Skip to content

Commit c2f8198

Browse files
committed
Sleep
1 parent aca8346 commit c2f8198

File tree

2 files changed

+32
-30
lines changed

2 files changed

+32
-30
lines changed

src/AsyncQueue.php

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function __construct(Connection $database, $table, $default = 'default',
4545
public function push($job, $data = '', $queue = null)
4646
{
4747
$id = parent::push($job, $data, $queue);
48-
$this->startProcess($queue, $id);
48+
$this->startProcess($id);
4949

5050
return $id;
5151
}
@@ -61,7 +61,7 @@ public function push($job, $data = '', $queue = null)
6161
public function pushRaw($payload, $queue = null, array $options = array())
6262
{
6363
$id = parent::push($job, $data, $queue);
64-
$this->startProcess($queue, $id);
64+
$this->startProcess($id);
6565

6666
return $id;
6767
}
@@ -79,7 +79,7 @@ public function pushRaw($payload, $queue = null, array $options = array())
7979
public function later($delay, $job, $data = '', $queue = null)
8080
{
8181
$id = parent::later($delay, $job, $data, $queue);
82-
$this->startProcess($queue, $id);
82+
$this->startProcess($id);
8383

8484
return $id;
8585
}
@@ -95,36 +95,46 @@ public function later($delay, $job, $data = '', $queue = null)
9595
public function release($queue, $job, $delay)
9696
{
9797
$id = parent::release($queue, $job, $delay);
98-
$this->startProcess($queue, $id);
98+
$this->startProcess($id);
9999

100100
return $id;
101101
}
102102

103+
protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
104+
{
105+
$availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
106+
107+
return $this->database->table($this->table)->insertGetId([
108+
'queue' => $this->getQueue($queue),
109+
'payload' => $payload,
110+
'attempts' => $attempts,
111+
'reserved' => 1,
112+
'reserved_at' => $this->getTime(),
113+
'available_at' => $availableAt->getTimestamp(),
114+
'created_at' => $this->getTime(),
115+
]);
116+
}
117+
103118
/**
104119
* Get the next available job for the queue.
105120
*
106121
* @param string|null $queue
107122
* @return \StdClass|null
108123
*/
109-
public function getJobFromId($queue, $id)
124+
public function getJobFromId($id)
110125
{
111-
$this->database->beginTransaction();
112126
$job = $this->database->table($this->table)
113-
->lockForUpdate()
114-
->where('queue', $this->getQueue($queue))
115-
->where('reserved', 0)
116127
->where('id', $id)
117128
->first();
118129

119130
if($job) {
120-
$this->markJobAsReserved($job->id);
121131

122132
return new DatabaseJob(
123133
$this->container, $this, $job, $queue
124134
);
125135
}
126136
}
127-
137+
128138
/**
129139
* Make a Process for the Artisan command for the job id.
130140
*
@@ -133,9 +143,9 @@ public function getJobFromId($queue, $id)
133143
*
134144
* @return void
135145
*/
136-
public function startProcess($queue, $id)
146+
public function startProcess($id)
137147
{
138-
$command = $this->getCommand($queue, $id);
148+
$command = $this->getCommand($id);
139149
$cwd = base_path();
140150

141151
$process = new Process($command, $cwd);
@@ -150,16 +160,15 @@ public function startProcess($queue, $id)
150160
*
151161
* @return string
152162
*/
153-
protected function getCommand($queue, $id)
163+
protected function getCommand($id)
154164
{
155165
$connection = $this->connectionName;
156-
$cmd = '%s artisan queue:async %d %s --env=%s --queue=%s';
166+
$cmd = '%s artisan queue:async %d %s';
157167
$cmd = $this->getBackgroundCommand($cmd);
158168

159169
$binary = $this->getPhpBinary();
160-
$environment = $this->container->environment();
161170

162-
return sprintf($cmd, $binary, $id, $connection, $environment, $this->getQueue($queue));
171+
return sprintf($cmd, $binary, $id, $connection);
163172
}
164173

165174
/**

src/Console/AsyncCommand.php

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,11 @@ public function __construct(Worker $worker)
4949
*/
5050
public function fire()
5151
{
52-
$queue = $this->option('queue');
53-
54-
5552
$id = $this->argument('id');
5653
$connection = $this->argument('connection');
57-
$delay = $this->option('delay');
58-
$tries = $this->option('tries');
5954

6055
$this->processJob(
61-
$connection, $queue, $id, $delay, $tries
56+
$connection, $id
6257
);
6358
}
6459

@@ -67,18 +62,20 @@ public function fire()
6762
* Process the job
6863
*
6964
*/
70-
protected function processJob($connectionName, $queue, $id, $delay, $maxTries)
65+
protected function processJob($connectionName, $id)
7166
{
7267
$manager = $this->worker->getManager();
7368
$connection = $manager->connection($connectionName);
7469

75-
$job = $connection->getJobFromId($queue, $id);
70+
$job = $connection->getJobFromId($id);
7671

7772
// If we're able to pull a job off of the stack, we will process it and
7873
// then immediately return back out. If there is no job on the queue
7974
// we will "sleep" the worker for the specified number of seconds.
8075
if ( ! is_null($job))
8176
{
77+
$sleep = $job->available_at - time();
78+
sleep($sleep);
8279
return $this->worker->process(
8380
$manager->getName($connectionName), $job, $maxTries, $delay
8481
);
@@ -110,11 +107,7 @@ protected function getArguments()
110107
protected function getOptions()
111108
{
112109
return array(
113-
array('queue', null, InputOption::VALUE_OPTIONAL, 'The queue name', null),
114-
115-
array('delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0),
116-
117-
array('tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0),
110+
118111
);
119112
}
120113
}

0 commit comments

Comments
 (0)