initial docker setup

This commit is contained in:
GotPPay
2018-06-14 16:49:28 +02:00
parent bc80b7342e
commit b5f87f27f8
3023 changed files with 985078 additions and 1 deletions

View File

@@ -0,0 +1,103 @@
<?php
/*
Plugin Name: WP Background Processing
Plugin URI: https://github.com/A5hleyRich/wp-background-processing
Description: Asynchronous requests and background processing in WordPress.
Author: Delicious Brains Inc.
Version: 1.0
Author URI: https://deliciousbrains.com/
*/
$queue_folder_path = plugin_dir_path( __FILE__ );
require_once $queue_folder_path . 'queue/classes/wp-job.php';
require_once $queue_folder_path . 'queue/classes/wp-queue.php';
require_once $queue_folder_path . 'queue/classes/worker/wp-worker.php';
require_once $queue_folder_path . 'queue/classes/worker/wp-http-worker.php';
global $wp_queue;
$wp_queue = new WP_Queue();
// Add WP CLI commands
if (defined( 'WP_CLI' ) && WP_CLI) {
try {
/**
* Service push to MailChimp
*
* <type>
* : product_sync order_sync order product
*/
function mailchimp_cli_push_command( $args, $assoc_args ) {
if (is_array($args) && isset($args[0])) {
switch($args[0]) {
case 'product_sync':
wp_queue(new MailChimp_WooCommerce_Process_Products());
WP_CLI::success("queued up the product sync!");
break;
case 'order_sync':
wp_queue(new MailChimp_WooCommerce_Process_Orders());
WP_CLI::success("queued up the order sync!");
break;
case 'order':
if (!isset($args[1])) {
wp_die('You must specify an order id as the 2nd parameter.');
}
wp_queue(new MailChimp_WooCommerce_Single_Order($args[1]));
WP_CLI::success("queued up the order {$args[1]}!");
break;
case 'product':
if (!isset($args[1])) {
wp_die('You must specify a product id as the 2nd parameter.');
}
wp_queue(new MailChimp_WooCommerce_Single_Product($args[1]));
WP_CLI::success("queued up the product {$args[1]}!");
break;
}
}
};
WP_CLI::add_command( 'mailchimp_push', 'mailchimp_cli_push_command');
require_once $queue_folder_path . 'queue/classes/cli/queue-command.php';
WP_CLI::add_command( 'queue', 'Queue_Command' );
} catch (\Exception $e) {}
}
if (!mailchimp_running_in_console() && mailchimp_is_configured()) {
// fire up the http worker container
new WP_Http_Worker($wp_queue);
}
// if we're not running in the console, and the http_worker is not running
if (mailchimp_should_init_queue()) {
try {
// if we do not have a site transient for the queue listener
if (!get_site_transient('http_worker_queue_listen')) {
// set the site transient to expire in 50 seconds so this will not happen too many times
// but still work for cron scripts on the minute mark.
set_site_transient( 'http_worker_queue_listen', microtime(), 50);
// if we have available jobs, call the http worker manually
if ($wp_queue->available_jobs()) {
mailchimp_call_http_worker_manually();
}
}
} catch (\Exception $e) {}
}
if (!function_exists( 'wp_queue')) {
/**
* WP queue.
*
* @param WP_Job $job
* @param int $delay
*/
function wp_queue( WP_Job $job, $delay = 0 ) {
global $wp_queue;
$wp_queue->push( $job, $delay );
do_action( 'wp_queue_job_pushed', $job );
}
}

View File

