Skip to content

Commit 4bfe46f

Browse files
committed
Updated to RxPHP v2
1 parent 851febe commit 4bfe46f

File tree

4 files changed

+53
-45
lines changed

4 files changed

+53
-45
lines changed

composer.json

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,16 @@
2222
],
2323
"autoload": {
2424
"psr-4": {
25-
"Rx\\Extra\\": "src/"
25+
"Rx\\": "src/"
2626
}
2727
},
2828
"require": {
29-
"reactivex/rxphp": "^1.0.0"
29+
"php":"^7.0",
30+
"reactivex/rxphp": "2.x-dev",
31+
"async-interop/event-loop": "^0.4"
32+
},
33+
"dev-require": {
34+
"async-interop/event-loop-implementation": "^0.4",
35+
"wyrihaximus/react-async-interop-loop": "^0.2.1"
3036
}
3137
}

src/Observable/FromEventEmitterObservable.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Rx\Extra\Observable;
3+
namespace Rx\Observable;
44

55
use Rx\Disposable\EmptyDisposable;
66
use Rx\DisposableInterface;
@@ -41,10 +41,9 @@ public function __construct($object, $nextAction = 'data', $errorAction = 'error
4141

4242
/**
4343
* @param ObserverInterface $observer
44-
* @param \Rx\SchedulerInterface|null $scheduler
4544
* @return DisposableInterface
4645
*/
47-
public function subscribe(ObserverInterface $observer, \Rx\SchedulerInterface $scheduler = null)
46+
public function _subscribe(ObserverInterface $observer): DisposableInterface
4847
{
4948
$this->object->on($this->nextAction, function () use ($observer) {
5049
$observer->onNext(func_get_args());

src/Operator/CutOperator.php

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
<?php
22

3-
namespace Rx\Extra\Operator;
3+
namespace Rx\Operator;
44

5+
use Rx\DisposableInterface;
56
use Rx\Observable;
67
use Rx\ObservableInterface;
78
use Rx\ObserverInterface;
8-
use Rx\Operator\OperatorInterface;
9+
use Rx\Scheduler;
910
use Rx\SchedulerInterface;
1011

1112
/**
@@ -17,39 +18,40 @@
1718
class CutOperator implements OperatorInterface
1819
{
1920

20-
private $delimiter;
21+
private $delimiter, $scheduler;
2122

22-
public function __construct($delimiter = PHP_EOL)
23+
public function __construct($delimiter = PHP_EOL, SchedulerInterface $scheduler = null)
2324
{
2425
$this->delimiter = $delimiter;
26+
$this->scheduler = $scheduler ?: Scheduler::getDefault();
2527
}
2628

2729
/**
2830
* @param \Rx\ObservableInterface $observable
2931
* @param \Rx\ObserverInterface $observer
30-
* @param \Rx\SchedulerInterface $scheduler
3132
* @return \Rx\DisposableInterface
33+
* @throws \InvalidArgumentException
3234
*/
33-
public function __invoke(ObservableInterface $observable, ObserverInterface $observer, SchedulerInterface $scheduler = null)
35+
public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface
3436
{
35-
36-
$buffer = "";
37+
$buffer = '';
3738

39+
/** @var $observable Observable */
3840
return $observable
39-
->defaultIfEmpty(Observable::just(null))
40-
->concat(Observable::just($this->delimiter))
41+
->defaultIfEmpty(Observable::of(null, $this->scheduler))
42+
->concat(Observable::of($this->delimiter, $this->scheduler))
4143
->concatMap(function ($x) use (&$buffer) {
4244

4345
if ($x === null || $buffer === null) {
4446
$buffer = null;
45-
return Observable::emptyObservable();
47+
return Observable::empty($this->scheduler);
4648
}
4749

4850
$items = explode($this->delimiter, $buffer . $x);
4951
$buffer = array_pop($items);
5052

51-
return Observable::fromArray($items);
53+
return Observable::fromArray($items, $this->scheduler);
5254
})
53-
->subscribe($observer, $scheduler);
55+
->subscribe($observer);
5456
}
5557
}

tests/Functional/Operator/CutTest.php

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,22 @@
44

55
use Rx\Functional\FunctionalTestCase;
66
use Rx\Observable;
7-
use Rx\Extra\Operator\CutOperator;
7+
use Rx\Operator\CutOperator;
8+
89

910
class CutTest extends FunctionalTestCase
1011
{
1112

1213
/**
1314
* @test
1415
*/
15-
function cut_never()
16+
public function cut_never()
1617
{
1718
$xs = Observable::never();
1819

1920
$results = $this->scheduler->startWithCreate(function () use ($xs) {
2021
return $xs->lift(function () {
21-
return new CutOperator();
22+
return new CutOperator(PHP_EOL, $this->scheduler);
2223
});
2324
});
2425
$this->assertMessages([], $results->getMessages());
@@ -27,7 +28,7 @@ function cut_never()
2728
/**
2829
* @test
2930
*/
30-
function cut_empty()
31+
public function cut_empty()
3132
{
3233
$xs = $this->createHotObservable([
3334
onNext(150, 1),
@@ -36,7 +37,7 @@ function cut_empty()
3637

3738
$results = $this->scheduler->startWithCreate(function () use ($xs) {
3839
return $xs->lift(function () {
39-
return new CutOperator();
40+
return new CutOperator(PHP_EOL, $this->scheduler);
4041
});
4142
});
4243
$this->assertMessages([onCompleted(233)], $results->getMessages());
@@ -45,7 +46,7 @@ function cut_empty()
4546
/**
4647
* @test
4748
*/
48-
function cut_default_delimiter()
49+
public function cut_default_delimiter()
4950
{
5051
$xs = $this->createHotObservable([
5152
onNext(150, 1),
@@ -55,7 +56,7 @@ function cut_default_delimiter()
5556

5657
$results = $this->scheduler->startWithCreate(function () use ($xs) {
5758
return $xs->lift(function () {
58-
return new CutOperator();
59+
return new CutOperator(PHP_EOL, $this->scheduler);
5960
});
6061
});
6162
$this->assertMessages([
@@ -68,7 +69,7 @@ function cut_default_delimiter()
6869
/**
6970
* @test
7071
*/
71-
function cut_comma_delimiter()
72+
public function cut_comma_delimiter()
7273
{
7374
$xs = $this->createHotObservable([
7475
onNext(150, 1),
@@ -78,7 +79,7 @@ function cut_comma_delimiter()
7879

7980
$results = $this->scheduler->startWithCreate(function () use ($xs) {
8081
return $xs->lift(function () {
81-
return new CutOperator(',');
82+
return new CutOperator(',', $this->scheduler);
8283
});
8384
});
8485
$this->assertMessages([
@@ -96,7 +97,7 @@ function cut_comma_delimiter()
9697
/**
9798
* @test
9899
*/
99-
function cut_comma_delimiter_empty_last()
100+
public function cut_comma_delimiter_empty_last()
100101
{
101102
$xs = $this->createHotObservable([
102103
onNext(150, 1),
@@ -106,7 +107,7 @@ function cut_comma_delimiter_empty_last()
106107

107108
$results = $this->scheduler->startWithCreate(function () use ($xs) {
108109
return $xs->lift(function () {
109-
return new CutOperator(',');
110+
return new CutOperator(',', $this->scheduler);
110111
});
111112
});
112113
$this->assertMessages([
@@ -124,7 +125,7 @@ function cut_comma_delimiter_empty_last()
124125
/**
125126
* @test
126127
*/
127-
function cut_comma_delimiter_empty_first()
128+
public function cut_comma_delimiter_empty_first()
128129
{
129130
$xs = $this->createHotObservable([
130131
onNext(150, 1),
@@ -134,7 +135,7 @@ function cut_comma_delimiter_empty_first()
134135

135136
$results = $this->scheduler->startWithCreate(function () use ($xs) {
136137
return $xs->lift(function () {
137-
return new CutOperator(',');
138+
return new CutOperator(',', $this->scheduler);
138139
});
139140
});
140141
$this->assertMessages([
@@ -152,7 +153,7 @@ function cut_comma_delimiter_empty_first()
152153
/**
153154
* @test
154155
*/
155-
function cut_comma_delimiter_skip_time()
156+
public function cut_comma_delimiter_skip_time()
156157
{
157158
$xs = $this->createHotObservable([
158159
onNext(150, 1),
@@ -163,7 +164,7 @@ function cut_comma_delimiter_skip_time()
163164

164165
$results = $this->scheduler->startWithCreate(function () use ($xs) {
165166
return $xs->lift(function () {
166-
return new CutOperator(',');
167+
return new CutOperator(',', $this->scheduler);
167168
});
168169
});
169170
$this->assertMessages([
@@ -181,7 +182,7 @@ function cut_comma_delimiter_skip_time()
181182
/**
182183
* @test
183184
*/
184-
function cut_comma_delimiter_buffer_all()
185+
public function cut_comma_delimiter_buffer_all()
185186
{
186187
$xs = $this->createHotObservable([
187188
onNext(150, 1),
@@ -194,7 +195,7 @@ function cut_comma_delimiter_buffer_all()
194195

195196
$results = $this->scheduler->startWithCreate(function () use ($xs) {
196197
return $xs->lift(function () {
197-
return new CutOperator(',');
198+
return new CutOperator(',', $this->scheduler);
198199
});
199200
});
200201
$this->assertMessages([
@@ -209,7 +210,7 @@ function cut_comma_delimiter_buffer_all()
209210
/**
210211
* @test
211212
*/
212-
function cut_empty_string()
213+
public function cut_empty_string()
213214
{
214215
$xs = $this->createHotObservable([
215216
onNext(150, 1),
@@ -219,7 +220,7 @@ function cut_empty_string()
219220

220221
$results = $this->scheduler->startWithCreate(function () use ($xs) {
221222
return $xs->lift(function () {
222-
return new CutOperator();
223+
return new CutOperator(PHP_EOL, $this->scheduler);
223224
});
224225
});
225226
$this->assertMessages([
@@ -231,7 +232,7 @@ function cut_empty_string()
231232
/**
232233
* @test
233234
*/
234-
function cut_just_delimiter()
235+
public function cut_just_delimiter()
235236
{
236237
$xs = $this->createHotObservable([
237238
onNext(150, 1),
@@ -241,7 +242,7 @@ function cut_just_delimiter()
241242

242243
$results = $this->scheduler->startWithCreate(function () use ($xs) {
243244
return $xs->lift(function () {
244-
return new CutOperator();
245+
return new CutOperator(PHP_EOL, $this->scheduler);
245246
});
246247
});
247248
$this->assertMessages([
@@ -254,7 +255,7 @@ function cut_just_delimiter()
254255
/**
255256
* @test
256257
*/
257-
function cut_split_delimiter()
258+
public function cut_split_delimiter()
258259
{
259260
$xs = $this->createHotObservable([
260261
onNext(150, 1),
@@ -266,7 +267,7 @@ function cut_split_delimiter()
266267

267268
$results = $this->scheduler->startWithCreate(function () use ($xs) {
268269
return $xs->lift(function () {
269-
return new CutOperator("--");
270+
return new CutOperator("--", $this->scheduler);
270271
});
271272
});
272273
$this->assertMessages([
@@ -280,7 +281,7 @@ function cut_split_delimiter()
280281
/**
281282
* @test
282283
*/
283-
function cut_error()
284+
public function cut_error()
284285
{
285286
$error = new \Exception();
286287

@@ -292,7 +293,7 @@ function cut_error()
292293

293294
$results = $this->scheduler->startWithCreate(function () use ($xs) {
294295
return $xs->lift(function () {
295-
return new CutOperator();
296+
return new CutOperator(PHP_EOL, $this->scheduler);
296297
});
297298
});
298299
$this->assertMessages([
@@ -303,7 +304,7 @@ function cut_error()
303304
/**
304305
* @test
305306
*/
306-
function cut_dispose()
307+
public function cut_dispose()
307308
{
308309
$xs = $this->createHotObservable([
309310
onNext(150, 1),
@@ -313,7 +314,7 @@ function cut_dispose()
313314

314315
$results = $this->scheduler->startWithDispose(function () use ($xs) {
315316
return $xs->lift(function () {
316-
return new CutOperator();
317+
return new CutOperator(PHP_EOL, $this->scheduler);
317318
});
318319
}, 204);
319320

0 commit comments

Comments
 (0)