diff --git a/_template_python/INDICATOR_DEV_GUIDE.md b/_template_python/INDICATOR_DEV_GUIDE.md index d003b6696..2b8b6333c 100644 --- a/_template_python/INDICATOR_DEV_GUIDE.md +++ b/_template_python/INDICATOR_DEV_GUIDE.md @@ -278,7 +278,7 @@ This example is taken from [`hhs_hosp`](https://github.com/cmu-delphi/covidcast- The column is described [here](https://cmu-delphi.github.io/delphi-epidata/api/missing_codes.html). -#### Testing +#### Local testing As a general rule, it helps to decompose your functions into operations for which you can write unit tests. To run the tests, use `make test` in the top-level indicator directory. @@ -411,29 +411,159 @@ Next, the `acquisition.covidcast` component of the `delphi-epidata` codebase doe 12. `value_updated_timestamp`: now 2. Update the `epimetric_latest` table with any new keys or new versions of existing keys. +Consider what settings to use in the `params.json.template` file in accordance with how you want to run the indicator and acquisition. +Pay attention to the receiving directory, as well as how you can store credentials in vault. +Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY-yTo-Av3oAFlhyG-vXGG-c/edit#heading=h.8kkoy8sx3t7f) for more vault info. + +### CI/CD + +* Add module name to the `build` job in `.github/workflows/python-ci.yml`. + This allows github actions to run on this indicator code, which includes unit tests and linting. +* Add top-level directory name to `indicator_list` in `Jenkinsfile`. + This allows your code to be automatically deployed to staging after your branch is merged to main, and deployed to prod after `covidcast-indicators` is released. +* Create `ansible/templates/{top_level_directory_name}-params-prod.json.j2` based on your `params.json.template` with some adjustment: + * "export_dir": "/common/covidcast/receiving/{data-source-name}" + * "log_filename": "/var/log/indicators/{top_level_directory_name}.log" + +Pay attention to the receiving/export directory, as well as how you can store credentials in vault. +Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY-yTo-Av3oAFlhyG-vXGG-c/edit#heading=h.8kkoy8sx3t7f) for more vault info. + ### Staging -After developing the pipeline code, but before deploying in development, the pipeline should be run on staging for at least a week. -This involves setting up some cronicle jobs as follows: +After developing the pipeline code, but before deploying in development, the pipeline should be tested on staging. +Indicator runs should be set up to run automatically daily for at least a week. -first the indicator run +The indicator run code is automatically deployed on staging after your branch is merged into `main`. +After merging, make sure you have proper access to Cronicle and staging server `app-mono-dev-01.delphi.cmu.edu` _and_ can see your code on staging at `/home/indicators/runtime/`. -Then the acquisition run +Then, on Cronicle, create two jobs: one to run the indicator and one to load the output csv files into database. -See [@korlaxxalrok](https://www.github.com/korlaxxalrok) or [@minhkhul](https://www.github.com/minhkhul) for more information. +We start by setting up the acquisition job. -https://cronicle-prod-01.delphi.cmu.edu/#Schedule?sub=edit_event&id=elr5clgy6rs +#### Acquisition job -https://cronicle-prod-01.delphi.cmu.edu/#Schedule?sub=edit_event&id=elr5ctl7art +The indicator job loads the location of the relevant csv output files into chained data, which this acquisition job then loads into our database. -Note the staging hostname and how the acquisition job is chained to run right after the indicator job. -Do a few test runs. +Example script: -If everything goes well (check staging db if data is ingested properly), make a prod version of the indicator run job and use that to run indicator on a daily basis. +``` +#!/usr/bin/python3 -Another thing to do is setting up the params.json template file in accordance with how you want to run the indicator and acquisition. -Pay attention to the receiving directory, as well as how you can store credentials in vault. -Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY-yTo-Av3oAFlhyG-vXGG-c/edit#heading=h.8kkoy8sx3t7f) for more vault info. +import subprocess +import json + +str_data = input() +print(str_data) + +data = json.loads(str_data, strict=False) +chain_data = data["chain_data"] +user = chain_data["user"] +host = chain_data["host"] +acq_ind_name = chain_data["acq_ind_name"] + +cmd = f'''ssh -T -l {user} {host} "cd ~/driver && python3 -m delphi.epidata.acquisition.covidcast.csv_to_database --data_dir=/common/covidcast --indicator_name={acq_ind_name} --log_file=/var/log/epidata/csv_upload_{acq_ind_name}.log"''' + +std_err, std_out = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() + +print(std_err.decode('UTF-8')) +print(std_out.decode('UTF-8')) +``` + +#### Indicator run job + +This job signs into our staging server via ssh, runs the indicator, producing csv files as output. + +Example script: + +``` +#!/bin/sh + +# vars +user='automation' +host='app-mono-dev-01.delphi.cmu.edu' +ind_name='nchs_mortality' +acq_ind_name='nchs-mortality' + +# chain_data to be sent to acquisition job +chain_data=$(jo chain_data=$(jo acq_ind_name=${acq_ind_name} ind_name=${ind_name} user=${user} host=${host})); +echo "${chain_data}"; + +ssh -T -l ${user} ${host} "sudo -u indicators -s bash -c 'cd /home/indicators/runtime/${ind_name} && env/bin/python -m delphi_${ind_name}'"; +``` + +Note the staging hostname in `host` and how the acquisition job is chained to run right after the indicator job. + +Note that `ind_name` variable here refer to the top-level directory name where code is located, while `acq_ind_name` refer to the directory name where output csv files are located, which corresponds to the name of `source` column in our database, as mentioned in step 3. + +To automatically run acquisition job right after indicator job finishes successfully: + +1. In `Plugin` section, select `Interpret JSON in Output`. +2. In `Chain Reaction` section, select your acquisition run job below to `Run Event on Success` + +You can read more about how the `chain_data` json object in the script above can be used in our subsequent acquisition job [here](https://github.com/jhuckaby/Cronicle/blob/master/docs/Plugins.md#chain-reaction-control). + +#### Staging database checks + +Apart from checking the logs of staging indicator run and acquisition jobs to identify potential issues with the pipeline, one can also check the contents of staging database for abnormalities. + +At this point, acquisition job should have loaded data onto staging mysql db, specifically the `covid` database. + +From staging: +``` +[user@app-mono-dev-01 ~]$ mysql -u user -p +Enter password: +Welcome to the MySQL monitor. Commands end with ; or \g. +Your MySQL connection id is 00000 +Server version: 8.0.36-28 Percona Server (GPL), Release 28, Revision 47601f19 + +Copyright (c) 2009-2024 Percona LLC and/or its affiliates +Copyright (c) 2000, 2024, Oracle and/or its affiliates. + +Oracle is a registered trademark of Oracle Corporation and/or its +affiliates. Other names may be trademarks of their respective +owners. + +Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. + +mysql> use covid; +Database changed +``` +Check `signal_dim` table to see if new source and signal names are all present and reasonable. For example: +``` +mysql> select * from signal_dim where source='nssp'; ++---------------+--------+----------------------------------+ +| signal_key_id | source | signal | ++---------------+--------+----------------------------------+ +| 817 | nssp | pct_ed_visits_combined | +| 818 | nssp | pct_ed_visits_covid | +| 819 | nssp | pct_ed_visits_influenza | +| 820 | nssp | pct_ed_visits_rsv | +| 821 | nssp | smoothed_pct_ed_visits_combined | +| 822 | nssp | smoothed_pct_ed_visits_covid | +| 823 | nssp | smoothed_pct_ed_visits_influenza | +| 824 | nssp | smoothed_pct_ed_visits_rsv | ++---------------+--------+----------------------------------+ +``` + +Then, check if the number of records ingested in db matches with the number of rows in csv when running locally. +For example, the below query sets the `issue` date being the day acquisition job was run, and `signal_key_id` correspond with signals from our new source. +Check if this count matches with local run result. + +``` +mysql> SELECT count(*) FROM epimetric_full WHERE issue=202425 AND signal_key_id > 816 AND signal_key_id < 825; ++----------+ +| count(*) | ++----------+ +| 2620872 | ++----------+ +1 row in set (0.80 sec) +``` + +You can also check how data looks more specifically at each geo level or among different signal names depending on the quirks of the source. + +See [@korlaxxalrok](https://www.github.com/korlaxxalrok) or [@minhkhul](https://www.github.com/minhkhul) for more information. + +If everything goes well make a prod version of the indicator run job and use that to run indicator on a daily basis. ### Signal Documentation