Skip to content

Commit 39ad1ff

Browse files
author
Rohit Rai
committedMay 11, 2014
Getting in the Hadoop Handler code from Cassandra to Calliope
1 parent 62455dc commit 39ad1ff

27 files changed

+5578
-25
lines changed
 

‎.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ project/project
55
.idea_modules
66
_sites
77
lib_managed
8+
*~

‎project/CalliopeBuild.scala

+7-16
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,20 @@
11
import sbt._
22
import sbt.Keys._
3-
import scala.xml.NodeSeq
43

54
object CalliopeBuild extends Build {
65

76
lazy val USE_CASV2 = System.getenv("USE_CASV2") != null && System.getenv("USE_CASV2").equalsIgnoreCase("true")
87

9-
lazy val VERSION = "0.9.0-U1-" + (if (USE_CASV2) "C2-EA" else "EA")
8+
lazy val VERSION = "0.9.1-U1-C2-EA"
109

11-
lazy val CAS_VERSION = if (USE_CASV2) "2.0.5" else "1.2.16"
10+
lazy val CAS_VERSION = "2.0.7"
1211

13-
lazy val THRIFT_VERSION = if (USE_CASV2) "0.9.1" else "0.7.0"
12+
lazy val THRIFT_VERSION = "0.9.1"
1413

1514
lazy val SCALA_VERSION = "2.10.3"
1615

1716
lazy val DS_DRIVER_VERSION = "2.0.1"
1817

19-
def sparkDependency(scalaVer: String) =
20-
scalaVer match {
21-
case "2.10.3" =>
22-
Seq("org.apache.spark" %% "spark-core" % "0.9.0-incubating",
23-
"org.apache.spark" %% "spark-streaming" % "0.9.0-incubating" % "provided")
24-
25-
case x =>
26-
Seq("org.apache.spark" %% "spark-core" % "0.9.0-incubating",
27-
"org.apache.spark" %% "spark-streaming" % "0.9.0-incubating" % "provided")
28-
}
2918

3019
lazy val calliope = {
3120
val dependencies = Seq(
@@ -34,6 +23,10 @@ object CalliopeBuild extends Build {
3423
"org.apache.thrift" % "libthrift" % THRIFT_VERSION exclude("org.slf4j", "slf4j-api") exclude("javax.servlet", "servlet-api"),
3524
"com.datastax.cassandra" % "cassandra-driver-core" % DS_DRIVER_VERSION,
3625
"org.slf4j" % "slf4j-jdk14" % "1.7.5",
26+
"org.apache.spark" %% "spark-core" % "0.9.1" exclude("org.apache.hadoop", "hadoop-core"),
27+
"org.apache.spark" %% "spark-streaming" % "0.9.1" % "provided",
28+
"org.apache.hadoop" % "hadoop-core" % "1.0.3",
29+
"org.apache.commons" % "commons-lang3" % "3.1",
3730
"org.scalatest" %% "scalatest" % "1.9.1" % "test"
3831
)
3932

@@ -71,8 +64,6 @@ object CalliopeBuild extends Build {
7164

7265
libraryDependencies ++= dependencies,
7366

74-
libraryDependencies <++= (scalaVersion)(sparkDependency),
75-
7667
parallelExecution in Test := false,
7768

7869
pomExtra := pom,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
import com.google.common.collect.ImmutableList;
21+
import com.google.common.collect.Lists;
22+
import org.apache.cassandra.auth.IAuthenticator;
23+
import org.apache.cassandra.dht.IPartitioner;
24+
import org.apache.cassandra.dht.Range;
25+
import org.apache.cassandra.dht.Token;
26+
import org.apache.cassandra.thrift.*;
27+
import org.apache.commons.lang3.StringUtils;
28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.mapred.JobConf;
30+
import org.apache.hadoop.mapreduce.*;
31+
import org.apache.thrift.TApplicationException;
32+
import org.apache.thrift.TException;
33+
import org.apache.thrift.protocol.TBinaryProtocol;
34+
import org.apache.thrift.protocol.TProtocol;
35+
import org.apache.thrift.transport.TTransport;
36+
import org.apache.thrift.transport.TTransportException;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
import java.io.IOException;
41+
import java.util.*;
42+
import java.util.concurrent.*;
43+
44+
45+
public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y> {
46+
private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyInputFormat.class);
47+
48+
public static final String MAPRED_TASK_ID = "mapred.task.id";
49+
// The simple fact that we need this is because the old Hadoop API wants us to "write"
50+
// to the key and value whereas the new asks for it.
51+
// I choose 8kb as the default max key size (instanciated only once), but you can
52+
// override it in your jobConf with this setting.
53+
public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
54+
public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
55+
56+
private String keyspace;
57+
private String cfName;
58+
private IPartitioner partitioner;
59+
60+
protected void validateConfiguration(Configuration conf) {
61+
if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null) {
62+
throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setInputColumnFamily()");
63+
}
64+
if (ConfigHelper.getInputInitialAddress(conf) == null)
65+
throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress");
66+
if (ConfigHelper.getInputPartitioner(conf) == null)
67+
throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
68+
}
69+
70+
public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception {
71+
logger.debug("Creating authenticated client for CF input format");
72+
TTransport transport;
73+
try {
74+
transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
75+
} catch (Exception e) {
76+
throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e);
77+
}
78+
TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
79+
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
80+
81+
// log in
82+
client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
83+
if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null)) {
84+
Map<String, String> creds = new HashMap<String, String>();
85+
creds.put(IAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
86+
creds.put(IAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
87+
AuthenticationRequest authRequest = new AuthenticationRequest(creds);
88+
client.login(authRequest);
89+
}
90+
logger.debug("Authenticated client for CF input format created successfully");
91+
return client;
92+
}
93+
94+
public List<InputSplit> getSplits(JobContext context) throws IOException {
95+
Configuration conf = HadoopCompat.getConfiguration(context);
96+
;
97+
98+
validateConfiguration(conf);
99+
100+
// cannonical ranges and nodes holding replicas
101+
List<TokenRange> masterRangeNodes = getRangeMap(conf);
102+
103+
keyspace = ConfigHelper.getInputKeyspace(conf);
104+
cfName = ConfigHelper.getInputColumnFamily(conf);
105+
partitioner = ConfigHelper.getInputPartitioner(conf);
106+
logger.debug("partitioner is " + partitioner);
107+
108+
109+
// cannonical ranges, split into pieces, fetching the splits in parallel
110+
ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
111+
List<InputSplit> splits = new ArrayList<InputSplit>();
112+
113+
try {
114+
List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
115+
KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
116+
Range<Token> jobRange = null;
117+
if (jobKeyRange != null) {
118+
if (jobKeyRange.start_key != null) {
119+
if (!partitioner.preservesOrder())
120+
throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving paritioner");
121+
if (jobKeyRange.start_token != null)
122+
throw new IllegalArgumentException("only start_key supported");
123+
if (jobKeyRange.end_token != null)
124+
throw new IllegalArgumentException("only start_key supported");
125+
jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key),
126+
partitioner.getToken(jobKeyRange.end_key),
127+
partitioner);
128+
} else if (jobKeyRange.start_token != null) {
129+
jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
130+
partitioner.getTokenFactory().fromString(jobKeyRange.end_token),
131+
partitioner);
132+
} else {
133+
logger.warn("ignoring jobKeyRange specified without start_key or start_token");
134+
}
135+
}
136+
137+
for (TokenRange range : masterRangeNodes) {
138+
if (jobRange == null) {
139+
// for each range, pick a live owner and ask it to compute bite-sized splits
140+
splitfutures.add(executor.submit(new SplitCallable(range, conf)));
141+
} else {
142+
Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token),
143+
partitioner.getTokenFactory().fromString(range.end_token),
144+
partitioner);
145+
146+
if (dhtRange.intersects(jobRange)) {
147+
for (Range<Token> intersection : dhtRange.intersectionWith(jobRange)) {
148+
range.start_token = partitioner.getTokenFactory().toString(intersection.left);
149+
range.end_token = partitioner.getTokenFactory().toString(intersection.right);
150+
// for each range, pick a live owner and ask it to compute bite-sized splits
151+
splitfutures.add(executor.submit(new SplitCallable(range, conf)));
152+
}
153+
}
154+
}
155+
}
156+
157+
// wait until we have all the results back
158+
for (Future<List<InputSplit>> futureInputSplits : splitfutures) {
159+
try {
160+
splits.addAll(futureInputSplits.get());
161+
} catch (Exception e) {
162+
throw new IOException("Could not get input splits", e);
163+
}
164+
}
165+
} finally {
166+
executor.shutdownNow();
167+
}
168+
169+
assert splits.size() > 0;
170+
Collections.shuffle(splits, new Random(System.nanoTime()));
171+
return splits;
172+
}
173+
174+
/**
175+
* Gets a token range and splits it up according to the suggested
176+
* size into input splits that Hadoop can use.
177+
*/
178+
class SplitCallable implements Callable<List<InputSplit>> {
179+
180+
private final TokenRange range;
181+
private final Configuration conf;
182+
183+
public SplitCallable(TokenRange tr, Configuration conf) {
184+
this.range = tr;
185+
this.conf = conf;
186+
}
187+
188+
public List<InputSplit> call() throws Exception {
189+
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
190+
List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
191+
assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
192+
// turn the sub-ranges into InputSplits
193+
String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
194+
195+
int endpointIndex = 0;
196+
for (String endpoint : range.rpc_endpoints) {
197+
String endpoint_address = endpoint;
198+
if (endpoint_address == null || endpoint_address.equals("0.0.0.0"))
199+
endpoint_address = range.endpoints.get(endpointIndex);
200+
endpoints[endpointIndex++] = endpoint_address;
201+
}
202+
203+
Token.TokenFactory factory = partitioner.getTokenFactory();
204+
for (CfSplit subSplit : subSplits) {
205+
Token left = factory.fromString(subSplit.getStart_token());
206+
Token right = factory.fromString(subSplit.getEnd_token());
207+
Range<Token> range = new Range<Token>(left, right, partitioner);
208+
List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
209+
for (Range<Token> subrange : ranges) {
210+
ColumnFamilySplit split =
211+
new ColumnFamilySplit(
212+
factory.toString(subrange.left),
213+
factory.toString(subrange.right),
214+
subSplit.getRow_count(),
215+
endpoints);
216+
217+
logger.debug("adding " + split);
218+
splits.add(split);
219+
}
220+
}
221+
return splits;
222+
}
223+
}
224+
225+
private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException {
226+
int splitsize = ConfigHelper.getInputSplitSize(conf);
227+
for (int i = 0; i < range.rpc_endpoints.size(); i++) {
228+
String host = range.rpc_endpoints.get(i);
229+
230+
if (host == null || host.equals("0.0.0.0"))
231+
host = range.endpoints.get(i);
232+
233+
try {
234+
Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
235+
client.set_keyspace(keyspace);
236+
237+
try {
238+
return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
239+
} catch (TApplicationException e) {
240+
// fallback to guessing split size if talking to a server without describe_splits_ex method
241+
if (e.getType() == TApplicationException.UNKNOWN_METHOD) {
242+
List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
243+
return tokenListToSplits(splitPoints, splitsize);
244+
}
245+
throw e;
246+
}
247+
} catch (IOException e) {
248+
logger.debug("failed connect to endpoint " + host, e);
249+
} catch (InvalidRequestException e) {
250+
throw new RuntimeException(e);
251+
} catch (TException e) {
252+
throw new RuntimeException(e);
253+
}
254+
}
255+
throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
256+
}
257+
258+
private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize) {
259+
List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
260+
for (int j = 0; j < splitTokens.size() - 1; j++)
261+
splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
262+
return splits;
263+
}
264+
265+
private List<TokenRange> getRangeMap(Configuration conf) throws IOException {
266+
Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
267+
268+
List<TokenRange> map;
269+
try {
270+
map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf));
271+
} catch (InvalidRequestException e) {
272+
throw new RuntimeException(e);
273+
} catch (TException e) {
274+
throw new RuntimeException(e);
275+
}
276+
return map;
277+
}
278+
279+
//
280+
// Old Hadoop API
281+
//
282+
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
283+
TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());
284+
List<InputSplit> newInputSplits = this.getSplits(tac);
285+
org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()];
286+
for (int i = 0; i < newInputSplits.size(); i++)
287+
oldInputSplits[i] = (ColumnFamilySplit) newInputSplits.get(i);
288+
return oldInputSplits;
289+
}
290+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
21+
import org.apache.cassandra.auth.IAuthenticator;
22+
import org.apache.cassandra.thrift.AuthenticationRequest;
23+
import org.apache.cassandra.thrift.Cassandra;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.mapreduce.JobContext;
26+
import org.apache.hadoop.mapreduce.OutputCommitter;
27+
import org.apache.hadoop.mapreduce.OutputFormat;
28+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
29+
import org.apache.thrift.protocol.TBinaryProtocol;
30+
import org.apache.thrift.protocol.TProtocol;
31+
import org.apache.thrift.transport.TTransport;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import java.io.IOException;
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
39+
/**
40+
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
41+
* OutputFormat that allows reduce tasks to store keys (and corresponding
42+
* values) as Cassandra rows (and respective columns) in a given
43+
* ColumnFamily.
44+
* <p/>
45+
* <p>
46+
* As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
47+
* Keyspace and ColumnFamily in your
48+
* Hadoop job Configuration. The {@link ConfigHelper} class, through its
49+
* {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
50+
* simple.
51+
* </p>
52+
* <p/>
53+
* <p>
54+
* For the sake of performance, this class employs a lazy write-back caching
55+
* mechanism, where its record writer batches mutations created based on the
56+
* reduce's inputs (in a task-specific map), and periodically makes the changes
57+
* official by sending a batch mutate request to Cassandra.
58+
* </p>
59+
*
60+
* @param <Y>
61+
*/
62+
public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputFormat<K, Y> implements org.apache.hadoop.mapred.OutputFormat<K, Y> {
63+
public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
64+
public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
65+
private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyOutputFormat.class);
66+
67+
68+
/**
69+
* Check for validity of the output-specification for the job.
70+
*
71+
* @param context information about the job
72+
* @throws java.io.IOException when output should not be attempted
73+
*/
74+
public void checkOutputSpecs(JobContext context) {
75+
checkOutputSpecs(HadoopCompat.getConfiguration(context));
76+
}
77+
78+
protected void checkOutputSpecs(Configuration conf) {
79+
if (ConfigHelper.getOutputKeyspace(conf) == null)
80+
throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
81+
if (ConfigHelper.getOutputPartitioner(conf) == null)
82+
throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
83+
if (ConfigHelper.getOutputInitialAddress(conf) == null)
84+
throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
85+
}
86+
87+
/**
88+
* Fills the deprecated OutputFormat interface for streaming.
89+
*/
90+
@Deprecated
91+
public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException {
92+
checkOutputSpecs(job);
93+
}
94+
95+
/**
96+
* The OutputCommitter for this format does not write any data to the DFS.
97+
*
98+
* @param context the task context
99+
* @return an output committer
100+
* @throws java.io.IOException
101+
* @throws InterruptedException
102+
*/
103+
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
104+
return new NullOutputCommitter();
105+
}
106+
107+
/**
108+
* Connects to the given server:port and returns a client based on the given socket that points to the configured
109+
* keyspace, and is logged in with the configured credentials.
110+
*
111+
* @param host fully qualified host name to connect to
112+
* @param port RPC port of the server
113+
* @param conf a job configuration
114+
* @return a cassandra client
115+
* @throws Exception set of thrown exceptions may be implementation defined,
116+
* depending on the used transport factory
117+
*/
118+
public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception {
119+
logger.debug("Creating authenticated client for CF output format");
120+
TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
121+
TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
122+
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
123+
client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
124+
if ((ConfigHelper.getOutputKeyspaceUserName(conf) != null) && (ConfigHelper.getOutputKeyspacePassword(conf) != null)) {
125+
Map<String, String> creds = new HashMap<String, String>();
126+
creds.put(IAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(conf));
127+
creds.put(IAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(conf));
128+
AuthenticationRequest authRequest = new AuthenticationRequest(creds);
129+
client.login(authRequest);
130+
}
131+
logger.debug("Authenticated client for CF output format created successfully");
132+
return client;
133+
}
134+
135+
/**
136+
* An {@link org.apache.hadoop.mapreduce.OutputCommitter} that does nothing.
137+
*/
138+
private static class NullOutputCommitter extends OutputCommitter {
139+
public void abortTask(TaskAttemptContext taskContext) {
140+
}
141+
142+
public void cleanupJob(JobContext jobContext) {
143+
}
144+
145+
public void commitTask(TaskAttemptContext taskContext) {
146+
}
147+
148+
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
149+
return false;
150+
}
151+
152+
public void setupJob(JobContext jobContext) {
153+
}
154+
155+
public void setupTask(TaskAttemptContext taskContext) {
156+
}
157+
}
158+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
21+
import org.apache.cassandra.client.RingCache;
22+
import org.apache.cassandra.thrift.Cassandra;
23+
import org.apache.cassandra.thrift.ConsistencyLevel;
24+
import org.apache.cassandra.utils.FBUtilities;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.mapreduce.RecordWriter;
27+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
28+
import org.apache.hadoop.util.Progressable;
29+
import org.apache.thrift.transport.TTransport;
30+
31+
import java.io.IOException;
32+
import java.net.InetAddress;
33+
import java.util.List;
34+
import java.util.concurrent.ArrayBlockingQueue;
35+
import java.util.concurrent.BlockingQueue;
36+
import java.util.concurrent.TimeUnit;
37+
38+
39+
/**
40+
* The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
41+
* pairs to a Cassandra column family. In particular, it applies all mutations
42+
* in the value, which it associates with the key, and in turn the responsible
43+
* endpoint.
44+
* <p/>
45+
* <p>
46+
* Furthermore, this writer groups the mutations by the endpoint responsible for
47+
* the rows being affected. This allows the mutations to be executed in parallel,
48+
* directly to a responsible endpoint.
49+
* </p>
50+
*
51+
* @see ColumnFamilyOutputFormat
52+
*/
53+
public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWriter<K, Y> implements org.apache.hadoop.mapred.RecordWriter<K, Y> {
54+
// The configuration this writer is associated with.
55+
protected final Configuration conf;
56+
57+
// The ring cache that describes the token ranges each node in the ring is
58+
// responsible for. This is what allows us to group the mutations by
59+
// the endpoints they should be targeted at. The targeted endpoint
60+
// essentially
61+
// acts as the primary replica for the rows being affected by the mutations.
62+
protected final RingCache ringCache;
63+
64+
// The number of mutations to buffer per endpoint
65+
protected final int queueSize;
66+
67+
protected final long batchThreshold;
68+
69+
protected final ConsistencyLevel consistencyLevel;
70+
protected Progressable progressable;
71+
protected TaskAttemptContext context;
72+
73+
protected AbstractColumnFamilyRecordWriter(Configuration conf) {
74+
this.conf = conf;
75+
this.ringCache = new RingCache(conf);
76+
this.queueSize = conf.getInt(AbstractColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
77+
batchThreshold = conf.getLong(AbstractColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
78+
consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
79+
}
80+
81+
/**
82+
* Close this <code>RecordWriter</code> to future operations, but not before
83+
* flushing out the batched mutations.
84+
*
85+
* @param context the context of the task
86+
* @throws java.io.IOException
87+
*/
88+
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
89+
close();
90+
}
91+
92+
/**
93+
* Fills the deprecated RecordWriter interface for streaming.
94+
*/
95+
@Deprecated
96+
public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException {
97+
close();
98+
}
99+
100+
protected abstract void close() throws IOException;
101+
102+
/**
103+
* A client that runs in a threadpool and connects to the list of endpoints for a particular
104+
* range. Mutations for keys in that range are sent to this client via a queue.
105+
*/
106+
public abstract class AbstractRangeClient<K> extends Thread {
107+
// The list of endpoints for this range
108+
protected final List<InetAddress> endpoints;
109+
// A bounded queue of incoming mutations for this range
110+
protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize);
111+
112+
protected volatile boolean run = true;
113+
// we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
114+
// so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
115+
// when the client is closed.
116+
protected volatile IOException lastException;
117+
118+
protected Cassandra.Client client;
119+
120+
/**
121+
* Constructs an {@link AbstractColumnFamilyRecordWriter.AbstractRangeClient} for the given endpoints.
122+
*
123+
* @param endpoints the possible endpoints to execute the mutations on
124+
*/
125+
public AbstractRangeClient(List<InetAddress> endpoints) {
126+
super("client-" + endpoints);
127+
this.endpoints = endpoints;
128+
}
129+
130+
/**
131+
* enqueues the given value to Cassandra
132+
*/
133+
public void put(K value) throws IOException {
134+
while (true) {
135+
if (lastException != null)
136+
throw lastException;
137+
try {
138+
if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
139+
break;
140+
} catch (InterruptedException e) {
141+
throw new AssertionError(e);
142+
}
143+
}
144+
}
145+
146+
public void close() throws IOException {
147+
// stop the run loop. this will result in closeInternal being called by the time join() finishes.
148+
run = false;
149+
interrupt();
150+
try {
151+
this.join();
152+
} catch (InterruptedException e) {
153+
throw new AssertionError(e);
154+
}
155+
156+
if (lastException != null)
157+
throw lastException;
158+
}
159+
160+
protected void closeInternal() {
161+
if (client != null) {
162+
TTransport transport = client.getOutputProtocol().getTransport();
163+
if (transport.isOpen())
164+
transport.close();
165+
}
166+
}
167+
168+
/**
169+
* Loops collecting mutations from the queue and sending to Cassandra
170+
*/
171+
public abstract void run();
172+
173+
@Override
174+
public String toString() {
175+
return "#<Client for " + endpoints.toString() + ">";
176+
}
177+
}
178+
}
179+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
21+
import org.apache.cassandra.thrift.Mutation;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.mapreduce.JobContext;
24+
import org.apache.hadoop.mapreduce.OutputCommitter;
25+
import org.apache.hadoop.mapreduce.OutputFormat;
26+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
27+
28+
import java.io.IOException;
29+
import java.nio.ByteBuffer;
30+
import java.util.List;
31+
32+
public class BulkOutputFormat extends OutputFormat<ByteBuffer, List<Mutation>>
33+
implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer, List<Mutation>> {
34+
@Override
35+
public void checkOutputSpecs(JobContext context) {
36+
checkOutputSpecs(HadoopCompat.getConfiguration(context));
37+
}
38+
39+
private void checkOutputSpecs(Configuration conf) {
40+
if (ConfigHelper.getOutputKeyspace(conf) == null) {
41+
throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
42+
}
43+
}
44+
45+
@Override
46+
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
47+
return new NullOutputCommitter();
48+
}
49+
50+
/**
51+
* Fills the deprecated OutputFormat interface for streaming.
52+
*/
53+
@Deprecated
54+
public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException {
55+
checkOutputSpecs(job);
56+
}
57+
58+
/**
59+
* Fills the deprecated OutputFormat interface for streaming.
60+
*/
61+
@Deprecated
62+
public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException {
63+
return new BulkRecordWriter(job, progress);
64+
}
65+
66+
@Override
67+
public BulkRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
68+
return new BulkRecordWriter(context);
69+
}
70+
71+
public static class NullOutputCommitter extends OutputCommitter {
72+
public void abortTask(TaskAttemptContext taskContext) {
73+
}
74+
75+
public void cleanupJob(JobContext jobContext) {
76+
}
77+
78+
public void commitTask(TaskAttemptContext taskContext) {
79+
}
80+
81+
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
82+
return false;
83+
}
84+
85+
public void setupJob(JobContext jobContext) {
86+
}
87+
88+
public void setupTask(TaskAttemptContext taskContext) {
89+
}
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
import org.apache.cassandra.auth.IAuthenticator;
21+
import org.apache.cassandra.config.CFMetaData;
22+
import org.apache.cassandra.config.Config;
23+
import org.apache.cassandra.config.DatabaseDescriptor;
24+
import org.apache.cassandra.db.marshal.AbstractType;
25+
import org.apache.cassandra.db.marshal.BytesType;
26+
import org.apache.cassandra.dht.Range;
27+
import org.apache.cassandra.dht.Token;
28+
import org.apache.cassandra.io.sstable.SSTableLoader;
29+
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
30+
import org.apache.cassandra.streaming.StreamState;
31+
import org.apache.cassandra.thrift.*;
32+
import org.apache.cassandra.utils.OutputHandler;
33+
import org.apache.hadoop.conf.Configuration;
34+
import org.apache.hadoop.mapreduce.RecordWriter;
35+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
36+
import org.apache.hadoop.util.Progressable;
37+
import org.apache.thrift.protocol.TBinaryProtocol;
38+
import org.apache.thrift.protocol.TProtocol;
39+
import org.apache.thrift.transport.TFramedTransport;
40+
import org.apache.thrift.transport.TSocket;
41+
import org.apache.thrift.transport.TTransport;
42+
import org.apache.thrift.transport.TTransportException;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
import java.io.File;
47+
import java.io.IOException;
48+
import java.net.InetAddress;
49+
import java.net.UnknownHostException;
50+
import java.nio.ByteBuffer;
51+
import java.util.*;
52+
import java.util.concurrent.ExecutionException;
53+
import java.util.concurrent.Future;
54+
import java.util.concurrent.TimeUnit;
55+
import java.util.concurrent.TimeoutException;
56+
57+
final class BulkRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>>
58+
implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>> {
59+
private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
60+
private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
61+
private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
62+
private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
63+
private final Configuration conf;
64+
private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
65+
private SSTableSimpleUnsortedWriter writer;
66+
private SSTableLoader loader;
67+
private File outputdir;
68+
private Progressable progress;
69+
private TaskAttemptContext context;
70+
private int maxFailures;
71+
72+
private enum CFType {
73+
NORMAL,
74+
SUPER,
75+
}
76+
77+
private enum ColType {
78+
NORMAL,
79+
COUNTER
80+
}
81+
82+
private CFType cfType;
83+
private ColType colType;
84+
85+
BulkRecordWriter(TaskAttemptContext context) {
86+
this(HadoopCompat.getConfiguration(context));
87+
this.context = context;
88+
}
89+
90+
BulkRecordWriter(Configuration conf, Progressable progress) {
91+
this(conf);
92+
this.progress = progress;
93+
}
94+
95+
BulkRecordWriter(Configuration conf) {
96+
Config.setClientMode(true);
97+
Config.setOutboundBindAny(true);
98+
this.conf = conf;
99+
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
100+
maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
101+
}
102+
103+
private String getOutputLocation() throws IOException {
104+
String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
105+
if (dir == null)
106+
throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
107+
return dir;
108+
}
109+
110+
private void setTypes(Mutation mutation) {
111+
if (cfType == null) {
112+
if (mutation.getColumn_or_supercolumn().isSetSuper_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column())
113+
cfType = CFType.SUPER;
114+
else
115+
cfType = CFType.NORMAL;
116+
if (mutation.getColumn_or_supercolumn().isSetCounter_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column())
117+
colType = ColType.COUNTER;
118+
else
119+
colType = ColType.NORMAL;
120+
}
121+
}
122+
123+
private void prepareWriter() throws IOException {
124+
if (outputdir == null) {
125+
String keyspace = ConfigHelper.getOutputKeyspace(conf);
126+
//dir must be named by ks/cf for the loader
127+
outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
128+
outputdir.mkdirs();
129+
}
130+
131+
if (writer == null) {
132+
AbstractType<?> subcomparator = null;
133+
ExternalClient externalClient = null;
134+
String username = ConfigHelper.getOutputKeyspaceUserName(conf);
135+
String password = ConfigHelper.getOutputKeyspacePassword(conf);
136+
137+
if (cfType == CFType.SUPER)
138+
subcomparator = BytesType.instance;
139+
140+
this.writer = new SSTableSimpleUnsortedWriter(
141+
outputdir,
142+
ConfigHelper.getOutputPartitioner(conf),
143+
ConfigHelper.getOutputKeyspace(conf),
144+
ConfigHelper.getOutputColumnFamily(conf),
145+
BytesType.instance,
146+
subcomparator,
147+
Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")),
148+
ConfigHelper.getOutputCompressionParamaters(conf));
149+
150+
externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf),
151+
ConfigHelper.getOutputRpcPort(conf),
152+
username,
153+
password);
154+
155+
this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler());
156+
}
157+
}
158+
159+
@Override
160+
public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException {
161+
setTypes(value.get(0));
162+
prepareWriter();
163+
writer.newRow(keybuff);
164+
for (Mutation mut : value) {
165+
if (cfType == CFType.SUPER) {
166+
writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
167+
if (colType == ColType.COUNTER)
168+
for (CounterColumn column : mut.getColumn_or_supercolumn().getCounter_super_column().columns)
169+
writer.addCounterColumn(column.name, column.value);
170+
else {
171+
for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns) {
172+
if (column.ttl == 0)
173+
writer.addColumn(column.name, column.value, column.timestamp);
174+
else
175+
writer.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long) column.ttl * 1000));
176+
}
177+
}
178+
} else {
179+
if (colType == ColType.COUNTER)
180+
writer.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value);
181+
else {
182+
if (mut.getColumn_or_supercolumn().column.ttl == 0)
183+
writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
184+
else
185+
writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long) (mut.getColumn_or_supercolumn().column.ttl) * 1000));
186+
}
187+
}
188+
if (null != progress)
189+
progress.progress();
190+
if (null != context)
191+
HadoopCompat.progress(context);
192+
}
193+
}
194+
195+
@Override
196+
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
197+
close();
198+
}
199+
200+
/**
201+
* Fills the deprecated RecordWriter interface for streaming.
202+
*/
203+
@Deprecated
204+
public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException {
205+
close();
206+
}
207+
208+
private void close() throws IOException {
209+
if (writer != null) {
210+
writer.close();
211+
Future<StreamState> future = loader.stream();
212+
while (true) {
213+
try {
214+
future.get(1000, TimeUnit.MILLISECONDS);
215+
break;
216+
} catch (ExecutionException | TimeoutException te) {
217+
if (null != progress)
218+
progress.progress();
219+
if (null != context)
220+
HadoopCompat.progress(context);
221+
} catch (InterruptedException e) {
222+
throw new IOException(e);
223+
}
224+
}
225+
if (loader.getFailedHosts().size() > 0) {
226+
if (loader.getFailedHosts().size() > maxFailures)
227+
throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
228+
else
229+
logger.warn("Some hosts failed: " + loader.getFailedHosts());
230+
}
231+
}
232+
}
233+
234+
static class ExternalClient extends SSTableLoader.Client {
235+
private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
236+
private final String hostlist;
237+
private final int rpcPort;
238+
private final String username;
239+
private final String password;
240+
241+
public ExternalClient(String hostlist, int port, String username, String password) {
242+
super();
243+
this.hostlist = hostlist;
244+
this.rpcPort = port;
245+
this.username = username;
246+
this.password = password;
247+
}
248+
249+
public void init(String keyspace) {
250+
Set<InetAddress> hosts = new HashSet<InetAddress>();
251+
String[] nodes = hostlist.split(",");
252+
for (String node : nodes) {
253+
try {
254+
hosts.add(InetAddress.getByName(node));
255+
} catch (UnknownHostException e) {
256+
throw new RuntimeException(e);
257+
}
258+
}
259+
Iterator<InetAddress> hostiter = hosts.iterator();
260+
while (hostiter.hasNext()) {
261+
try {
262+
InetAddress host = hostiter.next();
263+
Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
264+
265+
// log in
266+
client.set_keyspace(keyspace);
267+
if (username != null) {
268+
Map<String, String> creds = new HashMap<String, String>();
269+
creds.put(IAuthenticator.USERNAME_KEY, username);
270+
creds.put(IAuthenticator.PASSWORD_KEY, password);
271+
AuthenticationRequest authRequest = new AuthenticationRequest(creds);
272+
client.login(authRequest);
273+
}
274+
275+
List<TokenRange> tokenRanges = client.describe_ring(keyspace);
276+
List<KsDef> ksDefs = client.describe_keyspaces();
277+
278+
setPartitioner(client.describe_partitioner());
279+
Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
280+
281+
for (TokenRange tr : tokenRanges) {
282+
Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
283+
for (String ep : tr.endpoints) {
284+
addRangeForEndpoint(range, InetAddress.getByName(ep));
285+
}
286+
}
287+
288+
for (KsDef ksDef : ksDefs) {
289+
Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
290+
for (CfDef cfDef : ksDef.cf_defs)
291+
cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
292+
knownCfs.put(ksDef.name, cfs);
293+
}
294+
break;
295+
} catch (Exception e) {
296+
if (!hostiter.hasNext())
297+
throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
298+
}
299+
}
300+
}
301+
302+
public CFMetaData getCFMetaData(String keyspace, String cfName) {
303+
Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
304+
return cfs != null ? cfs.get(cfName) : null;
305+
}
306+
307+
private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException {
308+
TSocket socket = new TSocket(host, port);
309+
TTransport trans = new TFramedTransport(socket);
310+
trans.open();
311+
TProtocol protocol = new TBinaryProtocol(trans);
312+
return new Cassandra.Client(protocol);
313+
}
314+
}
315+
316+
static class NullOutputHandler implements OutputHandler {
317+
public void output(String msg) {
318+
}
319+
320+
public void debug(String msg) {
321+
}
322+
323+
public void warn(String msg) {
324+
}
325+
326+
public void warn(String msg, Throwable th) {
327+
}
328+
}
329+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
import org.apache.cassandra.db.Column;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.mapred.JobConf;
23+
import org.apache.hadoop.mapred.Reporter;
24+
import org.apache.hadoop.mapreduce.InputSplit;
25+
import org.apache.hadoop.mapreduce.RecordReader;
26+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
27+
import org.apache.hadoop.mapreduce.TaskAttemptID;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.SortedMap;
32+
33+
/**
34+
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
35+
* <p/>
36+
* At minimum, you need to set the CF and predicate (description of columns to extract from each row)
37+
* in your Hadoop job Configuration. The ConfigHelper class is provided to make this
38+
* simple:
39+
* ConfigHelper.setInputColumnFamily
40+
* ConfigHelper.setInputSlicePredicate
41+
* <p/>
42+
* You can also configure the number of rows per InputSplit with
43+
* ConfigHelper.setInputSplitSize
44+
* This should be "as big as possible, but no bigger." Each InputSplit is read from Cassandra
45+
* with multiple get_slice_range queries, and the per-call overhead of get_slice_range is high,
46+
* so larger split sizes are better -- but if it is too large, you will run out of memory.
47+
* <p/>
48+
* The default split size is 64k rows.
49+
*/
50+
public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>> {
51+
52+
public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
53+
return new ColumnFamilyRecordReader();
54+
}
55+
56+
public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException {
57+
TaskAttemptContext tac = HadoopCompat.newMapContext(
58+
jobConf,
59+
TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)),
60+
null,
61+
null,
62+
null,
63+
new ReporterWrapper(reporter),
64+
null);
65+
66+
ColumnFamilyRecordReader recordReader = new ColumnFamilyRecordReader(jobConf.getInt(CASSANDRA_HADOOP_MAX_KEY_SIZE, CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT));
67+
recordReader.initialize((InputSplit) split, tac);
68+
return recordReader;
69+
}
70+
71+
@Override
72+
protected void validateConfiguration(Configuration conf) {
73+
super.validateConfiguration(conf);
74+
75+
if (ConfigHelper.getInputSlicePredicate(conf) == null) {
76+
throw new UnsupportedOperationException("you must set the predicate with setInputSlicePredicate");
77+
}
78+
}
79+
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
21+
import org.apache.cassandra.thrift.Mutation;
22+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
23+
24+
import java.nio.ByteBuffer;
25+
import java.util.List;
26+
27+
/**
28+
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
29+
* OutputFormat that allows reduce tasks to store keys (and corresponding
30+
* values) as Cassandra rows (and respective columns) in a given
31+
* ColumnFamily.
32+
* <p/>
33+
* <p>
34+
* As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
35+
* Keyspace and ColumnFamily in your
36+
* Hadoop job Configuration. The {@link ConfigHelper} class, through its
37+
* {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
38+
* simple.
39+
* </p>
40+
* <p/>
41+
* <p>
42+
* For the sake of performance, this class employs a lazy write-back caching
43+
* mechanism, where its record writer batches mutations created based on the
44+
* reduce's inputs (in a task-specific map), and periodically makes the changes
45+
* official by sending a batch mutate request to Cassandra.
46+
* </p>
47+
*/
48+
public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<ByteBuffer, List<Mutation>> {
49+
/**
50+
* Fills the deprecated OutputFormat interface for streaming.
51+
*/
52+
@Deprecated
53+
public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) {
54+
return new ColumnFamilyRecordWriter(job, progress);
55+
}
56+
57+
/**
58+
* Get the {@link org.apache.hadoop.mapreduce.RecordWriter} for the given task.
59+
*
60+
* @param context the information about the current task.
61+
* @return a {@link org.apache.hadoop.mapreduce.RecordWriter} to write the output for the job.
62+
* @throws java.io.IOException
63+
*/
64+
public ColumnFamilyRecordWriter getRecordWriter(final TaskAttemptContext context) throws InterruptedException {
65+
return new ColumnFamilyRecordWriter(context);
66+
}
67+
}