@@ -0,0 +1,391 @@
<?php
/**
* Manage queue and jobs.
*
* @package wp-cli
*/
class Queue_Command extends WP_CLI_Command {
/**
* Timestamp of when this worker started processing the queue.
*
* @var int
*/
protected $start_time;
protected $pid;
protected $command_called;
/**
* Queue_Command constructor.
*/
public function __construct()
{
$this->pid = getmypid();
register_shutdown_function(array($this, 'on_shutdown'));
}
/**
* make sure we remove the site transient
*/
public function on_shutdown()
{
switch ($this->command_called) {
case 'listen':
$this->deleteQueueTimer();
break;
}
}
/**
* Get the expiration for the single cron job
*
* @throws \WP_CLI\ExitException
*/
public function expired_at()
{
$time = $this->getQueueTimer();
if (empty($time)) {
WP_CLI::error('no timer running');
wp_die();
}
WP_CLI::success("Next iteration will happen no later than ".(string) mailchimp_date_utc($time));
wp_die();
}
/**
* Flush all of the records in the queue.
*/
public function flush()
{
global $wpdb;
$this->command_called = 'flush';
$wpdb->query("DELETE FROM {$wpdb->prefix}queue");
}
/**
* Show all the records in the queue.
*/
public function show()
{
global $wpdb;
$this->command_called = 'show';
print_r($wpdb->get_results("SELECT * FROM {$wpdb->prefix}queue"));
}
/**
* Creates the queue tables.
*
* @subcommand create-tables
*/
public function create_tables( $args, $assoc_args = array() ) {
$this->command_called = 'create_tables';
require_once( ABSPATH . 'wp-admin/includes/upgrade.php' );
global $wpdb;
$wpdb->hide_errors();
$charset_collate = $wpdb->get_charset_collate();
$sql = "CREATE TABLE {$wpdb->prefix}queue (
id bigint(20) NOT NULL AUTO_INCREMENT,
job text NOT NULL,
attempts tinyint(1) NOT NULL DEFAULT 0,
locked tinyint(1) NOT NULL DEFAULT 0,
locked_at datetime DEFAULT NULL,
available_at datetime NOT NULL,
created_at datetime NOT NULL,
PRIMARY KEY (id)
) $charset_collate;";
dbDelta( $sql );
$sql = "CREATE TABLE {$wpdb->prefix}failed_jobs (
id bigint(20) NOT NULL AUTO_INCREMENT,
job text NOT NULL,
failed_at datetime NOT NULL,
PRIMARY KEY (id)
) $charset_collate;";
dbDelta( $sql );
WP_CLI::success( "Table {$wpdb->prefix}queue created." );
}
/**
* Run the queue listener to process jobs
*
* ## OPTIONS
*
* [--force=<0>]
* : Force the listener to ignore the transient and run
*
* [--daemon=<0>]
* : Running the command as a true process using a manager to keep alive.
* If using WP CRON use --daemon=0
* If using a process manager, do nothing or pass in 1
*
* [--multiple=<0>]
* : Allow multiple processes to run at the same time. ( default is 0 )
*
* [--sleep_processing=<1>]
* : How long to sleep between jobs. ( default is 1 second )
*
* [--sleep_empty=<5>]
* : How long to sleep between jobs when nothing is in the queue. ( default is 5 seconds )
* ---
*
* ## EXAMPLES
*
* wp queue listen --daemon=1
* wp queue listen --daemon=0 --sleep_empty=10
*
* ---
*
* @subcommand listen
* @param $args
* @param array $assoc_args
*/
public function listen( $args, $assoc_args = array() ) {
global $wp_queue;
$this->command_called = 'listen';
$this->start_time = time(); // Set start time of current command
$allow_multiple = (isset($assoc_args['multiple']) ? (bool) $assoc_args['multiple'] : null) === true;
$running_as_daemon = (isset($assoc_args['daemon']) ? (bool) $assoc_args['daemon'] : null) === true;
$force = (isset($assoc_args['force']) ? (bool) $assoc_args['force'] : null) === true;
$sleep_between_jobs = isset($assoc_args['sleep_processing']) ? (int) $assoc_args['sleep_processing'] : 1;
$sleep_when_empty = isset($assoc_args['sleep_empty']) ? (int) $assoc_args['sleep_empty'] : 5;
$expire_time = $this->getQueueTimer();
if (!$force && !$allow_multiple) {
if (!empty($expire_time) && ($expire_time+100) > $this->start_time) {
WP_CLI::log('Currently running in another process');
//mailchimp_debug("queue", $message = "wp queue listen is running in another process or waiting to restart at [{$expire_time}] but clock says [{$this->start_time}].");
wp_die();
}
}
$this->updateQueueTimer();
mailchimp_debug("queue", $message = "[start] queue listen process_id [{$this->pid}] :: max_time [{$this->getServerMaxExecutionTime()}] :: memory limit [{$this->getServerMemoryLimit()}]");
WP_CLI::log($message);
$worker = new WP_Worker( $wp_queue );
$loop_counter = 0;
// if the user specifies that they want to run as a daemon we need to allow that.
while ($running_as_daemon || $this->all_good_under_the_hood()) {
$loop_counter++;
// if we're doing single processing only, set the transient
if (!$allow_multiple) {
if ($loop_counter % 5 === 0) {
$this->updateQueueTimer(time() + 300);
}
}
// allow queue to break out of the forever loop if something is going wrong by adding a transient
if ((bool) get_site_transient('kill_wp_queue_listener')) {
break;
}
// log it in increments of 20 to be lighter on the log file
if ($loop_counter % 20 === 0) {
mailchimp_debug("queue listen", $message = "process id {$this->pid} :: loop #{$loop_counter}");
WP_CLI::log($message);
}
$sleep = $sleep_when_empty;
// if the worker has a job, apply the sleep between job timeout
if ($worker->should_run() && $worker->process_next_job()) {
$sleep = $sleep_between_jobs;
if (($job_name = $worker->get_job_name()) !== 'WP_Worker') {
WP_CLI::success('Processed: ' . $job_name);
}
}
sleep($sleep);
}
if (!$allow_multiple) {
$this->deleteQueueTimer();
}
mailchimp_debug("queue", $message = '[end] queue listen process_id = '.$this->pid);
WP_CLI::log($message);
exit;
}
/**
* Process the next job in the queue.
* @subcommand work
*/
public function work( $args, $assoc_args = array() ) {
global $wp_queue;
$this->command_called = 'work';
$worker = new WP_Worker( $wp_queue );
if ( $worker->should_run() ) {
if ( $worker->process_next_job() ) {
WP_CLI::success( 'Processed: ' . $worker->get_job_name() );
} else {
WP_CLI::warning( 'Failed: ' . $worker->get_job_name() );
}
} else {
WP_CLI::log( 'No jobs to process...' );
}
}
/**
* Show queue status.
*/
public function status( $args, $assoc_args = array() ) {
global $wp_queue;
$this->command_called = 'status';
WP_CLI::log( $wp_queue->available_jobs() . ' jobs in the queue' );
WP_CLI::log( $wp_queue->failed_jobs() . ' failed jobs' );
}
/**
* Push failed jobs back onto the queue.
*
* @subcommand restart-failed
*/
public function restart_failed( $args, $assoc_args = array() ) {
global $wp_queue;
$this->command_called = 'restart_failed';
if ( ! $wp_queue->failed_jobs() ) {
WP_CLI::log( 'No failed jobs to restart...' );
return;
}
$count = $wp_queue->restart_failed_jobs();
WP_CLI::success( $count . ' failed jobs pushed to the queue' );
}
/**
* @return mixed
*/
protected function deleteQueueTimer()
{
global $wpdb;
$key = 'mailchimp_woocommerce_queue_listen';
$sql = $wpdb->prepare("DELETE FROM {$wpdb->options} WHERE option_name = %s", $key);
return $wpdb->query($sql);
}
/**
* @return null
*/
protected function getQueueTimer()
{
global $wpdb;
$key = 'mailchimp_woocommerce_queue_listen';
$row = $wpdb->get_row($wpdb->prepare("SELECT option_value FROM $wpdb->options WHERE option_name = %s LIMIT 1", $key));
return is_object($row) ? (int) unserialize($row->option_value) : null;
}
/**
* @param null $time
* @return mixed
*/
protected function updateQueueTimer($time = null)
{
global $wpdb;
if (empty($this->start_time)) {
$this->start_time = time();
}
$value = $time ?: $this->start_time+600;
$values = array(
'option_value' => serialize($value),
'autoload' => 'no',
);
$key = 'mailchimp_woocommerce_queue_listen';
$updated = $wpdb->update($wpdb->options, $values, array('option_name' => $key));
if ($updated) {
return $updated;
}
$values['option_name'] = $key;
return $wpdb->insert($wpdb->options, $values);
}
/**
* @return bool
*/
protected function all_good_under_the_hood()
{
return !$this->time_exceeded() && !$this->memory_exceeded();
}
/**
* Memory exceeded
*
* Ensures the worker process never exceeds 80%
* of the maximum allowed PHP memory.
*
* @return bool
*/
protected function memory_exceeded() {
return memory_get_usage( true ) >= ($this->get_memory_limit() * 0.8);
}
/**
* Get memory limit
*
* @return int
*/
protected function get_memory_limit() {
return intval($this->getServerMemoryLimit()) * 1024 * 1024;
}
/**
* Time exceeded
*
* Ensures the worker never exceeds a sensible time limit (50s by default).
* A timeout limit of 30s is common on shared hosting.
*
* @return bool
*/
protected function time_exceeded() {
return time() >= $this->start_time + apply_filters( 'cli_worker_default_time_limit', ($this->getServerMaxExecutionTime() - 10));
}
/**
* @return int
*/
protected function getServerMaxExecutionTime()
{
$time_limit = (int) function_exists( 'ini_get' ) ? ini_get( 'max_execution_time' ) : 30;
if (!$time_limit || -1 == $time_limit) {
$time_limit = 1800;
}
return $time_limit;
}
/**
* @return string
*/
protected function getServerMemoryLimit()
{
$memory_limit = function_exists( 'ini_get' ) ? ini_get( 'memory_limit' ) : '128M';
if (!$memory_limit || -1 == $memory_limit) {
$memory_limit = '32000M';
}
return (int) preg_replace_callback('/(\-?\d+)(.?)/', function ($m) {
return $m[1] * pow(1024, strpos('BKMG', $m[2]));
}, strtoupper($memory_limit));
}
}

