diff --git a/.gitignore b/.gitignore index 987e2a2..7128132 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ composer.lock vendor +/.settings/ +/.project diff --git a/composer.json b/composer.json index 8e0ef0a..ddfb668 100644 --- a/composer.json +++ b/composer.json @@ -11,9 +11,8 @@ ], "require": { "php": ">=5.3.0", - "illuminate/support": "4.x", - "illuminate/console": "4.x", - "symfony/process": "~2.3" + "illuminate/support": "4.x|5.0.x", + "illuminate/console": "4.x|5.0.x" }, "autoload": { "classmap": [ diff --git a/src/AsyncQueue.php b/src/AsyncQueue.php index 75f72cd..b8767d4 100644 --- a/src/AsyncQueue.php +++ b/src/AsyncQueue.php @@ -3,7 +3,7 @@ use Barryvdh\Queue\Models\Job; use Illuminate\Queue\SyncQueue; -use Symfony\Component\Process\Process; +use Barryvdh\Queue\Jobs\AsyncJob; class AsyncQueue extends SyncQueue { @@ -27,12 +27,11 @@ public function __construct(array $config) * * @return int */ - public function push($job, $data = '', $queue = null) + public function push($job, $data='', $queue=null) { - $id = $this->storeJob($job, $data, 0); - $this->startProcess($id, 0); - - return $id; + $id = $this->storeJob($job, $data, $queue); + //$this->startProcess($id, 0); + return 0; } /** @@ -42,17 +41,19 @@ public function push($job, $data = '', $queue = null) * * @param string $job * @param mixed $data + * @param string|null $queue * @param int $delay * * @return int */ - public function storeJob($job, $data, $delay = 0) + private function storeJob($job, $data, $queue=null, $delay=0) { $payload = $this->createPayload($job, $data); $job = new Job(); $job->status = Job::STATUS_OPEN; $job->delay = $delay; + if($queue) $job->queue = $queue; $job->payload = $payload; $job->save(); @@ -63,36 +64,31 @@ public function storeJob($job, $data, $delay = 0) * Make a Process for the Artisan command for the job id. * * @param int $jobId - * @param int $delay * * @return void */ - public function startProcess($jobId, $delay = 0) + public function startProcess($jobId) { - $command = $this->getCommand($jobId, $delay); - $cwd = $this->container['path.base']; - - $process = new Process($command, $cwd); - $process->run(); + chdir($this->container['path.base']); + exec($this->getCommand($jobId)); } /** * Get the Artisan command as a string for the job id. * * @param int $jobId - * @param int $delay * * @return string */ - protected function getCommand($jobId, $delay = 0) + protected function getCommand($jobId) { - $cmd = '%s artisan queue:async %d --env=%s --delay=%d'; + $cmd = '%s artisan queue:async %d --env=%s'; $cmd = $this->getBackgroundCommand($cmd); $binary = $this->getPhpBinary(); $environment = $this->container->environment(); - return sprintf($cmd, $binary, $jobId, $environment, $delay); + return sprintf($cmd, $binary, $jobId, $environment); } /** @@ -102,11 +98,7 @@ protected function getCommand($jobId, $delay = 0) */ protected function getPhpBinary() { - $path = $this->config['binary']; - if (!defined('PHP_WINDOWS_VERSION_BUILD')) { - $path = escapeshellarg($path); - } - + $path = escapeshellarg($this->config['binary']); $args = $this->config['binary_args']; if(is_array($args)){ $args = implode(' ', $args); @@ -133,13 +125,38 @@ protected function getBackgroundCommand($cmd) * * @return int */ - public function later($delay, $job, $data = '', $queue = null) + public function later($delay, $job, $data='', $queue=null) { $delay = $this->getSeconds($delay); - $id = $this->storeJob($job, $data, $delay); - $this->startProcess($id, $delay); - - return $id; + $id = $this->storeJob($job, $data, $queue, $delay); + //$this->startProcess($id); + return 0; } - + + /** + * Pop the next job off of the queue. + * + * @param string|null $queue + * @return \Illuminate\Queue\Jobs\Job|null + */ + public function pop($queue=null) { + //TODO Il faudrait peut-être tenir compte de la colonne delay (en tt cas si on compte utiliser cette notion pour déterminer quel est le job suivant...) + if($queue) + $item=Job::where('queue', '=', $queue)->orderBy('id', 'asc')->first(); + else + $item=Job::orderBy('id', 'asc')->first(); + if($item) + return new AsyncJob($this->container, $item); + return null; + } + + /** + * + * {@inheritDoc} + * @see \Illuminate\Queue\SyncQueue::pushRaw() + */ + public function pushRaw($payload, $queue=null, array $options = array()) { + $payload=json_decode($payload,true); + return $this->storeJob($payload['job'], $payload['data'], $queue); + } } diff --git a/src/Console/AsyncCommand.php b/src/Console/AsyncCommand.php index df0cab6..5074992 100644 --- a/src/Console/AsyncCommand.php +++ b/src/Console/AsyncCommand.php @@ -6,7 +6,6 @@ use Barryvdh\Queue\Models\Job; use Illuminate\Console\Command; use Symfony\Component\Console\Input\InputArgument; -use Symfony\Component\Console\Input\InputOption; class AsyncCommand extends Command { @@ -33,10 +32,6 @@ public function fire() { $item = Job::findOrFail($this->argument('job_id')); - if ($delay = (int) $this->option('delay')) { - sleep($delay); - } - $job = new AsyncJob($this->laravel, $item); $job->fire(); @@ -53,16 +48,4 @@ protected function getArguments() array('job_id', InputArgument::REQUIRED, 'The Job ID'), ); } - - /** - * Get the console command arguments. - * - * @return array - */ - protected function getOptions() - { - return array( - array('delay', 'D', InputOption::VALUE_OPTIONAL, 'The delay in seconds', 0), - ); - } } diff --git a/src/Jobs/AsyncJob.php b/src/Jobs/AsyncJob.php index 5804a99..080eddd 100644 --- a/src/Jobs/AsyncJob.php +++ b/src/Jobs/AsyncJob.php @@ -8,13 +8,6 @@ class AsyncJob extends SyncJob { - /** - * Indicates if the job has been deleted. - * - * @var bool - */ - protected $deleted = false; - /** * The job model. * @@ -27,11 +20,14 @@ class AsyncJob extends SyncJob * * @param \Illuminate\Container\Container $container * @param \Barryvdh\Queue\Models\Job $job + * + * @return void */ public function __construct(Container $container, Job $job) { - $this->container = $container; $this->job = $job; + $this->container = $container; + $this->job->retries = $this->job->retries + 1; } /** @@ -42,13 +38,20 @@ public function __construct(Container $container, Job $job) public function fire() { // Get the payload from the job - $payload = $this->parsePayload($this->getRawBody()); + $payload = $this->parsePayload($this->job->payload); + if(isset($payload['error'])) unset($payload['error']); + + // If we have to wait, sleep until our time has come + if ($this->job->delay) { + $this->job->status = Job::STATUS_WAITING; + $this->job->save(); + sleep($this->job->delay); + } // Mark job as started $this->job->status = Job::STATUS_STARTED; - $this->job->retries++; $this->job->save(); - + try { // Fire the actual job $this->resolveAndFire($payload); @@ -57,47 +60,13 @@ public function fire() $this->job->status = Job::STATUS_FINISHED; $this->job->save(); } - } - - /** - * Get the raw body string for the job. - * - * @return string - */ - public function getRawBody() - { - return $this->job->payload; - } - - /** - * Release the job back into the queue. - * - * @param int $delay - * @return void - */ - public function release($delay = 0) - { - // Update the Job status - $this->job->status = Job::STATUS_OPEN; - $this->job->save(); - - // Wait for the delay - if ($delay) { - sleep($this->getSeconds($delay)); - } - - // Fire again - $this->fire(); - } - - /** - * Get the number of times the job has been attempted. - * - * @return int - */ - public function attempts() - { - return (int) $this->job->retries; + } + catch (\Exception $e){ + \Log::error('Error when fire a job :'.$e->getMessage()."\nJob details : $this->job->payload"); + $payload['error']=utf8_encode($e->__toString()); + $this->job->payload=json_encode($payload); + $this->job->save(); + } } /** @@ -122,14 +91,35 @@ protected function parsePayload($payload) { return json_decode($payload, true); } - + /** - * Get the job identifier. + * Get the number of times the job has been attempted. * - * @return string + * @return int */ - public function getJobId() + public function attempts() + { + return (int) $this->job->retries; + } + + /** + * Retourner le nom de la queue + * + * {@inheritDoc} + * @see \Illuminate\Queue\Jobs\Job::getQueue() + */ + public function getQueue() + { + return $this->job->queue; + } + + /** + * Rendre le payload à l'appel de cette méthode, utile pour le log en cas de failed_job. + * {@inheritDoc} + * @see \Illuminate\Queue\Jobs\SyncJob::getRawBody() + */ + public function getRawBody() { - return $this->job->id; + return $this->job->payload; } } diff --git a/src/Models/FailedJob.php b/src/Models/FailedJob.php new file mode 100644 index 0000000..7bcd8f6 --- /dev/null +++ b/src/Models/FailedJob.php @@ -0,0 +1,19 @@ +status) { + case self::STATUS_OPEN : return 'created'; + case self::STATUS_WAITING : return 'waiting'; + case self::STATUS_STARTED : return 'started'; + case self::STATUS_FINISHED : return 'done'; + } + } }