‎src/main/java/com/tuplejump/calliope/hadoop/ColumnFamilyRecordReader.java

+590
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
21+
import java.io.IOException;
22+
import java.net.InetAddress;
23+
import java.nio.ByteBuffer;
24+
import java.util.*;
25+
26+
import org.apache.cassandra.dht.Range;
27+
import org.apache.cassandra.dht.Token;
28+
import org.apache.cassandra.thrift.*;
29+
import org.apache.cassandra.utils.Pair;
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
32+
import org.apache.thrift.TException;
33+
import org.apache.hadoop.util.Progressable;
34+
35+
36+
/**
37+
* The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
38+
* pairs to a Cassandra column family. In particular, it applies all mutations
39+
* in the value, which it associates with the key, and in turn the responsible
40+
* endpoint.
41+
*
42+
* <p>
43+
* Furthermore, this writer groups the mutations by the endpoint responsible for
44+
* the rows being affected. This allows the mutations to be executed in parallel,
45+
* directly to a responsible endpoint.
46+
* </p>
47+
*
48+
* @see ColumnFamilyOutputFormat
49+
*/
50+
final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<ByteBuffer, List<Mutation>>
51+
{
52+
// handles for clients for each range running in the threadpool
53+
private final Map<Range, RangeClient> clients;
54+
55+
/**
56+
* Upon construction, obtain the map that this writer will use to collect
57+
* mutations, and the ring cache for the given keyspace.
58+
*
59+
* @param context the task attempt context
60+
* @throws java.io.IOException
61+
*/
62+
ColumnFamilyRecordWriter(TaskAttemptContext context)
63+
{
64+
this(HadoopCompat.getConfiguration(context));
65+
this.context = context;
66+
67+
}
68+
ColumnFamilyRecordWriter(Configuration conf, Progressable progressable)
69+
{
70+
this(conf);
71+
this.progressable = progressable;
72+
}
73+
74+
ColumnFamilyRecordWriter(Configuration conf)
75+
{
76+
super(conf);
77+
this.clients = new HashMap<Range, RangeClient>();
78+
}
79+
80+
@Override
81+
public void close() throws IOException
82+
{
83+
// close all the clients before throwing anything
84+
IOException clientException = null;
85+
for (RangeClient client : clients.values())
86+
{
87+
try
88+
{
89+
client.close();
90+
}
91+
catch (IOException e)
92+
{
93+
clientException = e;
94+
}
95+
}
96+
if (clientException != null)
97+
throw clientException;
98+
}
99+
100+
/**
101+
* If the key is to be associated with a valid value, a mutation is created
102+
* for it with the given column family and columns. In the event the value
103+
* in the column is missing (i.e., null), then it is marked for
104+
* {@link org.apache.cassandra.thrift.Deletion}. Similarly, if the entire value for a key is missing
105+
* (i.e., null), then the entire key is marked for {@link org.apache.cassandra.thrift.Deletion}.
106+
* </p>
107+
*
108+
* @param keybuff
109+
* the key to write.
110+
* @param value
111+
* the value to write.
112+
* @throws java.io.IOException
113+
*/
114+
@Override
115+
public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
116+
{
117+
Range<Token> range = ringCache.getRange(keybuff);
118+
119+
// get the client for the given range, or create a new one
120+
RangeClient client = clients.get(range);
121+
if (client == null)
122+
{
123+
// haven't seen keys for this range: create new client
124+
client = new RangeClient(ringCache.getEndpoint(range));
125+
client.start();
126+
clients.put(range, client);
127+
}
128+
129+
for (Mutation amut : value)
130+
client.put(Pair.create(keybuff, amut));
131+
if (progressable != null)
132+
progressable.progress();
133+
if (context != null)
134+
HadoopCompat.progress(context);
135+
}
136+
137+
/**
138+
* A client that runs in a threadpool and connects to the list of endpoints for a particular
139+
* range. Mutations for keys in that range are sent to this client via a queue.
140+
*/
141+
public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>>
142+
{
143+
public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
144+
145+
/**
146+
* Constructs an {@link ColumnFamilyRecordWriter.RangeClient} for the given endpoints.
147+
* @param endpoints the possible endpoints to execute the mutations on
148+
*/
149+
public RangeClient(List<InetAddress> endpoints)
150+
{
151+
super(endpoints);
152+
}
153+
154+
/**
155+
* Loops collecting mutations from the queue and sending to Cassandra
156+
*/
157+
public void run()
158+
{
159+
outer:
160+
while (run || !queue.isEmpty())
161+
{
162+
Pair<ByteBuffer, Mutation> mutation;
163+
try
164+
{
165+
mutation = queue.take();
166+
}
167+
catch (InterruptedException e)
168+
{
169+
// re-check loop condition after interrupt
170+
continue;
171+
}
172+
173+
Map<ByteBuffer, Map<String, List<Mutation>>> batch = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
174+
while (mutation != null)
175+
{
176+
Map<String, List<Mutation>> subBatch = batch.get(mutation.left);
177+
if (subBatch == null)
178+
{
179+
subBatch = Collections.singletonMap(columnFamily, (List<Mutation>) new ArrayList<Mutation>());
180+
batch.put(mutation.left, subBatch);
181+
}
182+
183+
subBatch.get(columnFamily).add(mutation.right);
184+
if (batch.size() >= batchThreshold)
185+
break;
186+
187+
mutation = queue.poll();
188+
}
189+
190+
Iterator<InetAddress> iter = endpoints.iterator();
191+
while (true)
192+
{
193+
// send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
194+
try
195+
{
196+
client.batch_mutate(batch, consistencyLevel);
197+
break;
198+
}
199+
catch (Exception e)
200+
{
201+
closeInternal();
202+
if (!iter.hasNext())
203+
{
204+
lastException = new IOException(e);
205+
break outer;
206+
}
207+
}
208+
209+
// attempt to connect to a different endpoint
210+
try
211+
{
212+
InetAddress address = iter.next();
213+
String host = address.getHostName();
214+
int port = ConfigHelper.getOutputRpcPort(conf);
215+
client = ColumnFamilyOutputFormat.createAuthenticatedClient(host, port, conf);
216+
}
217+
catch (Exception e)
218+
{
219+
closeInternal();
220+
// TException means something unexpected went wrong to that endpoint, so
221+
// we should try again to another. Other exceptions (auth or invalid request) are fatal.
222+
if ((!(e instanceof TException)) || !iter.hasNext())
223+
{
224+
lastException = new IOException(e);
225+
break outer;
226+
}
227+
}
228+
}
229+
}
230+
}
231+
}
232+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
import org.apache.hadoop.io.Writable;
21+
import org.apache.hadoop.mapreduce.InputSplit;
22+
23+
import java.io.DataInput;
24+
import java.io.DataOutput;
25+
import java.io.IOException;
26+
import java.util.Arrays;
27+
28+
public class ColumnFamilySplit extends InputSplit implements Writable, org.apache.hadoop.mapred.InputSplit
29+
{
30+
private String startToken;
31+
private String endToken;
32+
private long length;
33+
private String[] dataNodes;
34+
35+
@Deprecated
36+
public ColumnFamilySplit(String startToken, String endToken, String[] dataNodes)
37+
{
38+
this(startToken, endToken, Long.MAX_VALUE, dataNodes);
39+
}
40+
41+
public ColumnFamilySplit(String startToken, String endToken, long length, String[] dataNodes)
42+
{
43+
assert startToken != null;
44+
assert endToken != null;
45+
this.startToken = startToken;
46+
this.endToken = endToken;
47+
this.length = length;
48+
this.dataNodes = dataNodes;
49+
}
50+
51+
public String getStartToken()
52+
{
53+
return startToken;
54+
}
55+
56+
public String getEndToken()
57+
{
58+
return endToken;
59+
}
60+
61+
// getLength and getLocations satisfy the InputSplit abstraction
62+
63+
public long getLength()
64+
{
65+
return length;
66+
}
67+
68+
public String[] getLocations()
69+
{
70+
return dataNodes;
71+
}
72+
73+
// This should only be used by KeyspaceSplit.read();
74+
protected ColumnFamilySplit() {}
75+
76+
// These three methods are for serializing and deserializing
77+
// KeyspaceSplits as needed by the Writable interface.
78+
public void write(DataOutput out) throws IOException
79+
{
80+
out.writeUTF(startToken);
81+
out.writeUTF(endToken);
82+
out.writeInt(dataNodes.length);
83+
for (String endpoint : dataNodes)
84+
{
85+
out.writeUTF(endpoint);
86+
}
87+
}
88+
89+
public void readFields(DataInput in) throws IOException
90+
{
91+
startToken = in.readUTF();
92+
endToken = in.readUTF();
93+
int numOfEndpoints = in.readInt();
94+
dataNodes = new String[numOfEndpoints];
95+
for(int i = 0; i < numOfEndpoints; i++)
96+
{
97+
dataNodes[i] = in.readUTF();
98+
}
99+
}
100+
101+
@Override
102+
public String toString()
103+
{
104+
return "ColumnFamilySplit(" +
105+
"(" + startToken
106+
+ ", '" + endToken + ']'
107+
+ " @" + (dataNodes == null ? null : Arrays.asList(dataNodes)) + ')';
108+
}
109+
110+
public static ColumnFamilySplit read(DataInput in) throws IOException
111+
{
112+
ColumnFamilySplit w = new ColumnFamilySplit();
113+
w.readFields(in);
114+
return w;
115+
}
116+
}

‎src/main/java/com/tuplejump/calliope/hadoop/ConfigHelper.java

+599
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.tuplejump.calliope.hadoop;
20+
21+
import java.lang.reflect.Constructor;
22+
import java.lang.reflect.Field;
23+
import java.lang.reflect.InvocationTargetException;
24+
import java.lang.reflect.Method;
25+
26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.mapreduce.Counter;
28+
import org.apache.hadoop.mapreduce.InputSplit;
29+
import org.apache.hadoop.mapreduce.JobContext;
30+
import org.apache.hadoop.mapreduce.JobID;
31+
import org.apache.hadoop.mapreduce.MapContext;
32+
import org.apache.hadoop.mapreduce.OutputCommitter;
33+
import org.apache.hadoop.mapreduce.RecordReader;
34+
import org.apache.hadoop.mapreduce.RecordWriter;
35+
import org.apache.hadoop.mapreduce.StatusReporter;
36+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
37+
import org.apache.hadoop.mapreduce.TaskAttemptID;
38+
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
39+
40+
/*
41+
* This is based on ContextFactory.java from hadoop-2.0.x sources.
42+
*/
43+
44+
/**
45+
* Utility methods to allow applications to deal with inconsistencies between
46+
* MapReduce Context Objects API between Hadoop 1.x and 2.x.
47+
*/
48+
public class HadoopCompat {
49+
50+
private static final boolean useV21;
51+
52+
private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
53+
private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
54+
private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
55+
private static final Constructor<?> GENERIC_COUNTER_CONSTRUCTOR;
56+
57+
private static final Field READER_FIELD;
58+
private static final Field WRITER_FIELD;
59+
60+
private static final Method GET_CONFIGURATION_METHOD;
61+
private static final Method SET_STATUS_METHOD;
62+
private static final Method GET_COUNTER_METHOD;
63+
private static final Method INCREMENT_COUNTER_METHOD;
64+
private static final Method GET_TASK_ATTEMPT_ID;
65+
private static final Method PROGRESS_METHOD;
66+
67+
static {
68+
boolean v21 = true;
69+
final String PACKAGE = "org.apache.hadoop.mapreduce";
70+
try {
71+
Class.forName(PACKAGE + ".task.JobContextImpl");
72+
} catch (ClassNotFoundException cnfe) {
73+
v21 = false;
74+
}
75+
useV21 = v21;
76+
Class<?> jobContextCls;
77+
Class<?> taskContextCls;
78+
Class<?> taskIOContextCls;
79+
Class<?> mapContextCls;
80+
Class<?> genericCounterCls;
81+
try {
82+
if (v21) {
83+
jobContextCls =
84+
Class.forName(PACKAGE+".task.JobContextImpl");
85+
taskContextCls =
86+
Class.forName(PACKAGE+".task.TaskAttemptContextImpl");
87+
taskIOContextCls =
88+
Class.forName(PACKAGE+".task.TaskInputOutputContextImpl");
89+
mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
90+
genericCounterCls = Class.forName(PACKAGE+".counters.GenericCounter");
91+
} else {
92+
jobContextCls =
93+
Class.forName(PACKAGE+".JobContext");
94+
taskContextCls =
95+
Class.forName(PACKAGE+".TaskAttemptContext");
96+
taskIOContextCls =
97+
Class.forName(PACKAGE+".TaskInputOutputContext");
98+
mapContextCls = Class.forName(PACKAGE + ".MapContext");
99+
genericCounterCls =
100+
Class.forName("org.apache.hadoop.mapred.Counters$Counter");
101+
102+
}
103+
} catch (ClassNotFoundException e) {
104+
throw new IllegalArgumentException("Can't find class", e);
105+
}
106+
try {
107+
JOB_CONTEXT_CONSTRUCTOR =
108+
jobContextCls.getConstructor(Configuration.class, JobID.class);
109+
JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
110+
TASK_CONTEXT_CONSTRUCTOR =
111+
taskContextCls.getConstructor(Configuration.class,
112+
TaskAttemptID.class);
113+
TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
114+
GENERIC_COUNTER_CONSTRUCTOR =
115+
genericCounterCls.getDeclaredConstructor(String.class,
116+
String.class,
117+
Long.TYPE);
118+
GENERIC_COUNTER_CONSTRUCTOR.setAccessible(true);
119+
120+
if (useV21) {
121+
MAP_CONTEXT_CONSTRUCTOR =
122+
mapContextCls.getDeclaredConstructor(Configuration.class,
123+
TaskAttemptID.class,
124+
RecordReader.class,
125+
RecordWriter.class,
126+
OutputCommitter.class,
127+
StatusReporter.class,
128+
InputSplit.class);
129+
Method get_counter;
130+
try {
131+
get_counter = Class.forName(PACKAGE + ".TaskAttemptContext").getMethod("getCounter", String.class,
132+
String.class);
133+
} catch (Exception e) {
134+
get_counter = Class.forName(PACKAGE + ".TaskInputOutputContext").getMethod("getCounter",
135+
String.class, String.class);
136+
}
137+
GET_COUNTER_METHOD = get_counter;
138+
} else {
139+
MAP_CONTEXT_CONSTRUCTOR =
140+
mapContextCls.getConstructor(Configuration.class,
141+
TaskAttemptID.class,
142+
RecordReader.class,
143+
RecordWriter.class,
144+
OutputCommitter.class,
145+
StatusReporter.class,
146+
InputSplit.class);
147+
GET_COUNTER_METHOD = Class.forName(PACKAGE+".TaskInputOutputContext")
148+
.getMethod("getCounter", String.class, String.class);
149+
}
150+
MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
151+
READER_FIELD = mapContextCls.getDeclaredField("reader");
152+
READER_FIELD.setAccessible(true);
153+
WRITER_FIELD = taskIOContextCls.getDeclaredField("output");
154+
WRITER_FIELD.setAccessible(true);
155+
GET_CONFIGURATION_METHOD = Class.forName(PACKAGE+".JobContext")
156+
.getMethod("getConfiguration");
157+
SET_STATUS_METHOD = Class.forName(PACKAGE+".TaskAttemptContext")
158+
.getMethod("setStatus", String.class);
159+
GET_TASK_ATTEMPT_ID = Class.forName(PACKAGE+".TaskAttemptContext")
160+
.getMethod("getTaskAttemptID");
161+
INCREMENT_COUNTER_METHOD = Class.forName(PACKAGE+".Counter")
162+
.getMethod("increment", Long.TYPE);
163+
PROGRESS_METHOD = Class.forName(PACKAGE+".TaskAttemptContext")
164+
.getMethod("progress");
165+
166+
} catch (SecurityException e) {
167+
throw new IllegalArgumentException("Can't run constructor ", e);
168+
} catch (NoSuchMethodException e) {
169+
throw new IllegalArgumentException("Can't find constructor ", e);
170+
} catch (NoSuchFieldException e) {
171+
throw new IllegalArgumentException("Can't find field ", e);
172+
} catch (ClassNotFoundException e) {
173+
throw new IllegalArgumentException("Can't find class", e);
174+
}
175+
}
176+
177+
/**
178+
* True if runtime Hadoop version is 2.x, false otherwise.
179+
*/
180+
public static boolean isVersion2x() {
181+
return useV21;
182+
}
183+
184+
private static Object newInstance(Constructor<?> constructor, Object...args) {
185+
try {
186+
return constructor.newInstance(args);
187+
} catch (InstantiationException e) {
188+
throw new IllegalArgumentException("Can't instantiate " + constructor, e);
189+
} catch (IllegalAccessException e) {
190+
throw new IllegalArgumentException("Can't instantiate " + constructor, e);
191+
} catch (InvocationTargetException e) {
192+
throw new IllegalArgumentException("Can't instantiate " + constructor, e);
193+
}
194+
}
195+
196+
/**
197+
* Creates JobContext from a JobConf and jobId using the correct constructor
198+
* for based on Hadoop version. <code>jobId</code> could be null.
199+
*/
200+
public static JobContext newJobContext(Configuration conf, JobID jobId) {
201+
return (JobContext) newInstance(JOB_CONTEXT_CONSTRUCTOR, conf, jobId);
202+
}
203+
204+
/**
205+
* Creates TaskAttempContext from a JobConf and jobId using the correct
206+
* constructor for based on Hadoop version.
207+
*/
208+
public static TaskAttemptContext newTaskAttemptContext(
209+
Configuration conf, TaskAttemptID taskAttemptId) {
210+
return (TaskAttemptContext)
211+
newInstance(TASK_CONTEXT_CONSTRUCTOR, conf, taskAttemptId);
212+
}
213+
214+
/**
215+
* Instantiates MapContext under Hadoop 1 and MapContextImpl under Hadoop 2.
216+
*/
217+
public static MapContext newMapContext(Configuration conf,
218+
TaskAttemptID taskAttemptID,
219+
RecordReader recordReader,
220+
RecordWriter recordWriter,
221+
OutputCommitter outputCommitter,
222+
StatusReporter statusReporter,
223+
InputSplit inputSplit) {
224+
return (MapContext) newInstance(MAP_CONTEXT_CONSTRUCTOR,
225+
conf, taskAttemptID, recordReader, recordWriter, outputCommitter,
226+
statusReporter, inputSplit);
227+
}
228+
229+
/**
230+
* @return with Hadoop 2 : <code>new GenericCounter(args)</code>,<br>
231+
* with Hadoop 1 : <code>new Counter(args)</code>
232+
*/
233+
public static Counter newGenericCounter(String name, String displayName, long value) {
234+
try {
235+
return (Counter)
236+
GENERIC_COUNTER_CONSTRUCTOR.newInstance(name, displayName, value);
237+
} catch (InstantiationException e) {
238+
throw new IllegalArgumentException("Can't instantiate Counter", e);
239+
} catch (IllegalAccessException e) {
240+
throw new IllegalArgumentException("Can't instantiate Counter", e);
241+
} catch (InvocationTargetException e) {
242+
throw new IllegalArgumentException("Can't instantiate Counter", e);
243+
}
244+
}
245+
246+
/**
247+
* Invokes a method and rethrows any exception as runtime excetpions.
248+
*/
249+
private static Object invoke(Method method, Object obj, Object... args) {
250+
try {
251+
return method.invoke(obj, args);
252+
} catch (IllegalAccessException e) {
253+
throw new IllegalArgumentException("Can't invoke method " + method.getName(), e);
254+
} catch (InvocationTargetException e) {
255+
throw new IllegalArgumentException("Can't invoke method " + method.getName(), e);
256+
}
257+
}
258+
259+
/**
260+
* Invoke getConfiguration() on JobContext. Works with both
261+
* Hadoop 1 and 2.
262+
*/
263+
public static Configuration getConfiguration(JobContext context) {
264+
return (Configuration) invoke(GET_CONFIGURATION_METHOD, context);
265+
}
266+
267+
/**
268+
* Invoke setStatus() on TaskAttemptContext. Works with both
269+
* Hadoop 1 and 2.
270+
*/
271+
public static void setStatus(TaskAttemptContext context, String status) {
272+
invoke(SET_STATUS_METHOD, context, status);
273+
}
274+
275+
/**
276+
* returns TaskAttemptContext.getTaskAttemptID(). Works with both
277+
* Hadoop 1 and 2.
278+
*/
279+
public static TaskAttemptID getTaskAttemptID(TaskAttemptContext taskContext) {
280+
return (TaskAttemptID) invoke(GET_TASK_ATTEMPT_ID, taskContext);
281+
}
282+
283+
/**
284+
* Invoke getCounter() on TaskInputOutputContext. Works with both
285+
* Hadoop 1 and 2.
286+
*/
287+
public static Counter getCounter(TaskInputOutputContext context,
288+
String groupName, String counterName) {
289+
return (Counter) invoke(GET_COUNTER_METHOD, context, groupName, counterName);
290+
}
291+
292+
/**
293+
* Invoke TaskAttemptContext.progress(). Works with both
294+
* Hadoop 1 and 2.
295+
*/
296+
public static void progress(TaskAttemptContext context) {
297+
invoke(PROGRESS_METHOD, context);
298+
}
299+
300+
/**
301+
* Increment the counter. Works with both Hadoop 1 and 2
302+
*/
303+
public static void incrementCounter(Counter counter, long increment) {
304+
// incrementing a count might be called often. Might be affected by
305+
// cost of invoke(). might be good candidate to handle in a shim.
306+
// (TODO Raghu) figure out how achieve such a build with maven
307+
invoke(INCREMENT_COUNTER_METHOD, counter, increment);
308+
}
309+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop;
19+
20+
import org.apache.hadoop.mapred.Counters;
21+
import org.apache.hadoop.mapred.InputSplit;
22+
import org.apache.hadoop.mapred.Reporter;
23+
import org.apache.hadoop.mapreduce.StatusReporter;
24+
25+
/**
26+
* A reporter that works with both mapred and mapreduce APIs.
27+
*/
28+
public class ReporterWrapper extends StatusReporter implements Reporter {
29+
private Reporter wrappedReporter;
30+
31+
public ReporterWrapper(Reporter reporter) {
32+
wrappedReporter = reporter;
33+
}
34+
35+
@Override
36+
public Counters.Counter getCounter(Enum<?> anEnum) {
37+
return wrappedReporter.getCounter(anEnum);
38+
}
39+
40+
@Override
41+
public Counters.Counter getCounter(String s, String s1) {
42+
return wrappedReporter.getCounter(s, s1);
43+
}
44+
45+
@Override
46+
public void incrCounter(Enum<?> anEnum, long l) {
47+
wrappedReporter.incrCounter(anEnum, l);
48+
}
49+
50+
@Override
51+
public void incrCounter(String s, String s1, long l) {
52+
wrappedReporter.incrCounter(s, s1, l);
53+
}
54+
55+
@Override
56+
public InputSplit getInputSplit() throws UnsupportedOperationException {
57+
return wrappedReporter.getInputSplit();
58+
}
59+
60+
@Override
61+
public void progress() {
62+
wrappedReporter.progress();
63+
}
64+
65+
// @Override
66+
public float getProgress() {
67+
throw new UnsupportedOperationException();
68+
}
69+
70+
@Override
71+
public void setStatus(String s) {
72+
wrappedReporter.setStatus(s);
73+
}
74+
}

‎src/main/java/com/tuplejump/calliope/hadoop/cql3/CqlConfigHelper.java

+548
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop.cql3;
19+
20+
import com.datastax.driver.core.Row;
21+
import com.tuplejump.calliope.hadoop.AbstractColumnFamilyInputFormat;
22+
import org.apache.hadoop.mapred.InputSplit;
23+
import org.apache.hadoop.mapred.JobConf;
24+
import org.apache.hadoop.mapred.RecordReader;
25+
import org.apache.hadoop.mapred.Reporter;
26+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
27+
import org.apache.hadoop.mapreduce.TaskAttemptID;
28+
29+
import java.io.IOException;
30+
31+
/**
32+
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
33+
* <p/>
34+
* At minimum, you need to set the KS and CF in your Hadoop job Configuration.
35+
* The ConfigHelper class is provided to make this
36+
* simple:
37+
* ConfigHelper.setInputColumnFamily
38+
* <p/>
39+
* You can also configure the number of rows per InputSplit with
40+
* ConfigHelper.setInputSplitSize. The default split size is 64k rows.
41+
* <p/>
42+
* the number of CQL rows per page
43+
* CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
44+
* should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL
45+
* query, so you need set it big enough to minimize the network overhead, and also
46+
* not too big to avoid out of memory issue.
47+
* <p/>
48+
* other native protocol connection parameters in CqlConfigHelper
49+
*/
50+
public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row> {
51+
public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
52+
throws IOException {
53+
TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID))) {
54+
@Override
55+
public void progress() {
56+
reporter.progress();
57+
}
58+
};
59+
60+
CqlRecordReader recordReader = new CqlRecordReader();
61+
recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit) split, tac);
62+
return recordReader;
63+
}
64+
65+
@Override
66+
public org.apache.hadoop.mapreduce.RecordReader<Long, Row> createRecordReader(
67+
org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
68+
InterruptedException {
69+
return new CqlRecordReader();
70+
}
71+
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop.cql3;
19+
20+
21+
import com.tuplejump.calliope.hadoop.AbstractColumnFamilyOutputFormat;
22+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
23+
24+
import java.io.IOException;
25+
import java.nio.ByteBuffer;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
/**
30+
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
31+
* OutputFormat that allows reduce tasks to store keys (and corresponding
32+
* binded variable values) as CQL rows (and respective columns) in a given
33+
* ColumnFamily.
34+
* <p/>
35+
* <p>
36+
* As is the case with the {@link com.tuplejump.calliope.hadoop.ColumnFamilyInputFormat},
37+
* you need to set the prepared statement in your
38+
* Hadoop job Configuration. The {@link CqlConfigHelper} class, through its
39+
* {@link com.tuplejump.calliope.hadoop.ConfigHelper} method, is provided to make this
40+
* simple.
41+
* you need to set the Keyspace. The {@link com.tuplejump.calliope.hadoop.ConfigHelper} class, through its
42+
* {@link com.tuplejump.calliope.hadoop.ConfigHelper#setOutputColumnFamily} method, is provided to make this
43+
* simple.
44+
* </p>
45+
* <p/>
46+
* <p>
47+
* For the sake of performance, this class employs a lazy write-back caching
48+
* mechanism, where its record writer prepared statement binded variable values
49+
* created based on the reduce's inputs (in a task-specific map), and periodically
50+
* makes the changes official by sending a execution of prepared statement request
51+
* to Cassandra.
52+
* </p>
53+
*/
54+
public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>> {
55+
/**
56+
* Fills the deprecated OutputFormat interface for streaming.
57+
*/
58+
@Deprecated
59+
public CqlRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException {
60+
return new CqlRecordWriter(job, progress);
61+
}
62+
63+
/**
64+
* Get the {@link org.apache.hadoop.mapreduce.RecordWriter} for the given task.
65+
*
66+
* @param context the information about the current task.
67+
* @return a {@link org.apache.hadoop.mapreduce.RecordWriter} to write the output for the job.
68+
* @throws java.io.IOException
69+
*/
70+
public CqlRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
71+
return new CqlRecordWriter(context);
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop.cql3;
19+
20+
import com.tuplejump.calliope.hadoop.AbstractColumnFamilyInputFormat;
21+
import com.tuplejump.calliope.hadoop.HadoopCompat;
22+
import com.tuplejump.calliope.hadoop.ReporterWrapper;
23+
import org.apache.hadoop.mapred.InputSplit;
24+
import org.apache.hadoop.mapred.JobConf;
25+
import org.apache.hadoop.mapred.RecordReader;
26+
import org.apache.hadoop.mapred.Reporter;
27+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
28+
import org.apache.hadoop.mapreduce.TaskAttemptID;
29+
30+
import java.io.IOException;
31+
import java.nio.ByteBuffer;
32+
import java.util.Map;
33+
34+
/**
35+
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
36+
* <p/>
37+
* At minimum, you need to set the KS and CF in your Hadoop job Configuration.
38+
* The ConfigHelper class is provided to make this
39+
* simple:
40+
* ConfigHelper.setInputColumnFamily
41+
* <p/>
42+
* You can also configure the number of rows per InputSplit with
43+
* ConfigHelper.setInputSplitSize. The default split size is 64k rows.
44+
* the number of CQL rows per page
45+
* <p/>
46+
* the number of CQL rows per page
47+
* CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You
48+
* should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL
49+
* query, so you need set it big enough to minimize the network overhead, and also
50+
* not too big to avoid out of memory issue.
51+
* <p/>
52+
* the column names of the select CQL query. The default is all columns
53+
* CQLConfigHelper.setInputColumns
54+
* <p/>
55+
* the user defined the where clause
56+
* CQLConfigHelper.setInputWhereClauses. The default is no user defined where clause
57+
*/
58+
public class CqlPagingInputFormat extends AbstractColumnFamilyInputFormat<Map<String, ByteBuffer>, Map<String, ByteBuffer>> {
59+
public RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter)
60+
throws IOException {
61+
TaskAttemptContext tac = HadoopCompat.newMapContext(
62+
jobConf,
63+
TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)),
64+
null,
65+
null,
66+
null,
67+
new ReporterWrapper(reporter),
68+
null);
69+
70+
CqlPagingRecordReader recordReader = new CqlPagingRecordReader();
71+
recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit) split, tac);
72+
return recordReader;
73+
}
74+
75+
@Override
76+
public org.apache.hadoop.mapreduce.RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> createRecordReader(
77+
org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException,
78+
InterruptedException {
79+
return new CqlPagingRecordReader();
80+
}
81+
82+
}

