Skip to content

Commit

Permalink
Merge pull request #95 from pentium10/master
Browse files Browse the repository at this point in the history
Created the option to search for jobs by their data field, fixes #75
  • Loading branch information
pentium10 committed May 18, 2015
2 parents 3c92206 + 5ea3f99 commit d289b7c
Show file tree
Hide file tree
Showing 14 changed files with 1,091 additions and 823 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- You can move jobs between tubes
- Ability to Pause tubes
- Saved jobs (store sample jobs as a template, kick/edit them, very useful for development)
- Search jobs data field
- Customizable UI (code highlighter, choose columns, edit auto refresh seconds, pause tube seconds)

Change log on [Releases](https://github.com/ptrofimov/beanstalk_console/releases).
Expand Down
12 changes: 12 additions & 0 deletions lib/Pheanstalk.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ public function delete($job)
$this->_dispatch(new Pheanstalk_Command_DeleteCommand($job));
return $this;
}

/**
* Kicks job.
*
* @param object $job Pheanstalk_Job
* @chainable
*/
public function kickJob($job)
{
$this->_dispatch(new Pheanstalk_Command_KickJobCommand($job));
return $this;
}

/**
* Remove the specified tube from the watchlist
Expand Down
50 changes: 50 additions & 0 deletions lib/Pheanstalk/Command/KickJobCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

/**
* The 'kick-job' command.
* Kicks a specific buried or delayed job into a 'ready' state.
*
* A variant of kick that operates with a single job. If the given job
* exists and is in a buried or delayed state, it will be moved to the
* ready queue of the the same tube where it currently belongs.
*
* @author Matthieu Napoli
* @package Pheanstalk
* @licence http://www.opensource.org/licenses/mit-license.php
*/
class Pheanstalk_Command_KickJobCommand extends Pheanstalk_Command_AbstractCommand implements Pheanstalk_ResponseParser {

private $_job;

/**
* @param object $job Pheanstalk_Job
*/
public function __construct($job) {
$this->_job = $job;
}

/* (non-phpdoc)
* @see Pheanstalk_Command::getCommandLine()
*/

public function getCommandLine() {
return 'kick-job ' . $this->_job->getId();
}

/* (non-phpdoc)
* @see Pheanstalk_ResponseParser::parseRespose()
*/
public function parseResponse($responseLine, $responseData) {
if ($responseLine == Pheanstalk_Response::RESPONSE_NOT_FOUND) {
throw new Pheanstalk_Exception_ServerException(sprintf(
'%s: Job %d does not exist or is not in a kickable state.', $responseLine, $this->_job->getId()
));
} elseif ($responseLine == Pheanstalk_Response::RESPONSE_KICKED) {
return $this->_createResponse(Pheanstalk_Response::RESPONSE_KICKED);
} else {
throw new Exception('Unhandled response: ' . $responseLine);
}
return $this->_createResponse($responseLine);
}

}
112 changes: 100 additions & 12 deletions lib/include.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Console {
private $serversConfig = array();
private $serversEnv = array();
private $serversCookie = array();
private $searchResults = array();
private $actionTimeStart = 0;

public function __construct() {
$this->__init();
Expand Down Expand Up @@ -195,6 +197,10 @@ public function getTubeStatValues($tube) {
}
}

public function getSearchResult() {
return $this->searchResults;
}

protected function __init() {
global $server, $action, $state, $count, $tube, $config, $tplMain, $tplBlock;

Expand Down Expand Up @@ -362,6 +368,16 @@ protected function _actionKick() {
exit();
}

protected function _actionKickJob() {
$job = $this->interface->_client->peek(intval($_GET['jobid']));
if ($job) {
$this->interface->_client->kickJob($job);
}
header(
sprintf('Location: index.php?server=%s&tube=%s', $this->_globalVar['server'], $this->_globalVar['tube']));
exit();
}

protected function _actionDelete() {
switch ($this->_globalVar['state']) {
case 'ready':
Expand All @@ -378,6 +394,14 @@ protected function _actionDelete() {
$this->_postDelete();
}

protected function _actionDeleteJob() {
$job = $this->interface->_client->peek(intval($_GET['jobid']));
if ($job) {
$this->interface->_client->delete($job);
}
$this->_postDelete();
}

protected function _postDelete() {
$arr = $this->getTubeStatValues($this->_globalVar['tube']);
$availableJobs = $arr['current-jobs-urgent'] + $arr['current-jobs-ready'] + $arr['current-jobs-reserved'] + $arr['current-jobs-delayed'] + $arr['current-jobs-buried'];
Expand Down Expand Up @@ -468,19 +492,9 @@ protected function _actionAddSample() {
$success = false;
$error = '';
$response = array('result' => &$success, 'error' => &$error);
if (isset($_POST['addsamplestate']) && isset($_POST['addsamplename']) && isset($_POST['tube']) && isset($_POST['tubes'])) {
if (isset($_POST['addsamplejobid']) && isset($_POST['addsamplename']) && isset($_POST['tube']) && isset($_POST['tubes'])) {
try {
switch ($_POST['addsamplestate']) {
case 'ready':
$job = $this->interface->_client->useTube($_POST['tube'])->peekReady();
break;
case 'delayed':
$job = $this->interface->_client->useTube($_POST['tube'])->peekDelayed();
break;
case 'buried':
$job = $this->interface->_client->useTube($_POST['tube'])->peekBuried();
break;
}
$job = $this->interface->_client->peek(intval($_POST['addsamplejobid']));
if ($job) {
$res = $this->_storeSampleJob($_POST, $job->getData());
if ($res === true) {
Expand Down Expand Up @@ -652,6 +666,80 @@ protected function _actionMoveJobsTo() {
}
}

protected function _actionSearch() {
global $server, $tube, $state;
$this->actionTimeStart = microtime(true);
$timelimit_in_seconds = 10;
$searchStr = (isset($_GET['searchStr'])) ? $_GET['searchStr'] : null;
$states = array('ready', 'delayed', 'buried');
$jobList = array();
$limit = null;

if ($searchStr === null or $searchStr === '')
return false;

if (isset($_GET['limit'])) {
$limit = intval($_GET['limit']);
}

foreach ($states as $state) {
$jobList[$state] = $this->findJobsByState($tube, $state, $searchStr, $limit);
$jobList['total']+=count($jobList[$state]);
}

$this->searchResults = $jobList;
}

private function findJobsByState($tube, $state, $searchStr, $limit = 25) {
$jobList = array();
$job = null;
$total = $this->interface->getTubeStats($tube);
$totalJobs = 0;

try {
switch ($state) {
case 'ready':
$job = $this->interface->_client->useTube($tube)->peekReady();
$totalJobs = $total[2]['value'];
break;
case 'delayed':
$job = $this->interface->_client->useTube($tube)->peekDelayed();
$totalJobs = $total[4]['value'];
break;
case 'buried':
$job = $this->interface->_client->useTube($tube)->peekBuried();
$totalJobs = $total[5]['value'];
break;
}
} catch (Exception $e) {

}

if ($job === null)
return $jobList;

$jobList = array();
$lastId = $job->getId() + $totalJobs;

$added = 0;
for ($id = $job->getId(); $id < $lastId; $id++) {
try {
$job = $this->interface->_client->peek($id);
if (strpos($job->getData(), $searchStr) !== false) {
$jobList[$id] = $job;
$added++;
}
} catch (Pheanstalk_Exception_ServerException $e) {

}
if ($added >= $limit || (microtime(true) - $this->actionTimeStart) > $limit) {
break;
}
}

return $jobList;
}

private function _storeSampleJob($post, $jobData) {
$storage = new Storage($this->_globalVar['config']['storage']);
$job_array = array();
Expand Down
Loading

0 comments on commit d289b7c

Please sign in to comment.