Skip to content

Commit

Permalink
Merge branch 'release/0.6.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
tidal committed Oct 3, 2016
2 parents 4cc39f6 + 9b4d7ab commit d5b9c54
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 65 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@

### [0.6.4] - 2016-10-03

* refactoring

### [0.6.3] - 2016-10-02

* cs fixes
Expand Down
5 changes: 2 additions & 3 deletions src/Tidal/WampWatch/Adapter/React/DeferredAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
namespace Tidal\WampWatch\Adapter\React;

use Tidal\WampWatch\Async\DeferredInterface;
use Tidal\WampWatch\Async\PromiseInterface;
use React\Promise\Deferred;

class DeferredAdapter implements DeferredInterface
Expand Down Expand Up @@ -70,7 +69,7 @@ public function getPromiseClass()
}

/**
* @return PromiseInterface
* @return PromiseAdapter
*/
public function promise()
{
Expand Down Expand Up @@ -102,7 +101,7 @@ public function notify($update = null)
}

/**
* @return PromiseInterface
* @return PromiseAdapter
*/
private function createPromise()
{
Expand Down
40 changes: 35 additions & 5 deletions src/Tidal/WampWatch/Adapter/React/PromiseAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,19 @@ public function getAdaptee()
*/
public function then(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null)
{
$this->adaptee->then($onFulfilled, $onRejected, $onProgress);
$this->adaptee->then(function () use ($onFulfilled) {
if ($onFulfilled !== null) {
return call_user_func_array($onFulfilled, func_get_args());
}
}, function () use ($onRejected) {
if ($onRejected !== null) {
return call_user_func_array($onRejected, func_get_args());
}
}, function () use ($onProgress) {
if ($onProgress !== null) {
return call_user_func_array($onProgress, func_get_args());
}
});

return $this;
}
Expand All @@ -70,7 +82,19 @@ public function then(callable $onFulfilled = null, callable $onRejected = null,
*/
public function done(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null)
{
$this->adaptee->done($onFulfilled, $onRejected, $onProgress);
$this->adaptee->done(function () use ($onFulfilled) {
if ($onFulfilled !== null) {
return call_user_func_array($onFulfilled, func_get_args());
}
}, function () use ($onRejected) {
if ($onRejected !== null) {
return call_user_func_array($onRejected, func_get_args());
}
}, function () use ($onProgress) {
if ($onProgress !== null) {
return call_user_func_array($onProgress, func_get_args());
}
});

return $this;
}
Expand All @@ -82,7 +106,9 @@ public function done(callable $onFulfilled = null, callable $onRejected = null,
*/
public function otherwise(callable $onRejected)
{
$this->adaptee->otherwise($onRejected);
$this->adaptee->otherwise(function () use ($onRejected) {
return call_user_func_array($onRejected, func_get_args());
});

return $this;
}
Expand All @@ -94,7 +120,9 @@ public function otherwise(callable $onRejected)
*/
public function always(callable $onAlways)
{
$this->adaptee->always($onAlways);
$this->adaptee->always(function () use ($onAlways) {
return call_user_func_array($onAlways, func_get_args());
});

return $this;
}
Expand All @@ -106,7 +134,9 @@ public function always(callable $onAlways)
*/
public function progress(callable $onProgress)
{
$this->adaptee->progress($onProgress);
$this->adaptee->progress(function () use ($onProgress) {
return call_user_func_array($onProgress, func_get_args());
});

return $this;
}
Expand Down
45 changes: 35 additions & 10 deletions src/Tidal/WampWatch/Adapter/Thruway/ClientSession.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

use Thruway\ClientSession as ThruwaySession;
use Tidal\WampWatch\ClientSessionInterface;
use React\Promise\Promise as ReactPromise;
use Tidal\WampWatch\Adapter\React\PromiseAdapter;

class ClientSession implements ClientSessionInterface
{
Expand All @@ -33,11 +35,13 @@ public function __construct(ThruwaySession $thruwaySession)
* @param callable $callback
* @param $options array
*
* @return \React\Promise\Promise
* @return PromiseAdapter
*/
public function subscribe($topicName, callable $callback, $options = null)
{
return $this->thruwaySession->subscribe($topicName, $callback, $options);
return $this->createPromiseAdapter(
$this->thruwaySession->subscribe($topicName, $callback, $options)
);
}

/**
Expand All @@ -48,11 +52,13 @@ public function subscribe($topicName, callable $callback, $options = null)
* @param array|mixed $argumentsKw
* @param array|mixed $options
*
* @return \React\Promise\Promise
* @return PromiseAdapter
*/
public function publish($topicName, $arguments = null, $argumentsKw = null, $options = null)
{
return $this->thruwaySession->publish($topicName, $arguments, $argumentsKw, $options);
return $this->createPromiseAdapter(
$this->thruwaySession->publish($topicName, $arguments, $argumentsKw, $options)
);
}

/**
Expand All @@ -62,23 +68,27 @@ public function publish($topicName, $arguments = null, $argumentsKw = null, $opt
* @param callable $callback
* @param array|mixed $options
*
* @return \React\Promise\Promise
* @return PromiseAdapter
*/
public function register($procedureName, callable $callback, $options = null)
{
return $this->thruwaySession->register($procedureName, $callback, $options);
return $this->createPromiseAdapter(
$this->thruwaySession->register($procedureName, $callback, $options)
);
}

/**
* Unregister.
*
* @param string $procedureName
*
* @return \React\Promise\Promise
* @return PromiseAdapter
*/
public function unregister($procedureName)
{
return $this->thruwaySession->unregister($procedureName);
return $this->createPromiseAdapter(
$this->thruwaySession->unregister($procedureName)
);
}

/**
Expand All @@ -89,11 +99,13 @@ public function unregister($procedureName)
* @param array|mixed $argumentsKw
* @param array|mixed $options
*
* @return \React\Promise\Promise
* @return PromiseAdapter
*/
public function call($procedureName, $arguments = null, $argumentsKw = null, $options = null)
{
return $this->thruwaySession->call($procedureName, $arguments, $argumentsKw, $options);
return $this->createPromiseAdapter(
$this->thruwaySession->call($procedureName, $arguments, $argumentsKw, $options)
);
}

/**
Expand All @@ -112,8 +124,21 @@ public function getSessionId()
return $this->thruwaySession->getSessionId();
}

/**
* @param $msg
*/
public function sendMessage($msg)
{
$this->thruwaySession->sendMessage($msg);
}

/**
* @param ReactPromise $promise
*
* @return PromiseAdapter
*/
private function createPromiseAdapter(ReactPromise $promise)
{
return new PromiseAdapter($promise);
}
}
2 changes: 1 addition & 1 deletion src/Tidal/WampWatch/Model/Contract/RouterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
interface RouterInterface
{
/**
* @return string;
* @return string
*/
public function getUri();

Expand Down
38 changes: 38 additions & 0 deletions src/Tidal/WampWatch/MonitorTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
use React\Promise\Promise;
use Tidal\WampWatch\ClientSessionInterface as ClientSession;
use Tidal\WampWatch\Subscription\Collection as SubscriptionCollection;
use React\Promise\Deferred;
use Tidal\WampWatch\Adapter\React\PromiseAdapter;
use Tidal\WampWatch\Adapter\React\DeferredAdapter;

/**
* Description of MonitorTrait.
Expand Down Expand Up @@ -188,4 +191,39 @@ private function getErrorCallback()
return $error;
};
}

private function retrieveCallData($procedure, callable $filter = null, $arguments = [])
{
$deferred = new DeferredAdapter(
new Deferred()
);

$filter = $filter ?: function ($res) {
return $res;
};

$this->session->call($procedure, $arguments)
->then(
function ($res) use ($deferred, $filter) {
$deferred->resolve($filter($res));
},
$this->getErrorCallback()
);

return $deferred->promise();
}

/**
* @param callable $callback
*
* @return PromiseAdapter
*/
private function createPromiseAdapter(callable $callback)
{
return new PromiseAdapter(
new Promise(
$callback
)
);
}
}
49 changes: 30 additions & 19 deletions src/Tidal/WampWatch/SessionMonitor.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
namespace Tidal\WampWatch;

use Evenement\EventEmitterInterface;
use React\Promise\Promise;
use Tidal\WampWatch\ClientSessionInterface as ClientSession;
use Tidal\WampWatch\Adapter\React\PromiseAdapter;

/**
* Description of SessionMonitor.
Expand Down Expand Up @@ -62,7 +62,7 @@ public function __construct(ClientSession $session)
*
* @param $sessionId
*
* @return \React\Promise\Promise;
* @return PromiseAdapter
*/
public function getSessionInfo($sessionId)
{
Expand All @@ -83,17 +83,19 @@ function ($error) {
* registered on the wamp-router in the monitor's realm
* and populates the data via given callback,.
*
* @return Promise
* @return PromiseAdapter
*/
public function getSessionIds()
{
if (!count($this->sessionIds)) {
return $this->retrieveSessionIds();
}

return new Promise(function (callable $resolve) {
$resolve($this->sessionIds);
});
return $this->createPromiseAdapter(
function (callable $resolve) {
$resolve($this->getList());
}
);
}

/**
Expand Down Expand Up @@ -189,34 +191,42 @@ protected function initSetupCalls()
/**
* Retrieves the list of current sessionIds on the router.
*
* @return \React\Promise\Promise;
* @return PromiseAdapter
*/
protected function retrieveSessionIds()
{
return $this->session->call(self::SESSION_LIST_TOPIC, [])
->then(
$this->getSessionIdRetrievalCallback(),
$this->getErrorCallback()
);
return $this->retrieveCallData(
self::SESSION_LIST_TOPIC,
$this->getSessionIdRetrievalCallback(),
[]
);
}

/**
* @return \Closure
*/
protected function getSessionIdRetrievalCallback()
{
return function ($res) {
// remove our own sessionID from the tracked sessions
$sessionIds = $this->removeOwnSessionId($res[0]);
$this->setList($sessionIds);
$this->emit('list', [$this->getList()]);
$this->setList($res[0]);
$sessionIds = $this->getList();
$this->emit('list', [$sessionIds]);

return $this->getList();
return $sessionIds;
};
}

/**
* @param $list
*/
protected function setList($list)
{
$this->sessionIds = $list;
$this->sessionIds = $this->removeOwnSessionId($list);
}

/**
* @return array
*/
protected function getList()
{
return $this->sessionIds;
Expand All @@ -232,7 +242,8 @@ protected function getList()
protected function removeOwnSessionId(array $sessionsIds)
{
$key = array_search($this->session->getSessionId(), $sessionsIds);
if ($key >= 0) {

if ($key !== false && $key >= 0) {
unset($sessionsIds[$key]);
$sessionsIds = array_values($sessionsIds);
}
Expand Down
Loading

0 comments on commit d5b9c54

Please sign in to comment.