Skip to content
This repository has been archived by the owner on Mar 31, 2022. It is now read-only.

Added support for parquet input #17

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ If we're lucky, we should eventually see our data in C\*:
* `--timestamp` to specify the timestamp of values in C\*, defaults to now
* `--ttl` to specify the TTL of values in C\*, defaults to 0
* `--ignore` to omit fields from source records, can be repeated to specify multiple fields
* `--inputtype` use `parquet` or `avro`, default is `avro`

## Output URI Format

Expand Down
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -33,6 +33,7 @@
<hadoop.version>2.2.0</hadoop.version>
<avro.version>1.7.4</avro.version>
<cassandra.version>2.0.11</cassandra.version>
<parquet-avro.version>1.5.0</parquet-avro.version>
<mainClass />
</properties>
<scm>
Expand Down Expand Up @@ -98,6 +99,11 @@
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet-avro.version}</version>
</dependency>

<!-- test scope -->
<dependency>
Expand All @@ -106,6 +112,7 @@
<version>4.11</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
97 changes: 76 additions & 21 deletions src/main/java/com/spotify/hdfs2cass/Hdfs2Cass.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@
import com.spotify.hdfs2cass.crunch.cql.CQLTarget;
import com.spotify.hdfs2cass.crunch.thrift.ThriftRecord;
import com.spotify.hdfs2cass.crunch.thrift.ThriftTarget;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.io.parquet.AvroParquetFileSource;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.BasicConfigurator;
import parquet.avro.AvroParquetReader;

import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.List;
Expand Down Expand Up @@ -77,6 +85,9 @@ public class Hdfs2Cass extends Configured implements Tool, Serializable {
@Parameter(names = "--ttl")
protected static String ttl;

@Parameter(names = "--inputtype")
protected static String inputtype;

@Parameter(names = "--ignore")
protected static List<String> ignore = Lists.newArrayList();

Expand All @@ -86,6 +97,10 @@ public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new Hdfs2Cass(), args);
}

private static List<Path> inputList(List<String> inputs) {
return Lists.newArrayList(Iterables.transform(inputs, new StringToHDFSPath()));
}

@Override
public int run(String[] args) throws Exception {

Expand All @@ -99,36 +114,76 @@ public int run(String[] args) throws Exception {
// Parse & fetch info about target Cassandra cluster
CassandraParams params = CassandraParams.parse(outputUri);

PCollection<GenericRecord> records =
((PCollection<GenericRecord>)(PCollection) pipeline.read(From.avroFile(inputList(input))));
PCollection<GenericRecord> records = null;

if (inputtype == null || inputtype.equals("avro")) {
records = ((PCollection<GenericRecord>) (PCollection) pipeline.read(From.avroFile(inputList(input))));
} else if (inputtype.equals("parquet")) {
records = ((PCollection<GenericRecord>) (PCollection) pipeline.read(new AvroParquetFileSource<>(inputList(input), Avros.generics(getSchemaFromPath(inputList(input).get(0), new Configuration())))));
}

processAndWrite(outputUri, params, records);

// Execute the pipeline
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}

private void processAndWrite(URI outputUri, CassandraParams params, PCollection<GenericRecord> records) {
String protocol = outputUri.getScheme();
if (protocol.equalsIgnoreCase("thrift")) {
records
// First convert ByteBuffers to ThriftRecords
.parallelDo(new AvroToThrift(rowkey, timestamp, ttl, ignore), ThriftRecord.PTYPE)
// Then group the ThriftRecords in preparation for writing them
.parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
.groupByKey(params.createGroupingOptions())
// Finally write the ThriftRecords to Cassandra
.write(new ThriftTarget(outputUri, params));
// First convert ByteBuffers to ThriftRecords
.parallelDo(new AvroToThrift(rowkey, timestamp, ttl, ignore), ThriftRecord.PTYPE)
// Then group the ThriftRecords in preparation for writing them
.parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE)
.groupByKey(params.createGroupingOptions())
// Finally write the ThriftRecords to Cassandra
.write(new ThriftTarget(outputUri, params));
}
else if (protocol.equalsIgnoreCase("cql")) {
records
// In case of CQL, convert ByteBuffers to CQLRecords
.parallelDo(new AvroToCQL(rowkey, timestamp, ttl, ignore), CQLRecord.PTYPE)
.parallelDo(new CQLRecord.AsPair(), CQLRecord.AsPair.PTYPE)
.groupByKey(params.createGroupingOptions())
.write(new CQLTarget(outputUri, params));
// In case of CQL, convert ByteBuffers to CQLRecords
.parallelDo(new AvroToCQL(rowkey, timestamp, ttl, ignore), CQLRecord.PTYPE)
.parallelDo(new CQLRecord.AsPair(), CQLRecord.AsPair.PTYPE)
.groupByKey(params.createGroupingOptions())
.write(new CQLTarget(outputUri, params));
}

// Execute the pipeline
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}

private static List<Path> inputList(List<String> inputs) {
return Lists.newArrayList(Iterables.transform(inputs, new StringToHDFSPath()));
private Schema getSchemaFromPath(Path path, Configuration conf) {

AvroParquetReader<GenericRecord> reader = null;
try {
FileSystem fs = FileSystem.get(conf);
if (!fs.isFile(path)) {
FileStatus[] fstat = fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path path) {
String name = path.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
});
if (fstat == null || fstat.length == 0) {
throw new IllegalArgumentException("No valid files found in directory: " + path);
}
path = fstat[0].getPath();
}

reader = new AvroParquetReader<>(path);
return reader.read().getSchema();

} catch (IOException e) {
throw new RuntimeException("Error reading schema from path: " + path, e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
// ignored
}
}
}
}

private static class StringToHDFSPath implements Function<String, Path> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ private void close() throws IOException {
Future<StreamState> future =
loader.stream(Collections.<InetAddress>emptySet(), new ProgressIndicator());
try {
Uninterruptibles.getUninterruptibly(future);
StreamState streamState = Uninterruptibles.getUninterruptibly(future);
if (streamState.hasFailedSession()) {
LOG.warn("Some streaming sessions failed");
} else {
LOG.info("Streaming finished successfully");
}
} catch (ExecutionException e) {
throw new RuntimeException("Streaming to the following hosts failed: " +
loader.getFailedHosts(), e);
Expand All @@ -163,7 +168,5 @@ private void close() throws IOException {
} finally {
heartbeat.stopHeartbeat();
}
LOG.info("Streaming finished successfully");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,12 @@ private void close() throws IOException {
Future<StreamState> future =
loader.stream(Collections.<InetAddress>emptySet(), new ProgressIndicator());
try {
Uninterruptibles.getUninterruptibly(future);
StreamState streamState = Uninterruptibles.getUninterruptibly(future);
if (streamState.hasFailedSession()) {
LOG.warn("Some streaming sessions failed");
} else {
LOG.info("Streaming finished successfully");
}
} catch (ExecutionException e) {
throw new RuntimeException("Streaming to the following hosts failed: " +
loader.getFailedHosts(), e);
Expand Down