Skip to content

Commit

Permalink
Merge pull request #107 from Princeton-CDH/feature/ocrtext-to-json
Browse files Browse the repository at this point in the history
Script to collate ocr text files by directory into one json file per directory (for Gale local OCR)
  • Loading branch information
rlskoeser authored Nov 5, 2024
2 parents 4f9d0ca + 0913b6d commit 5b394a3
Show file tree
Hide file tree
Showing 8 changed files with 499 additions and 131 deletions.
17 changes: 15 additions & 2 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,29 @@ on:
push: # run on every push or PR to any branch
pull_request:

env:
# python version used to calculate and submit code coverage
COV_PYTHON_VERSION: "3.12"

jobs:
python-unit:
name: Python unit tests
runs-on: ubuntu-latest
strategy:
matrix:
python: ["3.11", "3.12"]
defaults:
run:
working-directory: .

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Setup Python
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python }}

# base the python cache on the hash of all pyproject.toml,
# which includes python requirements.
Expand All @@ -38,6 +50,7 @@ jobs:
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }} # required
# if: ${{ matrix.python == env.COV_PYTHON_VERSION }}

# Set the color of the slack message used in the next step based on the
# status of the build: "danger" for failure, "good" for success,
Expand All @@ -58,7 +71,7 @@ jobs:
env:
SLACK_COLOR: ${{ env.SLACK_COLOR }}
SLACK_WEBHOOK: ${{ secrets.ACTIONS_SLACK_WEBHOOK }}
SLACK_TITLE: "Workflow `${{ github.workflow }}`: ${{ job.status }}"
SLACK_TITLE: "Workflow `${{ github.workflow }}` (python ${{ matrix.python }}): ${{ job.status }}"
SLACK_MESSAGE: "Run <https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}|#${{ github.run_number }}> on <https://github.com/${{ github.repository }}/|${{ github.repository }}@${{ github.ref }}>"
SLACK_FOOTER: "<https://github.com/${{ github.repository }}/commit/${{ github.sha }}|View commit>"
MSG_MINIMAL: true # use compact slack message format
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ build-backend = "hatchling.build"
[project]
name = "corppa"
description = "Utilities for working with Princeton Prosody Archive full-text corpus"
requires-python = ">=3.12"
requires-python = ">=3.11"
readme = "README.md"
# license TBD
#license.file = "LICENSE"
#license = {text = "Apache-2"}
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.11",
"Development Status :: 3 - Alpha",
"Intended Audience :: Science/Research",
"Operating System :: OS Independent",
Expand All @@ -38,6 +39,7 @@ dev = ["pre-commit", "corppa[test]", "corppa[ocr]"]
[project.scripts]
corppa-filter = "corppa.utils.filter:main"
corppa-ocr = "corppa.ocr.gvision_ocr:main"
collate-txt = "corppa.ocr.collate_txt:main"

[tool.hatch.version]
path = "src/corppa/__init__.py"
Expand Down
123 changes: 123 additions & 0 deletions src/corppa/ocr/collate_txt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env python
"""
Script to turn directories with multiple text files into a single JSON
file containing text contents of all files with page numbers based
on text filenames. (Page number logic is currently Gale-specific).
"""

import argparse
import json
import pathlib
import sys

from tqdm import tqdm

from corppa.utils.path_utils import find_relative_paths, get_page_number


def collate_txt(
input_dir: pathlib.Path, output_dir: pathlib.Path, show_progress: bool = True
):
"""Takes a directory that contains text files grouped by directory at any
level of nesting under the specified `input_dir` and combines them into
one JSON file per directory. JSON files are created in the specified
`output_dir` using the same hierarchy found in the `input_dir`.
"""
directories = 0
txt_files = 0
skipped = 0

# stack tqdm bars so we can briefly show status
status = tqdm(
desc="Collating",
bar_format="{desc}{postfix}",
disable=not show_progress,
)

for ocr_dir, files in tqdm(
find_relative_paths(input_dir, [".txt"], group_by_dir=True),
desc="Directories with text files",
disable=not show_progress,
):
# output will be a json file based on name of the directory containing text files,
# with parallel directory structure to the source
output_file = output_dir / ocr_dir.parent / f"{ocr_dir.name}.json"
# if output exists from a previous run, skip
if output_file.exists():
skipped += 1
continue

