diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/io/json/VariantJsonWriter.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/io/json/VariantJsonWriter.java index 2a8437099b..75c210ce93 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/io/json/VariantJsonWriter.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/io/json/VariantJsonWriter.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.opencb.biodata.formats.variant.io.VariantWriter; @@ -128,6 +129,7 @@ public boolean pre() { try { variantsGenerator = factory.createGenerator(variantsStream); + variantsGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); if (fileStream != null || fileMetadata != null) { fileGenerator = factory.createGenerator(fileStream); } @@ -164,7 +166,6 @@ public boolean write(Variant variant) { } } variantsGenerator.writeObject(variant); - variantsGenerator.writeRaw('\n'); numVariantsWritten++; } catch (IOException ex) { logger.error(variant.toString(), ex); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/python/variant_walker.py b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/python/variant_walker.py index fa3ea798e5..b711202404 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/python/variant_walker.py +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/python/variant_walker.py @@ -1,5 +1,7 @@ import sys import importlib +import importlib.machinery +import importlib.util import os from abc import ABC, abstractmethod @@ -115,37 +117,50 @@ def main(module_name, class_name, *args): print(f"An error occurred during setup: {e}", file=sys.stderr) raise - num_entries = 0 - size_entries = 0 + input_lines_count = 0 + header_lines_count = 0 + header_size_bytes = 0 + variant_count = 0 + variant_size_bytes = 0 header_read = False header = [] for line in sys.stdin: - num_entries = num_entries + 1 - size_entries = size_entries + len(line) + input_lines_count = input_lines_count + 1 # Now 'line' does not have trailing '\n' or '\r' line = line.rstrip() ## The line will be a header line if it starts with '#' or if it's the first line if not header_read: - if line.startswith("#") or num_entries == 1: + if line.startswith("#") or input_lines_count == 1: header.append(line) + header_lines_count = header_lines_count + 1 + header_size_bytes = header_size_bytes + len(line) + ## Keep reading header lines until we find a non-header line + continue else: + ## Process the header header_read = True try: walker.header(header) except Exception as e: print(f"An error occurred while processing the header: {e}", file=sys.stderr) raise - else: - try: - walker.map(line) - except Exception as e: - print(f"An error occurred while processing a line: {e}", file=sys.stderr) - raise - - walker.count("num_entries", num_entries) - walker.count("size_entries", size_entries) + header = None + + variant_count = variant_count + 1 + variant_size_bytes = variant_size_bytes + len(line) + try: + walker.map(line) + except Exception as e: + print(f"An error occurred while processing a line: {e}", file=sys.stderr) + raise + + walker.count("input_lines_count", input_lines_count) + walker.count("header_lines_count", header_lines_count) + walker.count("header_size_bytes", header_size_bytes) + walker.count("variant_count", variant_count) + walker.count("variant_size_bytes", variant_size_bytes) try: walker.cleanup() except Exception as e: @@ -157,4 +172,4 @@ def main(module_name, class_name, *args): if len(sys.argv) < 3: print("Usage: python variant_walker.py [args...]", file=sys.stderr) sys.exit(1) - sys.exit(main(sys.argv[1], sys.argv[2], *sys.argv[3:])) \ No newline at end of file + sys.exit(main(sys.argv[1], sys.argv[2], *sys.argv[3:])) diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/python/walker_example.py b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/python/walker_example.py index 2c5c92fd6a..c85a2514bd 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/python/walker_example.py +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/python/walker_example.py @@ -1,4 +1,6 @@ import argparse +import json + from variant_walker import VariantWalker class Echo(VariantWalker): @@ -42,10 +44,29 @@ def header(self, header): self.write(header[-1]) def map(self, line): - # Split line by tab - fields = line.split('\t') - # Write fields 0, 1, 3, 4 joined by ':' - self.write(':'.join([fields[0], fields[1], fields[3], fields[4]])) + if line.startswith('{'): + # Parse JSON and write 'id' field + variant = json.loads(line) + self.write(variant['id']) + else: + # Split line by tab + fields = line.split('\t') + # Write fields 0, 1, 3, 4 joined by ':' + self.write(':'.join([fields[0], fields[1], fields[3], fields[4]])) def cleanup(self): pass + + +class Count(VariantWalker): + def setup(self, *arg): + self.count('count_setup_calls', 1) + + def header(self, header): + self.count('count_header_calls', 1) + + def map(self, line): + self.count('count_variant_map_calls', 1) + + def cleanup(self): + self.count('count_cleanup_calls', 1)