Celery, a Python task runner, runs scheduled tasks to scrape and process data. To run Celery, install Redis (as a taskrunner and db for Celery). The other dependencies are installed with pip
when initially installing the requirements for the app with pip install -r requirements.txt
.
The main celery app is in server_py/flatgov/flatgov/celery.py
and the schedule of tasks is defined in app.conf.beat_schedule
there. Logs for celery tasks are stored in /var/log/celery
, e.g. /var/log/celery/link_celery.log`, /var/log/celery/link_celery-2.log
, /var/log/celery/link_celery-3.log
etc. This repository includes four main tasks with some subtasks of the uscongress
task:
- uscongress
-
update uscongress bill and metadata
- statementAdminPolicy
-
updates statements of administration policy from the current White House (OMB) website
- committeereport
-
update committee reports associated with bills
- cbo
-
update cbo reports associated with bills
- crs
-
update crs reports associated with bills
Within the US Congress task, there are six celery tasks that run sequentially:
update_bill_task
, bill_data_task
, process_bill_meta_task
, related_bill_task
, elastic_load_task
, bill_similarity_task
The tasks after update_bill_task
are triggered upon save
of the status of update_bill_task
. After each task is run, its status is saved in the database in the UscongressUpdateJob table (see below). The status of the tasks is displayed in the /admin/
page. The six tasks are:
- update_bill_task
-
Download uscongress bill and metadata using sitemaps to efficiently determine what needs to be updated.
With the open source scraper itself, we run ./run govinfo --collections=BILLS --congress=117 --extract=mods,xml,premis --bulkdata=BILLSTATUS
. The status of this process is stored as fdyss_status
.
Then ./run bills
It will create data.json
out of data.xml
and text_versions. The status of this is stored as data_status
.
- bill_data_task
-
This, and the following task are now run from a Go executable.
Creates billList.json
and billsMeta.json
. Once the celery task update_bill_task
is finished (complete the download bill text and metadata), the bill_data_task
runs. This is triggered in the save
function of the uscongress.models.py
:
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if self.pk and self.data_status == self.SUCCESS and self.bill_status == self.PENDING:
current_app.send_task(
'uscongress.tasks.bill_data_task',
args=(self.pk, ),
queue='bill'
)
if self.pk and self.bill_status == self.SUCCESS and self.meta_status == self.PENDING:
current_app.send_task(
'uscongress.tasks.process_bill_meta_task',
args=(self.pk, ),
queue='bill'
)
...
The task bill_data_task
creates billList.json and billsMeta.json
file with the list in saved
field and dump related bill json files.
data.json files at the top level (not the data.json in the text versions) are used to create metadata.
- process_bill_meta_task
-
After completing
bill_data_task
,process_bill_meta_task
runs. This processes and organizes the metadata for bills. - related_bill_task
-
This task creates bill instances in the Bill table in the database.
- elastic_load_task
-
Parses xml into sections and loads the bill and sections into Elasticsearch. The xml for bill similarity is in the text_versions field.
- bill_similarity_task
-
Runs the similarity task on new bills, which are found in the new bill list of the
saved
field in the UscongressUpdateJob table record.
When celery tasks run, a UscongressUpdateJob
table record is created in the database to track the task status.
The fields in the UscongressUpdateJob
:
-
job_id : celery task id
-
fdsys_status : choice field (pending, success, failed) : It represents the status of uscongress bill download (fdsys and text versions) what the celery task
update_bill_task
does. Once it’s finished, the field value turns tosuccess
orfailed
-
saved : the list of bill congress numbers downloaded by running the celery task
update_bill_task
. -
skips : the list of bill congress numbers skipped by running the celery task
update_bill_task
. -
data_status : choice field (pending, success, failed) : Once download is finished, the celery task
update_bill_task
createsdata.json
out ofdata.xml
and text_versions that is exactly same as the./run bills
does. -
bill_status : choice field (pending, success, failed) : After creating
data.json
, we createsbillList.json
andbillsMeta.json
by running another celery task namedbill_data_task
. the field represents the status of the celery task to see if it’s finished (succeed or failed) -
meta_status : choice field (pending, success, failed) : After creating
billList.json
andbillsMeta.json
, we process the metadata by running the celery task namedprocess_bill_meta_task
. It represents the status of the task to see if it’s finished or in pending. -
related_status : choice field (pending, success, failed) : Once the celery task
process_bill_meta_task
is finished, the other celery task namedrelated_bill_task
to get related bills. This field represents the status of the celery task. -
elastic_status : choice field (pending, success, failed) : Once the task
related_bill_task
is finished, the other celery task namedelastic_load_task
runs in order to update loading of thenew
bills into Elasticsearch. This field represents the status of the task. -
similarity : choice field (pending, success, failed) : After finishing
elastic_load_task
, we updatees_similarity
field of each bill in the database. This is the field of the task status. -
created : the date time field represents the time when the record is created.
Found in server_py/flatgov/common/management
, the Statements of Administration Policy task (currently 'biden_statements.py') scrapes the links of SAP from the White House website and stores to the database using the original_pdf_link
as a unique field to avoid duplicates.
The CBO Scores task (in common/tasks.py
, referring to common/cbo.py
) processes metadata from bill status XML, to retrieve the 'cboCostEstimates' field. Once all of these are collected, it checks the database for each entry and stores any new entries.
The CRS Reports task runs the CRS scraper, described in CRS_REPORTS.
The Celery tasks are run on a schedule by celery-beat
. The schedule is defined in https://github.com/aih/FlatGov/blob/main/server_py/flatgov/flatgov/celery.py
For example, a CSV of the CRS documents is created every night at midnight here:
'crs_scraper_daily': {
'task': 'bills.tasks.crs_task',
'schedule': crontab(minute=0, hour=1),
'options': {'queue': 'bill'}
},
For a celery worker, open one terminal, go to the Django project root directory (in our case …/Flatgov/server_py/flatgov
), then activate the virtual environment.
Run the command below (Run the celery worker).
celery worker -Q bill -A flatgov.celery:app -n flatgov.%%h --loglevel=info
For celery scheduler, open another terminal, go to the Django project root directory (in our case …/Flatgov/server_py/flatgov
), then activate the virtual environment.
Run the command below (Run the celery redbeat)
celery beat -S redbeat.RedBeatScheduler -A flatgov.celery:app --loglevel=info
Then the background tasks (celery tasks ) run daily at midnight.
-
Init-script: celeryd
Before configuring it, go to the deployment_scripts/conf_celeryd
and update all the paths with the absolute paths
Copy deployment_scripts/bill_celeryd
file to /etc/init.d/celeryd
.
Make celeryd executable (Run following commands from the terminal.)
sudo chmod 755 /etc/init.d/celeryd
sudo chown root:root /etc/init.d/celeryd
For configuration, copy deployment_scripts/conf_celeryd
file to /etc/default/celeryd
.
You can check if the worker is active by:
(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo chown -R ubuntu:ubuntu /var/run/celery/
(flatgov) ubuntu/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo chown -R ubuntu:ubuntu /var/log/celery/
(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo /etc/init.d/celeryd start
celery init v10.1.
Using config script: /etc/default/celeryd
celery multi v4.4.2 (cliffs)
> Starting nodes...
> celery@ip-172-31-58-205: OK
Note
|
On Ubuntu, using the default ubuntu user, the settings are as follows.
|
CELERY_BIN="/home/ubuntu/.pyenv/versions/flatgov/bin/celery"
CELERY_APP="flatgov.celery:app"
CELERYD_CHDIR="/opt/flatgov/FlatGov/server_py/flatgov/"
CELERYD_OPTS="--time-limit=300 --concurrency=3 -Q bill -l INFO"
CELERYD_LOG_FILE="/var/log/celery/link_%n%I.log"
CELERYD_PID_FILE="/var/run/celery/link_%n.pid"
CELERYD_USER="ubuntu"
CELERYD_GROUP="ubuntu"
CELERY_CREATE_DIRS=1
To test:
(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo /etc/init.d/celeryd status
celery init v10.1.
Using config script: /etc/default/celeryd
celeryd (node link_celery) (pid 26679) is up...
+ 2. Init-script: celerybeat
Before configuring it, go to the deployment_scripts/conf_celerybeat
and update all the paths with the absolute paths
Copy deployment_scripts/celerybeat
file to /etc/init.d/celerybeat
.
Make celerybeat executable (Run following commands from the terminal.)
sudo chmod 755 /etc/init.d/celerybeat
sudo chown root:root /etc/init.d/celerybeat
For configuration, copy deployment_scripts/conf_celerybeat
file to /etc/default/celerybeat
.
Then
sudo chown root:root '/etc/default/celerybeat'
sudo chmod 640 '/etc/default/celerybeat'
You can check if the beat is active by:
sudo /etc/init.d/celerybeat start
sudo /etc/init.d/celerybeat status
On ubuntu, with a 'flatgov' virtualenv, the settings are as follows:
"/etc/default/celerybeat"
CELERY_BIN="/home/ubuntu/.pyenv/versions/3.8.3/envs/flatgov/bin/celery"
CELERY_APP="flatgov.celery:app"
CELERYBEAT_CHDIR="/opt/flatgov/FlatGov/server_py/flatgov"
CELERYBEAT_USER="ubuntu"
CELERYBEAT_GROUP="ubuntu"
CELERYBEAT_OPTS="--schedule=/var/run/celery/celerybeat-schedule"
+ 3. Maintenance
As was shown, the following commands control worker and beat:
/etc/init.d/celeryd {start|stop|restart}
/etc/init.d/celerybeat {start|stop|restart}
The celerybeat user may also need to be set to ubuntu
+ 4. Run a task manually
If you need to run a task manually (e.g. to test, or to get data off schedule), run a separate Celery worker:
(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov$ celery worker -Q bill -A flatgov.celery:app -n flatgov.%%h --loglevel=info
-------------- celery@flatgov.%ip-... v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1041-aws-x86_64-with-glibc2.27 2021-04-01 18:18:04
- *** --- * ---
- ** ---------- [config]
Then in a separate terminal run pyenv activate flatgov
. Then:
(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov$ python manage.py shell
Python 3.8.3 (default, Sep 24 2020, 22:52:34)
[GCC 7.5.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from bills.tasks import sap_biden_task
>>> from celery import current_app
>>> current_app.send_task('bills.tasks.sap_biden_task', queue='bill')
<AsyncResult: a5d7d336-0125-4bdf-8819-5628b2341081>
OR for the uscongress update task:
>>> from uscongress.tasks import update_bill_task
>>> from celery import current_app
>>> current_app.send_task('uscongress.tasks.update_bill_task', queue='bill')
<AsyncResult: f05d3449-d473-498f-b6f0-87f663cd20e3>
Then you can track the task by looking in the celery logs, or on the original celery terminal, e.g.:
2021-04-01 18:27:47,069: WARNING/ForkPoolWorker-1] 2021-04-01 18:27:47 [scrapy.statscollectors] INFO: Dumping Scrapy stats:
{'downloader/request_bytes': 486,
'downloader/request_count': 2,
'downloader/request_method_count/GET': 2,
'downloader/response_bytes': 27570,
'downloader/response_count': 2,
'downloader/response_status_count/200': 2,
'elapsed_time_seconds': 0.393221,
'finish_reason': 'finished',
'finish_time': datetime.datetime(2021, 4, 1, 18, 27, 47, 68878),
'item_scraped_count': 10,
'log_count/DEBUG': 12,
'log_count/INFO': 10,
'log_count/WARNING': 22,
'memusage/max': 83107840,
'memusage/startup': 83107840,
'response_received_count': 2,
'robotstxt/request_count': 1,
'robotstxt/response_count': 1,
'robotstxt/response_status_count/200': 1,
'scheduler/dequeued': 1,
'scheduler/dequeued/memory': 1,
'scheduler/enqueued': 1,
'scheduler/enqueued/memory': 1,
'start_time': datetime.datetime(2021, 4, 1, 18, 27, 46, 675657)}
[2021-04-01 18:27:47,069: INFO/ForkPoolWorker-1] Spider closed (finished)
For a different task, e.g. CommitteeDocument, the commands are:
celery worker -Q bill -A flatgov.celery:app -n flatgov.%%h --loglevel=info
-
Open another shell and run django shell →
python manage.py shell
from celery import current_app
current_app.send_task("bills.tasks.committee_report_scrapy_task", queue="bill")
See image::media/celery-task-manual.png[Manual Celery Task,300,200]
Then you can keep track of the task status on the terminal that celery is running on or you can see the CommitteeDocument records in the django admin dashboard. The initial data loading will take a long time; there are about 17,000 records.
Running Celery requires Redis. To set up and get Redis working see below. Also see the instructions on the [Redis website](https://redis.io/).:
-
On Ubuntu:
Install
$ sudo apt update
$ sudo apt install redis-server
Reading package lists... Done
Fetched 634 kB in 0s (24.3 MB/s)...
Start
sudo systemctl restart redis.service
To confirm that it is running:
sudo systemctl status redis
If necessary, edit /etc/redis/redis.conf
. Our set-up should not require any special settings