Skip to content

Commit 056d093

Browse files
committed
修改一下线程中断的逻辑
1 parent 419b180 commit 056d093

File tree

4 files changed

+146
-45
lines changed

4 files changed

+146
-45
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ allprojects {
2525
}
2626
2727
dependencies {
28-
implementation 'com.github.licheedev:Modbus4Android:2.0.0'
28+
implementation 'com.github.licheedev:Modbus4Android:2.0.1'
2929
}
3030
3131
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.licheedev.demo.utils;
2+
3+
import android.support.annotation.NonNull;
4+
import io.reactivex.Observable;
5+
import io.reactivex.ObservableSource;
6+
import io.reactivex.ObservableTransformer;
7+
import io.reactivex.Scheduler;
8+
import io.reactivex.Single;
9+
import io.reactivex.SingleSource;
10+
import io.reactivex.SingleTransformer;
11+
import io.reactivex.android.schedulers.AndroidSchedulers;
12+
import io.reactivex.functions.Function;
13+
import io.reactivex.schedulers.Schedulers;
14+
import java.util.concurrent.TimeUnit;
15+
16+
public class RxUtilEx {
17+
18+
public static <T> ObservableTransformer<T, T> rxIoMain() {
19+
return new ObservableTransformer<T, T>() {
20+
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
21+
return upstream.subscribeOn(Schedulers.io())
22+
.observeOn(AndroidSchedulers.mainThread());
23+
}
24+
};
25+
}
26+
27+
public static <T> SingleTransformer<T, T> rxSingleIoMain() {
28+
return new SingleTransformer<T, T>() {
29+
public SingleSource<T> apply(@io.reactivex.annotations.NonNull Single<T> upstream) {
30+
return upstream.subscribeOn(Schedulers.io())
31+
.observeOn(AndroidSchedulers.mainThread());
32+
}
33+
};
34+
}
35+
36+
/**
37+
* 重试和重复执行
38+
*
39+
* @param retryDelay
40+
* @param repeatDelay
41+
* @param <T>
42+
* @return
43+
*/
44+
public static <T> ObservableTransformer<T, T> retryRepeat(final long retryDelay,
45+
final long repeatDelay) {
46+
return new ObservableTransformer<T, T>() {
47+
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
48+
return upstream.retryWhen(
49+
new Function<Observable<Throwable>, ObservableSource<?>>() {
50+
@Override
51+
public ObservableSource<?> apply(Observable<Throwable> throwableObservable)
52+
throws Exception {
53+
return throwableObservable.delay(retryDelay, TimeUnit.MILLISECONDS);
54+
}
55+
})
56+
// 一定时间后重新查询
57+
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
58+
@Override
59+
public ObservableSource<?> apply(Observable<Object> objectObservable)
60+
throws Exception {
61+
return objectObservable.delay(repeatDelay, TimeUnit.MILLISECONDS);
62+
}
63+
});
64+
}
65+
};
66+
}
67+
68+
/**
69+
* 重试
70+
*
71+
* @param delay
72+
* @param <T>
73+
* @return
74+
*/
75+
public static <T> ObservableTransformer<T, T> retry(final long delay) {
76+
return new ObservableTransformer<T, T>() {
77+
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
78+
return upstream.retryWhen(
79+
new Function<Observable<Throwable>, ObservableSource<?>>() {
80+
@Override
81+
public ObservableSource<?> apply(Observable<Throwable> throwableObservable)
82+
throws Exception {
83+
return throwableObservable.delay(delay, TimeUnit.MILLISECONDS);
84+
}
85+
});
86+
}
87+
};
88+
}
89+
90+
/**
91+
* 重复执行
92+
*
93+
* @param delay
94+
* @param <T>
95+
* @return
96+
*/
97+
public static <T> ObservableTransformer<T, T> repeat(final long delay) {
98+
return new ObservableTransformer<T, T>() {
99+
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
100+
return upstream
101+
// 一定时间后重新查询
102+
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
103+
@Override
104+
public ObservableSource<?> apply(Observable<Object> objectObservable)
105+
throws Exception {
106+
return objectObservable.delay(delay, TimeUnit.MILLISECONDS);
107+
}
108+
});
109+
}
110+
};
111+
}
112+
113+
public static Scheduler io() {
114+
return Schedulers.io();
115+
}
116+
117+
public static Scheduler main() {
118+
return AndroidSchedulers.mainThread();
119+
}
120+
}

modbus4android/src/main/java/com/licheedev/modbus4android/IModbusWorker.java

+13-15
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package com.licheedev.modbus4android;
22

3-
import android.os.Handler;
43
import com.serotonin.modbus4j.ModbusMaster;
54
import com.serotonin.modbus4j.exception.ModbusInitException;
65
import io.reactivex.Observable;
7-
import io.reactivex.Scheduler;
86

97
/**
108
* Modbus工作接口,定义了Modbus初始化相关的方法
@@ -35,19 +33,19 @@ interface IModbusWorker {
3533
*/
3634
boolean isModbusOpened();
3735