directories += 1
txt_files += len(files)
status.set_postfix_str(f" {ocr_dir.stem}: {len(files)} txt files")

# combine text contents into a dictionary keyed on page number
txt_data = {}
for filename in files:
with (input_dir / filename).open(encoding="utf-8") as txtfile:
txt_data[get_page_number(filename)] = txtfile.read()

# ensure the parent directory exists
output_file.parent.mkdir(exist_ok=True)
# save out text content as json
with output_file.open("w", encoding="utf-8") as outfile:
json.dump(txt_data, outfile)

status.set_postfix_str("")
status.close()

# report a summary of what was done
print(
f"\nCreated JSON file{'' if directories == 1 else 's'} for "
+ f"{directories:,} director{'y' if directories == 1 else 'ies'} "
+ f"with {txt_files:,} total text files; skipped {skipped:,}."
)


def main():
parser = argparse.ArgumentParser(
description="Create JSON files to group OCR text files by directory."
)
# Required arguments
parser.add_argument(
"input_dir",
help="Top-level input directory with directories of OCR text files.",
type=pathlib.Path,
)
parser.add_argument(
"output_dir",
help="Top-level output directory for OCR consolidated into JSON files.",
type=pathlib.Path,
)
# Optional arguments
parser.add_argument(
"--progress",
help="Show progress",
action=argparse.BooleanOptionalAction,
default=True,
)

args = parser.parse_args()
# Validate arguments
if not args.input_dir.is_dir():
print(
f"Error: input directory {args.input_dir} does not exist", file=sys.stderr
)
sys.exit(1)
# create output dir if it doesn't exist
if not args.output_dir.is_dir():
try:
args.output_dir.mkdir()
print(f"Creating output directory {args.output_dir}")
except (FileExistsError, FileNotFoundError) as err:
print(
f"Error creating output directory {args.output_dir}: {err}",
file=sys.stderr,
)
sys.exit(1)

collate_txt(args.input_dir, args.output_dir, show_progress=args.progress)


if __name__ == "__main__":
main()
31 changes: 2 additions & 29 deletions src/corppa/ocr/gvision_ocr.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from tqdm import tqdm

from corppa.utils.path_utils import get_ppa_source, get_vol_dir
from corppa.utils.path_utils import find_relative_paths, get_ppa_source, get_vol_dir

# Attempt to import Google Cloud Vision Python Client
try:
Expand All @@ -24,33 +24,6 @@
os.environ["GRPC_VERBOSITY"] = "NONE"


def image_relpath_generator(image_dir, exts, follow_symlinks=True):
"""
This generator method finds all images in image_dir with file extensions
in exts (case insensitive). For each of these images, the method yields
the relative path with respect to image_dir.
For example, if image_dir = "a/b/c/images" and there are image files at the
following paths: "a/b/c/images/alpha.jpg", "a/b/c/images/d/beta.jpg"
The generate will produce these two items: "alpha.jpg" and "d/beta.jpg"
"""
# Create lowercase extension set from passed in exts
ext_set = {ext.lower() for ext in exts}

# Using pathlib.walk over glob because (1) it allows us to find files with
# multiple extensions in a single walk of the directory and (2) lets us
# leverage additional functionality of pathlib.
for dirpath, dirs, files in image_dir.walk(follow_symlinks=follow_symlinks):
# Check the files in walked directory
for file in files:
ext = os.path.splitext(file)[1]
if ext.lower() in ext_set:
filepath = dirpath.joinpath(file)
yield filepath.relative_to(image_dir)
# For future walking, remove hidden directories
dirs[:] = [d for d in dirs if d[0] != "."]