View File

@@ -0,0 +1,340 @@
<?php
if ( ! class_exists( 'WP_Http_Worker' ) ) {
class WP_Http_Worker extends WP_Worker {
/**
* Has the worker been dispatched in this request?
*
* @var bool
*/
protected $dispatched = false;
/**
* Timestamp of when this worker started processing the queue.
*
* @var int
*/
protected $start_time;
/**
* WP_Http_Worker constructor
*
* @param WP_Queue $queue
*/
public function __construct( $queue ) {
parent::__construct( $queue );
// Cron health check
add_action( 'http_worker_cron', array( $this, 'handle_cron' ) );
add_filter( 'cron_schedules', array( $this, 'schedule_cron' ) );
$this->maybe_schedule_cron();
// Dispatch handlers
add_action( 'wp_ajax_http_worker', array( $this, 'maybe_handle' ) );
add_action( 'wp_ajax_nopriv_http_worker', array( $this, 'maybe_handle' ) );
// Dispatch listener
add_action( 'wp_queue_job_pushed', array( $this, 'maybe_dispatch_worker' ) );
if (isset($_REQUEST['action']) && $_REQUEST['action'] === 'http_worker' && check_ajax_referer( 'http_worker', 'nonce', false)) {
add_action('init', array($this, 'handle'));
}
}
/**
* Maybe handle
*
* Process the queue if no other HTTP worker is running and
* the current worker is within server memory and time limit constraints.
* Automatically dispatch another worker and kill the current process if
* jobs remain in the queue and server limits reached.
*/
public function maybe_handle() {
check_ajax_referer( 'http_worker', 'nonce' );
$this->handle();
}
/**
*
*/
public function handle()
{
if ( $this->is_worker_running() ) {
// Worker already running, die
wp_die();
}
// Lock worker to prevent multiple instances spawning
$this->lock_worker();
$processed_something = false;
// Loop over jobs while within server limits
while ( ! $this->time_exceeded() && ! $this->memory_exceeded() ) {
if ( $this->should_run() ) {
$this->process_next_job();
$processed_something = true;
} else {
break;
}
}
// Unlock worker to allow another instance to be spawned
$this->unlock_worker();
$available_jobs = $this->queue->available_jobs();
if (!$processed_something && $available_jobs) {
mailchimp_debug('queue_tracer', "HTTPWorker@handle", array(
'jobs' => $available_jobs,
'time_exceeded' => $this->time_exceeded(),
'memory_exceeded' => $this->memory_exceeded(),
'memory_limit' => $this->get_memory_limit(),
'memory_usage' => memory_get_usage(true),
'ini_memory' => ini_get('memory_limit'),
'php_version' => phpversion(),
));
wp_die();
}
if ($available_jobs) {
// Job queue not empty, dispatch async worker request
$this->dispatch();
}
wp_die();
}
/**
* Memory exceeded
*
* Ensures the worker process never exceeds 80%
* of the maximum allowed PHP memory.
*
* @return bool
*/
protected function memory_exceeded() {
$memory_limit = $this->get_memory_limit() * 0.8; // 80% of max memory
$current_memory = memory_get_usage( true );
$return = false;
if ( $current_memory >= $memory_limit ) {
$return = true;
}
return apply_filters( 'http_worker_memory_exceeded', $return );
}
/**
* Get memory limit
*
* @return int
*/
protected function get_memory_limit() {
if ( function_exists( 'ini_get' ) ) {
$memory_limit = ini_get( 'memory_limit' );
} else {
// Sensible default
$memory_limit = '128M';
}
if ( ! $memory_limit || -1 == $memory_limit ) {
// Unlimited, set to 32GB
$memory_limit = '32000M';
}
return (int) preg_replace_callback('/(\-?\d+)(.?)/', function ($m) {
return $m[1] * pow(1024, strpos('BKMG', $m[2]));
}, strtoupper($memory_limit));
}
/**
* Time exceeded
*
* Ensures the worker never exceeds a sensible time limit (20s by default).
* A timeout limit of 30s is common on shared hosting.
*
* @return bool
*/
protected function time_exceeded() {
$finish = $this->start_time + apply_filters( 'http_worker_default_time_limit', 20 ); // 20 seconds
$return = false;
if ( time() >= $finish ) {
$return = true;
}
return apply_filters( 'http_worker_time_exceeded', $return );
}
/**
* Maybe dispatch worker
*
* Dispatch a worker process if we haven't already in this request
* and no other HTTP workers are running.
*
* @param WP_Job $job
*/
public function maybe_dispatch_worker( $job ) {
if ( $this->is_worker_running() ) {
// HTTP worker already running, return
return;
}
// Dispatch async worker request
$this->dispatch();
}
/**
* Is worker running
*
* Check if another instance of the HTTP worker is running.
*
* @return bool
*/
protected function is_worker_running() {
if ( get_site_transient( 'http_worker_lock' ) ) {
// Process already running
return true;
}
return false;
}
/**
* Lock worker
*
* Lock the HTTP worker to prevent multiple instances running.
*/
protected function lock_worker() {
$this->start_time = time(); // Set start time of current worker
$lock_duration = apply_filters( 'http_worker_lock_time', 60 ); // 60 seconds
set_site_transient( 'http_worker_lock', microtime(), $lock_duration );
}
/**
* Unlock worker
*
* Unlock the HTTP worker to allow other instances to be spawned.
*/
protected function unlock_worker() {
delete_site_transient( 'http_worker_lock' );
}
/**
* Dispatch
*
* Fire off a non-blocking async request if we haven't already
* in this request.
*/
protected function dispatch() {
if ( $this->is_http_worker_disabled() ) {
return;
}
if ( ! $this->dispatched ) {
$this->async_request();
}
$this->dispatched = true;
}
/**
* Is HTTP worker disabled
*
* @return bool
*/
protected function is_http_worker_disabled() {
if ( ! defined( 'DISABLE_WP_HTTP_WORKER' ) || true !== DISABLE_WP_HTTP_WORKER ) {
return false;
}
return true;
}
/**
* Async request
*
* Fire off a non-blocking request to admin-ajax.php.
*
* @return array|WP_Error
*/
protected function async_request() {
$action = 'http_worker';
$query_args = apply_filters( 'http_worker_query_args', array(
'action' => $action,
'nonce' => wp_create_nonce( $action ),
) );
$query_url = apply_filters( 'http_worker_query_url', admin_url( 'admin-ajax.php' ) );
$post_args = apply_filters( 'http_worker_post_args', array(
'timeout' => 0.01,
'blocking' => false,
'cookies' => $_COOKIE,
'sslverify' => apply_filters( 'https_local_ssl_verify', false ),
) );
$url = add_query_arg( $query_args, $query_url );
return wp_remote_post( esc_url_raw( $url ), $post_args );
}
/**
* @return bool
*/
public function handle_cron() {
if ($this->is_worker_running()) {
wp_die();
}
if ($this->queue->available_jobs()) {
$this->dispatch();
return true;
}
return false;
}
/**
* Cron schedules
*
* @param $schedules
*
* @return mixed
*/
public function schedule_cron( $schedules ) {
$interval = apply_filters( 'http_worker_cron_interval', 3 );
// Adds every 3 minutes to the existing schedules.
$schedules[ 'http_worker_cron_interval' ] = array(
'interval' => MINUTE_IN_SECONDS * $interval,
'display' => sprintf( __( 'Every %d Minutes' ), $interval ),
);
return $schedules;
}
/**
* Maybe schedule cron
*
* Schedule health check cron if not disabled. Remove schedule if
* disabled and already scheduled.
*/
public function maybe_schedule_cron() {
if ( !$this->is_http_worker_disabled() && ! wp_next_scheduled( 'http_worker_cron' )) {
// Schedule health check
wp_schedule_event( time(), 'http_worker_cron_interval', 'http_worker_cron' );
}
}
}
}

