diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..a793d958 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +vendor/ +*.swp +phpunit.xml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..9bb4841d --- /dev/null +++ b/.travis.yml @@ -0,0 +1,16 @@ +language: php +php: + - 5.6 + - 7.0 + - 7.1 + - hhvm +matrix: + exclude: + - php: hhvm + env: ENABLE_REDIS_EXT=1 +env: + - ENABLE_REDIS_EXT=0 + - ENABLE_REDIS_EXT=1 +before_script: + - sh -c "if [ $ENABLE_REDIS_EXT -eq 1 ]; then echo \"extension=redis.so\" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini; fi" + - composer install diff --git a/CHANGELOG.markdown b/CHANGELOG.markdown deleted file mode 100644 index 38e57daa..00000000 --- a/CHANGELOG.markdown +++ /dev/null @@ -1,19 +0,0 @@ -## 1.1 (2011-02-26) ## - -* Update Redisent library for Redis 2.2 compatibility. Redis 2.2 is now required. (thedotedge) -* Trim output of `ps` to remove any prepended whitespace (KevBurnsJr) -* Use `getenv` instead of `$_ENV` for better portability across PHP configurations (hobodave) -* Add support for sub-second queue check intervals (KevBurnsJr) -* Ability to specify a cluster/multiple redis servers and consistent hash between them (dceballos) -* Change arguments for jobs to be an array as they're easier to work with in PHP. -* Implement ability to have setUp and tearDown methods for jobs, called before and after every single run. -* Fix `APP_INCLUDE` environment variable not loading correctly. -* Jobs are no longer defined as static methods, and classes are instantiated first. This change is NOT backwards compatible and requires job classes are updated. -* Job arguments are passed to the job class when it is instantiated, and are accessible by $this->args. This change will break existing job classes that rely on arguments that have not been updated. -* Bundle sample script for managing php-resque instances using monit -* Fix undefined variable `$child` when exiting on non-forking operating systems -* Add `PIDFILE` environment variable to write out a PID for single running workers - -## 1.0 (2010-04-18) ## - -* Initial release \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..d3ceb190 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,82 @@ +## 1.3 (2013-??-??) - Current Master ## + +**Note:** This release introduces backwards incompatible changes with all previous versions of php-resque. Please see below for details. + +### Redisent (Redis Library) Replaced with Credis + +Redisent has always been the Redis backend for php-resque because of its lightweight nature. Unfortunately, Redisent is largely unmaintained. + +[Credis](https://github.com/colinmollenhour/credis) is a fork of Redisent, which among other improvements will automatically use the [phpredis](https://github.com/nicolasff/phpredis) native PHP extension if it is available. (you want this for speed, trust me) + +php-resque now utilizes Credis for all Redis based operations. Credis automatically required and installed as a Composer dependency. + +### Composer Support + +Composer support has been improved and is now the recommended method for including php-resque in your project. Details on Composer support can be found in the Getting Started section of the readme. + +### Improved DSN Support + +Changes by iskandar introduce improved support for using DSNs to connect to Redis. You can now utilize the following formatted strings for the REDIS_BACKEND environment variable to connect: + +* `host` +* `host:port` +* `redis://host:port` +* `redis://host:port/db` +* `redis://user:pass@host:port/` (username is required but will be ignored) +* `tcp://user:pass@host:port/` (username is required but will be ignored) + +### Other Improvements/Changes + +* **COMPATIBILITY BREAKING**: The bundled worker manager `resque.php` has been moved to `bin/resque`, and is available as `vendor/bin/resque` when php-resque is installed as a Composer package. +* Restructure tests and test bootstrapping. Autoload tests via Composer (install test dependencies with `composer install --dev`) +* Add `SETEX` to list of commands which supply a key as the first argument in Redisent (danhunsaker) +* Fix an issue where a lost connection to Redis could cause an infinite loop (atorres757) +* Add a helper method to `Resque_Redis` to remove the namespace applied to Redis keys (tonypiper) +* Call beforePerform hook before retrieivng an instance of the job class (allows beforePerform to cancel a job with DontPerform before initialising your application) +* Add `beforeEnqueue` hook, called before a job is placed on a queue + +## 1.2 (2012-10-13) ## + +**Note:** This release is largely backwards compatible with php-resque 1.1. The next release will introduce backwards incompatible changes (moving from Redisent to Credis), and will drop compatibility with PHP 5.2. + +* Allow alternate redis database to be selected when calling setBackend by supplying a second argument (patrickbajao) +* Use `require_once` when including php-resque after the app has been included in the sample resque.php to prevent include conflicts (andrewjshults) +* Wrap job arguments in an array to improve compatibility with ruby resque (warezthebeef) +* Fix a bug where the worker would spin out of control taking the server with it, if the redis connection was interrupted even briefly. Use SIGPIPE to trap this scenario cleanly. (d11wtq) +* Added support of Redis prefix (namespaces) (hlegius) +* When reserving jobs, check if the payload received from popping a queue is a valid object (fix bug whereby jobs are reserved based on an erroneous payload) (salimane) +* Re-enable autoload for class_exists in Job.php (humancopy) +* Fix lost jobs when there is more than one worker process started by the same parent process (salimane) +* Move include for resque before APP_INCLUDE is loaded in, so that way resque is available for the app +* Avoid working with dirty worker IDs (salimane) +* Allow UNIX socket to be passed to Resque when connecting to Redis (pedroarnal) +* Fix typographical errors in PHP docblocks (chaitanyakuber) +* Set the queue name on job instances when jobs are executed (chaitanyakuber) +* Fix and add tests for Resque_Event::stopListening (ebernhardson) +* Documentation cleanup (maetl) +* Pass queue name to afterEvent callback +* Only declare RedisException if it doesn't already exist (Matt Heath) +* Add support for Composer +* Fix missing and incorrect paths for Resque and Resque_Job_Status classes in demo (jjfrey) +* Disable autoload for the RedisException class_exists call (scragg0x) +* General tidy up of comments and files/folders + +## 1.1 (2011-03-27) ## + +* Update Redisent library for Redis 2.2 compatibility. Redis 2.2 is now required. (thedotedge) +* Trim output of `ps` to remove any prepended whitespace (KevBurnsJr) +* Use `getenv` instead of `$_ENV` for better portability across PHP configurations (hobodave) +* Add support for sub-second queue check intervals (KevBurnsJr) +* Ability to specify a cluster/multiple redis servers and consistent hash between them (dceballos) +* Change arguments for jobs to be an array as they're easier to work with in PHP. +* Implement ability to have setUp and tearDown methods for jobs, called before and after every single run. +* Fix `APP_INCLUDE` environment variable not loading correctly. +* Jobs are no longer defined as static methods, and classes are instantiated first. This change is NOT backwards compatible and requires job classes are updated. +* Job arguments are passed to the job class when it is instantiated, and are accessible by $this->args. This change will break existing job classes that rely on arguments that have not been updated. +* Bundle sample script for managing php-resque instances using monit +* Fix undefined variable `$child` when exiting on non-forking operating systems +* Add `PIDFILE` environment variable to write out a PID for single running workers + +## 1.0 (2010-04-18) ## + +* Initial release diff --git a/HOWITWORKS.md b/HOWITWORKS.md new file mode 100644 index 00000000..ec85fa37 --- /dev/null +++ b/HOWITWORKS.md @@ -0,0 +1,157 @@ +*For an overview of how to __use__ php-resque, see `README.md`.* + +The following is a step-by-step breakdown of how php-resque operates. + +## Enqueue Job ## + +What happens when you call `Resque::enqueue()`? + +1. `Resque::enqueue()` calls `Resque_Job::create()` with the same arguments it + received. +2. `Resque_Job::create()` checks that your `$args` (the third argument) are + either `null` or in an array +3. `Resque_Job::create()` generates a job ID (a "token" in most of the docs) +4. `Resque_Job::create()` pushes the job to the requested queue (first + argument) +5. `Resque_Job::create()`, if status monitoring is enabled for the job (fourth + argument), calls `Resque_Job_Status::create()` with the job ID as its only + argument +6. `Resque_Job_Status::create()` creates a key in Redis with the job ID in its + name, and the current status (as well as a couple of timestamps) as its + value, then returns control to `Resque_Job::create()` +7. `Resque_Job::create()` returns control to `Resque::enqueue()`, with the job + ID as a return value +8. `Resque::enqueue()` triggers the `afterEnqueue` event, then returns control + to your application, again with the job ID as its return value + +## Workers At Work ## + +How do the workers process the queues? + +1. `Resque_Worker::work()`, the main loop of the worker process, calls + `Resque_Worker->reserve()` to check for a job +2. `Resque_Worker->reserve()` checks whether to use blocking pops or not (from + `BLOCKING`), then acts accordingly: + * Blocking Pop + 1. `Resque_Worker->reserve()` calls `Resque_Job::reserveBlocking()` with + the entire queue list and the timeout (from `INTERVAL`) as arguments + 2. `Resque_Job::reserveBlocking()` calls `Resque::blpop()` (which in turn + calls Redis' `blpop`, after prepping the queue list for the call, then + processes the response for consistency with other aspects of the + library, before finally returning control [and the queue/content of the + retrieved job, if any] to `Resque_Job::reserveBlocking()`) + 3. `Resque_Job::reserveBlocking()` checks whether the job content is an + array (it should contain the job's type [class], payload [args], and + ID), and aborts processing if not + 4. `Resque_Job::reserveBlocking()` creates a new `Resque_Job` object with + the queue and content as constructor arguments to initialize the job + itself, and returns it, along with control of the process, to + `Resque_Worker->reserve()` + * Queue Polling + 1. `Resque_Worker->reserve()` iterates through the queue list, calling + `Resque_Job::reserve()` with the current queue's name as the sole + argument on each pass + 2. `Resque_Job::reserve()` passes the queue name on to `Resque::pop()`, + which in turn calls Redis' `lpop` with the same argument, then returns + control (and the job content, if any) to `Resque_Job::reserve()` + 3. `Resque_Job::reserve()` checks whether the job content is an array (as + before, it should contain the job's type [class], payload [args], and + ID), and aborts processing if not + 4. `Resque_Job::reserve()` creates a new `Resque_Job` object in the same + manner as above, and also returns this object (along with control of + the process) to `Resque_Worker->reserve()` +3. In either case, `Resque_Worker->reserve()` returns the new `Resque_Job` + object, along with control, up to `Resque_Worker::work()`; if no job is + found, it simply returns `FALSE` + * No Jobs + 1. If blocking mode is not enabled, `Resque_Worker::work()` sleeps for + `INTERVAL` seconds; it calls `usleep()` for this, so fractional seconds + *are* supported + * Job Reserved + 1. `Resque_Worker::work()` triggers a `beforeFork` event + 2. `Resque_Worker::work()` calls `Resque_Worker->workingOn()` with the new + `Resque_Job` object as its argument + 3. `Resque_Worker->workingOn()` does some reference assignments to help keep + track of the worker/job relationship, then updates the job status from + `WAITING` to `RUNNING` + 4. `Resque_Worker->workingOn()` stores the new `Resque_Job` object's payload + in a Redis key associated to the worker itself (this is to prevent the job + from being lost indefinitely, but does rely on that PID never being + allocated on that host to a different worker process), then returns control + to `Resque_Worker::work()` + 5. `Resque_Worker::work()` forks a child process to run the actual `perform()` + 6. The next steps differ between the worker and the child, now running in + separate processes: + * Worker + 1. The worker waits for the job process to complete + 2. If the exit status is not 0, the worker calls `Resque_Job->fail()` with + a `Resque_Job_DirtyExitException` as its only argument. + 3. `Resque_Job->fail()` triggers an `onFailure` event + 4. `Resque_Job->fail()` updates the job status from `RUNNING` to `FAILED` + 5. `Resque_Job->fail()` calls `Resque_Failure::create()` with the job + payload, the `Resque_Job_DirtyExitException`, the internal ID of the + worker, and the queue name as arguments + 6. `Resque_Failure::create()` creates a new object of whatever type has + been set as the `Resque_Failure` "backend" handler; by default, this is + a `Resque_Failure_Redis` object, whose constructor simply collects the + data passed into `Resque_Failure::create()` and pushes it into Redis + in the `failed` queue + 7. `Resque_Job->fail()` increments two failure counters in Redis: one for + a total count, and one for the worker + 8. `Resque_Job->fail()` returns control to the worker (still in + `Resque_Worker::work()`) without a value + * Job + 1. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its + only argument. + 2. `Resque_Worker->perform()` sets up a `try...catch` block so it can + properly handle exceptions by marking jobs as failed (by calling + `Resque_Job->fail()`, as above) + 3. Inside the `try...catch`, `Resque_Worker->perform()` triggers an + `afterFork` event + 4. Still inside the `try...catch`, `Resque_Worker->perform()` calls + `Resque_Job->perform()` with no arguments + 5. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no + arguments + 6. If `Resque_Job->getInstance()` has already been called, it returns the + existing instance; otherwise: + 7. `Resque_Job->getInstance()` checks that the job's class (type) exists + and has a `perform()` method; if not, in either case, it throws an + exception which will be caught by `Resque_Worker->perform()` + 8. `Resque_Job->getInstance()` creates an instance of the job's class, and + initializes it with a reference to the `Resque_Job` itself, the job's + arguments (which it gets by calling `Resque_Job->getArguments()`, which + in turn simply returns the value of `args[0]`, or an empty array if no + arguments were passed), and the queue name + 9. `Resque_Job->getInstance()` returns control, along with the job class + instance, to `Resque_Job->perform()` + 10. `Resque_Job->perform()` sets up its own `try...catch` block to handle + `Resque_Job_DontPerform` exceptions; any other exceptions are passed + up to `Resque_Worker->perform()` + 11. `Resque_Job->perform()` triggers a `beforePerform` event + 12. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists + 13. `Resque_Job->perform()` calls `perform()` on the instance + 14. `Resque_Job->perform()` calls `tearDown()` on the instance, if it + exists + 15. `Resque_Job->perform()` triggers an `afterPerform` event + 16. The `try...catch` block ends, suppressing `Resque_Job_DontPerform` + exceptions by returning control, and the value `FALSE`, to + `Resque_Worker->perform()`; any other situation returns the value + `TRUE` along with control, instead + 17. The `try...catch` block in `Resque_Worker->perform()` ends + 18. `Resque_Worker->perform()` updates the job status from `RUNNING` to + `COMPLETE`, then returns control, with no value, to the worker (again + still in `Resque_Worker::work()`) + 19. `Resque_Worker::work()` calls `exit(0)` to terminate the job process + cleanly + * SPECIAL CASE: Non-forking OS (Windows) + 1. Same as the job above, except it doesn't call `exit(0)` when done + 7. `Resque_Worker::work()` calls `Resque_Worker->doneWorking()` with no + arguments + 8. `Resque_Worker->doneWorking()` increments two processed counters in Redis: + one for a total count, and one for the worker + 9. `Resque_Worker->doneWorking()` deletes the Redis key set in + `Resque_Worker->workingOn()`, then returns control, with no value, to + `Resque_Worker::work()` +4. `Resque_Worker::work()` returns control to the beginning of the main loop, + where it will wait for the next job to become available, and start this + process all over again \ No newline at end of file diff --git a/LICENSE b/LICENSE index 65135912..a796ebf9 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -(c) 2010 Chris Boulton +(c) Chris Boulton Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/README.markdown b/README.md similarity index 52% rename from README.markdown rename to README.md index ab490bdd..cd8f83fe 100644 --- a/README.markdown +++ b/README.md @@ -1,34 +1,33 @@ -php-resque: PHP Resque Worker (and Enqueue) +php-resque: PHP Resque Worker (and Enqueue) [![Build Status](https://secure.travis-ci.org/chrisboulton/php-resque.png)](http://travis-ci.org/chrisboulton/php-resque) =========================================== Resque is a Redis-backed library for creating background jobs, placing -those jobs on multiple queues, and processing them later. +those jobs on one or more queues, and processing them later. -Resque was pioneered and is developed by the fine folks at GitHub (yes, -I am a kiss-ass), and written in Ruby. +## Background ## -What you're seeing here is an almost direct port of the Resque worker -and enqueue system to PHP, which I've thrown together because I'm sure -my PHP developers would have a fit if they had to write a line of Ruby. +Resque was pioneered and is developed by the fine folks at GitHub (yes, +I am a kiss-ass), and written in Ruby. What you're seeing here is an +almost direct port of the Resque worker and enqueue system to PHP. For more information on Resque, visit the official GitHub project: - + -And for background information, the launch post on the GitHub blog: +For further information, see the launch post on the GitHub blog: The PHP port does NOT include its own web interface for viewing queue stats, as the data is stored in the exact same expected format as the Ruby version of Resque. -The PHP port allows for much the same as the Ruby version of Rescue: +The PHP port provides much the same features as the Ruby version: * Workers can be distributed between multiple machines * Includes support for priorities (queues) -* Resilient to memory leaks (fork) +* Resilient to memory leaks (forking) * Expects failure -In addition, it also: +It also supports the following additional features: * Has the ability to track the status of jobs * Will mark a job as failed, if a forked child running a job does @@ -36,7 +35,38 @@ not exit with a status code as 0 * Has built in support for `setUp` and `tearDown` methods, called pre and post jobs -Note: php-resque requires at least Redis 2.2. +## Requirements ## + +* PHP 5.3+ +* Redis 2.2+ +* Optional but Recommended: Composer + +## Getting Started ## + +The easiest way to work with php-resque is when it's installed as a +Composer package inside your project. Composer isn't strictly +required, but makes life a lot easier. + +If you're not familiar with Composer, please see . + +1. Add php-resque to your application's composer.json. + +```json +{ + "require": { + "chrisboulton/php-resque": "1.2.x" + } +} +``` + +2. Run `composer install`. + +3. If you haven't already, add the Composer autoload to your project's + initialization file. (example) + +```sh +require 'vendor/autoload.php'; +``` ## Jobs ## @@ -44,28 +74,30 @@ Note: php-resque requires at least Redis 2.2. Jobs are queued as follows: - require_once 'lib/Resque.php'; - - // Required if redis is located elsewhere - Resque::setBackend('localhost', 6379); +```php +// Required if redis is located elsewhere +Resque::setBackend('localhost:6379'); - $args = array( - 'name' => 'Chris' - ); - Resque::enqueue('default', 'My_Job', $args); +$args = array( + 'name' => 'Chris' + ); +Resque::enqueue('default', 'My_Job', $args); +``` ### Defining Jobs ### -Each job should be in it's own class, and include a `perform` method. +Each job should be in its own class, and include a `perform` method. - class My_Job - { - public function perform() - { - // Work work work - echo $this->args['name']; - } - } +```php +class My_Job +{ + public function perform() + { + // Work work work + echo $this->args['name']; + } +} +``` When the job is run, the class will be instantiated and any arguments will be set as an array on the instantiated object, and are accessible @@ -77,43 +109,75 @@ result in a job failing. Jobs can also have `setUp` and `tearDown` methods. If a `setUp` method is defined, it will be called before the `perform` method is run. -The `tearDown` method if defined, will be called after the job finishes. - - class My_Job - { - public function setUp() - { - // ... Set up environment for this job - } - - public function perform() - { - // .. Run job - } - - public function tearDown() - { - // ... Remove environment for this job - } - } +The `tearDown` method, if defined, will be called after the job finishes. + + +```php +class My_Job +{ + public function setUp() + { + // ... Set up environment for this job + } + + public function perform() + { + // .. Run job + } + + public function tearDown() + { + // ... Remove environment for this job + } +} +``` + +### Dequeueing Jobs ### + +This method can be used to conveniently remove a job from a queue. + +```php +// Removes job class 'My_Job' of queue 'default' +Resque::dequeue('default', ['My_Job']); + +// Removes job class 'My_Job' with Job ID '087df5819a790ac666c9608e2234b21e' of queue 'default' +Resque::dequeue('default', ['My_Job' => '087df5819a790ac666c9608e2234b21e']); + +// Removes job class 'My_Job' with arguments of queue 'default' +Resque::dequeue('default', ['My_Job' => array('foo' => 1, 'bar' => 2)]); + +// Removes multiple jobs +Resque::dequeue('default', ['My_Job', 'My_Job2']); +``` + +If no jobs are given, this method will dequeue all jobs matching the provided queue. + +```php +// Removes all jobs of queue 'default' +Resque::dequeue('default'); +``` ### Tracking Job Statuses ### php-resque has the ability to perform basic status tracking of a queued job. The status information will allow you to check if a job is in the -queue, currently being run, has finished, or failed. +queue, is currently being run, has finished, or has failed. To track the status of a job, pass `true` as the fourth argument to `Resque::enqueue`. A token used for tracking the job status will be returned: - $token = Resque::enqueue('default', 'My_Job', $args, true); - echo $token; +```php +$token = Resque::enqueue('default', 'My_Job', $args, true); +echo $token; +``` To fetch the status of a job: - $status = new Resque_Job_Status($token); - echo $status->get(); // Outputs the status +```php +$status = new Resque_Job_Status($token); +echo $status->get(); // Outputs the status +``` Job statuses are defined as constants in the `Resque_Job_Status` class. Valid statuses include: @@ -134,8 +198,9 @@ class. Workers work in the exact same way as the Ruby workers. For complete documentation on workers, see the original documentation. -A basic "up-and-running" resque.php file is included that sets up a -running worker environment is included in the root directory. +A basic "up-and-running" `bin/resque` file is included that sets up a +running worker environment. (`vendor/bin/resque` when installed +via Composer) The exception to the similarities with the Ruby version of resque is how a worker is initially setup. To work under all environments, @@ -144,25 +209,39 @@ not having a single environment such as with Ruby, the PHP port makes To start a worker, it's very similar to the Ruby version: - $ QUEUE=file_serve php resque.php +```sh +$ QUEUE=file_serve php bin/resque +``` It's your responsibility to tell the worker which file to include to get your application underway. You do so by setting the `APP_INCLUDE` environment variable: - $ QUEUE=file_serve APP_INCLUDE=../application/init.php php resque.php +```sh +$ QUEUE=file_serve APP_INCLUDE=../application/init.php php bin/resque +``` + +*Pro tip: Using Composer? More than likely, you don't need to worry about +`APP_INCLUDE`, because hopefully Composer is responsible for autoloading +your application too!* Getting your application underway also includes telling the worker your job classes, by means of either an autoloader or including them. +Alternately, you can always `include('bin/resque')` from your application and +skip setting `APP_INCLUDE` altogether. Just be sure the various environment +variables are set (`setenv`) before you do. + ### Logging ### The port supports the same environment variables for logging to STDOUT. Setting `VERBOSE` will print basic debugging information and `VVERBOSE` will print detailed information. - $ VERBOSE QUEUE=file_serve php resque.php - $ VVERBOSE QUEUE=file_serve php resque.php +```sh +$ VERBOSE=1 QUEUE=file_serve bin/resque +$ VVERBOSE=1 QUEUE=file_serve bin/resque +``` ### Priorities and Queue Lists ### @@ -173,7 +252,9 @@ checked in. As per the original example: - $ QUEUE=file_serve,warm_cache php resque.php +```sh +$ QUEUE=file_serve,warm_cache bin/resque +``` The `file_serve` queue will always be checked for new jobs on each iteration before the `warm_cache` queue is checked. @@ -183,14 +264,32 @@ iteration before the `warm_cache` queue is checked. All queues are supported in the same manner and processed in alphabetical order: - $ QUEUE=* php resque.php +```sh +$ QUEUE='*' bin/resque +``` ### Running Multiple Workers ### -Multiple workers ca be launched and automatically worked by supplying -the `COUNT` environment variable: +Multiple workers can be launched simultaneously by supplying the `COUNT` +environment variable: + +```sh +$ COUNT=5 bin/resque +``` - $ COUNT=5 php resque.php +Be aware, however, that each worker is its own fork, and the original process +will shut down as soon as it has spawned `COUNT` forks. If you need to keep +track of your workers using an external application such as `monit`, you'll +need to work around this limitation. + +### Custom prefix ### + +When you have multiple apps using the same Redis database it is better to +use a custom prefix to separate the Resque data: + +```sh +$ PREFIX=my-app-name bin/resque +``` ### Forking ### @@ -207,9 +306,9 @@ the job. Signals also work on supported platforms exactly as in the Ruby version of Resque: -* `QUIT` - Wait for child to finish processing then exit -* `TERM` / `INT` - Immediately kill child then exit -* `USR1` - Immediately kill child but don't exit +* `QUIT` - Wait for job to finish processing then exit +* `TERM` / `INT` - Immediately kill job then exit +* `USR1` - Immediately kill job but don't exit * `USR2` - Pause worker, no new jobs will be processed * `CONT` - Resume worker. @@ -221,11 +320,12 @@ and any forked children also set their process title with the job being run. This helps identify running processes on the server and their resque status. -**PHP does not have this functionality by default.** +**PHP does not have this functionality by default until 5.5.** A PECL module () exists that -adds this funcitonality to PHP, so if you'd like process titles updated, -install the PECL module as well. php-resque will detect and use it. +adds this functionality to PHP before 5.5, so if you'd like process +titles updated, install the PECL module as well. php-resque will +automatically detect and use it. ## Event/Hook System ## @@ -236,14 +336,16 @@ You listen in on events (as listed below) by registering with `Resque_Event` and supplying a callback that you would like triggered when the event is raised: - Resque_Event::listen('eventName', [callback]); +```sh +Resque_Event::listen('eventName', [callback]); +``` `[callback]` may be anything in PHP that is callable by `call_user_func_array`: * A string with the name of a function * An array containing an object and method to call * An array containing an object and a static method to call -* A closure (PHP 5.3) +* A closure (PHP 5.3+) Events may pass arguments (documented below), so your callback should accept these arguments. @@ -255,7 +357,7 @@ It is up to your application to register event listeners. When enqueuing events in your application, it should be as easy as making sure php-resque is loaded and calling `Resque_Event::listen`. -When running workers, if you run workers via the default `resque.php` script, +When running workers, if you run workers via the default `bin/resque` script, your `APP_INCLUDE` script should initialize and register any listeners required for operation. If you have rolled your own worker manager, then it is again your responsibility to register listeners. @@ -275,20 +377,20 @@ Called before php-resque forks to run a job. Argument passed contains the instan `Resque_Job` for the job about to be run. `beforeFork` is triggered in the **parent** process. Any changes made will be permanent -for as long as the worker lives. +for as long as the **worker** lives. #### afterFork #### Called after php-resque forks to run a job (but before the job is run). Argument passed contains the instance of `Resque_Job` for the job about to be run. -`afterFork` is triggered in the child process after forking out to complete a job. Any -changes made will only live as long as the job is being processed. +`afterFork` is triggered in the **child** process after forking out to complete a job. Any +changes made will only live as long as the **job** is being processed. #### beforePerform #### Called before the `setUp` and `perform` methods on a job are run. Argument passed -contains the instance of `Resque_Job` about for the job about to be run. +contains the instance of `Resque_Job` for the job about to be run. You can prevent execution of the job by throwing an exception of `Resque_Job_DontPerform`. Any other exceptions thrown will be treated as if they were thrown in a job, causing the @@ -309,20 +411,80 @@ Called whenever a job fails. Arguments passed (in this order) include: * Exception - The exception that was thrown when the job failed * Resque_Job - The job that failed +#### beforeEnqueue #### + +Called immediately before a job is enqueued using the `Resque::enqueue` method. +Arguments passed (in this order) include: + +* Class - string containing the name of the job to be enqueued +* Arguments - array of arguments for the job +* Queue - string containing the name of the queue the job is to be enqueued in +* ID - string containing the token of the job to be enqueued + +You can prevent enqueing of the job by throwing an exception of `Resque_Job_DontCreate`. + #### afterEnqueue #### Called after a job has been queued using the `Resque::enqueue` method. Arguments passed (in this order) include: -* Class - string containing the name of the class the job was scheduled in +* Class - string containing the name of scheduled job * Arguments - array of arguments supplied to the job +* Queue - string containing the name of the queue the job was added to +* ID - string containing the new token of the enqueued job + +## Step-By-Step ## + +For a more in-depth look at what php-resque does under the hood (without +needing to directly examine the code), have a look at `HOWITWORKS.md`. ## Contributors ## -* chrisboulton -* thedotedge -* hobodave -* scraton -* KevBurnsJr -* jmathai -* dceballos +### Project Lead ### + +* @chrisboulton + +### Others ### + +* @acinader +* @ajbonner +* @andrewjshults +* @atorres757 +* @benjisg +* @cballou +* @chaitanyakuber +* @charly22 +* @CyrilMazur +* @d11wtq +* @danhunsaker +* @dceballos +* @ebernhardson +* @hlegius +* @hobodave +* @humancopy +* @iskandar +* @JesseObrien +* @jjfrey +* @jmathai +* @joshhawthorne +* @KevBurnsJr +* @lboynton +* @maetl +* @matteosister +* @MattHeath +* @mickhrmweb +* @Olden +* @patrickbajao +* @pedroarnal +* @ptrofimov +* @rajibahmed +* @richardkmiller +* @Rockstar04 +* @ruudk +* @salimane +* @scragg0x +* @scraton +* @thedotedge +* @tonypiper +* @trimbletodd +* @warezthebeef diff --git a/TODO.markdown b/TODO.markdown deleted file mode 100644 index 61ea867d..00000000 --- a/TODO.markdown +++ /dev/null @@ -1,8 +0,0 @@ -* Write tests for: - * `Resque_Failure` - * `Resque_Failure_Redis` -* Change to preforking worker model -* Clean up /bin and /demo -* Add a way to store arbitrary text in job statuses (for things like progress -indicators) -* Write plugin for Ruby resque that calls setUp and tearDown methods \ No newline at end of file diff --git a/bin/resque b/bin/resque old mode 100644 new mode 100755 index 1a248525..1d604851 --- a/bin/resque +++ b/bin/resque @@ -1 +1,129 @@ -#!/bin/sh +#!/usr/bin/env php + 1) { + $count = $COUNT; +} + +$PREFIX = getenv('PREFIX'); +if(!empty($PREFIX)) { + $logger->log(Psr\Log\LogLevel::INFO, 'Prefix set to {prefix}', array('prefix' => $PREFIX)); + Resque_Redis::prefix($PREFIX); +} + +if($count > 1) { + for($i = 0; $i < $count; ++$i) { + $pid = Resque::fork(); + if($pid === false || $pid === -1) { + $logger->log(Psr\Log\LogLevel::EMERGENCY, 'Could not fork worker {count}', array('count' => $i)); + die(); + } + // Child, start the worker + else if(!$pid) { + $queues = explode(',', $QUEUE); + $worker = new Resque_Worker($queues); + $worker->setLogger($logger); + $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); + $worker->work($interval, $BLOCKING); + break; + } + } +} +// Start a single worker +else { + $queues = explode(',', $QUEUE); + $worker = new Resque_Worker($queues); + $worker->setLogger($logger); + + $PIDFILE = getenv('PIDFILE'); + if ($PIDFILE) { + file_put_contents($PIDFILE, getmypid()) or + die('Could not write PID information to ' . $PIDFILE); + } + + $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker)); + $worker->work($interval, $BLOCKING); +} +?> diff --git a/composer.json b/composer.json new file mode 100644 index 00000000..b12fa291 --- /dev/null +++ b/composer.json @@ -0,0 +1,41 @@ +{ + "name": "chrisboulton/php-resque", + "type": "library", + "description": "Redis backed library for creating background jobs and processing them later. Based on resque for Ruby.", + "keywords": ["job", "background", "redis", "resque"], + "homepage": "http://www.github.com/chrisboulton/php-resque/", + "license": "MIT", + "authors": [ + { + "name": "Chris Boulton", + "email": "chris@bigcommerce.com" + } + ], + "repositories": [ + { + "type": "vcs", + "url": "https://github.com/chrisboulton/credis" + } + ], + "require": { + "php": ">=5.3.0", + "ext-pcntl": "*", + "colinmollenhour/credis": "~1.7", + "psr/log": "~1.0" + }, + "suggest": { + "ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.", + "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available." + }, + "require-dev": { + "phpunit/phpunit": "3.7.*" + }, + "bin": [ + "bin/resque" + ], + "autoload": { + "psr-0": { + "Resque": "lib" + } + } +} diff --git a/composer.lock b/composer.lock new file mode 100644 index 00000000..0f431b90 --- /dev/null +++ b/composer.lock @@ -0,0 +1,514 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", + "This file is @generated automatically" + ], + "hash": "41124ffd15a15b52947e430b92b8f10f", + "content-hash": "11906622d4e017ff6807c6dff51f208d", + "packages": [ + { + "name": "colinmollenhour/credis", + "version": "1.7", + "source": { + "type": "git", + "url": "https://github.com/colinmollenhour/credis.git", + "reference": "74b2b703da5c58dc07fb97e8954bc63280b469bf" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/colinmollenhour/credis/zipball/74b2b703da5c58dc07fb97e8954bc63280b469bf", + "reference": "74b2b703da5c58dc07fb97e8954bc63280b469bf", + "shasum": "" + }, + "require": { + "php": ">=5.4.0" + }, + "type": "library", + "autoload": { + "classmap": [ + "Client.php", + "Cluster.php", + "Sentinel.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Colin Mollenhour", + "email": "colin@mollenhour.com" + } + ], + "description": "Credis is a lightweight interface to the Redis key-value store which wraps the phpredis library when available for better performance.", + "homepage": "https://github.com/colinmollenhour/credis", + "time": "2016-03-24 15:50:52" + }, + { + "name": "psr/log", + "version": "1.0.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/log.git", + "reference": "fe0936ee26643249e916849d48e3a51d5f5e278b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/log/zipball/fe0936ee26643249e916849d48e3a51d5f5e278b", + "reference": "fe0936ee26643249e916849d48e3a51d5f5e278b", + "shasum": "" + }, + "type": "library", + "autoload": { + "psr-0": { + "Psr\\Log\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interface for logging libraries", + "keywords": [ + "log", + "psr", + "psr-3" + ], + "time": "2012-12-21 11:40:51" + } + ], + "packages-dev": [ + { + "name": "phpunit/php-code-coverage", + "version": "1.2.18", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-code-coverage.git", + "reference": "fe2466802556d3fe4e4d1d58ffd3ccfd0a19be0b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/fe2466802556d3fe4e4d1d58ffd3ccfd0a19be0b", + "reference": "fe2466802556d3fe4e4d1d58ffd3ccfd0a19be0b", + "shasum": "" + }, + "require": { + "php": ">=5.3.3", + "phpunit/php-file-iterator": ">=1.3.0@stable", + "phpunit/php-text-template": ">=1.2.0@stable", + "phpunit/php-token-stream": ">=1.1.3,<1.3.0" + }, + "require-dev": { + "phpunit/phpunit": "3.7.*@dev" + }, + "suggest": { + "ext-dom": "*", + "ext-xdebug": ">=2.0.5" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.2.x-dev" + } + }, + "autoload": { + "classmap": [ + "PHP/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Library that provides collection, processing, and rendering functionality for PHP code coverage information.", + "homepage": "https://github.com/sebastianbergmann/php-code-coverage", + "keywords": [ + "coverage", + "testing", + "xunit" + ], + "time": "2014-09-02 10:13:14" + }, + { + "name": "phpunit/php-file-iterator", + "version": "1.4.1", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-file-iterator.git", + "reference": "6150bf2c35d3fc379e50c7602b75caceaa39dbf0" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-file-iterator/zipball/6150bf2c35d3fc379e50c7602b75caceaa39dbf0", + "reference": "6150bf2c35d3fc379e50c7602b75caceaa39dbf0", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.4.x-dev" + } + }, + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "FilterIterator implementation that filters files based on a list of suffixes.", + "homepage": "https://github.com/sebastianbergmann/php-file-iterator/", + "keywords": [ + "filesystem", + "iterator" + ], + "time": "2015-06-21 13:08:43" + }, + { + "name": "phpunit/php-text-template", + "version": "1.2.1", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-text-template.git", + "reference": "31f8b717e51d9a2afca6c9f046f5d69fc27c8686" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-text-template/zipball/31f8b717e51d9a2afca6c9f046f5d69fc27c8686", + "reference": "31f8b717e51d9a2afca6c9f046f5d69fc27c8686", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "type": "library", + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sebastian@phpunit.de", + "role": "lead" + } + ], + "description": "Simple template engine.", + "homepage": "https://github.com/sebastianbergmann/php-text-template/", + "keywords": [ + "template" + ], + "time": "2015-06-21 13:50:34" + }, + { + "name": "phpunit/php-timer", + "version": "1.0.8", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-timer.git", + "reference": "38e9124049cf1a164f1e4537caf19c99bf1eb260" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-timer/zipball/38e9124049cf1a164f1e4537caf19c99bf1eb260", + "reference": "38e9124049cf1a164f1e4537caf19c99bf1eb260", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "require-dev": { + "phpunit/phpunit": "~4|~5" + }, + "type": "library", + "autoload": { + "classmap": [ + "src/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Utility class for timing", + "homepage": "https://github.com/sebastianbergmann/php-timer/", + "keywords": [ + "timer" + ], + "time": "2016-05-12 18:03:57" + }, + { + "name": "phpunit/php-token-stream", + "version": "1.2.2", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/php-token-stream.git", + "reference": "ad4e1e23ae01b483c16f600ff1bebec184588e32" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/php-token-stream/zipball/ad4e1e23ae01b483c16f600ff1bebec184588e32", + "reference": "ad4e1e23ae01b483c16f600ff1bebec184588e32", + "shasum": "" + }, + "require": { + "ext-tokenizer": "*", + "php": ">=5.3.3" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.2-dev" + } + }, + "autoload": { + "classmap": [ + "PHP/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Wrapper around PHP's tokenizer extension.", + "homepage": "https://github.com/sebastianbergmann/php-token-stream/", + "keywords": [ + "tokenizer" + ], + "time": "2014-03-03 05:10:30" + }, + { + "name": "phpunit/phpunit", + "version": "3.7.38", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/phpunit.git", + "reference": "38709dc22d519a3d1be46849868aa2ddf822bcf6" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/38709dc22d519a3d1be46849868aa2ddf822bcf6", + "reference": "38709dc22d519a3d1be46849868aa2ddf822bcf6", + "shasum": "" + }, + "require": { + "ext-ctype": "*", + "ext-dom": "*", + "ext-json": "*", + "ext-pcre": "*", + "ext-reflection": "*", + "ext-spl": "*", + "php": ">=5.3.3", + "phpunit/php-code-coverage": "~1.2", + "phpunit/php-file-iterator": "~1.3", + "phpunit/php-text-template": "~1.1", + "phpunit/php-timer": "~1.0", + "phpunit/phpunit-mock-objects": "~1.2", + "symfony/yaml": "~2.0" + }, + "require-dev": { + "pear-pear.php.net/pear": "1.9.4" + }, + "suggest": { + "phpunit/php-invoker": "~1.1" + }, + "bin": [ + "composer/bin/phpunit" + ], + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.7.x-dev" + } + }, + "autoload": { + "classmap": [ + "PHPUnit/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "", + "../../symfony/yaml/" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sebastian@phpunit.de", + "role": "lead" + } + ], + "description": "The PHP Unit Testing framework.", + "homepage": "http://www.phpunit.de/", + "keywords": [ + "phpunit", + "testing", + "xunit" + ], + "time": "2014-10-17 09:04:17" + }, + { + "name": "phpunit/phpunit-mock-objects", + "version": "1.2.3", + "source": { + "type": "git", + "url": "https://github.com/sebastianbergmann/phpunit-mock-objects.git", + "reference": "5794e3c5c5ba0fb037b11d8151add2a07fa82875" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit-mock-objects/zipball/5794e3c5c5ba0fb037b11d8151add2a07fa82875", + "reference": "5794e3c5c5ba0fb037b11d8151add2a07fa82875", + "shasum": "" + }, + "require": { + "php": ">=5.3.3", + "phpunit/php-text-template": ">=1.1.1@stable" + }, + "suggest": { + "ext-soap": "*" + }, + "type": "library", + "autoload": { + "classmap": [ + "PHPUnit/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "include-path": [ + "" + ], + "license": [ + "BSD-3-Clause" + ], + "authors": [ + { + "name": "Sebastian Bergmann", + "email": "sb@sebastian-bergmann.de", + "role": "lead" + } + ], + "description": "Mock Object library for PHPUnit", + "homepage": "https://github.com/sebastianbergmann/phpunit-mock-objects/", + "keywords": [ + "mock", + "xunit" + ], + "time": "2013-01-13 10:24:48" + }, + { + "name": "symfony/yaml", + "version": "v2.8.12", + "source": { + "type": "git", + "url": "https://github.com/symfony/yaml.git", + "reference": "e7540734bad981fe59f8ef14b6fc194ae9df8d9c" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/yaml/zipball/e7540734bad981fe59f8ef14b6fc194ae9df8d9c", + "reference": "e7540734bad981fe59f8ef14b6fc194ae9df8d9c", + "shasum": "" + }, + "require": { + "php": ">=5.3.9" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.8-dev" + } + }, + "autoload": { + "psr-4": { + "Symfony\\Component\\Yaml\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Symfony Yaml Component", + "homepage": "https://symfony.com/", + "time": "2016-09-02 01:57:56" + } + ], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": { + "php": ">=5.3.0", + "ext-pcntl": "*" + }, + "platform-dev": [] +} diff --git a/demo/bad_job.php b/demo/bad_job.php index bc126209..cd719cc2 100644 --- a/demo/bad_job.php +++ b/demo/bad_job.php @@ -5,5 +5,4 @@ public function perform() { throw new Exception('Unable to run this job!'); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/check_status.php b/demo/check_status.php index c5c194c9..871dabab 100644 --- a/demo/check_status.php +++ b/demo/check_status.php @@ -3,9 +3,13 @@ die('Specify the ID of a job to monitor the status of.'); } -require '../lib/resque.php'; +require __DIR__ . '/init.php'; + date_default_timezone_set('GMT'); Resque::setBackend('127.0.0.1:6379'); +// You can also use a DSN-style format: +//Resque::setBackend('redis://user:pass@127.0.0.1:6379'); +//Resque::setBackend('redis://user:pass@a.host.name:3432/2'); $status = new Resque_Job_Status($argv[1]); if(!$status->isTracking()) { @@ -16,5 +20,4 @@ while(true) { fwrite(STDOUT, "Status of ".$argv[1]." is: ".$status->get()."\n"); sleep(1); -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/init.php b/demo/init.php new file mode 100644 index 00000000..9078bcda --- /dev/null +++ b/demo/init.php @@ -0,0 +1,25 @@ + '); + sleep(1); + fwrite(STDOUT, 'Job ended!' . PHP_EOL); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/long_job.php b/demo/long_job.php index 8c9f0f94..1cfe5cb0 100644 --- a/demo/long_job.php +++ b/demo/long_job.php @@ -5,5 +5,4 @@ public function perform() { sleep(600); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/php_error_job.php b/demo/php_error_job.php index 93bf2bca..98244059 100644 --- a/demo/php_error_job.php +++ b/demo/php_error_job.php @@ -5,5 +5,4 @@ public function perform() { callToUndefinedFunction(); } -} -?> \ No newline at end of file +} \ No newline at end of file diff --git a/demo/queue.php b/demo/queue.php index 6a94e440..5067404b 100644 --- a/demo/queue.php +++ b/demo/queue.php @@ -3,17 +3,24 @@ die('Specify the name of a job to add. e.g, php queue.php PHP_Job'); } -require '../lib/Resque.php'; +require __DIR__ . '/init.php'; date_default_timezone_set('GMT'); Resque::setBackend('127.0.0.1:6379'); +// You can also use a DSN-style format: +//Resque::setBackend('redis://user:pass@127.0.0.1:6379'); +//Resque::setBackend('redis://user:pass@a.host.name:3432/2'); + $args = array( 'time' => time(), 'array' => array( 'test' => 'test', ), ); +if (empty($argv[2])) { + $jobId = Resque::enqueue('default', $argv[1], $args, true); +} else { + $jobId = Resque::enqueue($argv[1], $argv[2], $args, true); +} -$jobId = Resque::enqueue('default', $argv[1], $args, true); echo "Queued job ".$jobId."\n\n"; -?> \ No newline at end of file diff --git a/demo/resque.php b/demo/resque.php index 5af0cf17..fcfe578b 100644 --- a/demo/resque.php +++ b/demo/resque.php @@ -4,5 +4,4 @@ require 'job.php'; require 'php_error_job.php'; -require '../resque.php'; -?> \ No newline at end of file +require '../bin/resque'; \ No newline at end of file diff --git a/extras/resque.monit b/extras/resque.monit index 654815da..b611f8f5 100644 --- a/extras/resque.monit +++ b/extras/resque.monit @@ -9,7 +9,7 @@ check process resque_worker_[QUEUE] with pidfile /var/run/resque/worker_[QUEUE].pid - start program = "/bin/sh -c 'APP_INCLUDE=[APP_INCLUDE] QUEUE=[QUEUE] VERBOSE=1 PIDFILE=/var/run/resque/worker_[QUEUE].pid nohup php -f [PATH/TO/RESQUE]/resque.php > /var/log/resque/worker_[QUEUE].log &'" as uid [UID] and gid [GID] + start program = "/bin/sh -c 'APP_INCLUDE=[APP_INCLUDE] QUEUE=[QUEUE] VERBOSE=1 PIDFILE=/var/run/resque/worker_[QUEUE].pid nohup php -f [PATH/TO/RESQUE]/bin/resque > /var/log/resque/worker_[QUEUE].log &'" as uid [UID] and gid [GID] stop program = "/bin/sh -c 'kill -s QUIT `cat /var/run/resque/worker_[QUEUE].pid` && rm -f /var/run/resque/worker_[QUEUE].pid; exit 0;'" if totalmem is greater than 300 MB for 10 cycles then restart # eating up memory? group resque_workers \ No newline at end of file diff --git a/lib/Redisent/LICENSE b/lib/Redisent/LICENSE deleted file mode 100644 index 385910fb..00000000 --- a/lib/Redisent/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -Copyright (c) 2009 Justin Poliey - -Permission is hereby granted, free of charge, to any person -obtaining a copy of this software and associated documentation -files (the "Software"), to deal in the Software without -restriction, including without limitation the rights to use, -copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the -Software is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES -OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT -HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/lib/Redisent/README.markdown b/lib/Redisent/README.markdown deleted file mode 100644 index 3edb8438..00000000 --- a/lib/Redisent/README.markdown +++ /dev/null @@ -1,67 +0,0 @@ -# Redisent - -Redisent is a simple, no-nonsense interface to the [Redis](http://code.google.com/p/redis/) key-value store for modest developers. -Due to the way it is implemented, it is flexible and tolerant of changes to the Redis protocol. - -## Getting to work - -If you're at all familiar with the Redis protocol and PHP objects, you've already mastered Redisent. -All Redisent does is map the Redis protocol to a PHP object, abstract away the nitty-gritty, and make the return values PHP compatible. - - require 'redisent.php'; - $redis = new Redisent('localhost'); - $redis->set('awesome', 'absolutely'); - echo sprintf('Is Redisent awesome? %s.\n', $redis->get('awesome')); - -You use the exact same command names, and the exact same argument order. **How wonderful.** How about a more complex example? - - require 'redisent.php'; - $redis = new Redisent('localhost'); - $redis->rpush('particles', 'proton'); - $redis->rpush('particles', 'electron'); - $redis->rpush('particles', 'neutron'); - $particles = $redis->lrange('particles', 0, -1); - $particle_count = $redis->llen('particles'); - echo "

