Skip to content

Commit 3c0dad5

Browse files
authoredFeb 22, 2025··
GH-510: Add support for asynchronously reading data from disk using multiple threads (#518)
* update accent4j version * add configuration to enable_async_data_reads * update documentation * add async data reading * update changelog * fix bug in how ToggleQueue checks for whether it is read optimized or not * fix copyright year * upgrade to ccl 3.2.0 for contains/not_contains support * set accent4j dependency to release version of 1.14.0
1 parent a8be25b commit 3c0dad5

File tree

8 files changed

+414
-94
lines changed

8 files changed

+414
-94
lines changed
 

‎CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ We made several changes to improve the safety, scalability and operational effic
4141

4242
##### New Functionality and Enhancements
4343
* Reduced the amount of heap space required for essential storage metadata.
44-
* Added the `enable_efficient_metadata` configuration option to further reduce the amount of heap space required for essential storage metadata. When this option is set to `true`, metadata will occupy approximately one-third less heap space and likely improve overall system performance due to a decrease in garbage collection pauses (although per-operation performance may be slightly affected by additional overhead).
44+
* **Efficient Metadata:** Added the `enable_efficient_metadata` configuration option to further reduce the amount of heap space required for essential storage metadata. When this option is set to `true`, metadata will occupy approximately one-third less heap space and likely improve overall system performance due to a decrease in garbage collection pauses (although per-operation performance may be slightly affected by additional overhead).
45+
* **Asynchronous Data Reads:** Added the `enable_async_data_reads` configuration option to allow Concourse Server to *potentially* use multiple threads to read data from disk. When data records are either no longer cached or not eligible to ever be cached (due to space limitations), Concourse Server streams the relevant information from disk on-demand. By default, this is a synchronous process and the performance is linear based on the number of Segment files in the database. With this new configuration option, Concourse Server can now stream the data using multiple threads. Even under high contention, the read performance should be no worse than the default synchronous performance, but there may be additional overhead that reduces peak performance on a per-operation basis.
4546
* Improved write performance of the `set` method in large transactions by creating normalized views of existing data, which are consulted during the method’s implicit `select` read operation.
4647
* Improved the performance of the `verifyOrSet` method by removing redundant internal verification that occurred while finalizing the write.
4748

‎build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ subprojects {
121121
compile 'joda-time:joda-time:2.2'
122122
compile 'org.apache.thrift:libthrift:0.20.0'
123123
compile 'commons-configuration:commons-configuration:1.9'
124-
compile group: 'com.cinchapi', name: 'accent4j', version: '1.13.1', changing:true
124+
compile group: 'com.cinchapi', name: 'accent4j', version: '1.14.0', changing:true
125125
compile 'com.cinchapi:lib-config:1.5.1'
126126
compile group: 'com.cinchapi', name: 'lib-cli', version: '1.1.1', changing:true
127127

‎concourse-server/conf/concourse.yaml

+101-82
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
# The path to the file where access credentials for Concourse Server are
66
# stored. For optimal security, this file should be placed in a separate
7-
# directory from Concourse Server with more restrictive operating system
7+
# directory from Concourse Server with more restrictive operating system
88
# permissions
99
#
1010
# DEFAULT {$concourse.home}/.access
@@ -26,8 +26,8 @@ buffer_directory:
2626
buffer_page_size:
2727

2828
# The listener port (1-65535) for client connections. Choose a port between
29-
# 49152 and 65535 to minimize the possibility of conflicts with other services
30-
# on this host.
29+
# 49152 and 65535 to minimize the possibility of conflicts with other
30+
# services on this host.
3131
#
3232
# DEFAULT: 1717
3333
client_port:
@@ -40,9 +40,9 @@ client_port:
4040
# DEFAULT: {$user.home}/concourse/db
4141
database_directory:
4242

43-
# The default environment that is automatically loaded when the server
44-
# starts and is used whenever a client does not specify an environment
45-
# for the connection.
43+
# The default environment that is automatically loaded when the server starts
44+
# and is used whenever a client does not specify an environment for the
45+
# connection.
4646
#
4747
# DEFAULT: default
4848
default_environment:
@@ -53,17 +53,17 @@ default_environment:
5353
# DEFAULT: false
5454
enable_console_logging:
5555

56-
# The amount of memory that is allocated to the Concourse Server JVM.
57-
# Concourse requires a minimum heap size of 256MB to start, but much
58-
# more is recommended to ensure that read and write operations avoid
59-
# expensive disk seeks where possible. Concourse generally sets both
60-
# the initial and maximum heap sizes to the specified value, so there
61-
# must be enough system memory available for Concourse Server to start.
62-
#
63-
# Be careful and avoid setting the heap size too large because this may
64-
# cause longer garbage collection (gc) pauses or interfere with the ability
65-
# of Concourse Server to memory map (mmap) certain data files. We
66-
# recommend the following sizing guidelines:
56+
# The amount of memory that is allocated to the Concourse Server JVM. Concourse
57+
# requires a minimum heap size of 256MB to start, but much more is recommended
58+
# to ensure that read and write operations avoid expensive disk seeks where
59+
# possible. Concourse generally sets both the initial and maximum heap sizes to
60+
# the specified value, so there must be enough system memory available for
61+
# Concourse Server to start.
62+
#
63+
# Be careful and avoid setting the heap size too large because this may cause
64+
# longer garbage collection (gc) pauses or interfere with the ability of
65+
# Concourse Server to memory map (mmap) certain data files. We recommend the
66+
# following sizing guidelines:
6767
#
6868
# SYSTEM MEMORY | Recommended heap_size
6969
# -----------------------------------------------------------
@@ -89,46 +89,46 @@ http_port:
8989
http_enable_cors:
9090

9191
# A comma separated list of default URIs that are permitted to access HTTP
92-
# endpoints. By default (if enabled), the value of this preference is set
93-
# to the wildcard character '*' which means that any origin is allowed access.
92+
# endpoints. By default (if enabled), the value of this preference is set to
93+
# the wildcard character '*' which means that any origin is allowed access.
9494
# Changing this value to a discrete list will set the default origins that are
9595
# permitted, but individual endpoints may override this value.
9696
#
9797
# DEFAULT: (allow any origin)
9898
http_cors_default_allow_origin:
9999

100-
# A comma separated list of default headers that are sent in response to a
101-
# CORS preflight request to indicate which HTTP headers can be used when
102-
# making the actual request. By default (if enabled), the value of this
103-
# preference is set to the wildcard character '*' which means that any headers
104-
# specified in the preflight request are allowed. Changing this value to a
105-
# discrete list will set the default headers that are permitted, but individual
106-
# endpoints may override this value.
100+
# A comma separated list of default headers that are sent in response to a CORS
101+
# preflight request to indicate which HTTP headers can be used when making the
102+
# actual request. By default (if enabled), the value of this preference is set
103+
# to the wildcard character '*' which means that any headers specified in the
104+
# preflight request are allowed. Changing this value to a discrete list will
105+
# set the default headers that are permitted, but individual endpoints may
106+
# override this value.
107107
#
108108
# DEFAULT: (allow any headers)
109109
http_cors_default_allow_headers:
110110

111111
# A comma separated list of default methods that are sent in response to a
112-
# CORS preflight request to indicate which HTTP methods can be used when making
113-
# the actual request. By default (if enabled), the value of this preference is
114-
# set to the wildcard character '*' which means that any method specified in the
115-
# preflight request is allowed. Changing this value to a discrete list will set
116-
# the default methods that are permitted, but individual endpoints may override
117-
# this value.
112+
# CORS preflight request to indicate which HTTP methods can be used when
113+
# making the actual request. By default (if enabled), the value of this
114+
# preference is set to the wildcard character '*' which means that any method
115+
# specified in the preflight request is allowed. Changing this value to a
116+
# discrete list will set the default methods that are permitted, but individual
117+
# endpoints may override this value.
118118
#
119119
# DEFAULT: (allow any method)
120120
http_cors_default_allow_methods:
121121

122-
# The initial root password for Concourse Server. This password is used to set
123-
# up the initial administrator account when the server is first run. It is strongly
124-
# recommended to change this password immediately after the initial setup to maintain
125-
# security.
122+
# The initial root password for Concourse Server. This password is used to set
123+
# up the initial administrator account when the server is first run. It is
124+
# strongly recommended to change this password immediately after the initial
125+
# setup to maintain security.
126126
#
127127
# DEFAULT: "admin"
128128
init_root_password:
129129

130-
# The initial root username for Concourse Server. This username is associated with the
131-
# initial administrator account. It is strongly
130+
# The initial root username for Concourse Server. This username is associated
131+
# with the initial administrator account.
132132
#
133133
# DEFAULT: "admin"
134134
init_root_username:
@@ -148,8 +148,8 @@ jmx_port:
148148
#
149149
# ERROR: critical information when the system reaches a potentially fatal
150150
# state and may not operate normally.
151-
# WARN: useful information when the system reaches a less than ideal state but
152-
# can continue to operate normally.
151+
# WARN: useful information when the system reaches a less than ideal state
152+
# but can continue to operate normally.
153153
# INFO: status information about the system that can be used for sanity
154154
# checking.
155155
# DEBUG: detailed information about the system that can be used to diagnose
@@ -164,16 +164,16 @@ log_level:
164164

165165
# The length of the longest substring that will be indexed for fulltext search.
166166
#
167-
# This value does not mean that longer words will not be indexed. It simply means
168-
# that, for any indexable value (e.g. a String), any substring that is longer than
169-
# the value of this preference will not be added to the search index. The effect is
170-
# that search strings containing any words with a length greater than the value of
171-
# this preference will return 0 results.
172-
173-
# For best performance, this value should be set to the longest word length of any
174-
# possible search string. To be safe, we recommend setting this value to be the
175-
# length of the longest possible word in the search language. For example, the longest
176-
# possible word in English is about 40 characters long.
167+
# This value does not mean that longer words will not be indexed. It simply
168+
# means that, for any indexable value (e.g. a String), any substring that is
169+
# longer than the value of this preference will not be added to the search
170+
# index. The effect is that search strings containing any words with a length
171+
# greater than the value of this preference will return 0 results.
172+
#
173+
# For best performance, this value should be set to the longest word length of
174+
# any possible search string. To be safe, we recommend setting this value to be
175+
# the length of the longest possible word in the search language. For example,
176+
# the longest possible word in English is about 40 characters long.
177177
#
178178
# DEFAULT: 40
179179
max_search_substring_length:
@@ -187,10 +187,10 @@ max_search_substring_length:
187187
# DEFAULT: automatically chosen based on the client_port
188188
shutdown_port:
189189

190-
# The listener port (1-65535) for remote debugger connections. Choose a port between
191-
# 49152 and 65535 to minimize the possibility of conflicts with other services
192-
# on this host. If the value of this preference is set to 0, then remote debugging for
193-
# Concourse Server is disabled.
190+
# The listener port (1-65535) for remote debugger connections. Choose a port
191+
# between 49152 and 65535 to minimize the possibility of conflicts with other
192+
# services on this host. If the value of this preference is set to 0, then
193+
# remote debugging for Concourse Server is disabled.
194194
#
195195
# DEFAULT: 0
196196
remote_debugger_port:
@@ -199,12 +199,31 @@ remote_debugger_port:
199199
### EXPERIMENTAL CONFIGURATION FOR CONCOURSE SERVER ###
200200
#########################################################
201201

202-
# Automatically use a combination of defragmentation, garbage collection and load
203-
# balancing within the data files to optimize storage for read performance.
202+
# Potentially use multiple threads to asynchronously read data from disk.
203+
#
204+
# When enabled, reads will typically be faster when accessing data that is too
205+
# large to ever fit in memory or no longer cached due to memory constraints.
206+
#
207+
# This setting is particularly useful for search data since those indexes are
208+
# not cached by default (unless ENABLE_SEARCH_CACHE is enabled). Even if search
209+
# records are cached, this setting may still provide a performance boost if the
210+
# size of some search metadata exceeds the limits of what is cacheable in
211+
# memory.
212+
#
213+
# NOTE: There might be some overhead that could make some reads slower if all
214+
# their relevant segment metadata is cached and there is high contention.
215+
#
216+
# DEFAULT: false
217+
enable_async_data_reads:
218+
219+
# Automatically use a combination of defragmentation, garbage collection and
220+
# load balancing within the data files to optimize storage for read
221+
# performance.
204222
#
205-
# The compaction process may run continuously in the background without disrupting
206-
# reads or writes. The storage engine uses a specific strategy to determine how
207-
# data files should be reorganized to improve the performance of read operations.
223+
# The compaction process may run continuously in the background without
224+
# disrupting reads or writes. The storage engine uses a specific strategy to
225+
# determine how data files should be reorganized to improve the performance of
226+
# read operations.
208227
#
209228
# DEFAULT: false
210229
enable_compaction:
@@ -215,29 +234,29 @@ enable_compaction:
215234
# for essential metadata by a third. As a result, overall system performance may
216235
# improve due to a reduction in garbage collection pauses.
217236
#
218-
# However, this setting may increase CPU usage and slightly reduce peak performance
219-
# on a per-operation basis due to weaker reference locality.
237+
# However, this setting may increase CPU usage and slightly reduce peak
238+
# performance on a per-operation basis due to weaker reference locality.
220239
#
221240
# DEFAULT: false
222241
enable_efficient_metadata:
223242

224-
# Maintain and in-memory cache of the data indexes used to respond to search commands.
225-
# Search indexes tend to be much larger than those used for primary and secondary
226-
# lookups, so enabling the search cache may cause memory issues (and overall
227-
# performance degradation) if search is heavily used. Furthermore, indexing and
228-
# write performance may suffer if cached search indexes must be incrementally kept
229-
# current.
243+
# Maintain and in-memory cache of the data indexes used to respond to search
244+
# commands. Search indexes tend to be much larger than those used for primary
245+
# and secondary lookups, so enabling the search cache may cause memory issues
246+
# (and overall performance degradation) if search is heavily used. Furthermore,
247+
# indexing and write performance may suffer if cached search indexes must be
248+
# incrementally kept current.
230249
#
231250
# DEFAULT: false
232251
enable_search_cache:
233252

234253
# Attempt to optimize verify commands by using special lookup records.
235254
#
236-
# A lookup record only contains data for a single field. The database does not cache
237-
# lookup records, so, while generating one is theoretically faster than generating a
238-
# full or partial record, repeated attempts to verify data in the same field (e.g. a
239-
# counter whose value is stored against a single locator/key) or record may be slower
240-
# due to lack of caching.
255+
# A lookup record only contains data for a single field. The database does not
256+
# cache lookup records, so, while generating one is theoretically faster than
257+
# generating a full or partial record, repeated attempts to verify data in the
258+
# same field (e.g. a counter whose value is stored against a single
259+
# locator/key) or record may be slower due to lack of caching.
241260
#
242261
# DEFAULT: false
243262
enable_verify_by_lookup:
@@ -247,22 +266,22 @@ enable_verify_by_lookup:
247266
#############################################
248267
init:
249268

250-
# Configuration for the root user. If provided, will override values for flat config
251-
# options that are prefixed with "init_"
269+
# Configuration for the root user. If provided, will override values for flat
270+
# config options that are prefixed with "init_"
252271
root:
253272

254-
# The initial root password for Concourse Server. This password is used to set
255-
# up the initial administrator account when the server is first run. It is
256-
# strongly recommended to change this password immediately after the initial setup
257-
# to maintain security.
273+
# The initial root password for Concourse Server. This password is used to
274+
# set up the initial administrator account when the server is first run. It
275+
# is strongly recommended to change this password immediately after the
276+
# initial setup to maintain security.
258277
#
259278
# DEFAULT: the value of the init_root_password option, if available.
260279
# Otherwise "admin"
261280
password:
262281

263-
# The initial root username for Concourse Server. This username is associated
264-
# with the initial administrator account. It is strongly
282+
# The initial root username for Concourse Server. This username is
283+
# associated with the initial administrator account.
265284
#
266-
# DEFAULT: the value of the init_root_username option, if available.
285+
# DEFAULT: the value of the init_root_username option, if available.
267286
# Otherwise "admin"
268-
username:
287+
username:

‎concourse-server/src/main/java/com/cinchapi/concourse/server/GlobalState.java

+25
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,28 @@ public final class GlobalState extends Constants {
290290
*/
291291
public static String INIT_ROOT_USERNAME = "admin";
292292

293+
/**
294+
* Potentially use multiple threads to asynchronously read data from disk.
295+
* <p>
296+
* When enabled, reads will typically be faster when accessing data too
297+
* large to fit in memory or no longer cached due to memory constraints.
298+
* </p>
299+
* <p>
300+
* This setting is particularly useful for search data since those indexes
301+
* are not cached by default (unless {@link #ENABLE_SEARCH_CACHE} is
302+
* enabled). Even if search records are cached, this setting may still
303+
* provide a performance boost if the size of some search metadata exceeds
304+
* the limits of what is cacheable in memory.
305+
* </p>
306+
* <p>
307+
* <strong>NOTE:</strong> There might be some overhead that could make some
308+
* reads slower if all their relevant segment metadata is cached and there
309+
* is high contention.
310+
* </p>
311+
*/
312+
@Experimental
313+
public static boolean ENABLE_ASYNC_DATA_READS = false;
314+
293315
/**
294316
* Automatically use a combination of defragmentation, garbage collection
295317
* and load balancing within the data files to optimize storage for read
@@ -421,6 +443,9 @@ public final class GlobalState extends Constants {
421443
MAX_SEARCH_SUBSTRING_LENGTH = config.getOrDefault(
422444
"max_search_substring_length", MAX_SEARCH_SUBSTRING_LENGTH);
423445

446+
ENABLE_ASYNC_DATA_READS = config.getOrDefault("enable_async_data_reads",
447+
ENABLE_ASYNC_DATA_READS);
448+
424449
ENABLE_COMPACTION = config.getOrDefault("enable_compaction",
425450
ENABLE_COMPACTION);
426451

‎concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java

+144-8
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.cinchapi.common.base.CheckedExceptions;
5656
import com.cinchapi.common.base.Verify;
5757
import com.cinchapi.common.collect.concurrent.ThreadFactories;
58+
import com.cinchapi.common.concurrent.JoinableExecutorService;
5859
import com.cinchapi.concourse.annotate.Restricted;
5960
import com.cinchapi.concourse.server.GlobalState;
6061
import com.cinchapi.concourse.server.concurrent.AwaitableExecutorService;
@@ -181,6 +182,12 @@ private static Segment findSegment(Collection<Segment> segments,
181182
*/
182183
private static final Cache<Composite, CorpusRecord> DISABLED_CORPUS_CACHE = new NoOpCache<>();
183184

185+
/**
186+
* Global flag to indicate if async data reads are enabled
187+
*/
188+
// Copied here as a final variable for (hopeful) performance gains.
189+
private static final boolean ENABLE_ASYNC_DATA_READS = GlobalState.ENABLE_ASYNC_DATA_READS;
190+
184191
/**
185192
* Global flag that indicates if compaction is enabled.
186193
*/
@@ -299,6 +306,12 @@ private static Segment findSegment(Collection<Segment> segments,
299306
*/
300307
private transient boolean running = false;
301308

309+
/**
310+
* A {@link JoinableExecutorService} that facilitates
311+
* {@link #ENABLE_ASYNC_DATA_READS async data reading} if enabled.
312+
*/
313+
private transient JoinableExecutorService reader;
314+
302315
/**
303316
* We hold direct references to the current Segment. This pointer changes
304317
* whenever the database triggers a sync operation.
@@ -373,6 +386,11 @@ private static Segment findSegment(Collection<Segment> segments,
373386
*/
374387
private transient AwaitableExecutorService writer;
375388

389+
/**
390+
* Dynamic configuration.
391+
*/
392+
private transient Options options = new Options();
393+
376394
/**
377395
* Construct a Database that is backed by the default location which is in
378396
* {@link GlobalState#DATABASE_DIRECTORY}.
@@ -919,6 +937,11 @@ public void start() {
919937
// @formatter:on
920938
Logger.info("Database is running with compaction {}.",
921939
ENABLE_COMPACTION ? "ON" : "OFF");
940+
941+
reader = ENABLE_ASYNC_DATA_READS ? new JoinableExecutorService(
942+
Math.max(3, Runtime.getRuntime().availableProcessors()),
943+
ThreadFactories.namingThreadFactory("DatabaseReader"))
944+
: null;
922945
}
923946

924947
}
@@ -944,6 +967,9 @@ public void stop() {
944967
}
945968
fullCompaction.shutdownNow();
946969
incrementalCompaction.shutdownNow();
970+
if(reader != null) {
971+
reader.shutdown();
972+
}
947973
}
948974
}
949975

@@ -1000,6 +1026,7 @@ public boolean verify(Write write, long timestamp) {
10001026
* @param toks {@code query} split by whitespace
10011027
* @return the CorpusRecord
10021028
*/
1029+
@SuppressWarnings("unchecked")
10031030
private CorpusRecord getCorpusRecord(Text key, Text infix) {
10041031
masterLock.readLock().lock();
10051032
try {
@@ -1009,8 +1036,25 @@ private CorpusRecord getCorpusRecord(Text key, Text infix) {
10091036
: DISABLED_CORPUS_CACHE;
10101037
return cache.get(composite, () -> {
10111038
CorpusRecord $ = CorpusRecord.createPartial(key, infix);
1012-
for (Segment segment : segments) {
1013-
segment.corpus().seek(composite, $);
1039+
if(options.enableAsyncCorpusDataReads()) {
1040+
int i = 0;
1041+
Fragment<Text, Text, Position>[] fragments = new Fragment[segments
1042+
.size()];
1043+
Runnable[] tasks = new Runnable[segments.size()];
1044+
for (Segment segment : segments) {
1045+
Fragment<Text, Text, Position> fragment = new Fragment<>(
1046+
key, infix);
1047+
fragments[i] = fragment;
1048+
tasks[i++] = () -> segment.corpus().seek(composite,
1049+
fragment);
1050+
}
1051+
reader.join(tasks);
1052+
$.append(fragments);
1053+
}
1054+
else {
1055+
for (Segment segment : segments) {
1056+
segment.corpus().seek(composite, $);
1057+
}
10141058
}
10151059
return $;
10161060
});
@@ -1029,14 +1073,32 @@ private CorpusRecord getCorpusRecord(Text key, Text infix) {
10291073
* @param key
10301074
* @return the IndexRecord
10311075
*/
1076+
@SuppressWarnings("unchecked")
10321077
private IndexRecord getIndexRecord(Text key) {
10331078
masterLock.readLock().lock();
10341079
try {
10351080
Composite composite = Composite.create(key);
10361081
return indexCache.get(composite, () -> {
10371082
IndexRecord $ = IndexRecord.create(key);
1038-
for (Segment segment : segments) {
1039-
segment.index().seek(composite, $);
1083+
if(options.enableAsyncIndexDataReads()) {
1084+
int i = 0;
1085+
Fragment<Text, Value, Identifier>[] fragments = new Fragment[segments
1086+
.size()];
1087+
Runnable[] tasks = new Runnable[segments.size()];
1088+
for (Segment segment : segments) {
1089+
Fragment<Text, Value, Identifier> fragment = new Fragment<>(
1090+
key, null);
1091+
fragments[i] = fragment;
1092+
tasks[i++] = () -> segment.index().seek(composite,
1093+
fragment);
1094+
}
1095+
reader.join(tasks);
1096+
$.append(fragments);
1097+
}
1098+
else {
1099+
for (Segment segment : segments) {
1100+
segment.index().seek(composite, $);
1101+
}
10401102
}
10411103
return $;
10421104
});
@@ -1112,14 +1174,32 @@ private Record<Identifier, Text, Value> getLookupRecord(Identifier record,
11121174
* @param identifier
11131175
* @return the TableRecord
11141176
*/
1177+
@SuppressWarnings("unchecked")
11151178
private TableRecord getTableRecord(Identifier identifier) {
11161179
masterLock.readLock().lock();
11171180
try {
11181181
Composite composite = Composite.create(identifier);
11191182
return tableCache.get(composite, () -> {
11201183
TableRecord $ = TableRecord.create(identifier);
1121-
for (Segment segment : segments) {
1122-
segment.table().seek(composite, $);
1184+
if(options.enableAsyncTableDataReads()) {
1185+
int i = 0;
1186+
Fragment<Identifier, Text, Value>[] fragments = new Fragment[segments
1187+
.size()];
1188+
Runnable[] tasks = new Runnable[segments.size()];
1189+
for (Segment segment : segments) {
1190+
Fragment<Identifier, Text, Value> fragment = new Fragment<>(
1191+
identifier, null);
1192+
fragments[i] = fragment;
1193+
tasks[i++] = () -> segment.table().seek(composite,
1194+
fragment);
1195+
}
1196+
reader.join(tasks);
1197+
$.append(fragments);
1198+
}
1199+
else {
1200+
for (Segment segment : segments) {
1201+
segment.table().seek(composite, $);
1202+
}
11231203
}
11241204
return $;
11251205
});
@@ -1146,6 +1226,7 @@ private TableRecord getTableRecord(Identifier identifier) {
11461226
* @param key
11471227
* @return the TableRecord
11481228
*/
1229+
@SuppressWarnings("unchecked")
11491230
private TableRecord getTableRecord(Identifier identifier, Text key) {
11501231
masterLock.readLock().lock();
11511232
try {
@@ -1157,8 +1238,25 @@ private TableRecord getTableRecord(Identifier identifier, Text key) {
11571238
Composite composite = Composite.create(identifier, key);
11581239
table = tablePartialCache.get(composite, () -> {
11591240
TableRecord $ = TableRecord.createPartial(identifier, key);
1160-
for (Segment segment : segments) {
1161-
segment.table().seek(composite, $);
1241+
if(options.enableAsyncTableDataReads()) {
1242+
int i = 0;
1243+
Fragment<Identifier, Text, Value>[] fragments = new Fragment[segments
1244+
.size()];
1245+
Runnable[] tasks = new Runnable[segments.size()];
1246+
for (Segment segment : segments) {
1247+
Fragment<Identifier, Text, Value> fragment = new Fragment<>(
1248+
identifier, key);
1249+
fragments[i] = fragment;
1250+
tasks[i++] = () -> segment.table().seek(composite,
1251+
fragment);
1252+
}
1253+
reader.join(tasks);
1254+
$.append(fragments);
1255+
}
1256+
else {
1257+
for (Segment segment : segments) {
1258+
segment.table().seek(composite, $);
1259+
}
11621260
}
11631261
return $;
11641262
});
@@ -1312,6 +1410,44 @@ public boolean contains(String key, long record) {
13121410

13131411
}
13141412

1413+
/**
1414+
* Dynamic runtime configuration options.
1415+
*
1416+
* @author Jeff Nelson
1417+
*/
1418+
private final class Options {
1419+
1420+
/**
1421+
* Return {@code true} if {@link CorpusRecord corpus records} should be
1422+
* read from disk asynchronously.
1423+
*
1424+
* @return the optional configuration value.
1425+
*/
1426+
boolean enableAsyncCorpusDataReads() {
1427+
return running && ENABLE_ASYNC_DATA_READS;
1428+
}
1429+
1430+
/**
1431+
* Return {@code true} if {@link IndexRecord index records} should be
1432+
* read from disk asynchronously.
1433+
*
1434+
* @return the optional configuration value.
1435+
*/
1436+
boolean enableAsyncIndexDataReads() {
1437+
return running && ENABLE_ASYNC_DATA_READS;
1438+
}
1439+
1440+
/**
1441+
* Return {@code true} if {@link TableRecord table records} should be
1442+
* read from disk asynchronously.
1443+
*
1444+
* @return the optional configuration value.
1445+
*/
1446+
boolean enableAsyncTableDataReads() {
1447+
return running && ENABLE_ASYNC_DATA_READS;
1448+
}
1449+
}
1450+
13151451
/**
13161452
* {@link Cache} wrapper that is aware of whether the {@link Database} is
13171453
* running and behaves accordingly.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright (c) 2013-2025 Cinchapi Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.cinchapi.concourse.server.storage.db;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.Set;
22+
23+
import javax.annotation.concurrent.NotThreadSafe;
24+
25+
import com.cinchapi.concourse.server.io.Byteable;
26+
import com.google.common.collect.ImmutableMap;
27+
28+
/**
29+
* A {@link Fragment} represents a subset of another {@link Record}.
30+
* <p>
31+
* {@link Fragment Fragments} are used to asynchronously read {@link Revision
32+
* revisions} into memory to later be {@link Record#append(Fragment...) merged}
33+
* into a complete {@link Record}.
34+
* </p>
35+
* <p>
36+
* Regardless of their locator or key type, {@link Fragment Fragments} are
37+
* optimized for O(1) accumulation of {@link Revision revisions} in a
38+
* <strong>single thread</strong> and do not support any query operations.
39+
* </p>
40+
*
41+
* @author Jeff Nelson
42+
*/
43+
@NotThreadSafe
44+
public class Fragment<L extends Byteable & Comparable<L>, K extends Byteable & Comparable<K>, V extends Byteable & Comparable<V>>
45+
extends Record<L, K, V> {
46+
47+
/**
48+
* The accumulated {@link Revision revisions}.
49+
*/
50+
private final List<Revision<L, K, V>> revisions = new ArrayList<>();
51+
52+
/**
53+
* Construct a new instance.
54+
*
55+
* @param locator
56+
* @param key
57+
*/
58+
protected Fragment(L locator, K key) {
59+
super(locator, key);
60+
}
61+
62+
@Override
63+
public void append(Revision<L, K, V> revision) {
64+
revisions.add(revision);
65+
}
66+
67+
@Override
68+
public int cardinality() {
69+
throw new UnsupportedOperationException();
70+
}
71+
72+
@Override
73+
public boolean contains(K key, V value) {
74+
throw new UnsupportedOperationException();
75+
}
76+
77+
@Override
78+
public boolean contains(K key, V value, long timestamp) {
79+
throw new UnsupportedOperationException();
80+
}
81+
82+
@Override
83+
public Set<V> get(K key) {
84+
throw new UnsupportedOperationException();
85+
}
86+
87+
@Override
88+
public Map<K, Set<V>> getAll() {
89+
throw new UnsupportedOperationException();
90+
}
91+
92+
@Override
93+
public Map<K, Set<V>> getAll(long timestamp) {
94+
throw new UnsupportedOperationException();
95+
}
96+
97+
@Override
98+
public boolean isEmpty() {
99+
return revisions.isEmpty();
100+
}
101+
102+
@Override
103+
public Set<K> keys() {
104+
throw new UnsupportedOperationException();
105+
}
106+
107+
@Override
108+
public Set<K> keys(long timestamp) {
109+
throw new UnsupportedOperationException();
110+
}
111+
112+
@Override
113+
protected Map<K, Set<V>> $createDataMap() {
114+
return ImmutableMap.of();
115+
}
116+
117+
/**
118+
* Return the accumulated {@link Revision revisions} in this
119+
* {@link Fragment}.
120+
*
121+
* @return the {@link Revision revisions}
122+
*/
123+
List<Revision<L, K, V>> revisions() {
124+
return revisions;
125+
}
126+
127+
}

‎concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Record.java

+12
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,18 @@ protected Record(L locator, @Nullable K key) {
127127
this.partial = key != null;
128128
}
129129

130+
/**
131+
* {@link #append(Revision) Append} every {@link Fragment#revisions()
132+
* revision} from each of the {@code fragments}.
133+
*
134+
* @param fragments
135+
*/
136+
public void append(Fragment<L, K, V>... fragments) {
137+
for (Fragment<L, K, V> fragment : fragments) {
138+
fragment.revisions().forEach(this::append);
139+
}
140+
}
141+
130142
/**
131143
* Append {@code revision} to the record by updating the in-memory indices.
132144
* The {@code revision} must have:

‎concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/TableRecord.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
* A logical grouping of data for a single entity.
4242
* <p>
4343
* This is the primary view of stored data within Concourse, similar to a Row in
44-
* a traditional database. PrimaryRecords are designed to efficiently handle
45-
* direct/non-query reads.
44+
* a traditional database. A {@link TableRecord} is designed to efficiently
45+
* handle direct/non-query reads.
4646
* </p>
4747
*
4848
* @author Jeff Nelson

0 commit comments

Comments
 (0)
Please sign in to comment.