‎src/main/java/com/tuplejump/calliope/hadoop/cql3/CqlPagingRecordReader.java

+783
Large diffs are not rendered by default.

‎src/main/java/com/tuplejump/calliope/hadoop/cql3/CqlRecordReader.java

+486
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,396 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.tuplejump.calliope.hadoop.cql3;
19+
20+
import java.io.IOException;
21+
import java.net.InetAddress;
22+
import java.nio.ByteBuffer;
23+
import java.util.*;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
26+
import com.tuplejump.calliope.hadoop.AbstractColumnFamilyRecordWriter;
27+
import com.tuplejump.calliope.hadoop.ConfigHelper;
28+
import com.tuplejump.calliope.hadoop.HadoopCompat;
29+
import org.apache.hadoop.util.Progressable;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import org.apache.cassandra.db.marshal.AbstractType;
34+
import org.apache.cassandra.db.marshal.CompositeType;
35+
import org.apache.cassandra.db.marshal.LongType;
36+
import org.apache.cassandra.db.marshal.TypeParser;
37+
import org.apache.cassandra.dht.Range;
38+
import org.apache.cassandra.dht.Token;
39+
import org.apache.cassandra.exceptions.ConfigurationException;
40+
import org.apache.cassandra.exceptions.SyntaxException;
41+
import org.apache.cassandra.thrift.*;
42+
import org.apache.cassandra.utils.ByteBufferUtil;
43+
import org.apache.cassandra.utils.FBUtilities;
44+
import org.apache.hadoop.conf.Configuration;
45+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
46+
import org.apache.thrift.TException;
47+
import org.apache.thrift.transport.TTransport;
48+
49+
/**
50+
* The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
51+
* pairs to a Cassandra column family. In particular, it applies the binded variables
52+
* in the value to the prepared statement, which it associates with the key, and in
53+
* turn the responsible endpoint.
54+
*
55+
* <p>
56+
* Furthermore, this writer groups the cql queries by the endpoint responsible for
57+
* the rows being affected. This allows the cql queries to be executed in parallel,
58+
* directly to a responsible endpoint.
59+
* </p>
60+
*
61+
* @see CqlOutputFormat
62+
*/
63+
final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>>
64+
{
65+
private static final Logger logger = LoggerFactory.getLogger(CqlRecordWriter.class);
66+
67+
// handles for clients for each range running in the threadpool
68+
private final Map<InetAddress, RangeClient> clients;
69+
70+
// host to prepared statement id mappings
71+
private ConcurrentHashMap<Cassandra.Client, Integer> preparedStatements = new ConcurrentHashMap<Cassandra.Client, Integer>();
72+
73+
private final String cql;
74+
75+
private AbstractType<?> keyValidator;
76+
private String [] partitionKeyColumns;
77+
private List<String> clusterColumns;
78+
79+
/**
80+
* Upon construction, obtain the map that this writer will use to collect
81+
* mutations, and the ring cache for the given keyspace.
82+
*
83+
* @param context the task attempt context
84+
* @throws java.io.IOException
85+
*/
86+
CqlRecordWriter(TaskAttemptContext context) throws IOException
87+
{
88+
this(HadoopCompat.getConfiguration(context));
89+
this.context = context;
90+
}
91+
92+
CqlRecordWriter(Configuration conf, Progressable progressable) throws IOException
93+
{
94+
this(conf);
95+
this.progressable = progressable;
96+
}
97+
98+
CqlRecordWriter(Configuration conf)
99+
{
100+
super(conf);
101+
this.clients = new HashMap<>();
102+
103+
try
104+
{
105+
Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
106+
retrievePartitionKeyValidator(client);
107+
String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
108+
if (cqlQuery.toLowerCase().startsWith("insert"))
109+
throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
110+
cql = appendKeyWhereClauses(cqlQuery);
111+
112+
if (client != null)
113+
{
114+
TTransport transport = client.getOutputProtocol().getTransport();
115+
if (transport.isOpen())
116+
transport.close();
117+
}
118+
}
119+
catch (Exception e)
120+
{
121+
throw new RuntimeException(e);
122+
}
123+
}
124+
125+
@Override
126+
public void close() throws IOException
127+
{
128+
// close all the clients before throwing anything
129+
IOException clientException = null;
130+
for (RangeClient client : clients.values())
131+
{
132+
try
133+
{
134+
client.close();
135+
}
136+
catch (IOException e)
137+
{
138+
clientException = e;
139+
}
140+
}
141+
142+
if (clientException != null)
143+
throw clientException;
144+
}
145+
146+
/**
147+
* If the key is to be associated with a valid value, a mutation is created
148+
* for it with the given column family and columns. In the event the value
149+
* in the column is missing (i.e., null), then it is marked for
150+
* {@link org.apache.cassandra.thrift.Deletion}. Similarly, if the entire value for a key is missing
151+
* (i.e., null), then the entire key is marked for {@link org.apache.cassandra.thrift.Deletion}.
152+
* </p>
153+
*
154+
* @param keyColumns
155+
* the key to write.
156+
* @param values
157+
* the values to write.
158+
* @throws java.io.IOException
159+
*/
160+
@Override
161+
public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
162+
{
163+
Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns));
164+
165+
// get the client for the given range, or create a new one
166+
final InetAddress address = ringCache.getEndpoint(range).get(0);
167+
RangeClient client = clients.get(address);
168+
if (client == null)
169+
{
170+
// haven't seen keys for this range: create new client
171+
client = new RangeClient(ringCache.getEndpoint(range));
172+
client.start();
173+
clients.put(address, client);
174+
}
175+
176+
// add primary key columns to the bind variables
177+
List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values);
178+
for (String column : partitionKeyColumns)
179+
allValues.add(keyColumns.get(column));
180+
for (String column : clusterColumns)
181+
allValues.add(keyColumns.get(column));
182+
183+
client.put(allValues);
184+
185+
if (progressable != null)
186+
progressable.progress();
187+
if (context != null)
188+
HadoopCompat.progress(context);
189+
}
190+
191+
/**
192+
* A client that runs in a threadpool and connects to the list of endpoints for a particular
193+
* range. Bound variables for keys in that range are sent to this client via a queue.
194+
*/
195+
public class RangeClient extends AbstractRangeClient<List<ByteBuffer>>
196+
{
197+
/**
198+
* Constructs an {@link CqlRecordWriter.RangeClient} for the given endpoints.
199+
* @param endpoints the possible endpoints to execute the mutations on
200+
*/
201+
public RangeClient(List<InetAddress> endpoints)
202+
{
203+
super(endpoints);
204+
}
205+
206+
/**
207+
* Loops collecting cql binded variable values from the queue and sending to Cassandra
208+
*/
209+
public void run()
210+
{
211+
outer:
212+
while (run || !queue.isEmpty())
213+
{
214+
List<ByteBuffer> bindVariables;
215+
try
216+
{
217+
bindVariables = queue.take();
218+
}
219+
catch (InterruptedException e)
220+
{
221+
// re-check loop condition after interrupt
222+
continue;
223+
}
224+
225+
Iterator<InetAddress> iter = endpoints.iterator();
226+
while (true)
227+
{
228+
// send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
229+
try
230+
{
231+
int i = 0;
232+
int itemId = preparedStatement(client);
233+
while (bindVariables != null)
234+
{
235+
client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
236+
i++;
237+
238+
if (i >= batchThreshold)
239+
break;
240+
241+
bindVariables = queue.poll();
242+
}
243+
244+
break;
245+
}
246+
catch (Exception e)
247+
{
248+
closeInternal();
249+
if (!iter.hasNext())
250+
{
251+
lastException = new IOException(e);
252+
break outer;
253+
}
254+
}
255+
256+
// attempt to connect to a different endpoint
257+
try
258+
{
259+
InetAddress address = iter.next();
260+
String host = address.getHostName();
261+
int port = ConfigHelper.getOutputRpcPort(conf);
262+
client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
263+
}
264+
catch (Exception e)
265+
{
266+
closeInternal();
267+
// TException means something unexpected went wrong to that endpoint, so
268+
// we should try again to another. Other exceptions (auth or invalid request) are fatal.
269+
if ((!(e instanceof TException)) || !iter.hasNext())
270+
{
271+
lastException = new IOException(e);
272+
break outer;
273+
}
274+
}
275+
}
276+
}
277+
}
278+
279+
/** get prepared statement id from cache, otherwise prepare it from Cassandra server*/
280+
private int preparedStatement(Cassandra.Client client)
281+
{
282+
Integer itemId = preparedStatements.get(client);
283+
if (itemId == null)
284+
{
285+
CqlPreparedResult result;
286+
try
287+
{
288+
result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
289+
}
290+
catch (InvalidRequestException e)
291+
{
292+
throw new RuntimeException("failed to prepare cql query " + cql, e);
293+
}
294+
catch (TException e)
295+
{
296+
throw new RuntimeException("failed to prepare cql query " + cql, e);
297+
}
298+
299+
Integer previousId = preparedStatements.putIfAbsent(client, Integer.valueOf(result.itemId));
300+
itemId = previousId == null ? result.itemId : previousId;
301+
}
302+
return itemId;
303+
}
304+
}
305+
306+
private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
307+
{
308+
ByteBuffer partitionKey;
309+
if (keyValidator instanceof CompositeType)
310+
{
311+
ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length];
312+
for (int i = 0; i< keys.length; i++)
313+
keys[i] = keyColumns.get(partitionKeyColumns[i]);
314+
315+
partitionKey = CompositeType.build(keys);
316+
}
317+
else
318+
{
319+
partitionKey = keyColumns.get(partitionKeyColumns[0]);
320+
}
321+
return partitionKey;
322+
}
323+
324+
/** retrieve the key validator from system.schema_columnfamilies table */
325+
private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception
326+
{
327+
String keyspace = ConfigHelper.getOutputKeyspace(conf);
328+
String cfName = ConfigHelper.getOutputColumnFamily(conf);
329+
String query = "SELECT key_validator," +
330+
" key_aliases," +
331+
" column_aliases " +
332+
"FROM system.schema_columnfamilies " +
333+
"WHERE keyspace_name='%s' and columnfamily_name='%s'";
334+
String formatted = String.format(query, keyspace, cfName);
335+
CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE);
336+
337+
Column rawKeyValidator = result.rows.get(0).columns.get(0);
338+
String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
339+
keyValidator = parseType(validator);
340+
341+
Column rawPartitionKeys = result.rows.get(0).columns.get(1);
342+
String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue()));
343+
logger.debug("partition keys: " + keyString);
344+
345+
List<String> keys = FBUtilities.fromJsonList(keyString);
346+
partitionKeyColumns = new String[keys.size()];
347+
int i = 0;
348+
for (String key : keys)
349+
{
350+
partitionKeyColumns[i] = key;
351+
i++;
352+
}
353+
354+
Column rawClusterColumns = result.rows.get(0).columns.get(2);
355+
String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue()));
356+
357+
logger.debug("cluster columns: " + clusterColumnString);
358+
clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
359+
}
360+
361+
private AbstractType<?> parseType(String type) throws ConfigurationException
362+
{
363+
try
364+
{
365+
// always treat counters like longs, specifically CCT.serialize is not what we need
366+
if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
367+
return LongType.instance;
368+
return TypeParser.parse(type);
369+
}
370+
catch (SyntaxException e)
371+
{
372+
throw new ConfigurationException(e.getMessage(), e);
373+
}
374+
}
375+
376+
/**
377+
* add where clauses for partition keys and cluster columns
378+
*/
379+
private String appendKeyWhereClauses(String cqlQuery)
380+
{
381+
String keyWhereClause = "";
382+
383+
for (String partitionKey : partitionKeyColumns)
384+
keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? quote(partitionKey) : (" AND " + quote(partitionKey)));
385+
for (String clusterColumn : clusterColumns)
386+
keyWhereClause += " AND " + quote(clusterColumn) + " = ?";
387+
388+
return cqlQuery + " WHERE " + keyWhereClause;
389+
}
390+
391+
/** Quoting for working with uppercase */
392+
private String quote(String identifier)
393+
{
394+
return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
395+
}
396+
}

