Skip to content

Commit fe19779

Browse files
lixiaocuicodingYunhuiChen
authored andcommitted
【chunkserver】split read and write thread in concurrentApplyModel
1 parent 5387615 commit fe19779

30 files changed

+642
-666
lines changed

conf/chunkserver.conf.example

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,14 @@ storeng.sync_write=false
141141

142142
#
143143
# Concurrent apply module
144-
#
145-
# 并发模块的并发度,一般是10
146-
concurrentapply.size=10
147-
# 并发模块线程的队列深度
148-
concurrentapply.queuedepth=1
144+
# 并发模块写线程的并发度,一般是10
145+
wconcurrentapply.size=10
146+
# 并发模块写线程的队列深度
147+
wconcurrentapply.queuedepth=1
148+
# 并发模块读线程的并发度,一般是5
149+
rconcurrentapply.size=5
150+
# 并发模块读线程的队列深度
151+
rconcurrentapply.queuedepth=1
149152

150153
#
151154
# Chunkfile pool

coverage/test.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

coverage/tmp.py

Lines changed: 0 additions & 9 deletions
This file was deleted.

curve-ansible/roles/generate_config/defaults/main.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,10 @@ chunkserver_s3_config_path: /etc/curve/cs_s3.conf
114114
chunkserver_fs_enable_renameat2: true
115115
chunkserver_metric_onoff: true
116116
chunkserver_storeng_sync_write: false
117-
chunkserver_concurrentapply_size: 10
118-
chunkserver_concurrentapply_queuedepth: 1
117+
chunkserver_wconcurrentapply_size: 10
118+
chunkserver_wconcurrentapply_queuedepth: 1
119+
chunkserver_rconcurrentapply_size: 5
120+
chunkserver_rconcurrentapply_queuedepth: 1
119121
chunkserver_chunkfilepool_enable_get_chunk_from_pool: true
120122
chunkserver_chunkfilepool_chunk_file_pool_dir: ./0/
121123
chunkserver_chunkfilepool_cpmeta_file_size: 4096

curve-ansible/roles/generate_config/templates/chunkserver.conf.j2

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,13 @@ storeng.sync_write={{ chunkserver_storeng_sync_write }}
149149
# Concurrent apply module
150150
#
151151
# 并发模块的并发度,一般是10
152-
concurrentapply.size={{ chunkserver_concurrentapply_size }}
152+
wconcurrentapply.size={{ chunkserver_wconcurrentapply_size }}
153153
# 并发模块线程的队列深度
154-
concurrentapply.queuedepth={{ chunkserver_concurrentapply_queuedepth }}
154+
wconcurrentapply.queuedepth={{ chunkserver_wconcurrentapply_queuedepth }}
155+
# 并发模块读线程的并发度,一般是5
156+
rconcurrentapply.size={{ chunkserver_rconcurrentapply_size }}
157+
# 并发模块读线程的队列深度
158+
rconcurrentapply.queuedepth={{ chunkserver_rconcurrentapply_queuedepth }}
155159

156160
#
157161
# Chunkfile pool

deploy/local/chunkserver/conf/chunkserver.conf.0

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@ storeng.sync_write=false
113113
#
114114
concurrentapply.size=10
115115
concurrentapply.queuedepth=1
116+
wconcurrentapply.size=10
117+
wconcurrentapply.queuedepth=1
118+
rconcurrentapply.size=5
119+
rconcurrentapply.queuedepth=1
120+
116121

117122
#
118123
# Chunkfile pool

deploy/local/chunkserver/conf/chunkserver.conf.1

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,10 @@ storeng.sync_write=false
111111
#
112112
# Concurrent apply module
113113
#
114-
concurrentapply.size=10
115-
concurrentapply.queuedepth=1
114+
wconcurrentapply.size=10
115+
wconcurrentapply.queuedepth=1
116+
rconcurrentapply.size=5
117+
rconcurrentapply.queuedepth=1
116118

117119
#
118120
# Chunkfile pool

deploy/local/chunkserver/conf/chunkserver.conf.2

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,10 @@ storeng.sync_write=false
111111
#
112112
# Concurrent apply module
113113
#
114-
concurrentapply.size=10
115-
concurrentapply.queuedepth=1
114+
wconcurrentapply.size=10
115+
wconcurrentapply.queuedepth=1
116+
rconcurrentapply.size=5
117+
rconcurrentapply.queuedepth=1
116118

117119
#
118120
# Chunkfile pool

