Skip to content

Commit 9a150a8

Browse files
Jonathan YoungJonathan Young
authored andcommitted
Merge branch 'develop' into ARXIVCE-825
2 parents bd1efa1 + fcb89a3 commit 9a150a8

File tree

4 files changed

+70
-21
lines changed

4 files changed

+70
-21
lines changed

arxiv/db/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@
5757
from ..config import settings, Settings
5858

5959
metadata = MetaData()
60+
latexml_metadata = MetaData()
6061
_latexml_engine: Engine = None
6162
_classic_engine: Engine = None
6263

6364
class Base(DeclarativeBase):
6465
metadata=metadata
6566

6667
class LaTeXMLBase(DeclarativeBase):
67-
metadata=metadata
68+
metadata=latexml_metadata
6869

6970
logger = logging.getLogger(__name__)
7071

@@ -200,4 +201,5 @@ def init(settings: Settings=settings) -> None:
200201

201202
# late import of arxiv.db.models to avoid loops
202203
from arxiv.db.models import configure_db_engine
203-
configure_db_engine(_classic_engine, _latexml_engine)
204+
configure_db_engine(_classic_engine, _latexml_engine)
205+

arxiv/ops/db_subset/clone_subset.py

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import importlib
66
import inspect
7+
import argparse
78

89
from sqlalchemy import (
910
create_engine,
@@ -19,8 +20,8 @@
1920
make_transient
2021
)
2122