‎src/main/scala/com/tuplejump/calliope/CasBuilder.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919

2020
package com.tuplejump.calliope
2121

22-
import org.apache.cassandra.hadoop.ConfigHelper
22+
import com.tuplejump.calliope.hadoop.ConfigHelper
2323
import org.apache.cassandra.thrift.{SliceRange, SlicePredicate}
2424
import org.apache.hadoop.mapreduce.Job
2525
import org.apache.cassandra.utils.ByteBufferUtil
2626

2727
import scala.collection.JavaConversions._
2828
import com.tuplejump.calliope.queries.FinalQuery
2929
import org.apache.hadoop.conf.Configuration
30-
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
30+
import com.tuplejump.calliope.hadoop.cql3.CqlConfigHelper
3131

3232
trait CasBuilder extends Serializable {
3333
def configuration: Configuration

‎src/main/scala/com/tuplejump/calliope/CassandraRDDFunctions.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ package com.tuplejump.calliope
2222

2323
import org.apache.spark.Logging
2424
import org.apache.cassandra.thrift.{Column, Mutation, ColumnOrSuperColumn}
25-
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
25+
import com.tuplejump.calliope.hadoop.ColumnFamilyOutputFormat
2626

2727
import org.apache.spark.SparkContext._
2828
import org.apache.spark.rdd.RDD
2929

30-
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
30+
import com.tuplejump.calliope.hadoop.cql3.CqlOutputFormat
3131

3232
import scala.collection.JavaConversions._
3333

‎src/main/scala/com/tuplejump/calliope/cql3/Cql3CassandraRDD.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.hadoop.mapreduce.{TaskAttemptID, JobID, InputSplit}
2323
import scala.collection.JavaConversions._
2424
import java.text.SimpleDateFormat
2525
import java.util.Date
26-
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
26+
import com.tuplejump.calliope.hadoop.cql3.CqlPagingInputFormat
2727
import com.tuplejump.calliope.CasBuilder
2828
import org.apache.spark._
2929
import org.apache.spark.rdd.RDD

‎src/main/scala/com/tuplejump/calliope/thrift/ThriftCassandraRDD.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.tuplejump.calliope.thrift
2121

22-
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
22+
import com.tuplejump.calliope.hadoop.ColumnFamilyInputFormat
2323
import scala.collection.JavaConversions._
2424
import java.text.SimpleDateFormat
2525
import java.util.Date

‎src/main/scala/com/tuplejump/calliope/utils/RichByteBuffer.scala

+10-3
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import org.apache.cassandra.utils.ByteBufferUtil
2424
import scala.language.implicitConversions
2525
import java.util.Date
2626
import java.util.UUID
27-
import org.apache.cassandra.cql.jdbc.JdbcDecimal
2827
import java.net.InetAddress
2928
import com.datastax.driver.core.DataType
3029
import java.math.BigInteger
30+
import scala.collection.JavaConverters._
3131

3232

3333
object RichByteBuffer {
@@ -53,7 +53,7 @@ object RichByteBuffer {
5353

5454
implicit def ByteBuffer2ByteArray(buffer: ByteBuffer): Array[Byte] = ByteBufferUtil.getArray(buffer)
5555

56-
implicit def ByteBuffer2BigDecimal(buffer: ByteBuffer): BigDecimal = JdbcDecimal.instance.compose(buffer)
56+
implicit def ByteBuffer2BigDecimal(buffer: ByteBuffer): BigDecimal = DataType.decimal().deserialize(buffer).asInstanceOf[BigDecimal]
5757

5858
implicit def ByteBuffer2BigInteger(buffer: ByteBuffer): BigInteger = new BigInteger(ByteBufferUtil.getArray(buffer))
5959

@@ -111,7 +111,7 @@ object RichByteBuffer {
111111

112112
implicit def ByteArray2ByteBuffer(bytes: Array[Byte]): ByteBuffer = ByteBuffer.wrap(bytes)
113113

114-
implicit def BigDecimal2ByteBuffer(bigDec: BigDecimal): ByteBuffer = JdbcDecimal.instance.decompose(bigDec.bigDecimal)
114+
implicit def BigDecimal2ByteBuffer(bigDec: BigDecimal): ByteBuffer = DataType.decimal().serialize(bigDec)
115115

116116
implicit def BigInteger2ByteBuffer(bigInt: BigInteger): ByteBuffer = bigInt.toByteArray
117117

@@ -130,5 +130,12 @@ object RichByteBuffer {
130130
implicit def MapSS2MapBB(m: Map[String, String]) = m.map {
131131
case (k, v) => new Tuple2[ByteBuffer, ByteBuffer](k, v)
132132
}.toMap
133+
134+
/* Conversions for Collections */
135+
136+
implicit def MapSS2ByteBuffer(map: Map[String, String]): ByteBuffer = DataType.map(DataType.text(), DataType.text()).serialize(map.asJava)
137+
138+
implicit def ByteBuffer2MapSS(buffer: ByteBuffer): Map[String, String] = DataType.map(DataType.text(), DataType.text()).deserialize(buffer).asInstanceOf[java.util.Map[String, String]].asScala.toMap
139+
133140
}
134141

0 commit comments

Comments
 (0)
Please sign in to comment.