View File

@@ -0,0 +1,96 @@
<?php
if ( ! class_exists( 'WP_Worker' ) ) {
class WP_Worker {
/**
* @var WP_Queue
*/
protected $queue;
/**
* @var WP_Job
*/
protected $payload;
/**
* WP_Worker constructor.
*
* @param WP_Queue $queue
*/
public function __construct( $queue ) {
$this->queue = $queue;
}
/**
* Should run
*
* @return bool
*/
public function should_run() {
if ( $this->queue->available_jobs() ) {
return true;
}
return false;
}
/**
* Process next job.
*
* @return bool
*/
public function process_next_job() {
$job = $this->queue->get_next_job();
if (empty($job)) {
return false;
}
$this->payload = unserialize( $job->job );
$this->queue->lock_job( $job );
$this->payload->set_job( $job );
try {
$this->payload->handle();
if ( $this->payload->is_deleted() ) {
// Job manually deleted, delete from queue
$this->queue->delete( $job );
return true;
}
if ( $this->payload->is_released() ) {
// Job manually released, release back onto queue
$this->queue->release( $job, $this->payload->get_delay() );
}
if ( ! $this->payload->is_deleted_or_released() ) {
// Job completed, delete from queue
$this->queue->delete( $job );
}
} catch ( Exception $e ) {
mailchimp_log('queue.error', "{$e->getMessage()} on {$e->getLine()} in {$e->getFile()}", array('job' => get_class($this->payload)));
$this->queue->release( $job );
return false;
}
if (defined('WP_CLI') && WP_CLI && property_exists($this->payload, 'should_kill_queue_listener') && $this->payload->should_kill_queue_listener === true) {
wp_die('killing queue listener');
}
return true;
}
/**
* Get job name.
*
* @return object
*/
public function get_job_name() {
return get_class( $this->payload );
}
}
}

