Skip to content

Commit

Permalink
S3: working now for writing too
Browse files Browse the repository at this point in the history
  • Loading branch information
rhanka committed Dec 22, 2019
1 parent e825648 commit 7818f19
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ENV http_proxy $proxy
ENV https_proxy $proxy

RUN pip install --upgrade pip
RUN pip install `echo $proxy | sed 's/\(\S\S*\)/--proxy \1/'` simplejson tslib Flask Flask-OAuth flask-login enum34 flask_restplus rauth PyYAML nltk elasticsearch pandas Werkzeug scikit-learn[alldeps] geopy jellyfish networkx sqlalchemy psycopg2-binary redisearch s3fs
RUN pip install `echo $proxy | sed 's/\(\S\S*\)/--proxy \1/'` simplejson tslib Flask Flask-OAuth flask-login enum34 flask_restplus rauth PyYAML nltk elasticsearch pandas Werkzeug scikit-learn[alldeps] geopy jellyfish networkx sqlalchemy psycopg2-binary redisearch boto3 smart-open typing

RUN mkdir -p /matchid/code /matchid/conf/run /matchid/log /matchid/referential_data /data/matchID_test/ /matchid/upload

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ endif

install-prerequisites:
ifeq ("$(wildcard /usr/bin/envsubst)","")
sudo apt-get update; true
sudo apt-get update; true
sudo apt install -y gettext; true
endif

Expand Down
174 changes: 104 additions & 70 deletions code/recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import unicodedata
import shutil
import csv
import s3fs
import boto3
from smart_open import open

from werkzeug.utils import secure_filename
from cStringIO import StringIO

Expand All @@ -31,6 +33,7 @@

# interact with datasets
import gzip

#from pandasql import sqldf
import elasticsearch
from elasticsearch import Elasticsearch, helpers
Expand Down Expand Up @@ -72,7 +75,6 @@
import automata
from tools import *


def fwf_format(row, widths, sep=""):
return sep.join([row[col].ljust(widths[i] - len(sep)) for i, col in enumerate(row.keys())])

Expand Down Expand Up @@ -112,6 +114,11 @@ class Connector(Configured):
def __init__(self, name=None):
Configured.__init__(self, "connectors", name)

try:
self.thread_count = self.conf["thread_count"]
except:
self.thread_count = 1

try:
self.type = self.conf["type"]
except:
Expand All @@ -138,40 +145,37 @@ def __init__(self, name=None):
sys.exit("Ooops: bucket of {} connector {} has to be defined".format(
self.type, self.name))
try:
self.endpoint_url = self.conf["endpoint_url"]
self.aws_access_key_id = self.conf["aws_access_key_id"]
except:
self.endpoint_url = None
self.aws_access_key_id = os.getenv('aws_access_key_id')
try:
self.region_name = self.conf["region_name"]
self.aws_secret_access_key = self.conf["aws_secret_access_key"]
except:
self.region_name = None
self.aws_secret_access_key = os.getenv('aws_secret_access_key')
try:
self.signature_version = self.conf["signature_version"]
self.endpoint_url = self.conf["endpoint_url"]
except:
self.signature_version = None

