Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for gzip compression #58

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions openaddr/conform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import os
import errno
import gzip
import tempfile
import mimetypes
import json
Expand Down Expand Up @@ -53,6 +54,7 @@ def gdal_error_handler(err_class, err_num, err_msg):
]

UNZIPPED_DIRNAME = 'unzipped'
UNGZIPPED_DIRNAME = 'ungzipped'

# extracts:
# - '123' from '123 Main St'
Expand Down Expand Up @@ -156,6 +158,8 @@ def from_format_string(clz, format_string):
return GuessDecompressTask()
elif format_string.lower() == 'zip':
return ZipDecompressTask()
elif format_string.lower() == 'gzip':
return GzipDecompressTask()
else:
raise KeyError("I don't know how to decompress for format {}".format(format_string))

Expand All @@ -173,6 +177,10 @@ def decompress(self, source_paths, workdir, filenames):
substitute_task = ZipDecompressTask()
_L.info('Guessing zip compression based on file names')
return substitute_task.decompress(source_paths, workdir, filenames)
elif 'gzip' in types:
substitute_task = GzipDecompressTask()
_L.info('Guessing gzip compression based on file names')
return substitute_task.decompress(source_paths, workdir, filenames)

_L.warning('Could not guess a single compression from file names')
return source_paths
Expand Down Expand Up @@ -221,6 +229,26 @@ def decompress(self, source_paths, workdir, filenames):

return output_files

class GzipDecompressTask(DecompressionTask):
def decompress(self, source_paths, workdir, filenames):
output_files = []
expand_path = os.path.join(workdir, UNGZIPPED_DIRNAME)
mkdirsp(expand_path)

for source_path in source_paths:
# Build a file name for the decompressed file without the .gz extension
expanded_path = os.path.join(expand_path, os.path.basename(source_path)[:-3])

with open(expanded_path, 'wb') as temp_fp:
with open(source_path, 'rb') as source_fp:
with gzip.open(source_fp, 'rb') as gz_fp:
temp_fp.write(gz_fp.read())

output_files.append(temp_fp.name)
_L.debug("Ungzipped file {}".format(output_files[-1]))

return output_files

def elaborate_filenames(filename):
''' Return a list of filenames for a single name from conform file tag.

Expand Down
Loading