View File

@@ -0,0 +1,105 @@
<?php
if ( ! class_exists( 'WP_Job' ) ) {
abstract class WP_Job {
public $should_kill_queue_listener = false;
/**
* @var stdClass
*/
private $job;
/**
* @var int
*/
private $delay = 0;
/**
* @var bool
*/
private $deleted = false;
/**
* @var bool
*/
private $released = false;
/**
* Set job
*
* @param $job
*/
public function set_job( $job ) {
$this->job = $job;
}
/**
* Delete the job from the queue
*/
protected function delete() {
$this->deleted = true;
}
/**
* Release a job back onto the queue
*
* @param int $delay
*/
protected function release( $delay = 0 ) {
$this->released = true;
$this->delay = $delay;
}
/**
* Attempts
*
* @return int
*/
protected function attempts() {
return (int) $this->job->attempts;
}
/**
* Is deleted.
*
* @return bool
*/
public function is_deleted() {
return $this->deleted;
}
/**
* Is released.
*
* @return bool
*/
public function is_released() {
return $this->released;
}
/**
* Is deleted for released
*
* @return bool
*/
public function is_deleted_or_released() {
return $this->is_deleted() || $this->is_released();
}
/**
* Get delay.
*
* @return int
*/
public function get_delay() {
return $this->delay;
}
/**
* Handle the job.
*/
abstract public function handle();
}
}