self.endpoint_url = None
try:
self.key = self.conf["key"]
self.signature_version = self.conf["signature_version"]
except:
try:
self.key = os.getenv("aws_access_key_id")
except:
sys.exit("Ooops: aws_access_key_id of {} connector {} has to be defined directly or through env".format(
self.type, self.name))
self.signature_version = None
try:
self.secret = self.conf["secret"]
self.region_name = self.conf["region_name"]
except:
try:
self.secret = os.getenv("aws_secret_access_key")
except:
sys.exit("Ooops: aws_secret_access_key of {} connector {} has to be defined directly or through env".format(
self.type, self.name))
self.s3fs = s3fs.S3FileSystem(
key=self.key,
secret=self.secret,
use_ssl=True,
client_kwargs={"endpoint_url": self.endpoint_url, "region_name": self.region_name}
self.region_name = None
self.s3_session = boto3.Session(
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.region_name
)
#self.client = session.client('s3', endpoint_url=self.endpoint_url)
self.s3_resource = self.s3_session.resource('s3', endpoint_url = self.endpoint_url)
self.transport_params={
"session": self.s3_session,
"resource_kwargs": {"endpoint_url": self.endpoint_url}
}

try:
self.timeout = self.conf["timeout"]
except:
Expand Down Expand Up @@ -216,11 +220,6 @@ def __init__(self, name=None):
except:
self.sample = 500

try:
self.thread_count = self.conf["thread_count"]
except:
self.thread_count = 1

if (self.type == "sql"):
try:
self.encoding = self.conf["encoding"]
Expand Down Expand Up @@ -256,7 +255,7 @@ def __init__(self, name=None, parent=None):
self.filter = None
return
else:
log.write(error="no conf for dataset {}".format(self.name))
config.log.write(error="no conf for dataset {}".format(self.name))

try:
self.parent = parent
Expand Down Expand Up @@ -364,15 +363,18 @@ def __init__(self, name=None, parent=None):

if (self.connector.type == "s3"):
self.select = None
try:
self.files = [f
for f in self.connector.s3fs.ls(self.connector.bucket)
if re.match(r'^' + self.connector.bucket + '\/' + self.table['regex'] + '$', f)]
self.file = self.files[0]
except:
self.file = os.path.join(self.connector.bucket, self.table)
if (type(self.table) == str):
self.file = "s3://" + os.path.join(self.connector.bucket,self.table)
self.files = [self.file]
#log.write("Ooops: couldn't set filename for dataset {}, connector {}".format(self.name,self.connector.name),exit=True)
else:
try:
self.files = ["s3://" + os.path.join(f.bucket_name,f.key)
for f in self.connector.s3_resource.Bucket(self.connector.bucket).objects.all()
if re.match(r'^' + self.table['regex'] + '$', f.key)]
self.file = self.files[0]
except:
self.files= [str(err())]
# config.log.write("Ooops: couldn't match files for {} in connector {}".format(self.name,self.connector.name),exit=True)


if (self.connector.type == "filesystem") | (self.connector.type == "s3"):
Expand Down Expand Up @@ -464,34 +466,34 @@ def init_reader(self, df = None, test = False, test_chunk_size=None):
if (self.type == "csv"):
self.reader = itertools.chain.from_iterable(
pd.read_csv(
file if (self.connector.type == "filesystem") else self.connector.s3fs.open(file),
self.open(file),
sep=self.sep, usecols=self.select, chunksize=self.chunk,
compression=self.compression, encoding=self.encoding, dtype=object, header=self.header, names=self.names, skiprows=self.skiprows,
encoding=self.encoding,dtype=object, header=self.header, names=self.names, skiprows=self.skiprows,
prefix=self.prefix, iterator=True, index_col=False, keep_default_na=False
)
for file in self.files)
elif (self.type == "fwf"):
self.reader = itertools.chain.from_iterable(
pd.read_fwf(
file if (self.connector.type == "filesystem") else self.connector.s3fs.open(file),
self.open(file),
chunksize=self.connector.chunk, skiprows=self.skiprows,
encoding=(self.encoding if (self.compression == 'infer') else None),
delimiter=self.sep, compression=self.compression, dtype=object, names=self.names, widths=self.widths,
encoding=self.encoding,
delimiter=self.sep, compression='infer', dtype=object, names=self.names, widths=self.widths,
iterator=True, keep_default_na=False
)
for file in self.files)
elif (self.type == "hdf"):
self.reader = itertools.chain.from_iterable(
pd.read_hdf(
file if (self.connector.type == "filesystem") else self.connector.s3fs.open(file),
self.open(file),
chunksize=self.chunk
)
for file in self.files)
elif (self.type == "msgpack"):
self.reader = itertools.chain.from_iterable(
self.iterator_chunk(
pd.read_msgpack(
file if (self.connector.type == "filesystem") else self.connector.s3fs.open(file),
self.open(file),
iterator=True, encoding=self.encoding
)
)
Expand Down Expand Up @@ -541,6 +543,10 @@ def init_reader(self, df = None, test = False, test_chunk_size=None):
self.log.write(msg="couldn't initiate dataset {}".format(
self.name), error=err(), exit=True)

def open(self, file, mode='rb'):
if (self.connector.type == "s3"):
return open(file, mode, transport_params=self.connector.transport_params)
return open(file, mode)

def iterator_chunk(self, iterator):
df_list=[]
Expand Down Expand Up @@ -617,6 +623,15 @@ def init_writer(self):
# further better except should make difference btw no
# existing file and unwritable
pass
self.fs = open(self.file, 'wb')
else:
self.fs = open(self.file, 'ab')
self.log.write(msg="initiated stream output {}".format(self.name))
elif (self.connector.type == "s3"):
self.fs = open(self.file,
'wb', transport_params=self.connector.transport_params)
self.log.write(msg="initiated stream output {}".format(self.name))

elif (self.connector.type == "sql"):
if (self.mode == 'create'):
self.connector.sql.execute(
Expand Down Expand Up @@ -725,7 +740,7 @@ def write(self, chunk=0, df=None):
self.log.write(msg="elasticsearch bulk of subchunk {} failed {}/{}".format(
i, self.connector.name, self.table), error=err())

elif (self.connector.type == "filesystem"):
elif (self.connector.type in ["filesystem", "s3"]):
# self.log.write("filesystem write {}".format(self.name))
if (self.type == "csv"):
try:
Expand All @@ -734,13 +749,16 @@ def write(self, chunk=0, df=None):
if (chunk == 0):
header = self.header
else:
header = None
header = False
if (self.names != None):
df=df[self.names]
else:
df.sort_index(axis=1, inplace=True)
df.to_csv(self.file, mode='a', index=False, sep=self.sep, quoting=self.quoting,
compression=self.compression, encoding=self.encoding, header=header)

df.to_csv(
self.fs,
mode='a', index=False, sep=self.sep, quoting=self.quoting,
encoding=self.encoding, header=header)
except:
self.log.write(
"write to csv failed writing {}".format(self.file), err())
Expand All @@ -750,30 +768,37 @@ def write(self, chunk=0, df=None):
else:
header = False
try:
to_fwf(df, self.file, names=self.names, header=header, sep=self.sep,
widths=self.widths, append=True, encoding=self.encoding, log=self.log)
to_fwf(df,
self.fs,
names=self.names, header=header, sep=self.sep,
widths=self.widths, append=True, encoding=self.encoding, log=self.log)
except:
self.log.write(
"write to fwf failed writing {}".format(self.file), err())
pass
elif (self.type == "hdf"):
try:
df.to_hdf(self.file, key=self.name,
mode='a', format='table')
df.to_hdf(
self.fs,
key=self.name,
mode='a', format='table')
except:
self.log.write(
"write to hdf failed writing {}".format(self.file), err())
elif (self.type == "msgpack"):
try:
df.to_msgpack(self.file, append=True,
encoding=self.encoding)
df.to_msgpack(
self.fs,
append=True,
encoding=self.encoding)
except:
self.log.write(
"write to msgpack failed writing {}".format(self.file), err())
else:
self.log.write("no method for writing to {} with type {}".format(
self.file, self.type))

if self.connector.type == "filesystem":
self.fs.flush()
elif (self.connector.type == "sql"):
try:
self.log.write(
Expand Down Expand Up @@ -825,6 +850,9 @@ def write(self, chunk=0, df=None):

return processed

def close(self):
if self.connector.type in ["filesystem", "s3"]:
self.fs.close()

class Recipe(Configured):

Expand Down Expand Up @@ -1012,12 +1040,6 @@ def init(self, df=None, parent=None, test=False, callback=None):
if ((len(self.before) + len(self.after)) == 0):
self.log.write(msg="couldn't init input {} of recipe {}".format(
self.input.name, self.name), error=err())
if (self.test == False):
try:
self.output.init_writer()
except:
self.log.write(msg="couldn't init output {} of recipe {}".format(
self.output.name, self.name), error=err())

def set_job(self, job=None):
self.job = job
Expand Down Expand Up @@ -1063,12 +1085,18 @@ def write(self, i, df, supervisor=None):

def write_queue(self, queue, supervisor=None):
exit = False
if (self.test == False):
try:
self.output.init_writer()
except:
self.log.write(msg="couldn't init output {} of recipe {}".format(
self.output.name, self.name), error=err())
w_queue = []
try:
max_threads = self.output.thread_count
self.log.write("initiating queue with {} threads".format(max_threads))
except:
max_threads = 1
self.log.write("initiating queue with {} threads".format(max_threads))
while (exit == False):
try:
res = queue.get()
Expand All @@ -1083,11 +1111,15 @@ def write_queue(self, queue, supervisor=None):
t[1].is_alive() & (supervisor[t[0]] == "writing"))]
time.sleep(0.05)
supervisor[res[0]] = "writing"
thread = Process(target=self.write, args=[
res[0], res[1], supervisor])
thread.start()
w_queue.append([res[0], thread])
time.sleep(0.05)
if ((max_threads == 1) | (self.output.connector.type in ["filesystem", "s3"])):
# filestream can't be parallelized
self.write(res[0], res[1], supervisor)
else:
thread = Process(target=self.write, args=[
res[0], res[1], supervisor])
thread.start()
w_queue.append([res[0], thread])
time.sleep(0.05)
except:
# self.log.write("waiting to write - {}, {}".format(self.name, w_queue))
time.sleep(1)
Expand All @@ -1098,6 +1130,7 @@ def write_queue(self, queue, supervisor=None):
t[1].is_alive() & (supervisor[t[0]] == "writing"))]
except:
pass
self.output.close()

def run_chunk(self, i, df, queue=None, supervisor=None):
if (supervisor != None):
Expand Down Expand Up @@ -1407,6 +1440,7 @@ def run(self, head=None, write_queue=None, supervisor=None):
except:
self.log.write(msg="SQL statement ended with error, please check error message above")

self.log.write("end of all")

def select_columns(self, df=None, arg="select"):
try:
Expand Down

0 comments on commit 7818f19

Please sign in to comment.