def ocr_image_via_gvision(gvision_client, input_image, out_txt, out_json):
"""
Perform OCR for input image using the Google Cloud Vision API via the provided client.
Expand Down Expand Up @@ -126,7 +99,7 @@ def ocr_images(in_dir, out_dir, exts, ocr_limit=0, show_progress=True):

ocr_count = 0
skip_count = 0
for image_relpath in image_relpath_generator(in_dir, exts):
for image_relpath in find_relative_paths(in_dir, exts):
# Refresh progress bar
if show_progress:
progress_bar.refresh()
Expand Down
88 changes: 87 additions & 1 deletion src/corppa/utils/path_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""
Library of general-purpose auxiliary methods for stand-alone scripts
General-purpose methods for working with paths, PPA identifiers, and directories
"""

import os
import pathlib
from typing import Iterator

_htid_encode_map = {":": "+", "/": "=", ".": ","}
_htid_encode_table = str.maketrans(_htid_encode_map)
Expand Down Expand Up @@ -114,3 +116,87 @@ def get_image_relpath(work_id, page_num):
raise NotImplementedError
else:
raise ValueError(f"Unsupported source '{source}'")


def get_page_number(pagefile: pathlib.Path) -> str:
"""Extract and return the page number from the filename for page-level
content (e.g., image or text). Returns the page number as a string
with leading zeros. (Note: logic is currently
specific to Gale/ECCO file naming conventions.)"""
# NOTE: this logic is currently specific to Gale/ECCO files,
# which look like CW0112029406_00180.txt

# split the file base/stem name by _ and take the last part
source_id, pagenum = pagefile.stem.split("_", 1)
if get_ppa_source(source_id) != "Gale":
raise NotImplementedError
# return the number as a string; strip extra trailing zero
return pagenum[:-1] # strip trailing zero


def find_relative_paths(
base_dir, exts, follow_symlinks=True, group_by_dir=False
) -> Iterator[pathlib.Path] | Iterator[tuple[pathlib.Path, list]]:
"""
This method finds files anywhere under the specified base directory
that match any of the specified file extensions (case insensitive),
and returns a generator of path objects with a path relative to the
base directory. File extensions should include the leading period,
i.e. `[".jpg", ".tiff"]` rather than `["jpg", "tiff"]`.
For example, given a base directory `a/b/c/images`, an extension list of `.jpg`,
and files nested at different levels in the hierarchy
`a/b/c/images/alpha.jpg`, `a/b/c/images/d/beta.jpg`:
```
a/b/c/images
|-- alpha.jpg
+-- d
|-- beta.jpg
```
The result will include the two items: `alpha.jpg and `d/beta.jpg`
When `group_by_dir` is `True`, resulting files will be returned grouped
by the parent directory. The return result is a tuple of a single :class:`pathlib.Path`
object for the directory and a list of :class:`pathlib.Path` objects for the files in that
directory that match the specified extensions. Given a hierarchy like this:
```
images/vol-a/
|-- alpha.jpg
|-- beta.jpg
```
the method would return `(vol-a, [alpha.jpg, beta.jpg])`.
"""
# Create lowercase extension set from passed in exts
ext_set = {ext.lower() for ext in exts}

# Using pathlib.Path.walk / os.walk over glob because (1) it allows us to
# find files with multiple extensions in a single walk of the directory
# and (2) lets us leverage additional functionality of pathlib.
if hasattr(base_dir, "walk"):
# As of Python 3.12, Path.walk exists
walk_generator = base_dir.walk(follow_symlinks=follow_symlinks)
else:
# For Python 3.11, fall back to os.walk
walk_generator = os.walk(base_dir, followlinks=follow_symlinks)
for dirpath, dirnames, filenames in walk_generator:
if isinstance(dirpath, str):
# Convert str produced by os.walk to Path object
dirpath = pathlib.Path(dirpath)
# Create a generator of relevant files in the current directory
include_files = (
dirpath.joinpath(file).relative_to(base_dir)
for file in filenames
if os.path.splitext(file)[1].lower() in ext_set
)
# if group by dir is specified, yield dirpath and list of files,
# but only if at least one relevant file is found
if group_by_dir:
include_files = list(include_files)
if include_files:
yield (dirpath.relative_to(base_dir), include_files)
else:
# otherwise yield just the files
yield from include_files

# modify dirnames in place to skip hidden directories
dirnames[:] = [d for d in dirnames if not d.startswith(".")]
Loading

0 comments on commit 5b394a3

Please sign in to comment.