From fc820c6bd1b131bf41b28f1800ff4784cb37a83b Mon Sep 17 00:00:00 2001 From: Matt Brown Date: Wed, 4 Mar 2015 14:54:51 -0500 Subject: [PATCH 1/2] adjust when "streaming finished successfully" is logged If the BulkRecordWriter is closed due to GC errors / the task shutting down, the "streaming finished successfully" message is logged unconditionally, which might be confusing (especially when logging in concert with "no streaming happened". --- .../cassandra/cql/CrunchCqlBulkRecordWriter.java | 9 ++++++--- .../cassandra/thrift/CrunchBulkRecordWriter.java | 7 ++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/spotify/hdfs2cass/cassandra/cql/CrunchCqlBulkRecordWriter.java b/src/main/java/com/spotify/hdfs2cass/cassandra/cql/CrunchCqlBulkRecordWriter.java index f94aa58..4e66a3a 100644 --- a/src/main/java/com/spotify/hdfs2cass/cassandra/cql/CrunchCqlBulkRecordWriter.java +++ b/src/main/java/com/spotify/hdfs2cass/cassandra/cql/CrunchCqlBulkRecordWriter.java @@ -152,7 +152,12 @@ private void close() throws IOException { Future future = loader.stream(Collections.emptySet(), new ProgressIndicator()); try { - Uninterruptibles.getUninterruptibly(future); + StreamState streamState = Uninterruptibles.getUninterruptibly(future); + if (streamState.hasFailedSession()) { + LOG.warn("Some streaming sessions failed"); + } else { + LOG.info("Streaming finished successfully"); + } } catch (ExecutionException e) { throw new RuntimeException("Streaming to the following hosts failed: " + loader.getFailedHosts(), e); @@ -163,7 +168,5 @@ private void close() throws IOException { } finally { heartbeat.stopHeartbeat(); } - LOG.info("Streaming finished successfully"); } - } diff --git a/src/main/java/com/spotify/hdfs2cass/cassandra/thrift/CrunchBulkRecordWriter.java b/src/main/java/com/spotify/hdfs2cass/cassandra/thrift/CrunchBulkRecordWriter.java index 9085ecf..0e36e42 100644 --- a/src/main/java/com/spotify/hdfs2cass/cassandra/thrift/CrunchBulkRecordWriter.java +++ b/src/main/java/com/spotify/hdfs2cass/cassandra/thrift/CrunchBulkRecordWriter.java @@ -227,7 +227,12 @@ private void close() throws IOException { Future future = loader.stream(Collections.emptySet(), new ProgressIndicator()); try { - Uninterruptibles.getUninterruptibly(future); + StreamState streamState = Uninterruptibles.getUninterruptibly(future); + if (streamState.hasFailedSession()) { + LOG.warn("Some streaming sessions failed"); + } else { + LOG.info("Streaming finished successfully"); + } } catch (ExecutionException e) { throw new RuntimeException("Streaming to the following hosts failed: " + loader.getFailedHosts(), e); From 426b5d3a5ea2f65a833d28d85ba20113e65d4017 Mon Sep 17 00:00:00 2001 From: planvin Date: Sat, 4 Apr 2015 18:50:06 +0200 Subject: [PATCH 2/2] Added support for parquet input --- README.md | 1 + pom.xml | 11 ++- .../java/com/spotify/hdfs2cass/Hdfs2Cass.java | 97 +++++++++++++++---- 3 files changed, 86 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index dfcff5a..4ca9563 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ If we're lucky, we should eventually see our data in C\*: * `--timestamp` to specify the timestamp of values in C\*, defaults to now * `--ttl` to specify the TTL of values in C\*, defaults to 0 * `--ignore` to omit fields from source records, can be repeated to specify multiple fields +* `--inputtype` use `parquet` or `avro`, default is `avro` ## Output URI Format diff --git a/pom.xml b/pom.xml index 4d038d6..545a0a9 100644 --- a/pom.xml +++ b/pom.xml @@ -1,6 +1,6 @@ - 4.0.0 @@ -33,6 +33,7 @@ 2.2.0 1.7.4 2.0.11 + 1.5.0 @@ -98,6 +99,11 @@ guava 15.0 + + com.twitter + parquet-avro + ${parquet-avro.version} + @@ -106,6 +112,7 @@ 4.11 test + diff --git a/src/main/java/com/spotify/hdfs2cass/Hdfs2Cass.java b/src/main/java/com/spotify/hdfs2cass/Hdfs2Cass.java index 11bb839..c4a519f 100644 --- a/src/main/java/com/spotify/hdfs2cass/Hdfs2Cass.java +++ b/src/main/java/com/spotify/hdfs2cass/Hdfs2Cass.java @@ -25,19 +25,27 @@ import com.spotify.hdfs2cass.crunch.cql.CQLTarget; import com.spotify.hdfs2cass.crunch.thrift.ThriftRecord; import com.spotify.hdfs2cass.crunch.thrift.ThriftTarget; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; import org.apache.crunch.PipelineResult; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.From; +import org.apache.crunch.io.parquet.AvroParquetFileSource; +import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.BasicConfigurator; +import parquet.avro.AvroParquetReader; +import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.util.List; @@ -77,6 +85,9 @@ public class Hdfs2Cass extends Configured implements Tool, Serializable { @Parameter(names = "--ttl") protected static String ttl; + @Parameter(names = "--inputtype") + protected static String inputtype; + @Parameter(names = "--ignore") protected static List ignore = Lists.newArrayList(); @@ -86,6 +97,10 @@ public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new Hdfs2Cass(), args); } + private static List inputList(List inputs) { + return Lists.newArrayList(Iterables.transform(inputs, new StringToHDFSPath())); + } + @Override public int run(String[] args) throws Exception { @@ -99,36 +114,76 @@ public int run(String[] args) throws Exception { // Parse & fetch info about target Cassandra cluster CassandraParams params = CassandraParams.parse(outputUri); - PCollection records = - ((PCollection)(PCollection) pipeline.read(From.avroFile(inputList(input)))); + PCollection records = null; + + if (inputtype == null || inputtype.equals("avro")) { + records = ((PCollection) (PCollection) pipeline.read(From.avroFile(inputList(input)))); + } else if (inputtype.equals("parquet")) { + records = ((PCollection) (PCollection) pipeline.read(new AvroParquetFileSource<>(inputList(input), Avros.generics(getSchemaFromPath(inputList(input).get(0), new Configuration()))))); + } + + processAndWrite(outputUri, params, records); + + // Execute the pipeline + PipelineResult result = pipeline.done(); + return result.succeeded() ? 0 : 1; + } + private void processAndWrite(URI outputUri, CassandraParams params, PCollection records) { String protocol = outputUri.getScheme(); if (protocol.equalsIgnoreCase("thrift")) { records - // First convert ByteBuffers to ThriftRecords - .parallelDo(new AvroToThrift(rowkey, timestamp, ttl, ignore), ThriftRecord.PTYPE) - // Then group the ThriftRecords in preparation for writing them - .parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE) - .groupByKey(params.createGroupingOptions()) - // Finally write the ThriftRecords to Cassandra - .write(new ThriftTarget(outputUri, params)); + // First convert ByteBuffers to ThriftRecords + .parallelDo(new AvroToThrift(rowkey, timestamp, ttl, ignore), ThriftRecord.PTYPE) + // Then group the ThriftRecords in preparation for writing them + .parallelDo(new ThriftRecord.AsPair(), ThriftRecord.AsPair.PTYPE) + .groupByKey(params.createGroupingOptions()) + // Finally write the ThriftRecords to Cassandra + .write(new ThriftTarget(outputUri, params)); } else if (protocol.equalsIgnoreCase("cql")) { records - // In case of CQL, convert ByteBuffers to CQLRecords - .parallelDo(new AvroToCQL(rowkey, timestamp, ttl, ignore), CQLRecord.PTYPE) - .parallelDo(new CQLRecord.AsPair(), CQLRecord.AsPair.PTYPE) - .groupByKey(params.createGroupingOptions()) - .write(new CQLTarget(outputUri, params)); + // In case of CQL, convert ByteBuffers to CQLRecords + .parallelDo(new AvroToCQL(rowkey, timestamp, ttl, ignore), CQLRecord.PTYPE) + .parallelDo(new CQLRecord.AsPair(), CQLRecord.AsPair.PTYPE) + .groupByKey(params.createGroupingOptions()) + .write(new CQLTarget(outputUri, params)); } - - // Execute the pipeline - PipelineResult result = pipeline.done(); - return result.succeeded() ? 0 : 1; } - private static List inputList(List inputs) { - return Lists.newArrayList(Iterables.transform(inputs, new StringToHDFSPath())); + private Schema getSchemaFromPath(Path path, Configuration conf) { + + AvroParquetReader reader = null; + try { + FileSystem fs = FileSystem.get(conf); + if (!fs.isFile(path)) { + FileStatus[] fstat = fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + if (fstat == null || fstat.length == 0) { + throw new IllegalArgumentException("No valid files found in directory: " + path); + } + path = fstat[0].getPath(); + } + + reader = new AvroParquetReader<>(path); + return reader.read().getSchema(); + + } catch (IOException e) { + throw new RuntimeException("Error reading schema from path: " + path, e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + // ignored + } + } + } } private static class StringToHDFSPath implements Function {