Skip to content

Commit 17acda7

Browse files
authored
Merge pull request #317 from dimitri-yatsenko/master
Fix #288 (add connection id to jobs table) and modify `key_source` handling
2 parents 9cb16a8 + 79d77f9 commit 17acda7

File tree

11 files changed

+37
-11
lines changed

11 files changed

+37
-11
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@ MANIFEST
1414
.vagrant/
1515
dj_local_conf.json
1616
build/
17+
.coverage
18+
./tests/.coverage

datajoint/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from .version import __version__
1919

2020
__author__ = "Dimitri Yatsenko, Edgar Y. Walker, and Fabian Sinz at Baylor College of Medicine"
21-
__date__ = "March 8, 2017"
21+
__date__ = "June 1, 2017"
2222
__all__ = ['__author__', '__version__',
2323
'config', 'conn', 'kill', 'BaseRelation',
2424
'Connection', 'Heading', 'FreeRelation', 'Not', 'schema',

datajoint/autopopulate.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,21 +76,20 @@ def populate(self, *restrictions, suppress_errors=False, reserve_jobs=False, ord
7676
todo = self.key_source
7777
if not isinstance(todo, RelationalOperand):
7878
raise DataJointError('Invalid key_source value')
79-
todo = todo & AndList(restrictions)
79+
todo = todo.proj() & AndList(restrictions)
8080

8181
error_list = [] if suppress_errors else None
8282

8383
jobs = self.connection.jobs[self.target.database] if reserve_jobs else None
8484

85-
8685
# define and setup signal handler for SIGTERM
8786
if reserve_jobs:
8887
def handler(signum, frame):
8988
logger.info('Populate terminated by SIGTERM')
9089
raise SystemExit('SIGTERM received')
9190
old_handler = signal.signal(signal.SIGTERM, handler)
9291

93-
todo -= self.target.proj()
92+
todo -= self.target
9493
keys = list(todo.fetch.keys())
9594
if order == "reverse":
9695
keys.reverse()
@@ -142,7 +141,7 @@ def progress(self, *restrictions, display=True):
142141
"""
143142
todo = self.key_source & AndList(restrictions)
144143
total = len(todo)
145-
remaining = len(todo - self.target.proj())
144+
remaining = len(todo.proj() - self.target)
146145
if display:
147146
print('%-20s' % self.__class__.__name__,
148147
'Completed %d of %d (%2.1f%%) %s' % (

datajoint/base_relation.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ def insert(self, rows, replace=False, ignore_errors=False, skip_duplicates=False
155155
return
156156

157157
heading = self.heading
158+
if heading.attributes is None:
159+
logger.warning('Could not access table {table}'.format(table=self.full_table_name))
160+
return
161+
158162
field_list = None # ensures that all rows have the same attributes in the same order as the first row.
159163

160164
def make_row_to_insert(row):

datajoint/connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def __init__(self, host, user, password, init_fun=None):
7070
self.connect()
7171
if self.is_connected:
7272
logger.info("Connected {user}@{host}:{port}".format(**self.conn_info))
73+
self.connection_id = self.query('SELECT connection_id()').fetchone()[0]
7374
else:
7475
raise DataJointError('Connection failed.')
7576
self._conn.autocommit(True)
@@ -129,6 +130,10 @@ def query(self, query, args=(), as_dict=False):
129130
cur.execute(query, args)
130131
else:
131132
raise
133+
except err.ProgrammingError as e:
134+
print('Error in query:')
135+
print(query)
136+
raise
132137
return cur
133138

134139
def get_user(self):

datajoint/fetch.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from . import DataJointError
88
from . import key as PRIMARY_KEY
99

10+
1011
def update_dict(d1, d2):
1112
return {k: (d2[k] if k in d2 else d1[k]) for k in d1}
1213

@@ -29,7 +30,6 @@ def copy(self):
2930
"""
3031
return self.__class__(self)
3132

32-
3333
def _initialize_behavior(self):
3434
self.sql_behavior = {}
3535
self.ext_behavior = dict(squeeze=False)
@@ -90,7 +90,6 @@ def order_by(self, *args):
9090
self.sql_behavior['order_by'] = args
9191
return self
9292

93-
9493
@property
9594
def as_dict(self):
9695
"""

datajoint/hash.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import hashlib
2+
import base64
3+
4+
5+
def filehash(filename):
6+
s = hashlib.sha256()
7+
with open(filename, 'rb') as f:
8+
for block in iter(lambda: f.read(65536), b''):
9+
s.update(block)
10+
return base64.b64encode(s.digest(), b'-_')[0:43].decode()

datajoint/heading.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def init_from_database(self, conn, database, table_name):
132132
if info is None:
133133
if table_name == '~log':
134134
logger.warning('Could not create the ~log table')
135+
return
135136
else:
136137
raise DataJointError('The table `{database}`.`{table_name}` is not defined.'.format(
137138
table_name=table_name, database=database))

datajoint/jobs.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def __init__(self, arg, database=None):
4444
user="" :varchar(255) # database user
4545
host="" :varchar(255) # system hostname
4646
pid=0 :int unsigned # system process id
47+
connection_id = 0 : bigint unsigned # connection_id()
4748
timestamp=CURRENT_TIMESTAMP :timestamp # automatic timestamp
4849
""".format(database=database, error_message_length=ERROR_MESSAGE_LENGTH)
4950
if not self.is_declared:
@@ -80,10 +81,11 @@ def reserve(self, table_name, key):
8081
status='reserved',
8182
host=os.uname().nodename,
8283
pid=os.getpid(),
84+
connection_id=self.connection.connection_id,
8385
key=key,
8486
user=self._user)
8587
try:
86-
self.insert1(job)
88+
self.insert1(job, ignore_extra_fields=True)
8789
except pymysql.err.IntegrityError:
8890
return False
8991
return True
@@ -113,6 +115,7 @@ def error(self, table_name, key, error_message):
113115
status="error",
114116
host=os.uname().nodename,
115117
pid=os.getpid(),
118+
connection_id=self.connection.connection_id,
116119
user=self._user,
117120
key=key,
118121
error_message=error_message), replace=True, ignore_extra_fields=True)

datajoint/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.6.0"
1+
__version__ = "0.6.1"

tests/test_jobs.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ def test_reserve_job():
3333
'failed to reserve new jobs')
3434
# finish with error
3535
for key in subjects.fetch.keys():
36-
schema.schema.jobs.error(table_name, key, "error message")
36+
schema.schema.jobs.error(table_name, key,
37+
"error message")
3738
# refuse jobs with errors
3839
for key in subjects.fetch.keys():
3940
assert_false(schema.schema.jobs.reserve(table_name, key),
@@ -43,6 +44,7 @@ def test_reserve_job():
4344
assert_false(schema.schema.jobs,
4445
'failed to clear error jobs')
4546

47+
4648
def test_restrictions():
4749
# clear out jobs table
4850
jobs = schema.schema.jobs
@@ -73,6 +75,7 @@ def test_sigint():
7375
assert_equals(error_message, 'KeyboardInterrupt')
7476
schema.schema.jobs.delete()
7577

78+
7679
def test_sigterm():
7780
# clear out job table
7881
schema.schema.jobs.delete()
@@ -113,4 +116,4 @@ def test_long_error_message():
113116
error_message = schema.schema.jobs.fetch1['error_message']
114117
assert_true(error_message == short_error_message, 'error messages do not agree')
115118
assert_false(error_message.endswith(TRUNCATION_APPENDIX), 'error message should not be truncated')
116-
schema.schema.jobs.delete()
119+
schema.schema.jobs.delete()

0 commit comments

Comments
 (0)