View File

@@ -0,0 +1,226 @@
<?php
if ( ! class_exists( 'WP_Queue' ) ) {
class WP_Queue {
/**
* @var string
*/
public $table;
/**
* @var string
*/
public $failed_table;
/**
* @var int
*/
public $release_time = 60;
/**
* WP_Queue constructor
*/
public function __construct() {
global $wpdb;
$this->table = $wpdb->prefix . 'queue';
$this->failed_table = $wpdb->prefix . 'failed_jobs';
}
/**
* Push a job onto the queue.
*
* @param WP_Job $job
* @param int $delay
*
* @return $this
*/
public function push( WP_Job $job, $delay = 0 ) {
global $wpdb;
$data = array(
'job' => maybe_serialize( $job ),
'available_at' => $this->datetime( $delay ),
'created_at' => $this->datetime(),
);
$id = $wpdb->insert( $this->table, $data );
return $this;
}
/**
* Release.
*
* @param object $job
* @param int $delay
*/
public function release( $job, $delay = 0 ) {
if ( $job->attempts >= 3 ) {
$this->failed( $job );
return;
}
global $wpdb;
$data = array(
'attempts' => $job->attempts + 1,
'locked' => 0,
'locked_at' => null,
'available_at' => $this->datetime( $delay ),
);
$where = array(
'id' => $job->id,
);
$wpdb->update( $this->table, $data, $where );
}
/**
* Failed
*
* @param stdClass $job
*/
protected function failed( $job ) {
global $wpdb;
$wpdb->insert( $this->failed_table, array(
'job' => $job->job,
'failed_at' => $this->datetime(),
) );
$payload = unserialize($job->job);
if (method_exists($payload, 'failed')) {
$payload->failed();
}
$this->delete( $job );
}
/**
* Delete.
*
* @param object $job
*/
public function delete( $job ) {
global $wpdb;
$wpdb->delete($this->table, array('id' => $job->id));
}
/**
* Get MySQL datetime.
*
* @param int $offset Seconds, can pass negative int.
*
* @return string
*/
protected function datetime($offset = 0) {
$timestamp = time() + $offset;
return gmdate( 'Y-m-d H:i:s', $timestamp );
}
/**
* Available jobs.
*/
public function available_jobs() {
global $wpdb;
$now = $this->datetime();
$sql = $wpdb->prepare( "
SELECT COUNT(*) FROM {$this->table}
WHERE available_at <= %s"
, $now );
return $wpdb->get_var( $sql );
}
/**
* Available jobs.
*/
public function failed_jobs() {
global $wpdb;
return $wpdb->get_var( "SELECT COUNT(*) FROM {$this->failed_table}" );
}
/**
* Restart failed jobs.
*/
public function restart_failed_jobs() {
global $wpdb;
$count = 0;
$jobs = $wpdb->get_results( "SELECT * FROM {$this->failed_table}" );
foreach ( $jobs as $job ) {
$this->push( maybe_unserialize( $job->job ) );
$wpdb->delete( $this->failed_table, array(
'id' => $job->id,
) );
$count++;
}
return $count;
}
/**
* Get next job.
*/
public function get_next_job() {
global $wpdb;
$this->maybe_release_locked_jobs();
$now = $this->datetime();
$sql = $wpdb->prepare( "
SELECT * FROM {$this->table}
WHERE locked = 0
AND available_at <= %s"
, $now );
return $wpdb->get_row( $sql );
}
/**
* Maybe release locked jobs.
*/
protected function maybe_release_locked_jobs() {
global $wpdb;
$expired = $this->datetime( - $this->release_time );
$sql = $wpdb->prepare( "
UPDATE {$this->table}
SET attempts = attempts + 1, locked = 0, locked_at = NULL
WHERE locked = 1
AND locked_at <= %s"
, $expired );
$wpdb->query( $sql );
}
/**
* Lock job.
*
* @param object $job
*/
public function lock_job( $job ) {
global $wpdb;
$data = array(
'locked' => 1,
'locked_at' => $this->datetime(),
);
$where = array(
'id' => $job->id,
);
$wpdb->update( $this->table, $data, $where );
}
}
}