11<?php
22namespace Barryvdh \Queue ;
33
4- use Barryvdh \Queue \Models \Job ;
5- use Illuminate \Queue \SyncQueue ;
4+ use Illuminate \Queue \DatabaseQueue ;
65
7- class AsyncQueue extends SyncQueue
6+ class AsyncQueue extends DatabaseQueue
87{
9- /** @var array */
10- protected $ config ;
8+ /** @var string */
9+ protected $ binary ;
10+
11+ /** @var string */
12+ protected $ binaryArgs ;
1113
1214 /**
13- * @param array $config
15+ * @param \Illuminate\Database\Connection $database
16+ * @param string $table
17+ * @param string $default
18+ * @param int $expire
19+ * @param string $binary
20+ * @param string|array $binaryArgs
1421 */
15- public function __construct (array $ config )
22+ public function __construct (Connection $ database , $ table , $ default = ' default ' , $ expire = 60 , $ binary = ' php ' , $ binaryArgs = '' )
1623 {
17- $ this ->config = $ config ;
24+ parent ::__construct ($ database , $ table , $ default , $ expire );
25+ $ this ->binary = $ binary ;
26+ $ this ->binaryArgs = $ binaryArgs ;
1827 }
1928
2029 /**
@@ -28,35 +37,86 @@ public function __construct(array $config)
2837 */
2938 public function push ($ job , $ data = '' , $ queue = null )
3039 {
31- $ id = $ this -> storeJob ($ job , $ data , 0 );
32- $ this ->startProcess ($ id , 0 );
40+ $ id = parent :: push ($ job , $ data , $ queue );
41+ $ this ->startProcess ($ queue , $ id );
3342
3443 return $ id ;
3544 }
45+
46+ /**
47+ * Push a raw payload onto the queue.
48+ *
49+ * @param string $payload
50+ * @param string $queue
51+ * @param array $options
52+ * @return mixed
53+ */
54+ public function pushRaw ($ payload , $ queue = null , array $ options = array ())
55+ {
56+ $ id = parent ::push ($ job , $ data , $ queue );
57+ $ this ->startProcess ($ queue , $ id );
3658
59+ return $ id ;
60+ }
61+
3762 /**
38- * Store the job in the database.
39- *
40- * Returns the id of the job.
63+ * Push a new job onto the queue after a delay.
4164 *
42- * @param string $job
43- * @param mixed $data
44- * @param int $delay
65+ * @param \DateTime|int $delay
66+ * @param string $job
67+ * @param mixed $data
68+ * @param string|null $queue
4569 *
4670 * @return int
4771 */
48- public function storeJob ( $ job , $ data , $ delay = 0 )
72+ public function later ( $ delay , $ job , $ data = '' , $ queue = null )
4973 {
50- $ payload = $ this ->createPayload ($ job , $ data );
51-
52- $ job = new Job ();
53- $ job ->status = Job::STATUS_OPEN ;
54- $ job ->delay = $ delay ;
55- $ job ->payload = $ payload ;
56- $ job ->save ();
74+ $ id = parent ::later ($ delay , $ job , $ data , $ queue );
75+ $ this ->startProcess ($ queue , $ id );
5776
58- return $ job -> id ;
77+ return $ id ;
5978 }
79+
80+ /**
81+ * Release a reserved job back onto the queue.
82+ *
83+ * @param string $queue
84+ * @param \StdClass $job
85+ * @param int $delay
86+ * @return void
87+ */
88+ public function release ($ queue , $ job , $ delay )
89+ {
90+ $ id = parent ::release ($ queue , $ job , $ delay );
91+ $ this ->startProcess ($ queue , $ id );
92+
93+ return $ id ;
94+ }
95+
96+ /**
97+ * Get the next available job for the queue.
98+ *
99+ * @param string|null $queue
100+ * @return \StdClass|null
101+ */
102+ protected function getJobFromId ($ queue , $ id )
103+ {
104+ $ this ->database ->beginTransaction ();
105+ $ job = $ this ->database ->table ($ this ->table )
106+ ->lockForUpdate ()
107+ ->where ('queue ' , $ this ->getQueue ($ queue ))
108+ ->where ('reserved ' , 0 )
109+ ->where ('id ' , $ id )
110+ ->first ();
111+
112+ if ($ job ) {
113+ $ this ->markJobAsReserved ($ job ->id );
114+
115+ return new DatabaseJob (
116+ $ this ->container , $ this , $ job , $ queue
117+ );
118+ }
119+ }
60120
61121 /**
62122 * Make a Process for the Artisan command for the job id.
@@ -66,10 +126,10 @@ public function storeJob($job, $data, $delay = 0)
66126 *
67127 * @return void
68128 */
69- public function startProcess ($ jobId , $ delay = 0 )
129+ public function startProcess ($ queue , $ id )
70130 {
71131 chdir ($ this ->container ['path.base ' ]);
72- exec ($ this ->getCommand ($ jobId , $ delay ));
132+ exec ($ this ->getCommand ($ queue , $ id ));
73133 }
74134
75135 /**
@@ -80,15 +140,15 @@ public function startProcess($jobId, $delay = 0)
80140 *
81141 * @return string
82142 */
83- protected function getCommand ($ jobId , $ delay = 0 )
143+ protected function getCommand ($ queue , $ id )
84144 {
85- $ cmd = '%s artisan queue:async %d --env=%s --delay=%d ' ;
145+ $ cmd = '%s artisan queue:async %d %d --env=%s --queue=%s ' ;
86146 $ cmd = $ this ->getBackgroundCommand ($ cmd );
87147
88148 $ binary = $ this ->getPhpBinary ();
89149 $ environment = $ this ->container ->environment ();
90150
91- return sprintf ($ cmd , $ binary , $ jobId , $ environment , $ delay );
151+ return sprintf ($ cmd , $ binary , $ jobId , $ environment , $ queue );
92152 }
93153
94154 /**
@@ -98,12 +158,12 @@ protected function getCommand($jobId, $delay = 0)
98158 */
99159 protected function getPhpBinary ()
100160 {
101- $ path = escapeshellarg ($ this -> config [ ' binary ' ] );
161+ $ path = escapeshellarg ($ thisbinary );
102162 if (!defined ('PHP_WINDOWS_VERSION_BUILD ' )) {
103163 $ path = escapeshellarg ($ path );
104164 }
105165
106- $ args = $ this ->config [ ' binary_args ' ] ;
166+ $ args = $ this ->binaryArgs ;
107167 if (is_array ($ args )){
108168 $ args = implode (' ' , $ args );
109169 }
@@ -119,23 +179,6 @@ protected function getBackgroundCommand($cmd)
119179 }
120180 }
121181
122- /**
123- * Push a new job onto the queue after a delay.
124- *
125- * @param \DateTime|int $delay
126- * @param string $job
127- * @param mixed $data
128- * @param string|null $queue
129- *
130- * @return int
131- */
132- public function later ($ delay , $ job , $ data = '' , $ queue = null )
133- {
134- $ delay = $ this ->getSeconds ($ delay );
135- $ id = $ this ->storeJob ($ job , $ data , $ delay );
136- $ this ->startProcess ($ id , $ delay );
137-
138- return $ id ;
139- }
182+
140183
141184}
0 commit comments