|
1 | 1 | <?php |
2 | 2 | namespace Barryvdh\Queue; |
3 | 3 |
|
4 | | -use Illuminate\Database\Connection; |
5 | | -use Illuminate\Queue\DatabaseQueue; |
6 | | -use Illuminate\Queue\Jobs\DatabaseJob; |
7 | | -use Symfony\Component\Process\Process; |
8 | | -use Illuminate\Queue\Jobs\DatabaseJobRecord; |
| 4 | +use Illuminate\Queue\SyncQueue; |
| 5 | +use Illuminate\Support\Facades\Concurrency; |
| 6 | +use Illuminate\Support\Facades\Queue; |
9 | 7 |
|
10 | | -class AsyncQueue extends DatabaseQueue |
| 8 | +class AsyncQueue extends SyncQueue |
11 | 9 | { |
12 | | - /** @var string */ |
13 | | - protected $binary; |
14 | | - |
15 | | - /** @var string */ |
16 | | - protected $binaryArgs; |
17 | | - |
18 | | - /** @var string */ |
19 | | - protected $connectionName; |
20 | | - |
21 | 10 | /** |
22 | | - * @param \Illuminate\Database\Connection $database |
23 | | - * @param string $table |
24 | | - * @param string $default |
25 | | - * @param int $expire |
26 | | - * @param string $binary |
27 | | - * @param string|array $binaryArgs |
28 | | - */ |
29 | | - public function __construct(Connection $database, $table, $default = 'default', $expire = 60, $binary = 'php', $binaryArgs = '', $connectionName = '') |
30 | | - { |
31 | | - parent::__construct($database, $table, $default, $expire); |
32 | | - $this->binary = $binary; |
33 | | - $this->binaryArgs = $binaryArgs; |
34 | | - $this->connectionName = $connectionName; |
35 | | - } |
36 | | - |
37 | | - /** |
38 | | - * Push a new job onto the queue. |
39 | | - * |
40 | | - * @param string $job |
41 | | - * @param mixed $data |
42 | | - * @param string|null $queue |
43 | | - * |
44 | | - * @return int |
| 11 | + * {@inheritdoc} |
45 | 12 | */ |
46 | 13 | public function push($job, $data = '', $queue = null) |
47 | 14 | { |
48 | | - $id = parent::push($job, $data, $queue); |
49 | | - $this->startProcess($id); |
50 | | - |
51 | | - return $id; |
52 | | - } |
53 | | - |
54 | | - /** |
55 | | - * Push a raw payload onto the queue. |
56 | | - * |
57 | | - * @param string $payload |
58 | | - * @param string $queue |
59 | | - * @param array $options |
60 | | - * @return mixed |
61 | | - */ |
62 | | - public function pushRaw($payload, $queue = null, array $options = []) |
63 | | - { |
64 | | - $id = parent::pushRaw($payload, $queue, $options); |
65 | | - $this->startProcess($id); |
66 | | - |
67 | | - return $id; |
68 | | - } |
69 | | - |
70 | | - /** |
71 | | - * Push a new job onto the queue after a delay. |
72 | | - * |
73 | | - * @param \DateTime|int $delay |
74 | | - * @param string $job |
75 | | - * @param mixed $data |
76 | | - * @param string|null $queue |
77 | | - * |
78 | | - * @return int |
79 | | - */ |
80 | | - public function later($delay, $job, $data = '', $queue = null) |
81 | | - { |
82 | | - $id = parent::later($delay, $job, $data, $queue); |
83 | | - $this->startProcess($id); |
84 | | - |
85 | | - return $id; |
86 | | - } |
87 | | - |
88 | | - /** |
89 | | - * Create an array to insert for the given job. |
90 | | - * |
91 | | - * @param string|null $queue |
92 | | - * @param string $payload |
93 | | - * @param int $availableAt |
94 | | - * @param int $attempts |
95 | | - * @return array |
96 | | - */ |
97 | | - protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0) |
98 | | - { |
99 | | - $record = parent::buildDatabaseRecord($queue, $payload, $availableAt, $attempts); |
100 | | - $record['reserved_at'] = $this->currentTime(); |
101 | | - |
102 | | - return $record; |
103 | | - } |
104 | | - |
105 | | - /** |
106 | | - * Get the next available job for the queue. |
107 | | - * |
108 | | - * @param int $id |
109 | | - * @return DatabaseJob |
110 | | - */ |
111 | | - public function getJobFromId($id) |
112 | | - { |
113 | | - $job = $this->database->table($this->table) |
114 | | - ->where('id', $id) |
115 | | - ->first(); |
116 | | - |
117 | | - if ($job) { |
118 | | - $job = $this->markJobAsReserved(new DatabaseJobRecord((object) $job)); |
119 | | - return new DatabaseJob( |
120 | | - $this->container, $this, $job, $this->connectionName, $job->queue |
121 | | - ); |
122 | | - } |
123 | | - } |
124 | | - |
125 | | - /** |
126 | | - * Make a Process for the Artisan command for the job id. |
127 | | - * |
128 | | - * @param int $id |
129 | | - * |
130 | | - * @return void |
131 | | - */ |
132 | | - public function startProcess($id) |
133 | | - { |
134 | | - $command = $this->getCommand($id); |
135 | | - $cwd = base_path(); |
136 | | - |
137 | | - $process = new Process([$command], $cwd); |
138 | | - $process->run(); |
139 | | - } |
140 | | - |
141 | | - /** |
142 | | - * Get the Artisan command as a string for the job id. |
143 | | - * |
144 | | - * @param int $id |
145 | | - * |
146 | | - * @return string |
147 | | - */ |
148 | | - protected function getCommand($id) |
149 | | - { |
150 | | - $connection = $this->connectionName; |
151 | | - $cmd = '%s artisan queue:async %d %s'; |
152 | | - $cmd = $this->getBackgroundCommand($cmd); |
153 | | - |
154 | | - $binary = $this->getPhpBinary(); |
155 | | - |
156 | | - return sprintf($cmd, $binary, $id, $connection); |
157 | | - } |
158 | | - |
159 | | - /** |
160 | | - * Get the escaped PHP Binary from the configuration |
161 | | - * |
162 | | - * @return string |
163 | | - */ |
164 | | - protected function getPhpBinary() |
165 | | - { |
166 | | - $path = $this->binary; |
167 | | - if ( ! defined('PHP_WINDOWS_VERSION_BUILD')) { |
168 | | - $path = escapeshellarg($path); |
169 | | - } |
170 | | - |
171 | | - $args = $this->binaryArgs; |
172 | | - if (is_array($args)) { |
173 | | - $args = implode(' ', $args); |
174 | | - } |
175 | | - |
176 | | - return trim($path . ' ' . $args); |
177 | | - } |
178 | | - |
179 | | - protected function getBackgroundCommand($cmd) |
180 | | - { |
181 | | - if (defined('PHP_WINDOWS_VERSION_BUILD')) { |
182 | | - return 'start /B ' . $cmd . ' > NUL'; |
183 | | - } |
184 | | - |
185 | | - return $cmd . ' > /dev/null 2>&1 &'; |
| 15 | + Concurrency::driver('process') |
| 16 | + ->defer(fn () => Queue::connection('sync')->push($job, $data, $queue)); |
186 | 17 | } |
187 | 18 | } |
0 commit comments