src/chunkserver/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ cc_library(
6565
"//proto:chunkserver-cc-protos",
6666
"//proto:topology_cc_proto",
6767
"//src/chunkserver/datastore:chunkserver_datastore",
68+
"//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply",
6869
"//src/chunkserver/raftsnapshot:chunkserver-raft-snapshot",
6970
"//src/common:curve_common",
7071
"//src/common:curve_s3_adapter",
@@ -108,6 +109,7 @@ cc_library(
108109
"//proto:chunkserver-cc-protos",
109110
"//proto:topology_cc_proto",
110111
"//src/chunkserver/datastore:chunkserver_datastore",
112+
"//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply",
111113
"//src/chunkserver/raftsnapshot:chunkserver-raft-snapshot",
112114
"//src/common:curve_common",
113115
"//src/common:curve_s3_adapter",
@@ -144,6 +146,7 @@ cc_binary(
144146
"//proto:chunkserver-cc-protos",
145147
"//src/chunkserver:chunkserver-lib",
146148
"//src/chunkserver/datastore:chunkserver_datastore",
149+
"//src/chunkserver/concurrent_apply:chunkserver_concurrent_apply",
147150
"//src/chunkserver/raftsnapshot:chunkserver-raft-snapshot",
148151
"//src/common:curve_common",
149152
"//src/common:curve_s3_adapter",

src/chunkserver/chunkserver.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ using ::curve::fs::LocalFileSystem;
4747
using ::curve::fs::LocalFileSystemOption;
4848
using ::curve::fs::LocalFsFactory;
4949
using ::curve::fs::FileSystemType;
50+
using ::curve::chunkserver::concurrent::ConcurrentApplyModule;
5051

5152
DEFINE_string(conf, "ChunkServer.conf", "Path of configuration file");
5253
DEFINE_string(chunkServerIp, "127.0.0.1", "chunkserver ip");
@@ -104,11 +105,9 @@ int ChunkServer::Run(int argc, char** argv) {
104105

105106
// 初始化并发持久模块
106107
ConcurrentApplyModule concurrentapply;
107-
int size;
108-
LOG_IF(FATAL, !conf.GetIntValue("concurrentapply.size", &size));
109-
int qdepth;
110-
LOG_IF(FATAL, !conf.GetIntValue("concurrentapply.queuedepth", &qdepth));
111-
LOG_IF(FATAL, false == concurrentapply.Init(size, qdepth))
108+
ConcurrentApplyOption concurrentApplyOptions;
109+
InitConcurrentApplyOptions(&conf, &concurrentApplyOptions);
110+
LOG_IF(FATAL, false == concurrentapply.Init(concurrentApplyOptions))
112111
<< "Failed to initialize concurrentapply module!";
113112

114113
// 初始化本地文件系统
@@ -380,6 +379,8 @@ void ChunkServer::Stop() {
380379
brpc::AskToQuit();
381380
}
382381

382+
383+
383384
void ChunkServer::InitChunkFilePoolOptions(
384385
common::Configuration *conf, ChunkfilePoolOptions *chunkFilePoolOptions) {
385386
LOG_IF(FATAL, !conf->GetUInt32Value("global.chunk_size",
@@ -408,6 +409,18 @@ void ChunkServer::InitChunkFilePoolOptions(
408409
}
409410
}
410411

412+
void ChunkServer::InitConcurrentApplyOptions(common::Configuration *conf,
413+
ConcurrentApplyOption *concurrentApplyOptions) {
414+
LOG_IF(FATAL, !conf->GetIntValue(
415+
"rconcurrentapply.size", &concurrentApplyOptions->rconcurrentsize));
416+
LOG_IF(FATAL, !conf->GetIntValue(
417+
"wconcurrentapply.size", &concurrentApplyOptions->wconcurrentsize));
418+
LOG_IF(FATAL, !conf->GetIntValue(
419+
"rconcurrentapply.queuedepth", &concurrentApplyOptions->rqueuedepth));
420+
LOG_IF(FATAL, !conf->GetIntValue(
421+
"wconcurrentapply.queuedepth", &concurrentApplyOptions->wqueuedepth));
422+
}
423+
411424
void ChunkServer::InitCopysetNodeOptions(
412425
common::Configuration *conf, CopysetNodeOptions *copysetNodeOptions) {
413426
LOG_IF(FATAL, !conf->GetStringValue("global.ip", &copysetNodeOptions->ip));

0 commit comments

Comments
 (0)