22-
from ...db import Base, LaTeXMLBase, session_factory, _classic_engine as classic_engine
23-
from ...db.models import (
23+
from arxiv.db import Base, LaTeXMLBase, session_factory, _classic_engine as classic_engine
24+
from arxiv.db.models import (
2425
DBLaTeXMLDocuments,
2526
DBLaTeXMLSubmissions,
2627
TapirUser,
@@ -43,6 +44,7 @@ class Edge:
4344
to_table: str
4445
to_column: str
4546

47+
4648

4749
def generate_relationship_graph(models: List[Type]):
4850
adjacency_list = {}
@@ -142,13 +144,15 @@ def _write_subquery (table: Any, subq: Subquery, classic_session: Session, new_s
142144
new_session.commit()
143145
new_session.commit()
144146

147+
145148
def _insert_latexml_tables (query_map: Dict[str, Subquery], classic_session: Session, new_session: Session):
146149
documents = classic_session.execute(select(query_map['arXiv_metadata'])).all()
147150
ids = [(x[2], x[-4]) for x in documents]
148-
for i in range(0, len(ids), 500):
151+
n_docs = 10000 # This was 500, and not sure of this magic number.
152+
for i in range(0, len(ids), n_docs):
149153
latexml_docs = classic_session.execute(
150154
select(DBLaTeXMLDocuments)
151-
.filter(tuple_(DBLaTeXMLDocuments.paper_id, DBLaTeXMLDocuments.document_version).in_(ids[i: min(len(ids), i+500)]))
155+
.filter(tuple_(DBLaTeXMLDocuments.paper_id, DBLaTeXMLDocuments.document_version).in_(ids[i: min(len(ids), i+n_docs)]))
152156
).scalars().all()
153157
for row in latexml_docs:
154158
make_transient(row)
@@ -157,10 +161,10 @@ def _insert_latexml_tables (query_map: Dict[str, Subquery], classic_session: Ses
157161

158162
submissions = classic_session.execute(select(query_map['arXiv_submissions'])).all()
159163
sub_ids = [x[0] for x in submissions]
160-
for i in range(0, len(sub_ids), 500):
164+
for i in range(0, len(sub_ids), n_docs):
161165
latexml_subs = classic_session.execute(
162166
select(DBLaTeXMLSubmissions)
163-
.filter(DBLaTeXMLSubmissions.submission_id.in_(sub_ids[i: min(len(sub_ids), i+500)]))
167+
.filter(DBLaTeXMLSubmissions.submission_id.in_(sub_ids[i: min(len(sub_ids), i+n_docs)]))
164168
).scalars().all()
165169
for row in latexml_subs:
166170
make_transient(row)
@@ -182,9 +186,13 @@ def _invert_db_graph_edges (db_graph: Dict[str, List[Edge]]) -> Dict[str, List[E
182186
inverted_db_graph[next.to_table] = [reversed_edge]
183187
return inverted_db_graph
184188

185-
def _make_subset (db_graph: Dict[str, List[Edge]],
186-
special_cases: Dict[str, SpecialCase],
187-
size: int):
189+
def _make_subset (
190+
db_graph: Dict[str, List[Edge]],
191+
special_cases: Dict[str, SpecialCase],
192+
size: int,
193+
create_arxiv_db_schema: bool,
194+
create_latexml_db_schema: bool,
195+
):
188196
"""
189197
algorithm:
190198
@@ -198,11 +206,18 @@ def _make_subset (db_graph: Dict[str, List[Edge]],
198206
classic_session = session_factory()
199207
new_session = NewSessionLocal()
200208

201-
Base.metadata.drop_all(new_engine)
202-
Base.metadata.create_all(new_engine)
203-
LaTeXMLBase.metadata.drop_all(new_engine)
204-
LaTeXMLBase.metadata.create_all(new_engine)
205-
209+
if create_arxiv_db_schema:
210+
Base.metadata.drop_all(new_engine)
211+
Base.metadata.create_all(new_engine)
212+
213+
if create_latexml_db_schema:
214+
LaTeXMLBase.metadata.drop_all(new_engine)
215+
LaTeXMLBase.metadata.create_all(new_engine)
216+
217+
# check db connections
218+
_any_tapir_user = classic_session.execute(select(TapirUser).limit(1)).scalars().all()
219+
_any_latexml_doc = classic_session.execute(select(DBLaTeXMLDocuments).limit(1)).scalars().all()
220+
206221
### Do algorithm ###
207222
table_lookup = { i.__tablename__: i for i in get_tables() }
208223
processing_order = topological_sort({ k: list(map(lambda x: x.to_table, v)) for k,v in db_graph.items() })
@@ -244,7 +259,8 @@ def _make_subset (db_graph: Dict[str, List[Edge]],
244259
new_session.commit()
245260
new_session.close()
246261

247-
def clone_db_subset (n_users: int, config_directory: Optional[str] = None):
262+
def clone_db_subset (n_users: int, config_directory: Optional[str] = None,
263+
create_arxiv_db_schema: bool = True, create_latexml_db_schema: bool = True,):
248264
config_directory = config_directory or \
249265
os.path.abspath(
250266
os.path.join(
@@ -255,4 +271,32 @@ def clone_db_subset (n_users: int, config_directory: Optional[str] = None):
255271
graph = json.loads(open(os.path.join(config_directory, 'graph.json')).read())
256272
special_cases = json.loads(open(os.path.join(config_directory, 'special_cases.json')).read())
257273
graph_with_edges = { k: list(map(lambda x: Edge(**x), v)) for k,v in graph.items() }
258-
_make_subset(graph_with_edges, special_cases, n_users)
274+
_make_subset(graph_with_edges, special_cases, n_users, create_arxiv_db_schema, create_latexml_db_schema)
275+
276+
277+
def main():
278+
# Set up argument parser
279+
parser = argparse.ArgumentParser(description="Clone a subset of the classic DB to a new DB.")
280+
281+
# Define arguments with environment variables as defaults
282+
parser.add_argument('--n-users', type=int, default=os.environ.get('N_USERS', 2000),
283+
help='Number of users to copy (default: N_USERS environment variable or 2000)')
284+
parser.add_argument('--config-directory', type=str, default=os.environ.get('CONFIG_DIRECTORY'),
285+
help='Configuration directory (default: CONFIG_DIRECTORY environment variable)')
286+
parser.add_argument('--create-arxiv-db-schema', type=lambda x: x.lower() == 'true',
287+
default=os.environ.get('CREATE_ARXIV_DB_SCHEMA', 'true').lower() == 'true',
288+
help='Whether to create the arXiv DB schema (default: CREATE_ARXIV_DB_SCHEMA environment variable or true)')
289+
parser.add_argument('--create-latexml-db-schema', type=lambda x: x.lower() == 'true',
290+
default=os.environ.get('CREATE_LATEXML_DB_SCHEMA', 'true').lower() == 'true',
291+
help='Whether to create the LaTeXML DB schema (default: CREATE_LATEXML_DB_SCHEMA environment variable or true)')
292+
293+
# Parse arguments
294+
args = parser.parse_args()
295+
296+
# Call the function with the parsed arguments
297+
clone_db_subset(args.n_users, args.config_directory,
298+
args.create_arxiv_db_schema, args.create_latexml_db_schema)
299+
300+
301+
if __name__ == '__main__':
302+
main()

development/load_arxiv_db_schema.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
def main(mysql_port: int, db_name: str, root_password: str="rootpassword", schema_sql: str="arxiv_db_schema.sql",
1919
use_ssl: bool = False,
2020
) -> None:
21+
logger = logging.getLogger()
2122
conn_argv = [f"--port={mysql_port}", "-h", "127.0.0.1", "-u", "root", f"--password={root_password}"]
2223
if not use_ssl:
2324
conn_argv.append("--ssl-mode=DISABLED")
2425

2526
if not is_port_open("127.0.0.1", mysql_port):
27+
logger.warning("Starting fake-arxiv-db")
2628
run_mysql_container(mysql_port, container_name="fake-arxiv-db", db_name=db_name, root_password=root_password)
2729

2830
cli = ["mysql"] + conn_argv + [db_name]

gcp/cloud_functions/aggregate_hourly_downloads/src/main.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
from typing import Set, Dict, List, Literal, Tuple, Any, Union
55
from datetime import datetime, timedelta, timezone
6+
from dateutil import parser
67

78
from arxiv.taxonomy.category import Category
89
from arxiv.taxonomy.definitions import CATEGORIES
@@ -243,8 +244,7 @@ def aggregate_hourly_downloads(cloud_event: CloudEvent):
243244
if state!="SUCCEEDED":
244245
logging.warning(f"recieved state other than SUCCEEDED: {state}")
245246
return
246-
247-
pubsub_timestamp = datetime.strptime(cloud_event['time'], "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
247+
pubsub_timestamp =parser.isoparse(cloud_event['time']).replace(tzinfo=timezone.utc)
248248

249249
#get and check enviroment data
250250
enviro=os.environ.get('ENVIRONMENT')
@@ -420,7 +420,8 @@ def manual_aggregate(starttime:datetime, endtime: datetime):
420420

421421
endtime=datetime.now()
422422
if len(failed_hours)>0:
423-
logging.critical(f"All failed time periods: {failed_hours}")
423+
formatted_hours = ", ".join(dt.strftime("%Y-%m-%d %H") for dt in failed_hours)
424+
logging.critical(f"All failed time periods: {formatted_hours}")
424425
total_time=str(endtime-starttime).split(".")[0]
425426
logging.info(f" Finished processing! total time: {total_time}, started: {starttime.strftime('%H:%M')}, ended: {endtime.strftime('%H:%M')}")
426427

0 commit comments

Comments
 (0)