38-
/**
39-
* 给RxJava用的,Modbus工作线程调度器
40-
*
41-
* @return
42-
*/
43-
Scheduler getModbusScheduler();
44-
45-
/**
46-
* Modbus工作线程的Handler
47-
*
48-
* @return
49-
*/
50-
Handler getModbusHandler();
36+
///**
37+
// * 给RxJava用的,Modbus工作线程调度器
38+
// *
39+
// * @return
40+
// */
41+
//Scheduler getModbusScheduler();
42+
//
43+
///**
44+
// * Modbus工作线程的Handler
45+
// *
46+
// * @return
47+
// */
48+
//Handler getModbusHandler();
5149

5250
/**
5351
* [RX]初始化modbus

modbus4android/src/main/java/com/licheedev/modbus4android/ModbusWorker.java

+12-29
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package com.licheedev.modbus4android;
22

3-
import android.os.Build;
4-
import android.os.Handler;
5-
import android.os.HandlerThread;
63
import android.support.annotation.NonNull;
74
import android.util.Log;
85
import com.serotonin.modbus4j.ModbusMaster;
@@ -29,16 +26,17 @@
2926
import io.reactivex.ObservableEmitter;
3027
import io.reactivex.ObservableOnSubscribe;
3128
import io.reactivex.Observer;
32-
import io.reactivex.Scheduler;
3329
import io.reactivex.android.schedulers.AndroidSchedulers;
3430
import io.reactivex.disposables.Disposable;
3531
import io.reactivex.exceptions.CompositeException;
3632
import io.reactivex.functions.Action;
3733
import io.reactivex.plugins.RxJavaPlugins;
34+
import io.reactivex.schedulers.Schedulers;
3835
import java.util.concurrent.Callable;
3936
import java.util.concurrent.ExecutionException;
4037
import java.util.concurrent.ExecutorService;
4138
import java.util.concurrent.Executors;
39+
import java.util.concurrent.Future;
4240

4341
/**
4442
* ModbusWorker实现,实现了初始化modbus,并增加了线圈、离散量输入、寄存器的读写方法
@@ -50,22 +48,13 @@ public class ModbusWorker implements IModbusWorker {
5048
private static final String NO_INIT_MESSAGE = "ModbusMaster hasn't been inited!";
5149

5250
private final ExecutorService mRequestExecutor;
53-
protected final Scheduler mModbusScheduler;
54-
private final HandlerThread mModbusThread;
55-
private final Handler mModbusHandler;
5651

5752
protected ModbusMaster mModbusMaster;
5853

5954
public ModbusWorker() {
6055

6156
// modbus请求用的单一线程池
6257
mRequestExecutor = Executors.newSingleThreadExecutor();
63-
64-
// 辅助用的工作线程
65-
mModbusThread = new HandlerThread("modbus-working-thread");
66-
mModbusThread.start();
67-
mModbusHandler = new Handler(mModbusThread.getLooper());
68-
mModbusScheduler = AndroidSchedulers.from(mModbusThread.getLooper());
6958
}
7059

7160
/**
@@ -91,12 +80,6 @@ public synchronized void release() {
9180
mModbusMaster = null;
9281
}
9382

94-
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.JELLY_BEAN_MR2) {
95-
mModbusThread.quitSafely();
96-
} else {
97-
mModbusThread.quit();
98-
}
99-
10083
mRequestExecutor.shutdown();
10184
}
10285

@@ -110,14 +93,6 @@ public synchronized boolean isModbusOpened() {
11093
return getModbusMaster() != null;
11194
}
11295

113-
public Scheduler getModbusScheduler() {
114-
return mModbusScheduler;
115-
}
116-
117-
public Handler getModbusHandler() {
118-
return mModbusHandler;
119-
}
120-
12196
@Override
12297
public void checkWorkingState() throws ModbusInitException, IllegalStateException {
12398

@@ -144,8 +119,16 @@ private <T> T doSync(Callable<T> callable)
144119
throws InterruptedException, ModbusInitException, ModbusTransportException,
145120
ModbusRespException, ExecutionException {
146121

122+
Future<T> submit = null;
147123
try {
148-
return mRequestExecutor.submit(callable).get();
124+
submit = mRequestExecutor.submit(callable);
125+
return submit.get();
126+
} catch (InterruptedException e) {
127+
if (submit != null) {
128+
submit.cancel(true);
129+
}
130+
Thread.currentThread().interrupt();
131+
throw e;
149132
} catch (ExecutionException e) {
150133
//e.printStackTrace();
151134
Throwable cause = e.getCause();
@@ -191,7 +174,7 @@ public void subscribe(ObservableEmitter<T> emitter) throws Exception {
191174
}
192175
}
193176
}
194-
}).subscribeOn(mModbusScheduler);
177+
}).subscribeOn(Schedulers.io());
195178
}
196179

197180
/**

0 commit comments

Comments
 (0)