The {$particle_count} particles that make up atoms are:

"; - echo "
    "; - foreach ($particles as $particle) { - echo "
  • {$particle}
  • "; - } - echo "
"; - -Be aware that Redis error responses will be wrapped in a RedisException class and thrown, so do be sure to use proper coding techniques. - -## Clustering your servers - -Redisent also includes a way for developers to fully utilize the scalability of Redis with multiple servers and [consistent hashing](http://en.wikipedia.org/wiki/Consistent_hashing). -Using the RedisentCluster class, you can use Redisent the same way, except that keys will be hashed across multiple servers. -Here is how to set up a cluster: - - include 'redisent_cluster.php'; - - $cluster = new RedisentCluster(array( - array('host' => '127.0.0.1', 'port' => 6379), - array('host' => '127.0.0.1', 'port' => 6380) - )); - -You can then use Redisent the way you normally would, i.e., `$cluster->set('key', 'value')` or `$cluster->lrange('particles', 0, -1)`. -But what about when you need to use commands that are server specific and do not operate on keys? You can use routing, with the `RedisentCluster::to` method. -To use routing, you need to assign a server an alias in the constructor of the Redis cluster. Aliases are not required on all servers, just the ones you want to be able to access directly. - - include 'redisent_cluster.php'; - - $cluster = new RedisentCluster(array( - 'alpha' => array('host' => '127.0.0.1', 'port' => 6379), - array('host' => '127.0.0.1', 'port' => 6380) - )); - -Now there is an alias of the server running on 127.0.0.1:6379 called **alpha**, and can be interacted with like this: - - // get server info - $cluster->to('alpha')->info(); - -Now you have complete programatic control over your Redis servers. - -## About - -© 2009 [Justin Poliey](http://justinpoliey.com) \ No newline at end of file diff --git a/lib/Redisent/Redisent.php b/lib/Redisent/Redisent.php deleted file mode 100644 index ac70c81e..00000000 --- a/lib/Redisent/Redisent.php +++ /dev/null @@ -1,143 +0,0 @@ - - * @copyright 2009 Justin Poliey - * @license http://www.opensource.org/licenses/mit-license.php The MIT License - * @package Redisent - */ - -define('CRLF', sprintf('%s%s', chr(13), chr(10))); - -/** - * Wraps native Redis errors in friendlier PHP exceptions - */ -class RedisException extends Exception { -} - -/** - * Redisent, a Redis interface for the modest among us - */ -class Redisent { - - /** - * Socket connection to the Redis server - * @var resource - * @access private - */ - private $__sock; - - /** - * Host of the Redis server - * @var string - * @access public - */ - public $host; - - /** - * Port on which the Redis server is running - * @var integer - * @access public - */ - public $port; - - /** - * Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}. - * @param string $host The hostname of the Redis server - * @param integer $port The port number of the Redis server - */ - function __construct($host, $port = 6379) { - $this->host = $host; - $this->port = $port; - $this->__sock = fsockopen($this->host, $this->port, $errno, $errstr); - if (!$this->__sock) { - throw new Exception("{$errno} - {$errstr}"); - } - } - - function __destruct() { - fclose($this->__sock); - } - - function __call($name, $args) { - - /* Build the Redis unified protocol command */ - array_unshift($args, strtoupper($name)); - $command = sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(array($this, 'formatArgument'), $args), CRLF), CRLF); - - /* Open a Redis connection and execute the command */ - for ($written = 0; $written < strlen($command); $written += $fwrite) { - $fwrite = fwrite($this->__sock, substr($command, $written)); - if ($fwrite === FALSE) { - throw new Exception('Failed to write entire command to stream'); - } - } - - /* Parse the response based on the reply identifier */ - $reply = trim(fgets($this->__sock, 512)); - switch (substr($reply, 0, 1)) { - /* Error reply */ - case '-': - throw new RedisException(substr(trim($reply), 4)); - break; - /* Inline reply */ - case '+': - $response = substr(trim($reply), 1); - break; - /* Bulk reply */ - case '$': - $response = null; - if ($reply == '$-1') { - break; - } - $read = 0; - $size = substr($reply, 1); - do { - $block_size = ($size - $read) > 1024 ? 1024 : ($size - $read); - $response .= fread($this->__sock, $block_size); - $read += $block_size; - } while ($read < $size); - fread($this->__sock, 2); /* discard crlf */ - break; - /* Multi-bulk reply */ - case '*': - $count = substr($reply, 1); - if ($count == '-1') { - return null; - } - $response = array(); - for ($i = 0; $i < $count; $i++) { - $bulk_head = trim(fgets($this->__sock, 512)); - $size = substr($bulk_head, 1); - if ($size == '-1') { - $response[] = null; - } - else { - $read = 0; - $block = ""; - do { - $block_size = ($size - $read) > 1024 ? 1024 : ($size - $read); - $block .= fread($this->__sock, $block_size); - $read += $block_size; - } while ($read < $size); - fread($this->__sock, 2); /* discard crlf */ - $response[] = $block; - } - } - break; - /* Integer reply */ - case ':': - $response = intval(substr(trim($reply), 1)); - break; - default: - throw new RedisException("invalid server response: {$reply}"); - break; - } - /* Party on */ - return $response; - } - - private function formatArgument($arg) { - return sprintf('$%d%s%s', strlen($arg), CRLF, $arg); - } -} \ No newline at end of file diff --git a/lib/Redisent/RedisentCluster.php b/lib/Redisent/RedisentCluster.php deleted file mode 100644 index ea936116..00000000 --- a/lib/Redisent/RedisentCluster.php +++ /dev/null @@ -1,138 +0,0 @@ - - * @copyright 2009 Justin Poliey - * @license http://www.opensource.org/licenses/mit-license.php The MIT License - * @package Redisent - */ - -require_once dirname(__FILE__) . '/Redisent.php'; - -/** - * A generalized Redisent interface for a cluster of Redis servers - */ -class RedisentCluster { - - /** - * Collection of Redisent objects attached to Redis servers - * @var array - * @access private - */ - private $redisents; - - /** - * Aliases of Redisent objects attached to Redis servers, used to route commands to specific servers - * @see RedisentCluster::to - * @var array - * @access private - */ - private $aliases; - - /** - * Hash ring of Redis server nodes - * @var array - * @access private - */ - private $ring; - - /** - * Individual nodes of pointers to Redis servers on the hash ring - * @var array - * @access private - */ - private $nodes; - - /** - * Number of replicas of each node to make around the hash ring - * @var integer - * @access private - */ - private $replicas = 128; - - /** - * The commands that are not subject to hashing - * @var array - * @access private - */ - private $dont_hash = array( - 'RANDOMKEY', 'DBSIZE', - 'SELECT', 'MOVE', 'FLUSHDB', 'FLUSHALL', - 'SAVE', 'BGSAVE', 'LASTSAVE', 'SHUTDOWN', - 'INFO', 'MONITOR', 'SLAVEOF' - ); - - /** - * Creates a Redisent interface to a cluster of Redis servers - * @param array $servers The Redis servers in the cluster. Each server should be in the format array('host' => hostname, 'port' => port) - */ - function __construct($servers) { - $this->ring = array(); - $this->aliases = array(); - foreach ($servers as $alias => $server) { - $this->redisents[] = new Redisent($server['host'], $server['port']); - if (is_string($alias)) { - $this->aliases[$alias] = $this->redisents[count($this->redisents)-1]; - } - for ($replica = 1; $replica <= $this->replicas; $replica++) { - $this->ring[crc32($server['host'].':'.$server['port'].'-'.$replica)] = $this->redisents[count($this->redisents)-1]; - } - } - ksort($this->ring, SORT_NUMERIC); - $this->nodes = array_keys($this->ring); - } - - /** - * Routes a command to a specific Redis server aliased by {$alias}. - * @param string $alias The alias of the Redis server - * @return Redisent The Redisent object attached to the Redis server - */ - function to($alias) { - if (isset($this->aliases[$alias])) { - return $this->aliases[$alias]; - } - else { - throw new Exception("That Redisent alias does not exist"); - } - } - - /* Execute a Redis command on the cluster */ - function __call($name, $args) { - - /* Pick a server node to send the command to */ - $name = strtoupper($name); - if (!in_array($name, $this->dont_hash)) { - $node = $this->nextNode(crc32($args[0])); - $redisent = $this->ring[$node]; - } - else { - $redisent = $this->redisents[0]; - } - - /* Execute the command on the server */ - return call_user_func_array(array($redisent, $name), $args); - } - - /** - * Routes to the proper server node - * @param integer $needle The hash value of the Redis command - * @return Redisent The Redisent object associated with the hash - */ - private function nextNode($needle) { - $haystack = $this->nodes; - while (count($haystack) > 2) { - $try = floor(count($haystack) / 2); - if ($haystack[$try] == $needle) { - return $needle; - } - if ($needle < $haystack[$try]) { - $haystack = array_slice($haystack, 0, $try + 1); - } - if ($needle > $haystack[$try]) { - $haystack = array_slice($haystack, $try + 1); - } - } - return $haystack[count($haystack)-1]; - } - -} \ No newline at end of file diff --git a/lib/Resque.php b/lib/Resque.php index 168d849a..d03b2ecf 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -1,42 +1,48 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque { - const VERSION = '1.0'; + const VERSION = '1.2'; + + const DEFAULT_INTERVAL = 5; /** * @var Resque_Redis Instance of Resque_Redis that talks to redis. */ public static $redis = null; + /** + * @var mixed Host/port conbination separated by a colon, or a nested + * array of server swith host/port pairs + */ + protected static $redisServer = null; + + /** + * @var int ID of Redis database to select. + */ + protected static $redisDatabase = 0; + /** * Given a host/port combination separated by a colon, set it as * the redis server that Resque will talk to. * - * @param mixed $server Host/port combination separated by a colon, or - * a nested array of servers with host/port pairs. + * @param mixed $server Host/port combination separated by a colon, DSN-formatted URI, or + * a callable that receives the configured database ID + * and returns a Resque_Redis instance, or + * a nested array of servers with host/port pairs. + * @param int $database */ - public static function setBackend($server) + public static function setBackend($server, $database = 0) { - if(is_array($server)) { - require_once dirname(__FILE__) . '/Resque/RedisCluster.php'; - self::$redis = new Resque_RedisCluster($server); - } - else { - list($host, $port) = explode(':', $server); - require_once dirname(__FILE__) . '/Resque/Redis.php'; - self::$redis = new Resque_Redis($host, $port); - } + self::$redisServer = $server; + self::$redisDatabase = $database; + self::$redis = null; } /** @@ -46,24 +52,65 @@ public static function setBackend($server) */ public static function redis() { - if(is_null(self::$redis)) { - self::setBackend('localhost:6379'); + if (self::$redis !== null) { + return self::$redis; + } + + if (is_callable(self::$redisServer)) { + self::$redis = call_user_func(self::$redisServer, self::$redisDatabase); + } else { + self::$redis = new Resque_Redis(self::$redisServer, self::$redisDatabase); } return self::$redis; } + /** + * fork() helper method for php-resque that handles issues PHP socket + * and phpredis have with passing around sockets between child/parent + * processes. + * + * Will close connection to Redis before forking. + * + * @return int Return vars as per pcntl_fork(). False if pcntl_fork is unavailable + */ + public static function fork() + { + if(!function_exists('pcntl_fork')) { + return false; + } + + // Close the connection to Redis before forking. + // This is a workaround for issues phpredis has. + self::$redis = null; + + $pid = pcntl_fork(); + if($pid === -1) { + throw new RuntimeException('Unable to fork child worker.'); + } + + return $pid; + } + /** * Push a job to the end of a specific queue. If the queue does not * exist, then create it as well. * * @param string $queue The name of the queue to add the job to. - * @param object $item Job description as an object to be JSON encoded. + * @param array $item Job description as an array to be JSON encoded. */ public static function push($queue, $item) { + $encodedItem = json_encode($item); + if ($encodedItem === false) { + return false; + } self::redis()->sadd('queues', $queue); - self::redis()->rpush('queue:' . $queue, json_encode($item)); + $length = self::redis()->rpush('queue:' . $queue, $encodedItem); + if ($length < 1) { + return false; + } + return true; } /** @@ -71,11 +118,12 @@ public static function push($queue, $item) * return it. * * @param string $queue The name of the queue to fetch an item from. - * @return object Decoded item from the queue. + * @return array Decoded item from the queue. */ public static function pop($queue) { - $item = self::redis()->lpop('queue:' . $queue); + $item = self::redis()->lpop('queue:' . $queue); + if(!$item) { return; } @@ -83,9 +131,74 @@ public static function pop($queue) return json_decode($item, true); } + /** + * Remove items of the specified queue + * + * @param string $queue The name of the queue to fetch an item from. + * @param array $items + * @return integer number of deleted items + */ + public static function dequeue($queue, $items = Array()) + { + if(count($items) > 0) { + return self::removeItems($queue, $items); + } else { + return self::removeList($queue); + } + } + + /** + * Remove specified queue + * + * @param string $queue The name of the queue to remove. + * @return integer Number of deleted items + */ + public static function removeQueue($queue) + { + $num = self::removeList($queue); + self::redis()->srem('queues', $queue); + return $num; + } + + /** + * Pop an item off the end of the specified queues, using blocking list pop, + * decode it and return it. + * + * @param array $queues + * @param int $timeout + * @return null|array Decoded item from the queue. + */ + public static function blpop(array $queues, $timeout) + { + $list = array(); + foreach($queues AS $queue) { + $list[] = 'queue:' . $queue; + } + + $item = self::redis()->blpop($list, (int)$timeout); + + if(!$item) { + return; + } + + /** + * Normally the Resque_Redis class returns queue names without the prefix + * But the blpop is a bit different. It returns the name as prefix:queue:name + * So we need to strip off the prefix:queue: part + */ + $queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:')); + + return array( + 'queue' => $queue, + 'payload' => json_decode($item[1], true) + ); + } + /** * Return the size (number of pending jobs) of the specified queue. * + * @param string $queue name of the queue to be checked for pending jobs + * * @return int The size of the queue. */ public static function size($queue) @@ -99,20 +212,30 @@ public static function size($queue) * @param string $queue The name of the queue to place the job in. * @param string $class The name of the class that contains the code to execute the job. * @param array $args Any optional arguments that should be passed when the job is executed. - * @param boolean $monitor Set to true to be able to monitor the status of a job. + * @param boolean $trackStatus Set to true to be able to monitor the status of a job. + * + * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue */ public static function enqueue($queue, $class, $args = null, $trackStatus = false) { - require_once dirname(__FILE__) . '/Resque/Job.php'; - $result = Resque_Job::create($queue, $class, $args, $trackStatus); - if ($result) { - Resque_Event::trigger('afterEnqueue', array( - 'class' => $class, - 'args' => $args, - )); + $id = Resque::generateJobId(); + $hookParams = array( + 'class' => $class, + 'args' => $args, + 'queue' => $queue, + 'id' => $id, + ); + try { + Resque_Event::trigger('beforeEnqueue', $hookParams); + } + catch(Resque_Job_DontCreate $e) { + return false; } - - return $result; + + Resque_Job::create($queue, $class, $args, $trackStatus, $id); + Resque_Event::trigger('afterEnqueue', $hookParams); + + return $id; } /** @@ -123,7 +246,6 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals */ public static function reserve($queue) { - require_once dirname(__FILE__) . '/Resque/Job.php'; return Resque_Job::reserve($queue); } @@ -140,4 +262,118 @@ public static function queues() } return $queues; } + + /** + * Remove Items from the queue + * Safely moving each item to a temporary queue before processing it + * If the Job matches, counts otherwise puts it in a requeue_queue + * which at the end eventually be copied back into the original queue + * + * @private + * + * @param string $queue The name of the queue + * @param array $items + * @return integer number of deleted items + */ + private static function removeItems($queue, $items = Array()) + { + $counter = 0; + $originalQueue = 'queue:'. $queue; + $tempQueue = $originalQueue. ':temp:'. time(); + $requeueQueue = $tempQueue. ':requeue'; + + // move each item from original queue to temp queue and process it + $finished = false; + while (!$finished) { + $string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue); + + if (!empty($string)) { + if(self::matchItem($string, $items)) { + self::redis()->rpop($tempQueue); + $counter++; + } else { + self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue); + } + } else { + $finished = true; + } + } + + // move back from temp queue to original queue + $finished = false; + while (!$finished) { + $string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue); + if (empty($string)) { + $finished = true; + } + } + + // remove temp queue and requeue queue + self::redis()->del($requeueQueue); + self::redis()->del($tempQueue); + + return $counter; + } + + /** + * matching item + * item can be ['class'] or ['class' => 'id'] or ['class' => {:foo => 1, :bar => 2}] + * @private + * + * @params string $string redis result in json + * @params $items + * + * @return (bool) + */ + private static function matchItem($string, $items) + { + $decoded = json_decode($string, true); + + foreach($items as $key => $val) { + # class name only ex: item[0] = ['class'] + if (is_numeric($key)) { + if($decoded['class'] == $val) { + return true; + } + # class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}] + } elseif (is_array($val)) { + $decodedArgs = (array)$decoded['args'][0]; + if ($decoded['class'] == $key && + count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) { + return true; + } + # class name with ID, example: item[0] = ['class' => 'id'] + } else { + if ($decoded['class'] == $key && $decoded['id'] == $val) { + return true; + } + } + } + return false; + } + + /** + * Remove List + * + * @private + * + * @params string $queue the name of the queue + * @return integer number of deleted items belongs to this list + */ + private static function removeList($queue) + { + $counter = self::size($queue); + $result = self::redis()->del('queue:' . $queue); + return ($result == 1) ? $counter : 0; + } + + /* + * Generate an identifier to attach to a job for status tracking. + * + * @return string + */ + public static function generateJobId() + { + return md5(uniqid('', true)); + } } diff --git a/lib/Resque/Event.php b/lib/Resque/Event.php index 2264ae27..20072ff9 100644 --- a/lib/Resque/Event.php +++ b/lib/Resque/Event.php @@ -3,8 +3,7 @@ * Resque event/plugin system class * * @package Resque/Event - * @author Chris Boulton - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Event @@ -73,7 +72,7 @@ public static function stopListening($event, $callback) $key = array_search($callback, self::$events[$event]); if ($key !== false) { - unset(self::$events[$key]); + unset(self::$events[$event][$key]); } return true; @@ -86,4 +85,4 @@ public static function clearListeners() { self::$events = array(); } -} \ No newline at end of file +} diff --git a/lib/Resque/Exception.php b/lib/Resque/Exception.php index b288bf47..01217c38 100644 --- a/lib/Resque/Exception.php +++ b/lib/Resque/Exception.php @@ -3,11 +3,9 @@ * Resque exception. * * @package Resque - * @author Chris Boulton - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Exception extends Exception { } -?> \ No newline at end of file diff --git a/lib/Resque/Failure.php b/lib/Resque/Failure.php index 844e3434..deb678f9 100644 --- a/lib/Resque/Failure.php +++ b/lib/Resque/Failure.php @@ -1,12 +1,10 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Failure @@ -19,10 +17,10 @@ class Resque_Failure /** * Create a new failed job on the backend. * - * @param object $payload The contents of the job that has just failed. - * @param object $exception The exception generated when the job failed to run. - * @param object $worker Instance of Resque_Worker that was running this job when it failed. - * @param string $queue The name of the queue that this job was fetched from. + * @param object $payload The contents of the job that has just failed. + * @param \Exception $exception The exception generated when the job failed to run. + * @param \Resque_Worker $worker Instance of Resque_Worker that was running this job when it failed. + * @param string $queue The name of the queue that this job was fetched from. */ public static function create($payload, Exception $exception, Resque_Worker $worker, $queue) { @@ -38,7 +36,6 @@ public static function create($payload, Exception $exception, Resque_Worker $wor public static function getBackend() { if(self::$backend === null) { - require dirname(__FILE__) . '/Failure/Redis.php'; self::$backend = 'Resque_Failure_Redis'; } diff --git a/lib/Resque/Failure/Interface.php b/lib/Resque/Failure/Interface.php index 863cd0b1..74de9e7b 100644 --- a/lib/Resque/Failure/Interface.php +++ b/lib/Resque/Failure/Interface.php @@ -3,8 +3,7 @@ * Interface that all failure backends should implement. * * @package Resque/Failure - * @author Chris Boulton - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ interface Resque_Failure_Interface @@ -19,4 +18,3 @@ interface Resque_Failure_Interface */ public function __construct($payload, $exception, $worker, $queue); } -?> \ No newline at end of file diff --git a/lib/Resque/Failure/Redis.php b/lib/Resque/Failure/Redis.php index c81bfc20..69d68724 100644 --- a/lib/Resque/Failure/Redis.php +++ b/lib/Resque/Failure/Redis.php @@ -3,8 +3,7 @@ * Redis backend for storing failed Resque jobs. * * @package Resque/Failure - * @author Chris Boulton - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ @@ -32,4 +31,3 @@ public function __construct($payload, $exception, $worker, $queue) Resque::redis()->rpush('failed', $data); } } -?> \ No newline at end of file diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php old mode 100644 new mode 100755 index 9d674f5c..8508f766 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -1,14 +1,9 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Job @@ -24,20 +19,25 @@ class Resque_Job public $worker; /** - * @var object Object containing details of the job. + * @var array Array containing details of the job. */ public $payload; - + /** - * @var object Instance of the class performing work for this job. + * @var object|Resque_JobInterface Instance of the class performing work for this job. */ private $instance; + /** + * @var Resque_Job_FactoryInterface + */ + private $jobFactory; + /** * Instantiate a new instance of a job. * * @param string $queue The queue that the job belongs to. - * @param object $payload Object containing details of the job. + * @param array $payload array containing details of the job. */ public function __construct($queue, $payload) { @@ -52,19 +52,27 @@ public function __construct($queue, $payload) * @param string $class The name of the class that contains the code to execute the job. * @param array $args Any optional arguments that should be passed when the job is executed. * @param boolean $monitor Set to true to be able to monitor the status of a job. + * @param string $id Unique identifier for tracking the job. Generated if not supplied. + * + * @return string + * @throws \InvalidArgumentException */ - public static function create($queue, $class, $args = null, $monitor = false) + public static function create($queue, $class, $args = null, $monitor = false, $id = null) { + if (is_null($id)) { + $id = Resque::generateJobId(); + } + if($args !== null && !is_array($args)) { throw new InvalidArgumentException( 'Supplied $args must be an array.' ); } - $id = md5(uniqid('', true)); Resque::push($queue, array( 'class' => $class, - 'args' => $args, + 'args' => array($args), 'id' => $id, + 'queue_time' => microtime(true), )); if($monitor) { @@ -79,18 +87,37 @@ public static function create($queue, $class, $args = null, $monitor = false) * instance of Resque_Job for it. * * @param string $queue The name of the queue to check for a job in. - * @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + * @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. */ public static function reserve($queue) { $payload = Resque::pop($queue); - if(!$payload) { + if(!is_array($payload)) { return false; } return new Resque_Job($queue, $payload); } + /** + * Find the next available job from the specified queues using blocking list pop + * and return an instance of Resque_Job for it. + * + * @param array $queues + * @param int $timeout + * @return false|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found. + */ + public static function reserveBlocking(array $queues, $timeout = null) + { + $item = Resque::blpop($queues, $timeout); + + if(!is_array($item)) { + return false; + } + + return new Resque_Job($item['queue'], $item['payload']); + } + /** * Update the status of the current job. * @@ -116,7 +143,7 @@ public function getStatus() $status = new Resque_Job_Status($this->payload['id']); return $status->get(); } - + /** * Get the arguments supplied to this job. * @@ -127,14 +154,14 @@ public function getArguments() if (!isset($this->payload['args'])) { return array(); } - - return $this->payload['args']; + + return $this->payload['args'][0]; } - + /** * Get the instantiated object for this job that will be performing work. - * - * @return object Instance of the object that this job belongs to. + * @return Resque_JobInterface Instance of the object that this job belongs to. + * @throws Resque_Exception */ public function getInstance() { @@ -142,36 +169,24 @@ public function getInstance() return $this->instance; } - if(!class_exists($this->payload['class'])) { - throw new Resque_Exception( - 'Could not find job class ' . $this->payload['class'] . '.' - ); - } - - if(!method_exists($this->payload['class'], 'perform')) { - throw new Resque_Exception( - 'Job class ' . $this->payload['class'] . ' does not contain a perform method.' - ); - } - - $this->instance = new $this->payload['class']; - $this->instance->job = $this; - $this->instance->args = $this->getArguments(); - return $this->instance; + $this->instance = $this->getJobFactory()->create($this->payload['class'], $this->getArguments(), $this->queue); + $this->instance->job = $this; + return $this->instance; } /** * Actually execute a job by calling the perform method on the class * associated with the job with the supplied arguments. * + * @return bool * @throws Resque_Exception When the job's class could not be found or it does not contain a perform method. */ public function perform() { - $instance = $this->getInstance(); try { Resque_Event::trigger('beforePerform', $this); - + + $instance = $this->getInstance(); if(method_exists($instance, 'setUp')) { $instance->setUp(); } @@ -188,12 +203,14 @@ public function perform() catch(Resque_Job_DontPerform $e) { return false; } - + return true; } /** * Mark the current job as having failed. + * + * @param $exception */ public function fail($exception) { @@ -203,7 +220,6 @@ public function fail($exception) )); $this->updateStatus(Resque_Job_Status::STATUS_FAILED); - require_once dirname(__FILE__) . '/Failure.php'; Resque_Failure::create( $this->payload, $exception, @@ -216,6 +232,7 @@ public function fail($exception) /** * Re-queue the current job. + * @return string */ public function recreate() { @@ -225,7 +242,7 @@ public function recreate() $monitor = true; } - return self::create($this->queue, $this->payload['class'], $this->payload['args'], $monitor); + return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor); } /** @@ -247,5 +264,26 @@ public function __toString() } return '(' . implode(' | ', $name) . ')'; } + + /** + * @param Resque_Job_FactoryInterface $jobFactory + * @return Resque_Job + */ + public function setJobFactory(Resque_Job_FactoryInterface $jobFactory) + { + $this->jobFactory = $jobFactory; + + return $this; + } + + /** + * @return Resque_Job_FactoryInterface + */ + public function getJobFactory() + { + if ($this->jobFactory === null) { + $this->jobFactory = new Resque_Job_Factory(); + } + return $this->jobFactory; + } } -?> \ No newline at end of file diff --git a/lib/Resque/Job/DirtyExitException.php b/lib/Resque/Job/DirtyExitException.php index b69413a3..108e0613 100644 --- a/lib/Resque/Job/DirtyExitException.php +++ b/lib/Resque/Job/DirtyExitException.php @@ -3,8 +3,7 @@ * Runtime exception class for a job that does not exit cleanly. * * @package Resque/Job - * @author Chris Boulton - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Job_DirtyExitException extends RuntimeException diff --git a/lib/Resque/Job/DontCreate.php b/lib/Resque/Job/DontCreate.php new file mode 100644 index 00000000..31c33cdc --- /dev/null +++ b/lib/Resque/Job/DontCreate.php @@ -0,0 +1,12 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job_DontCreate extends Exception +{ + +} \ No newline at end of file diff --git a/lib/Resque/Job/DontPerform.php b/lib/Resque/Job/DontPerform.php index 91d5c70e..553327ff 100644 --- a/lib/Resque/Job/DontPerform.php +++ b/lib/Resque/Job/DontPerform.php @@ -3,8 +3,7 @@ * Exception to be thrown if a job should not be performed/run. * * @package Resque/Job - * @author Chris Boulton - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Job_DontPerform extends Exception diff --git a/lib/Resque/Job/Factory.php b/lib/Resque/Job/Factory.php new file mode 100644 index 00000000..cf172944 --- /dev/null +++ b/lib/Resque/Job/Factory.php @@ -0,0 +1,32 @@ +args = $args; + $instance->queue = $queue; + return $instance; + } +} diff --git a/lib/Resque/Job/FactoryInterface.php b/lib/Resque/Job/FactoryInterface.php new file mode 100644 index 00000000..b8c102cf --- /dev/null +++ b/lib/Resque/Job/FactoryInterface.php @@ -0,0 +1,12 @@ + - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Job_Status @@ -141,4 +140,3 @@ public function __toString() return 'job:' . $this->id . ':status'; } } -?> \ No newline at end of file diff --git a/lib/Resque/JobInterface.php b/lib/Resque/JobInterface.php new file mode 100644 index 00000000..be5891db --- /dev/null +++ b/lib/Resque/JobInterface.php @@ -0,0 +1,9 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Log extends Psr\Log\AbstractLogger +{ + public $verbose; + + public function __construct($verbose = false) { + $this->verbose = $verbose; + } + + /** + * Logs with an arbitrary level. + * + * @param mixed $level PSR-3 log level constant, or equivalent string + * @param string $message Message to log, may contain a { placeholder } + * @param array $context Variables to replace { placeholder } + * @return null + */ + public function log($level, $message, array $context = array()) + { + if ($this->verbose) { + fwrite( + STDOUT, + '[' . $level . '] [' . strftime('%T %Y-%m-%d') . '] ' . $this->interpolate($message, $context) . PHP_EOL + ); + return; + } + + if (!($level === Psr\Log\LogLevel::INFO || $level === Psr\Log\LogLevel::DEBUG)) { + fwrite( + STDOUT, + '[' . $level . '] ' . $this->interpolate($message, $context) . PHP_EOL + ); + } + } + + /** + * Fill placeholders with the provided context + * @author Jordi Boggiano j.boggiano@seld.be + * + * @param string $message Message to be logged + * @param array $context Array of variables to use in message + * @return string + */ + public function interpolate($message, array $context = array()) + { + // build a replacement array with braces around the context keys + $replace = array(); + foreach ($context as $key => $val) { + $replace['{' . $key . '}'] = $val; + } + + // interpolate replacement values into the message and return + return strtr($message, $replace); + } +} diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index 874bf696..153bd40e 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -1,21 +1,34 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ -class Resque_Redis extends Redisent +class Resque_Redis { + /** + * Redis namespace + * @var string + */ + private static $defaultNamespace = 'resque:'; + + /** + * A default host to connect to + */ + const DEFAULT_HOST = 'localhost'; + + /** + * The default Redis port + */ + const DEFAULT_PORT = 6379; + + /** + * The default Redis Database number + */ + const DEFAULT_DATABASE = 0; + /** * @var array List of all commands in Redis that supply a key as their * first argument. Used to prefix keys with the Resque namespace. @@ -29,12 +42,13 @@ class Resque_Redis extends Redisent 'ttl', 'move', 'set', + 'setex', 'get', 'getset', 'setnx', 'incr', 'incrby', - 'decrby', + 'decr', 'decrby', 'rpush', 'lpush', @@ -45,6 +59,7 @@ class Resque_Redis extends Redisent 'lset', 'lrem', 'lpop', + 'blpop', 'rpop', 'sadd', 'srem', @@ -61,7 +76,9 @@ class Resque_Redis extends Redisent 'zcard', 'zscore', 'zremrangebyscore', - 'sort' + 'sort', + 'rename', + 'rpoplpush' ); // sinterstore // sunion @@ -70,32 +87,184 @@ class Resque_Redis extends Redisent // sdiffstore // sinter // smove - // rename - // rpoplpush // mget // msetnx // mset // renamenx + /** + * Set Redis namespace (prefix) default: resque + * @param string $namespace + */ + public static function prefix($namespace) + { + if (substr($namespace, -1) !== ':' && $namespace != '') { + $namespace .= ':'; + } + self::$defaultNamespace = $namespace; + } + + /** + * @param string|array $server A DSN or array + * @param int $database A database number to select. However, if we find a valid database number in the DSN the + * DSN-supplied value will be used instead and this parameter is ignored. + * @param object $client Optional Credis_Cluster or Credis_Client instance instantiated by you + */ + public function __construct($server, $database = null, $client = null) + { + try { + if (is_array($server)) { + $this->driver = new Credis_Cluster($server); + } + else if (is_object($client)) { + $this->driver = $client; + } + else { + list($host, $port, $dsnDatabase, $user, $password, $options) = self::parseDsn($server); + // $user is not used, only $password + + // Look for known Credis_Client options + $timeout = isset($options['timeout']) ? intval($options['timeout']) : null; + $persistent = isset($options['persistent']) ? $options['persistent'] : ''; + $maxRetries = isset($options['max_connect_retries']) ? $options['max_connect_retries'] : 0; + + $this->driver = new Credis_Client($host, $port, $timeout, $persistent); + $this->driver->setMaxConnectRetries($maxRetries); + if ($password){ + $this->driver->auth($password); + } + + // If we have found a database in our DSN, use it instead of the `$database` + // value passed into the constructor. + if ($dsnDatabase !== false) { + $database = $dsnDatabase; + } + } + + if ($database !== null) { + $this->driver->select($database); + } + } + catch(CredisException $e) { + throw new Resque_RedisException('Error communicating with Redis: ' . $e->getMessage(), 0, $e); + } + } + + /** + * Parse a DSN string, which can have one of the following formats: + * + * - host:port + * - redis://user:pass@host:port/db?option1=val1&option2=val2 + * - tcp://user:pass@host:port/db?option1=val1&option2=val2 + * - unix:///path/to/redis.sock + * + * Note: the 'user' part of the DSN is not used. + * + * @param string $dsn A DSN string + * @return array An array of DSN compotnents, with 'false' values for any unknown components. e.g. + * [host, port, db, user, pass, options] + */ + public static function parseDsn($dsn) + { + if ($dsn == '') { + // Use a sensible default for an empty DNS string + $dsn = 'redis://' . self::DEFAULT_HOST; + } + if(substr($dsn, 0, 7) === 'unix://') { + return array( + $dsn, + null, + false, + null, + null, + null, + ); + } + $parts = parse_url(http://github.com/$dsn); + + // Check the URI scheme + $validSchemes = array('redis', 'tcp'); + if (isset($parts['scheme']) && ! in_array($parts['scheme'], $validSchemes)) { + throw new \InvalidArgumentException("Invalid DSN. Supported schemes are " . implode(', ', $validSchemes)); + } + + // Allow simple 'hostname' format, which `parse_url` treats as a path, not host. + if ( ! isset($parts['host']) && isset($parts['path'])) { + $parts['host'] = $parts['path']; + unset($parts['path']); + } + + // Extract the port number as an integer + $port = isset($parts['port']) ? intval($parts['port']) : self::DEFAULT_PORT; + + // Get the database from the 'path' part of the URI + $database = false; + if (isset($parts['path'])) { + // Strip non-digit chars from path + $database = intval(preg_replace('/[^0-9]/', '', $parts['path'])); + } + + // Extract any 'user' and 'pass' values + $user = isset($parts['user']) ? $parts['user'] : false; + $pass = isset($parts['pass']) ? $parts['pass'] : false; + + // Convert the query string into an associative array + $options = array(); + if (isset($parts['query'])) { + // Parse the query string into an array + parse_str($parts['query'], $options); + } + + return array( + $parts['host'], + $port, + $database, + $user, + $pass, + $options, + ); + } + /** * Magic method to handle all function requests and prefix key based - * operations with the 'resque:' key prefix. + * operations with the {self::$defaultNamespace} key prefix. * * @param string $name The name of the method called. * @param array $args Array of supplied arguments to the method. * @return mixed Return value from Resident::call() based on the command. */ - public function __call($name, $args) { - $args = func_get_args(); - if(in_array($name, $this->keyCommands)) { - $args[1][0] = 'resque:' . $args[1][0]; + public function __call($name, $args) + { + if (in_array($name, $this->keyCommands)) { + if (is_array($args[0])) { + foreach ($args[0] AS $i => $v) { + $args[0][$i] = self::$defaultNamespace . $v; + } + } + else { + $args[0] = self::$defaultNamespace . $args[0]; + } } try { - return parent::__call($name, $args[1]); + return $this->driver->__call($name, $args); } - catch(RedisException $e) { - return false; + catch (CredisException $e) { + throw new Resque_RedisException('Error communicating with Redis: ' . $e->getMessage(), 0, $e); } } + + public static function getPrefix() + { + return self::$defaultNamespace; + } + + public static function removePrefix($string) + { + $prefix=self::getPrefix(); + + if (substr($string, 0, strlen($prefix)) == $prefix) { + $string = substr($string, strlen($prefix), strlen($string) ); + } + return $string; + } } -?> \ No newline at end of file diff --git a/lib/Resque/RedisCluster.php b/lib/Resque/RedisCluster.php deleted file mode 100644 index 6ac15e3f..00000000 --- a/lib/Resque/RedisCluster.php +++ /dev/null @@ -1,101 +0,0 @@ - - * @copyright (c) 2010 Chris Boulton - * @license http://www.opensource.org/licenses/mit-license.php - */ -class Resque_RedisCluster extends RedisentCluster -{ - /** - * @var array List of all commands in Redis that supply a key as their - * first argument. Used to prefix keys with the Resque namespace. - */ - private $keyCommands = array( - 'exists', - 'del', - 'type', - 'keys', - 'expire', - 'ttl', - 'move', - 'set', - 'get', - 'getset', - 'setnx', - 'incr', - 'incrby', - 'decrby', - 'decrby', - 'rpush', - 'lpush', - 'llen', - 'lrange', - 'ltrim', - 'lindex', - 'lset', - 'lrem', - 'lpop', - 'rpop', - 'sadd', - 'srem', - 'spop', - 'scard', - 'sismember', - 'smembers', - 'srandmember', - 'zadd', - 'zrem', - 'zrange', - 'zrevrange', - 'zrangebyscore', - 'zcard', - 'zscore', - 'zremrangebyscore', - 'sort' - ); - // sinterstore - // sunion - // sunionstore - // sdiff - // sdiffstore - // sinter - // smove - // rename - // rpoplpush - // mget - // msetnx - // mset - // renamenx - - /** - * Magic method to handle all function requests and prefix key based - * operations with the 'resque:' key prefix. - * - * @param string $name The name of the method called. - * @param array $args Array of supplied arguments to the method. - * @return mixed Return value from Resident::call() based on the command. - */ - public function __call($name, $args) { - $args = func_get_args(); - if(in_array($name, $this->keyCommands)) { - $args[1][0] = 'resque:' . $args[1][0]; - } - try { - return parent::__call($name, $args[1]); - } - catch(RedisException $e) { - return false; - } - } -} -?> diff --git a/lib/Resque/RedisException.php b/lib/Resque/RedisException.php new file mode 100644 index 00000000..ca654a00 --- /dev/null +++ b/lib/Resque/RedisException.php @@ -0,0 +1,12 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_RedisException extends Resque_Exception +{ +} +?> \ No newline at end of file diff --git a/lib/Resque/Stat.php b/lib/Resque/Stat.php index 2805376c..bc00c636 100644 --- a/lib/Resque/Stat.php +++ b/lib/Resque/Stat.php @@ -3,8 +3,7 @@ * Resque statistic management (jobs processed, failed, etc) * * @package Resque/Stat - * @author Chris Boulton - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Stat diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index a5c70b9b..04714c1c 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -1,28 +1,20 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Worker { - const LOG_NONE = 0; - const LOG_NORMAL = 1; - const LOG_VERBOSE = 2; - /** - * @var int Current log level of this worker. - */ - public $logLevel = 0; + * @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface + */ + public $logger; /** * @var array Array of all associated queues for this worker. @@ -53,14 +45,40 @@ class Resque_Worker * @var Resque_Job Current job, if any, being processed by this worker. */ private $currentJob = null; - + /** * @var int Process ID of child worker processes. */ private $child = null; + /** + * Instantiate a new worker, given a list of queues that it should be working + * on. The list of queues should be supplied in the priority that they should + * be checked for jobs (first come, first served) + * + * Passing a single '*' allows the worker to work on all queues in alphabetical + * order. You can easily add new queues dynamically and have them worked on using + * this method. + * + * @param string|array $queues String with a single queue name, array with multiple. + */ + public function __construct($queues) + { + $this->logger = new Resque_Log(); + + if(!is_array($queues)) { + $queues = array($queues); + } + + $this->queues = $queues; + $this->hostname = php_uname('n'); + + $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); + } + /** * Return all workers known to Resque as instantiated instances. + * @return array */ public static function all() { @@ -95,7 +113,7 @@ public static function exists($workerId) */ public static function find($workerId) { - if(!self::exists($workerId)) { + if(!self::exists($workerId) || false === strpos($workerId, ":")) { return false; } @@ -116,34 +134,6 @@ public function setId($workerId) $this->id = $workerId; } - /** - * Instantiate a new worker, given a list of queues that it should be working - * on. The list of queues should be supplied in the priority that they should - * be checked for jobs (first come, first served) - * - * Passing a single '*' allows the worker to work on all queues in alphabetical - * order. You can easily add new queues dynamically and have them worked on using - * this method. - * - * @param string|array $queues String with a single queue name, array with multiple. - */ - public function __construct($queues) - { - if(!is_array($queues)) { - $queues = array($queues); - } - - $this->queues = $queues; - if(function_exists('gethostname')) { - $hostname = gethostname(); - } - else { - $hostname = php_uname('n'); - } - $this->hostname = $hostname; - $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); - } - /** * The primary loop for a worker which when called on an instance starts * the worker's life cycle. @@ -152,7 +142,7 @@ public function __construct($queues) * * @param int $interval How often to check for new jobs across the queues. */ - public function work($interval = 5) + public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) { $this->updateProcLine('Starting'); $this->startup(); @@ -165,7 +155,14 @@ public function work($interval = 5) // Attempt to find and reserve a job $job = false; if(!$this->paused) { - $job = $this->reserve(); + if($blocking === true) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); + } else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + } + + $job = $this->reserve($blocking, $interval); } if(!$job) { @@ -173,31 +170,37 @@ public function work($interval = 5) if($interval == 0) { break; } - // If no job was found, we sleep for $interval before continuing and checking again - $this->log('Sleeping for ' . $interval, true); - if($this->paused) { - $this->updateProcLine('Paused'); - } - else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + + if($blocking === false) + { + // If no job was found, we sleep for $interval before continuing and checking again + $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); + if($this->paused) { + $this->updateProcLine('Paused'); + } + else { + $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + } + + usleep($interval * 1000000); } - usleep($interval * 1000000); + continue; } - $this->log('got ' . $job); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job)); Resque_Event::trigger('beforeFork', $job); $this->workingOn($job); - $this->child = $this->fork(); + $this->child = Resque::fork(); // Forked and we're the child. Run the job. - if($this->child === 0 || $this->child === false) { + if ($this->child === 0 || $this->child === false) { $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, $status); $this->perform($job); - if($this->child === 0) { + if ($this->child === 0) { exit(0); } } @@ -206,7 +209,7 @@ public function work($interval = 5) // Parent process, sit and wait $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, $status); // Wait until the child process finishes before continuing pcntl_wait($status); @@ -228,7 +231,7 @@ public function work($interval = 5) /** * Process a single job. * - * @param object|null $job The job to be processed. + * @param Resque_Job $job The job to be processed. */ public function perform(Resque_Job $job) { @@ -237,33 +240,42 @@ public function perform(Resque_Job $job) $job->perform(); } catch(Exception $e) { - $this->log($job . ' failed: ' . $e->getMessage()); + $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e)); $job->fail($e); return; } $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); - $this->log('done ' . $job); + $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); } /** - * Attempt to find a job from the top of one of the queues for this worker. - * - * @return object|boolean Instance of Resque_Job if a job is found, false if not. + * @param bool $blocking + * @param int $timeout + * @return object|boolean Instance of Resque_Job if a job is found, false if not. */ - public function reserve() + public function reserve($blocking = false, $timeout = null) { $queues = $this->queues(); if(!is_array($queues)) { return; } - foreach($queues as $queue) { - $this->log('Checking ' . $queue, self::LOG_VERBOSE); - $job = Resque_Job::reserve($queue); + + if($blocking === true) { + $job = Resque_Job::reserveBlocking($queues, $timeout); if($job) { - $this->log('Found job on ' . $queue, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); return $job; } + } else { + foreach($queues as $queue) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); + $job = Resque_Job::reserve($queue); + if($job) { + $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); + return $job; + } + } } return false; @@ -291,27 +303,6 @@ public function queues($fetch = true) return $queues; } - /** - * Attempt to fork a child process from the parent to run a job in. - * - * Return values are those of pcntl_fork(). - * - * @return int -1 if the fork failed, 0 for the forked child, the PID of the child for the parent. - */ - private function fork() - { - if(!function_exists('pcntl_fork')) { - return false; - } - - $pid = pcntl_fork(); - if($pid === -1) { - throw new RuntimeException('Unable to fork child worker.'); - } - - return $pid; - } - /** * Perform necessary actions to start a worker. */ @@ -332,8 +323,12 @@ private function startup() */ private function updateProcLine($status) { - if(function_exists('setproctitle')) { - setproctitle('resque-' . Resque::VERSION . ': ' . $status); + $processTitle = 'resque-' . Resque::VERSION . ': ' . $status; + if(function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') { + cli_set_process_title($processTitle); + } + else if(function_exists('setproctitle')) { + setproctitle($processTitle); } } @@ -351,14 +346,13 @@ private function registerSigHandlers() return; } - declare(ticks = 1); pcntl_signal(SIGTERM, array($this, 'shutDownNow')); pcntl_signal(SIGINT, array($this, 'shutDownNow')); pcntl_signal(SIGQUIT, array($this, 'shutdown')); pcntl_signal(SIGUSR1, array($this, 'killChild')); pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); - $this->log('Registered signals', self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals'); } /** @@ -366,7 +360,7 @@ private function registerSigHandlers() */ public function pauseProcessing() { - $this->log('USR2 received; pausing job processing'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing'); $this->paused = true; } @@ -376,7 +370,7 @@ public function pauseProcessing() */ public function unPauseProcessing() { - $this->log('CONT received; resuming job processing'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing'); $this->paused = false; } @@ -387,7 +381,7 @@ public function unPauseProcessing() public function shutdown() { $this->shutdown = true; - $this->log('Exiting...'); + $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down'); } /** @@ -407,18 +401,18 @@ public function shutdownNow() public function killChild() { if(!$this->child) { - $this->log('No child to kill.', self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); return; } - $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { - $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); posix_kill($this->child, SIGKILL); $this->child = null; } else { - $this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE); + $this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); $this->shutdown(); } } @@ -436,12 +430,14 @@ public function pruneDeadWorkers() $workerPids = $this->workerPids(); $workers = self::all(); foreach($workers as $worker) { - list($host, $pid, $queues) = explode(':', (string)$worker, 3); - if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { - continue; + if (is_object($worker)) { + list($host, $pid, $queues) = explode(':', (string)$worker, 3); + if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { + continue; + } + $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); + $worker->unregisterWorker(); } - $this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE); - $worker->unregisterWorker(); } } @@ -466,7 +462,7 @@ public function workerPids() */ public function registerWorker() { - Resque::redis()->sadd('workers', $this); + Resque::redis()->sadd('workers', (string)$this); Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y')); } @@ -527,21 +523,6 @@ public function __toString() return $this->id; } - /** - * Output a given log message to STDOUT. - * - * @param string $message Message to output. - */ - public function log($message) - { - if($this->logLevel == self::LOG_NORMAL) { - fwrite(STDOUT, "*** " . $message . "\n"); - } - else if($this->logLevel == self::LOG_VERBOSE) { - fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n"); - } - } - /** * Return an object describing the job this worker is currently working on. * @@ -568,5 +549,14 @@ public function getStat($stat) { return Resque_Stat::get($stat . ':' . $this); } + + /** + * Inject the logging object into the worker + * + * @param Psr\Log\LoggerInterface $logger + */ + public function setLogger(Psr\Log\LoggerInterface $logger) + { + $this->logger = $logger; + } } -?> \ No newline at end of file diff --git a/phpunit.xml b/phpunit.xml.dist similarity index 91% rename from phpunit.xml rename to phpunit.xml.dist index efbc7f29..61d2d7b3 100644 --- a/phpunit.xml +++ b/phpunit.xml.dist @@ -1,7 +1,8 @@ 1) { - $count = $COUNT; -} - -if($count > 1) { - for($i = 0; $i < $count; ++$i) { - $pid = pcntl_fork(); - if($pid == -1) { - die("Could not fork worker ".$i."\n"); - } - // Child, start the worker - else if(!$pid) { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; - fwrite(STDOUT, '*** Starting worker '.$worker."\n"); - $worker->work($interval); - break; - } - } -} -// Start a single worker -else { - $queues = explode(',', $QUEUE); - $worker = new Resque_Worker($queues); - $worker->logLevel = $logLevel; - - $PIDFILE = getenv('PIDFILE'); - if ($PIDFILE) { - file_put_contents($PIDFILE, getmypid()) or - die('Could not write PID information to ' . $PIDFILE); - } - - fwrite(STDOUT, '*** Starting worker '.$worker."\n"); - $worker->work($interval); -} -?> diff --git a/test/Resque/Tests/EventTest.php b/test/Resque/Tests/EventTest.php index 3d2a5364..6e102cf4 100644 --- a/test/Resque/Tests/EventTest.php +++ b/test/Resque/Tests/EventTest.php @@ -1,24 +1,22 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Tests_EventTest extends Resque_Tests_TestCase { private $callbacksHit = array(); - + public function setUp() { Test_Job::$called = false; - + // Register a worker to test with $this->worker = new Resque_Worker('jobs'); + $this->worker->setLogger(new Resque_Log()); $this->worker->registerWorker(); } @@ -33,14 +31,14 @@ public function getEventTestJob() $payload = array( 'class' => 'Test_Job', 'args' => array( - 'somevar', + array('somevar'), ), ); $job = new Resque_Job('jobs', $payload); $job->worker = $this->worker; return $job; } - + public function eventCallbackProvider() { return array( @@ -49,7 +47,7 @@ public function eventCallbackProvider() array('afterFork', 'afterForkEventCallback'), ); } - + /** * @dataProvider eventCallbackProvider */ @@ -60,10 +58,10 @@ public function testEventCallbacksFire($event, $callback) $job = $this->getEventTestJob(); $this->worker->perform($job); $this->worker->work(0); - + $this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called'); } - + public function testBeforeForkEventCallbackFires() { $event = 'beforeFork'; @@ -78,6 +76,18 @@ public function testBeforeForkEventCallbackFires() $this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called'); } + public function testBeforeEnqueueEventCallbackFires() + { + $event = 'beforeEnqueue'; + $callback = 'beforeEnqueueEventCallback'; + + Resque_Event::listen($event, array($this, $callback)); + Resque::enqueue('jobs', 'Test_Job', array( + 'somevar' + )); + $this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called'); + } + public function testBeforePerformEventCanStopWork() { $callback = 'beforePerformEventDontPerformCallback'; @@ -89,25 +99,60 @@ public function testBeforePerformEventCanStopWork() $this->assertContains($callback, $this->callbacksHit, $callback . ' callback was not called'); $this->assertFalse(Test_Job::$called, 'Job was still performed though Resque_Job_DontPerform was thrown'); } - + + public function testBeforeEnqueueEventStopsJobCreation() + { + $callback = 'beforeEnqueueEventDontCreateCallback'; + Resque_Event::listen('beforeEnqueue', array($this, $callback)); + Resque_Event::listen('afterEnqueue', array($this, 'afterEnqueueEventCallback')); + + $result = Resque::enqueue('test_job', 'TestClass'); + $this->assertContains($callback, $this->callbacksHit, $callback . ' callback was not called'); + $this->assertNotContains('afterEnqueueEventCallback', $this->callbacksHit, 'afterEnqueue was still called, even though it should not have been'); + $this->assertFalse($result); + } + public function testAfterEnqueueEventCallbackFires() { $callback = 'afterEnqueueEventCallback'; - $event = 'afterEnqueue'; - + $event = 'afterEnqueue'; + Resque_Event::listen($event, array($this, $callback)); Resque::enqueue('jobs', 'Test_Job', array( 'somevar' - )); + )); $this->assertContains($callback, $this->callbacksHit, $event . ' callback (' . $callback .') was not called'); } - + + public function testStopListeningRemovesListener() + { + $callback = 'beforePerformEventCallback'; + $event = 'beforePerform'; + + Resque_Event::listen($event, array($this, $callback)); + Resque_Event::stopListening($event, array($this, $callback)); + + $job = $this->getEventTestJob(); + $this->worker->perform($job); + $this->worker->work(0); + + $this->assertNotContains($callback, $this->callbacksHit, + $event . ' callback (' . $callback .') was called though Resque_Event::stopListening was called' + ); + } + public function beforePerformEventDontPerformCallback($instance) { $this->callbacksHit[] = __FUNCTION__; throw new Resque_Job_DontPerform; } - + + public function beforeEnqueueEventDontCreateCallback($queue, $class, $args, $track = false) + { + $this->callbacksHit[] = __FUNCTION__; + throw new Resque_Job_DontCreate; + } + public function assertValidEventCallback($function, $job) { $this->callbacksHit[] = $function; @@ -117,7 +162,7 @@ public function assertValidEventCallback($function, $job) $args = $job->getArguments(); $this->assertEquals($args[0], 'somevar'); } - + public function afterEnqueueEventCallback($class, $args) { $this->callbacksHit[] = __FUNCTION__; @@ -126,12 +171,17 @@ public function afterEnqueueEventCallback($class, $args) 'somevar', ), $args); } - + + public function beforeEnqueueEventCallback($job) + { + $this->callbacksHit[] = __FUNCTION__; + } + public function beforePerformEventCallback($job) { $this->assertValidEventCallback(__FUNCTION__, $job); } - + public function afterPerformEventCallback($job) { $this->assertValidEventCallback(__FUNCTION__, $job); @@ -141,9 +191,9 @@ public function beforeForkEventCallback($job) { $this->assertValidEventCallback(__FUNCTION__, $job); } - + public function afterForkEventCallback($job) { $this->assertValidEventCallback(__FUNCTION__, $job); } -} \ No newline at end of file +} diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php index 5f0fd0f9..d751c37f 100644 --- a/test/Resque/Tests/JobStatusTest.php +++ b/test/Resque/Tests/JobStatusTest.php @@ -1,22 +1,25 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase { + /** + * @var \Resque_Worker + */ + protected $worker; + public function setUp() { parent::setUp(); // Register a worker to test with $this->worker = new Resque_Worker('jobs'); + $this->worker->setLogger(new Resque_Log()); } public function testJobStatusCanBeTracked() @@ -39,6 +42,7 @@ public function testQueuedJobReturnsQueuedStatus() $status = new Resque_Job_Status($token); $this->assertEquals(Resque_Job_Status::STATUS_WAITING, $status->get()); } + public function testRunningJobReturnsRunningStatus() { $token = Resque::enqueue('jobs', 'Failing_Job', null, true); diff --git a/test/Resque/Tests/JobTest.php b/test/Resque/Tests/JobTest.php index df6187b4..fb55d13b 100644 --- a/test/Resque/Tests/JobTest.php +++ b/test/Resque/Tests/JobTest.php @@ -1,12 +1,10 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Tests_JobTest extends Resque_Tests_TestCase @@ -19,6 +17,7 @@ public function setUp() // Register a worker to test with $this->worker = new Resque_Worker('jobs'); + $this->worker->setLogger(new Resque_Log()); $this->worker->registerWorker(); } @@ -27,6 +26,23 @@ public function testJobCanBeQueued() $this->assertTrue((bool)Resque::enqueue('jobs', 'Test_Job')); } + /** + * @expectedException Resque_RedisException + */ + public function testRedisErrorThrowsExceptionOnJobCreation() + { + $mockCredis = $this->getMockBuilder('Credis_Client') + ->setMethods(['connect', '__call']) + ->getMock(); + $mockCredis->expects($this->any())->method('__call') + ->will($this->throwException(new CredisException('failure'))); + + Resque::setBackend(function($database) use ($mockCredis) { + return new Resque_Redis('localhost:6379', $database, $mockCredis); + }); + Resque::enqueue('jobs', 'This is a test'); + } + public function testQeueuedJobCanBeReserved() { Resque::enqueue('jobs', 'Test_Job'); @@ -65,7 +81,7 @@ public function testQueuedJobReturnsExactSamePassedInArguments() Resque::enqueue('jobs', 'Test_Job', $args); $job = Resque_Job::reserve('jobs'); - $this->assertEquals($args, $job->payload['args']); + $this->assertEquals($args, $job->getArguments()); } public function testAfterJobIsReservedItIsRemoved() @@ -97,9 +113,10 @@ public function testRecreatedJobMatchesExistingJob() $newJob = Resque_Job::reserve('jobs'); $this->assertEquals($job->payload['class'], $newJob->payload['class']); - $this->assertEquals($job->payload['args'], $newJob->payload['args']); + $this->assertEquals($job->getArguments(), $newJob->getArguments()); } + public function testFailedJobExceptionsAreCaught() { $payload = array( @@ -166,4 +183,264 @@ public function testJobWithTearDownCallbackFiresTearDown() $this->assertTrue(Test_Job_With_TearDown::$called); } -} \ No newline at end of file + + public function testNamespaceNaming() { + $fixture = array( + array('test' => 'more:than:one:with:', 'assertValue' => 'more:than:one:with:'), + array('test' => 'more:than:one:without', 'assertValue' => 'more:than:one:without:'), + array('test' => 'resque', 'assertValue' => 'resque:'), + array('test' => 'resque:', 'assertValue' => 'resque:'), + ); + + foreach($fixture as $item) { + Resque_Redis::prefix($item['test']); + $this->assertEquals(Resque_Redis::getPrefix(), $item['assertValue']); + } + } + + public function testJobWithNamespace() + { + Resque_Redis::prefix('php'); + $queue = 'jobs'; + $payload = array('another_value'); + Resque::enqueue($queue, 'Test_Job_With_TearDown', $payload); + + $this->assertEquals(Resque::queues(), array('jobs')); + $this->assertEquals(Resque::size($queue), 1); + + Resque_Redis::prefix('resque'); + $this->assertEquals(Resque::size($queue), 0); + } + + public function testDequeueAll() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $this->assertEquals(Resque::dequeue($queue), 2); + $this->assertEquals(Resque::size($queue), 0); + } + + public function testDequeueMakeSureNotDeleteOthers() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $other_queue = 'other_jobs'; + Resque::enqueue($other_queue, 'Test_Job_Dequeue'); + Resque::enqueue($other_queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $this->assertEquals(Resque::size($other_queue), 2); + $this->assertEquals(Resque::dequeue($queue), 2); + $this->assertEquals(Resque::size($queue), 0); + $this->assertEquals(Resque::size($other_queue), 2); + } + + public function testDequeueSpecificItem() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue2'); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueSpecificMultipleItems() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + Resque::enqueue($queue, 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::size($queue), 3); + $test = array('Test_Job_Dequeue2', 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::dequeue($queue, $test), 2); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueNonExistingItem() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + Resque::enqueue($queue, 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::size($queue), 3); + $test = array('Test_Job_Dequeue4'); + $this->assertEquals(Resque::dequeue($queue, $test), 0); + $this->assertEquals(Resque::size($queue), 3); + } + + public function testDequeueNonExistingItem2() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue1'); + Resque::enqueue($queue, 'Test_Job_Dequeue2'); + Resque::enqueue($queue, 'Test_Job_Dequeue3'); + $this->assertEquals(Resque::size($queue), 3); + $test = array('Test_Job_Dequeue4', 'Test_Job_Dequeue1'); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 2); + } + + public function testDequeueItemID() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $qid = Resque::enqueue($queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue' => $qid); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueWrongItemID() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $qid = Resque::enqueue($queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + #qid right but class name is wrong + $test = array('Test_Job_Dequeue1' => $qid); + $this->assertEquals(Resque::dequeue($queue, $test), 0); + $this->assertEquals(Resque::size($queue), 2); + } + + public function testDequeueWrongItemID2() + { + $queue = 'jobs'; + Resque::enqueue($queue, 'Test_Job_Dequeue'); + $qid = Resque::enqueue($queue, 'Test_Job_Dequeue'); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue' => 'r4nD0mH4sh3dId'); + $this->assertEquals(Resque::dequeue($queue, $test), 0); + $this->assertEquals(Resque::size($queue), 2); + } + + public function testDequeueItemWithArg() + { + $queue = 'jobs'; + $arg = array('foo' => 1, 'bar' => 2); + Resque::enqueue($queue, 'Test_Job_Dequeue9'); + Resque::enqueue($queue, 'Test_Job_Dequeue9', $arg); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue9' => $arg); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + #$this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueSeveralItemsWithArgs() + { + // GIVEN + $queue = 'jobs'; + $args = array('foo' => 1, 'bar' => 10); + $removeArgs = array('foo' => 1, 'bar' => 2); + Resque::enqueue($queue, 'Test_Job_Dequeue9', $args); + Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs); + Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs); + $this->assertEquals(Resque::size($queue), 3); + + // WHEN + $test = array('Test_Job_Dequeue9' => $removeArgs); + $removedItems = Resque::dequeue($queue, $test); + + // THEN + $this->assertEquals($removedItems, 2); + $this->assertEquals(Resque::size($queue), 1); + $item = Resque::pop($queue); + $this->assertInternalType('array', $item['args']); + $this->assertEquals(10, $item['args'][0]['bar'], 'Wrong items were dequeued from queue!'); + } + + public function testDequeueItemWithUnorderedArg() + { + $queue = 'jobs'; + $arg = array('foo' => 1, 'bar' => 2); + $arg2 = array('bar' => 2, 'foo' => 1); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue', $arg); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue' => $arg2); + $this->assertEquals(Resque::dequeue($queue, $test), 1); + $this->assertEquals(Resque::size($queue), 1); + } + + public function testDequeueItemWithiWrongArg() + { + $queue = 'jobs'; + $arg = array('foo' => 1, 'bar' => 2); + $arg2 = array('foo' => 2, 'bar' => 3); + Resque::enqueue($queue, 'Test_Job_Dequeue'); + Resque::enqueue($queue, 'Test_Job_Dequeue', $arg); + $this->assertEquals(Resque::size($queue), 2); + $test = array('Test_Job_Dequeue' => $arg2); + $this->assertEquals(Resque::dequeue($queue, $test), 0); + $this->assertEquals(Resque::size($queue), 2); + } + + public function testUseDefaultFactoryToGetJobInstance() + { + $payload = array( + 'class' => 'Some_Job_Class', + 'args' => null + ); + $job = new Resque_Job('jobs', $payload); + $instance = $job->getInstance(); + $this->assertInstanceOf('Some_Job_Class', $instance); + } + + public function testUseFactoryToGetJobInstance() + { + $payload = array( + 'class' => 'Some_Job_Class', + 'args' => array(array()) + ); + $job = new Resque_Job('jobs', $payload); + $factory = new Some_Stub_Factory(); + $job->setJobFactory($factory); + $instance = $job->getInstance(); + $this->assertInstanceOf('Resque_JobInterface', $instance); + } + + public function testDoNotUseFactoryToGetInstance() + { + $payload = array( + 'class' => 'Some_Job_Class', + 'args' => array(array()) + ); + $job = new Resque_Job('jobs', $payload); + $factory = $this->getMock('Resque_Job_FactoryInterface'); + $testJob = $this->getMock('Resque_JobInterface'); + $factory->expects(self::never())->method('create')->will(self::returnValue($testJob)); + $instance = $job->getInstance(); + $this->assertInstanceOf('Resque_JobInterface', $instance); + } +} + +class Some_Job_Class implements Resque_JobInterface +{ + + /** + * @return bool + */ + public function perform() + { + return true; + } +} + +class Some_Stub_Factory implements Resque_Job_FactoryInterface +{ + + /** + * @param $className + * @param array $args + * @param $queue + * @return Resque_JobInterface + */ + public function create($className, $args, $queue) + { + return new Some_Job_Class(); + } +} diff --git a/test/Resque/Tests/LogTest.php b/test/Resque/Tests/LogTest.php new file mode 100644 index 00000000..db97b160 --- /dev/null +++ b/test/Resque/Tests/LogTest.php @@ -0,0 +1,31 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_LogTest extends Resque_Tests_TestCase +{ + public function testLogInterpolate() + { + $logger = new Resque_Log(); + $actual = $logger->interpolate('string {replace}', array('replace' => 'value')); + $expected = 'string value'; + + $this->assertEquals($expected, $actual); + } + + public function testLogInterpolateMutiple() + { + $logger = new Resque_Log(); + $actual = $logger->interpolate( + 'string {replace1} {replace2}', + array('replace1' => 'value1', 'replace2' => 'value2') + ); + $expected = 'string value1 value2'; + + $this->assertEquals($expected, $actual); + } +} diff --git a/test/Resque/Tests/RedisTest.php b/test/Resque/Tests/RedisTest.php new file mode 100644 index 00000000..55b5e17d --- /dev/null +++ b/test/Resque/Tests/RedisTest.php @@ -0,0 +1,197 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_RedisTest extends Resque_Tests_TestCase +{ + /** + * @expectedException Resque_RedisException + */ + public function testRedisExceptionsAreSurfaced() + { + $mockCredis = $this->getMockBuilder('Credis_Client') + ->setMethods(['connect', '__call']) + ->getMock(); + $mockCredis->expects($this->any())->method('__call') + ->will($this->throwException(new CredisException('failure'))); + + Resque::setBackend(function($database) use ($mockCredis) { + return new Resque_Redis('localhost:6379', $database, $mockCredis); + }); + Resque::redis()->ping(); + } + + /** + * These DNS strings are considered valid. + * + * @return array + */ + public function validDsnStringProvider() + { + return array( + // Input , Expected output + array('', array( + 'localhost', + Resque_Redis::DEFAULT_PORT, + false, + false, false, + array(), + )), + array('localhost', array( + 'localhost', + Resque_Redis::DEFAULT_PORT, + false, + false, false, + array(), + )), + array('localhost:1234', array( + 'localhost', + 1234, + false, + false, false, + array(), + )), + array('localhost:1234/2', array( + 'localhost', + 1234, + 2, + false, false, + array(), + )), + array('redis://foobar', array( + 'foobar', + Resque_Redis::DEFAULT_PORT, + false, + false, false, + array(), + )), + array('redis://foobar/', array( + 'foobar', + Resque_Redis::DEFAULT_PORT, + false, + false, false, + array(), + )), + array('redis://foobar:1234', array( + 'foobar', + 1234, + false, + false, false, + array(), + )), + array('redis://foobar:1234/15', array( + 'foobar', + 1234, + 15, + false, false, + array(), + )), + array('redis://foobar:1234/0', array( + 'foobar', + 1234, + 0, + false, false, + array(), + )), + array('redis://user@foobar:1234', array( + 'foobar', + 1234, + false, + 'user', false, + array(), + )), + array('redis://user@foobar:1234/15', array( + 'foobar', + 1234, + 15, + 'user', false, + array(), + )), + array('redis://user:pass@foobar:1234', array( + 'foobar', + 1234, + false, + 'user', 'pass', + array(), + )), + array('redis://user:pass@foobar:1234?x=y&a=b', array( + 'foobar', + 1234, + false, + 'user', 'pass', + array('x' => 'y', 'a' => 'b'), + )), + array('redis://:pass@foobar:1234?x=y&a=b', array( + 'foobar', + 1234, + false, + false, 'pass', + array('x' => 'y', 'a' => 'b'), + )), + array('redis://user@foobar:1234?x=y&a=b', array( + 'foobar', + 1234, + false, + 'user', false, + array('x' => 'y', 'a' => 'b'), + )), + array('redis://foobar:1234?x=y&a=b', array( + 'foobar', + 1234, + false, + false, false, + array('x' => 'y', 'a' => 'b'), + )), + array('redis://user@foobar:1234/12?x=y&a=b', array( + 'foobar', + 1234, + 12, + 'user', false, + array('x' => 'y', 'a' => 'b'), + )), + array('tcp://user@foobar:1234/12?x=y&a=b', array( + 'foobar', + 1234, + 12, + 'user', false, + array('x' => 'y', 'a' => 'b'), + )), + ); + } + + /** + * These DSN values should throw exceptions + * @return array + */ + public function bogusDsnStringProvider() + { + return array( + array('http://foo.bar/'), + array('user:@foobar:1234?x=y&a=b'), + array('foobar:1234?x=y&a=b'), + ); + } + + /** + * @dataProvider validDsnStringProvider + */ + public function testParsingValidDsnString($dsn, $expected) + { + $result = Resque_Redis::parseDsn($dsn); + $this->assertEquals($expected, $result); + } + + /** + * @dataProvider bogusDsnStringProvider + * @expectedException InvalidArgumentException + */ + public function testParsingBogusDsnStringThrowsException($dsn) + { + // The next line should throw an InvalidArgumentException + $result = Resque_Redis::parseDsn($dsn); + } +} \ No newline at end of file diff --git a/test/Resque/Tests/StatTest.php b/test/Resque/Tests/StatTest.php index 64047948..aa418887 100644 --- a/test/Resque/Tests/StatTest.php +++ b/test/Resque/Tests/StatTest.php @@ -1,12 +1,9 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Tests_StatTest extends Resque_Tests_TestCase diff --git a/test/Resque/Tests/TestCase.php b/test/Resque/Tests/TestCase.php index f4c00dfa..a97f64bf 100644 --- a/test/Resque/Tests/TestCase.php +++ b/test/Resque/Tests/TestCase.php @@ -3,8 +3,7 @@ * Resque test case class. Contains setup and teardown methods. * * @package Resque/Tests - * @author Chris Boulton - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Tests_TestCase extends PHPUnit_Framework_TestCase @@ -12,13 +11,20 @@ class Resque_Tests_TestCase extends PHPUnit_Framework_TestCase protected $resque; protected $redis; + public static function setUpBeforeClass() + { + date_default_timezone_set('UTC'); + } + public function setUp() { $config = file_get_contents(REDIS_CONF); preg_match('#^\s*port\s+([0-9]+)#m', $config, $matches); - $this->redis = new Redisent('localhost', $matches[1]); + $this->redis = new Credis_Client('localhost', $matches[1]); + + Resque::setBackend('redis://localhost:' . $matches[1]); // Flush redis $this->redis->flushAll(); } -} \ No newline at end of file +} diff --git a/test/Resque/Tests/WorkerTest.php b/test/Resque/Tests/WorkerTest.php index 47b02081..93c0621a 100644 --- a/test/Resque/Tests/WorkerTest.php +++ b/test/Resque/Tests/WorkerTest.php @@ -1,12 +1,9 @@ - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase @@ -14,6 +11,7 @@ class Resque_Tests_WorkerTest extends Resque_Tests_TestCase public function testWorkerRegistersInList() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); // Make sure the worker is in the list @@ -26,6 +24,7 @@ public function testGetAllWorkers() // Register a few workers for($i = 0; $i < $num; ++$i) { $worker = new Resque_Worker('queue_' . $i); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); } @@ -36,6 +35,7 @@ public function testGetAllWorkers() public function testGetWorkerById() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); $newWorker = Resque_Worker::find((string)$worker); @@ -50,6 +50,7 @@ public function testInvalidWorkerDoesNotExist() public function testWorkerCanUnregister() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); $worker->unregisterWorker(); @@ -61,6 +62,7 @@ public function testWorkerCanUnregister() public function testPausedWorkerDoesNotPickUpJobs() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->pauseProcessing(); Resque::enqueue('jobs', 'Test_Job'); $worker->work(0); @@ -71,6 +73,7 @@ public function testPausedWorkerDoesNotPickUpJobs() public function testResumedWorkerPicksUpJobs() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->pauseProcessing(); Resque::enqueue('jobs', 'Test_Job'); $worker->work(0); @@ -86,6 +89,7 @@ public function testWorkerCanWorkOverMultipleQueues() 'queue1', 'queue2' )); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('queue1', 'Test_Job_1'); Resque::enqueue('queue2', 'Test_Job_2'); @@ -104,6 +108,7 @@ public function testWorkerWorksQueuesInSpecifiedOrder() 'medium', 'low' )); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); // Queue the jobs in a different order @@ -125,6 +130,7 @@ public function testWorkerWorksQueuesInSpecifiedOrder() public function testWildcardQueueWorkerWorksAllQueues() { $worker = new Resque_Worker('*'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('queue1', 'Test_Job_1'); @@ -140,6 +146,7 @@ public function testWildcardQueueWorkerWorksAllQueues() public function testWorkerDoesNotWorkOnUnknownQueues() { $worker = new Resque_Worker('queue1'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); Resque::enqueue('queue2', 'Test_Job'); @@ -150,6 +157,7 @@ public function testWorkerClearsItsStatusWhenNotWorking() { Resque::enqueue('jobs', 'Test_Job'); $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $job = $worker->reserve(); $worker->workingOn($job); $worker->doneWorking(); @@ -159,6 +167,7 @@ public function testWorkerClearsItsStatusWhenNotWorking() public function testWorkerRecordsWhatItIsWorkingOn() { $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); $payload = array( @@ -181,6 +190,7 @@ public function testWorkerErasesItsStatsWhenShutdown() Resque::enqueue('jobs', 'Invalid_Job'); $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->work(0); $worker->work(0); @@ -192,15 +202,18 @@ public function testWorkerCleansUpDeadWorkersOnStartup() { // Register a good worker $goodWorker = new Resque_Worker('jobs'); + $goodWorker->setLogger(new Resque_Log()); $goodWorker->registerWorker(); $workerId = explode(':', $goodWorker); // Register some bad workers $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->setId($workerId[0].':1:jobs'); $worker->registerWorker(); $worker = new Resque_Worker(array('high', 'low')); + $worker->setLogger(new Resque_Log()); $worker->setId($workerId[0].':2:high,low'); $worker->registerWorker(); @@ -216,12 +229,14 @@ public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() { // Register a bad worker on this machine $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $workerId = explode(':', $worker); $worker->setId($workerId[0].':1:jobs'); $worker->registerWorker(); // Register some other false workers $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->setId('my.other.host:1:jobs'); $worker->registerWorker(); @@ -238,6 +253,7 @@ public function testDeadWorkerCleanUpDoesNotCleanUnknownWorkers() public function testWorkerFailsUncompletedJobsOnExit() { $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); $worker->registerWorker(); $payload = array( @@ -250,4 +266,28 @@ public function testWorkerFailsUncompletedJobsOnExit() $this->assertEquals(1, Resque_Stat::get('failed')); } + + public function testBlockingListPop() + { + $worker = new Resque_Worker('jobs'); + $worker->setLogger(new Resque_Log()); + $worker->registerWorker(); + + Resque::enqueue('jobs', 'Test_Job_1'); + Resque::enqueue('jobs', 'Test_Job_2'); + + $i = 1; + while($job = $worker->reserve(true, 1)) + { + $this->assertEquals('Test_Job_' . $i, $job->payload['class']); + + if($i == 2) { + break; + } + + $i++; + } + + $this->assertEquals(2, $i); + } } \ No newline at end of file diff --git a/test/Resque/Tests/bootstrap.php b/test/bootstrap.php similarity index 80% rename from test/Resque/Tests/bootstrap.php rename to test/bootstrap.php index eb84258a..a4b68377 100644 --- a/test/Resque/Tests/bootstrap.php +++ b/test/bootstrap.php @@ -3,24 +3,15 @@ * Resque test bootstrap file - sets up a test environment. * * @package Resque/Tests - * @author Chris Boulton - * @copyright (c) 2010 Chris Boulton + * @author Chris Boulton * @license http://www.opensource.org/licenses/mit-license.php */ -define('CWD', dirname(__FILE__)); -define('RESQUE_LIB', CWD . '/../../../lib/'); -define('TEST_MISC', realpath(CWD . '/../../misc/')); -define('REDIS_CONF', TEST_MISC . '/redis.conf'); - -// Change to the directory this file lives in. This is important, due to -// how we'll be running redis. +$loader = require __DIR__ . '/../vendor/autoload.php'; +$loader->add('Resque_Tests', __DIR__); -require_once CWD . '/TestCase.php'; - -// Include Resque -require_once RESQUE_LIB . 'Resque.php'; -require_once RESQUE_LIB . 'Resque/Worker.php'; +define('TEST_MISC', realpath(__DIR__ . '/misc/')); +define('REDIS_CONF', TEST_MISC . '/redis.conf'); // Attempt to start our own redis instance for tesitng. exec('which redis-server', $output, $returnVar); @@ -58,11 +49,13 @@ function killRedis($pid) } $pidFile = TEST_MISC . '/' . $matches[1]; - $pid = trim(file_get_contents($pidFile)); - posix_kill((int) $pid, 9); + if (file_exists($pidFile)) { + $pid = trim(file_get_contents($pidFile)); + posix_kill((int) $pid, 9); - if(is_file($pidFile)) { - unlink($pidFile); + if(is_file($pidFile)) { + unlink($pidFile); + } } // Remove the redis database