From 830388a0d2a4e1afceb77a8ad576099e7a119b5f Mon Sep 17 00:00:00 2001 From: minhkhul Date: Wed, 3 Jul 2024 14:49:02 -0400 Subject: [PATCH 1/7] deployment + staging details --- _template_python/INDICATOR_DEV_GUIDE.md | 121 +++++++++++++++++++++--- 1 file changed, 110 insertions(+), 11 deletions(-) diff --git a/_template_python/INDICATOR_DEV_GUIDE.md b/_template_python/INDICATOR_DEV_GUIDE.md index a1c42b49f..a6bb5f84a 100644 --- a/_template_python/INDICATOR_DEV_GUIDE.md +++ b/_template_python/INDICATOR_DEV_GUIDE.md @@ -238,7 +238,7 @@ This example is taken from [`hhs_hosp`](https://github.com/cmu-delphi/covidcast- The column is described [here](https://cmu-delphi.github.io/delphi-epidata/api/missing_codes.html). -#### Testing +#### Local testing As a general rule, it helps to decompose your functions into operations for which you can write unit tests. To run the tests, use `make test` in the top-level indicator directory. @@ -355,25 +355,124 @@ Next, the `acquisition.covidcast` component of the `delphi-epidata` codebase doe 12. `value_updated_timestamp`: now 2. Update the `epimetric_latest` table with any new keys or new versions of existing keys. -### Staging +### CI/CD: +* Add module name to the `build` job in `.github/workflows/python-ci.yml`. This allows github actions to run on this indicator code, which includes unit tests and linting. -After developing the pipeline code, but before deploying in development, the pipeline should be run on staging for at least a week. This involves setting up some cronicle jobs as follows: +* Add top-level directory name to `indicator_list` in `Jenkinsfile`. This allows your code to be automatically deployed to staging after your branch is merged to main, and deployed to prod after `cocivcast-indicators` is released. -first the indicator run +* Create `ansible/templates/{top_level_directory_name}-params-prod.json.j2` based on your `params.json.template` with some adjustment: + * "export_dir": "/common/covidcast/receiving/{data-source-name}" + * "log_filename": "/var/log/indicators/{top_level_directory_name}.log" -Then the acquisition run +Pay attention to the receiving/export directory, as well as how you can store credentials in vault. Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY-yTo-Av3oAFlhyG-vXGG-c/edit#heading=h.8kkoy8sx3t7f) for more vault info. -See [@korlaxxalrok](https://www.github.com/korlaxxalrok) or [@minhkhul](https://www.github.com/minhkhul) for more information. +### Staging: + +After developing the pipeline code, but before release to prod, the pipeline should be run on staging for at least a week. + +The indicator run code is automatically deployed on staging after your branch is merged into main. After merging, make sure you 1. have proper access to Cronicle and staging server `app-mono-dev-01.delphi.cmu.edu` and 2. can see your code on staging at `/home/indicators/runtime/`. + +Then, on Cronicle, create two jobs: First one to run the indicator and second one to load the output csv files into database. + +#### Indicator run job: +This job ssh into our staging server and run the indicator, producing csv files output. + +Example script: +``` +#!/bin/sh + +# vars +user='automation' +host='app-mono-dev-01.delphi.cmu.edu' +ind_name='nchs_mortality' +acq_ind_name='nchs-mortality' + +# chain_data to be sent to acquisition job +chain_data=$(jo chain_data=$(jo acq_ind_name=${acq_ind_name} ind_name=${ind_name} user=${user} host=${host})); +echo "${chain_data}"; + +ssh -T -l ${user} ${host} "sudo -u indicators -s bash -c 'cd /home/indicators/runtime/${ind_name} && env/bin/python -m delphi_${ind_name}'"; +``` + +Note the staging hostname in `host`. + +Note that `ind_name` variable here refer to the top-level directory name where code is located, while `acq_ind_name` refer to the directory name where output csv files are located, which corresponds to the name of `source` column in our database, as mentioned in step 3. + +To automatically run acquisition job right after indicator job finishes successfully: +1. In `Plugin` section, select `Interpret JSON in Output`. +2. In `Chain Reaction` section, select your acquisition run job below to `Run Event on Success` +You can read more about how the `chain_data` json object in the script above can be used in our subsequent acquisition job [here](https://github.com/jhuckaby/Cronicle/blob/master/docs/Plugins.md#chain-reaction-control). + +#### Acquisition job: + +The acquisition job use chained data from indicator job to determine where csv output files are, then load them into our database. + +Example script: +``` +#!/usr/bin/python3 + +import subprocess +import json + +str_data = input() +print(str_data) + +data = json.loads(str_data, strict=False) +chain_data = data["chain_data"] +user = chain_data["user"] +host = chain_data["host"] +acq_ind_name = chain_data["acq_ind_name"] + +cmd = f'''ssh -T -l {user} {host} "cd ~/driver && python3 -m delphi.epidata.acquisition.covidcast.csv_to_database --data_dir=/common/covidcast --indicator_name={acq_ind_name} --log_file=/var/log/epidata/csv_upload_{acq_ind_name}.log"''' -https://cronicle-prod-01.delphi.cmu.edu/#Schedule?sub=edit_event&id=elr5clgy6rs +std_err, std_out = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() -https://cronicle-prod-01.delphi.cmu.edu/#Schedule?sub=edit_event&id=elr5ctl7art +print(std_err.decode('UTF-8')) +print(std_out.decode('UTF-8')) +``` + +#### Staging database checks: +Apart from checking the logs of staging indicator run and acquisition jobs to identify potential issues with the pipeline, one can also check the contents of staging database for abnormalities. + +At this point, acquisition job should have loaded data onto staging mysql db, specifically the `covid` database. +``` +mysql> use covid; +Database changed +``` +Check `signal_dim` table to see if new source and signal names are all present and reasonable. For example: +``` +mysql> select * from signal_dim where source='nssp'; ++---------------+--------+----------------------------------+ +| signal_key_id | source | signal | ++---------------+--------+----------------------------------+ +| 817 | nssp | pct_ed_visits_combined | +| 818 | nssp | pct_ed_visits_covid | +| 819 | nssp | pct_ed_visits_influenza | +| 820 | nssp | pct_ed_visits_rsv | +| 821 | nssp | smoothed_pct_ed_visits_combined | +| 822 | nssp | smoothed_pct_ed_visits_covid | +| 823 | nssp | smoothed_pct_ed_visits_influenza | +| 824 | nssp | smoothed_pct_ed_visits_rsv | ++---------------+--------+----------------------------------+ +``` + +Then, check if number of records ingested in db matches with number of rows in csv when running locally. For example, the below query sets the `issue` date being the day acquisition job was run, and `signal_key_id` correspond with signals from our new source. Check if this count matches with local run result. +``` +mysql> SELECT count(*) FROM epimetric_full WHERE issue=202425 AND signal_key_id > 816 AND signal_key_id < 825; ++----------+ +| count(*) | ++----------+ +| 2620872 | ++----------+ +1 row in set (0.80 sec) +``` +You can also check how data looks more specifically at each geo level or among different signal names depending on the quirks of the source. + +See [@korlaxxalrok](https://www.github.com/korlaxxalrok) or [@minhkhul](https://www.github.com/minhkhul) for more information. -Note the staging hostname and how the acquisition job is chained to run right after the indicator job. Do a few test runs. -If everything goes well (check staging db if data is ingested properly), make a prod version of the indicator run job and use that to run indicator on a daily basis. +If everything goes well make a prod version of the indicator run job and use that to run indicator on a daily basis. -Another thing to do is setting up the params.json template file in accordance with how you want to run the indicator and acquisition. Pay attention to the receiving directory, as well as how you can store credentials in vault. Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY-yTo-Av3oAFlhyG-vXGG-c/edit#heading=h.8kkoy8sx3t7f) for more vault info. ### Signal Documentation From 57cb4ec7c18e72a3fca9b650ab70f1ccf9904f5a Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Mon, 8 Jul 2024 16:00:53 -0400 Subject: [PATCH 2/7] wrap lines --- _template_python/INDICATOR_DEV_GUIDE.md | 40 ++++++++++++++++--------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/_template_python/INDICATOR_DEV_GUIDE.md b/_template_python/INDICATOR_DEV_GUIDE.md index a6bb5f84a..6ab3b5c29 100644 --- a/_template_python/INDICATOR_DEV_GUIDE.md +++ b/_template_python/INDICATOR_DEV_GUIDE.md @@ -355,29 +355,33 @@ Next, the `acquisition.covidcast` component of the `delphi-epidata` codebase doe 12. `value_updated_timestamp`: now 2. Update the `epimetric_latest` table with any new keys or new versions of existing keys. -### CI/CD: -* Add module name to the `build` job in `.github/workflows/python-ci.yml`. This allows github actions to run on this indicator code, which includes unit tests and linting. - -* Add top-level directory name to `indicator_list` in `Jenkinsfile`. This allows your code to be automatically deployed to staging after your branch is merged to main, and deployed to prod after `cocivcast-indicators` is released. +### CI/CD +* Add module name to the `build` job in `.github/workflows/python-ci.yml`. + This allows github actions to run on this indicator code, which includes unit tests and linting. +* Add top-level directory name to `indicator_list` in `Jenkinsfile`. + This allows your code to be automatically deployed to staging after your branch is merged to main, and deployed to prod after `cocivcast-indicators` is released. * Create `ansible/templates/{top_level_directory_name}-params-prod.json.j2` based on your `params.json.template` with some adjustment: * "export_dir": "/common/covidcast/receiving/{data-source-name}" * "log_filename": "/var/log/indicators/{top_level_directory_name}.log" -Pay attention to the receiving/export directory, as well as how you can store credentials in vault. Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY-yTo-Av3oAFlhyG-vXGG-c/edit#heading=h.8kkoy8sx3t7f) for more vault info. +Pay attention to the receiving/export directory, as well as how you can store credentials in vault. +Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY-yTo-Av3oAFlhyG-vXGG-c/edit#heading=h.8kkoy8sx3t7f) for more vault info. -### Staging: +### Staging -After developing the pipeline code, but before release to prod, the pipeline should be run on staging for at least a week. +After developing the pipeline code, but before release to prod, the pipeline should be run on staging for at least a week. -The indicator run code is automatically deployed on staging after your branch is merged into main. After merging, make sure you 1. have proper access to Cronicle and staging server `app-mono-dev-01.delphi.cmu.edu` and 2. can see your code on staging at `/home/indicators/runtime/`. +The indicator run code is automatically deployed on staging after your branch is merged into main. After merging, make sure you have proper access to Cronicle and staging server `app-mono-dev-01.delphi.cmu.edu` _and_ can see your code on staging at `/home/indicators/runtime/`. Then, on Cronicle, create two jobs: First one to run the indicator and second one to load the output csv files into database. -#### Indicator run job: +#### Indicator run job + This job ssh into our staging server and run the indicator, producing csv files output. Example script: + ``` #!/bin/sh @@ -396,18 +400,21 @@ ssh -T -l ${user} ${host} "sudo -u indicators -s bash -c 'cd /home/indicators/ru Note the staging hostname in `host`. -Note that `ind_name` variable here refer to the top-level directory name where code is located, while `acq_ind_name` refer to the directory name where output csv files are located, which corresponds to the name of `source` column in our database, as mentioned in step 3. +Note that `ind_name` variable here refer to the top-level directory name where code is located, while `acq_ind_name` refer to the directory name where output csv files are located, which corresponds to the name of `source` column in our database, as mentioned in step 3. To automatically run acquisition job right after indicator job finishes successfully: + 1. In `Plugin` section, select `Interpret JSON in Output`. 2. In `Chain Reaction` section, select your acquisition run job below to `Run Event on Success` + You can read more about how the `chain_data` json object in the script above can be used in our subsequent acquisition job [here](https://github.com/jhuckaby/Cronicle/blob/master/docs/Plugins.md#chain-reaction-control). -#### Acquisition job: +#### Acquisition job The acquisition job use chained data from indicator job to determine where csv output files are, then load them into our database. Example script: + ``` #!/usr/bin/python3 @@ -431,10 +438,12 @@ print(std_err.decode('UTF-8')) print(std_out.decode('UTF-8')) ``` -#### Staging database checks: +#### Staging database checks + Apart from checking the logs of staging indicator run and acquisition jobs to identify potential issues with the pipeline, one can also check the contents of staging database for abnormalities. At this point, acquisition job should have loaded data onto staging mysql db, specifically the `covid` database. + ``` mysql> use covid; Database changed @@ -456,7 +465,10 @@ mysql> select * from signal_dim where source='nssp'; +---------------+--------+----------------------------------+ ``` -Then, check if number of records ingested in db matches with number of rows in csv when running locally. For example, the below query sets the `issue` date being the day acquisition job was run, and `signal_key_id` correspond with signals from our new source. Check if this count matches with local run result. +Then, check if number of records ingested in db matches with number of rows in csv when running locally. +For example, the below query sets the `issue` date being the day acquisition job was run, and `signal_key_id` correspond with signals from our new source. +Check if this count matches with local run result. + ``` mysql> SELECT count(*) FROM epimetric_full WHERE issue=202425 AND signal_key_id > 816 AND signal_key_id < 825; +----------+ @@ -466,11 +478,11 @@ mysql> SELECT count(*) FROM epimetric_full WHERE issue=202425 AND signal_key_id +----------+ 1 row in set (0.80 sec) ``` + You can also check how data looks more specifically at each geo level or among different signal names depending on the quirks of the source. See [@korlaxxalrok](https://www.github.com/korlaxxalrok) or [@minhkhul](https://www.github.com/minhkhul) for more information. - If everything goes well make a prod version of the indicator run job and use that to run indicator on a daily basis. From 6a289fdb7dde472b54aff3a5205a159b6dd38d4e Mon Sep 17 00:00:00 2001 From: minhkhul <118945681+minhkhul@users.noreply.github.com> Date: Tue, 9 Jul 2024 19:10:23 -0400 Subject: [PATCH 3/7] Update _template_python/INDICATOR_DEV_GUIDE.md Co-authored-by: David Weber --- _template_python/INDICATOR_DEV_GUIDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_template_python/INDICATOR_DEV_GUIDE.md b/_template_python/INDICATOR_DEV_GUIDE.md index 6ab3b5c29..66b63fde7 100644 --- a/_template_python/INDICATOR_DEV_GUIDE.md +++ b/_template_python/INDICATOR_DEV_GUIDE.md @@ -411,7 +411,7 @@ You can read more about how the `chain_data` json object in the script above can #### Acquisition job -The acquisition job use chained data from indicator job to determine where csv output files are, then load them into our database. +The indicator job loads the location of the relevant csv output files into chained data, which the acquisition job then loads into our database. Example script: From d2fafd11c4d913a4e6e3a63f14cf27c76c3d1738 Mon Sep 17 00:00:00 2001 From: minhkhul <118945681+minhkhul@users.noreply.github.com> Date: Tue, 9 Jul 2024 19:10:48 -0400 Subject: [PATCH 4/7] Update _template_python/INDICATOR_DEV_GUIDE.md Co-authored-by: David Weber --- _template_python/INDICATOR_DEV_GUIDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_template_python/INDICATOR_DEV_GUIDE.md b/_template_python/INDICATOR_DEV_GUIDE.md index 66b63fde7..7808d7693 100644 --- a/_template_python/INDICATOR_DEV_GUIDE.md +++ b/_template_python/INDICATOR_DEV_GUIDE.md @@ -378,7 +378,7 @@ Then, on Cronicle, create two jobs: First one to run the indicator and second on #### Indicator run job -This job ssh into our staging server and run the indicator, producing csv files output. +This job signs into our staging server via ssh, runs the indicator, producing csv files as output. Example script: From 67152f7b71f7be7667bcb8e6cd1e76d4aadab9d6 Mon Sep 17 00:00:00 2001 From: minhkhul <118945681+minhkhul@users.noreply.github.com> Date: Tue, 9 Jul 2024 19:10:53 -0400 Subject: [PATCH 5/7] Update _template_python/INDICATOR_DEV_GUIDE.md Co-authored-by: David Weber --- _template_python/INDICATOR_DEV_GUIDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_template_python/INDICATOR_DEV_GUIDE.md b/_template_python/INDICATOR_DEV_GUIDE.md index 7808d7693..18788de0c 100644 --- a/_template_python/INDICATOR_DEV_GUIDE.md +++ b/_template_python/INDICATOR_DEV_GUIDE.md @@ -360,7 +360,7 @@ Next, the `acquisition.covidcast` component of the `delphi-epidata` codebase doe * Add module name to the `build` job in `.github/workflows/python-ci.yml`. This allows github actions to run on this indicator code, which includes unit tests and linting. * Add top-level directory name to `indicator_list` in `Jenkinsfile`. - This allows your code to be automatically deployed to staging after your branch is merged to main, and deployed to prod after `cocivcast-indicators` is released. + This allows your code to be automatically deployed to staging after your branch is merged to main, and deployed to prod after `covidcast-indicators` is released. * Create `ansible/templates/{top_level_directory_name}-params-prod.json.j2` based on your `params.json.template` with some adjustment: * "export_dir": "/common/covidcast/receiving/{data-source-name}" * "log_filename": "/var/log/indicators/{top_level_directory_name}.log" From 519582acb483ab26b9a516d0c1cabaa6aa7bb436 Mon Sep 17 00:00:00 2001 From: minhkhul <118945681+minhkhul@users.noreply.github.com> Date: Tue, 9 Jul 2024 19:29:52 -0400 Subject: [PATCH 6/7] Update INDICATOR_DEV_GUIDE.md with mysql syntax --- _template_python/INDICATOR_DEV_GUIDE.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/_template_python/INDICATOR_DEV_GUIDE.md b/_template_python/INDICATOR_DEV_GUIDE.md index 18788de0c..7b3992d58 100644 --- a/_template_python/INDICATOR_DEV_GUIDE.md +++ b/_template_python/INDICATOR_DEV_GUIDE.md @@ -444,7 +444,23 @@ Apart from checking the logs of staging indicator run and acquisition jobs to id At this point, acquisition job should have loaded data onto staging mysql db, specifically the `covid` database. +From staging: ``` +[user@app-mono-dev-01 ~]$ mysql -u user -p +Enter password: +Welcome to the MySQL monitor. Commands end with ; or \g. +Your MySQL connection id is 00000 +Server version: 8.0.36-28 Percona Server (GPL), Release 28, Revision 47601f19 + +Copyright (c) 2009-2024 Percona LLC and/or its affiliates +Copyright (c) 2000, 2024, Oracle and/or its affiliates. + +Oracle is a registered trademark of Oracle Corporation and/or its +affiliates. Other names may be trademarks of their respective +owners. + +Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. + mysql> use covid; Database changed ``` From 9972728a223fd6eec1f72bff2aac5a43ae261e6f Mon Sep 17 00:00:00 2001 From: minhkhul <118945681+minhkhul@users.noreply.github.com> Date: Tue, 9 Jul 2024 19:34:13 -0400 Subject: [PATCH 7/7] Update INDICATOR_DEV_GUIDE.md reorder instruction on cron jobs --- _template_python/INDICATOR_DEV_GUIDE.md | 62 +++++++++++++------------ 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/_template_python/INDICATOR_DEV_GUIDE.md b/_template_python/INDICATOR_DEV_GUIDE.md index 7b3992d58..37aca8ff7 100644 --- a/_template_python/INDICATOR_DEV_GUIDE.md +++ b/_template_python/INDICATOR_DEV_GUIDE.md @@ -374,7 +374,38 @@ After developing the pipeline code, but before release to prod, the pipeline sho The indicator run code is automatically deployed on staging after your branch is merged into main. After merging, make sure you have proper access to Cronicle and staging server `app-mono-dev-01.delphi.cmu.edu` _and_ can see your code on staging at `/home/indicators/runtime/`. -Then, on Cronicle, create two jobs: First one to run the indicator and second one to load the output csv files into database. +Then, on Cronicle, create two jobs: First one to run the indicator and second one to load the output csv files into database. + +We start by setting up the acquisition job. + +#### Acquisition job + +The indicator job loads the location of the relevant csv output files into chained data, which this acquisition job then loads into our database. + +Example script: + +``` +#!/usr/bin/python3 + +import subprocess +import json + +str_data = input() +print(str_data) + +data = json.loads(str_data, strict=False) +chain_data = data["chain_data"] +user = chain_data["user"] +host = chain_data["host"] +acq_ind_name = chain_data["acq_ind_name"] + +cmd = f'''ssh -T -l {user} {host} "cd ~/driver && python3 -m delphi.epidata.acquisition.covidcast.csv_to_database --data_dir=/common/covidcast --indicator_name={acq_ind_name} --log_file=/var/log/epidata/csv_upload_{acq_ind_name}.log"''' + +std_err, std_out = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() + +print(std_err.decode('UTF-8')) +print(std_out.decode('UTF-8')) +``` #### Indicator run job @@ -409,35 +440,6 @@ To automatically run acquisition job right after indicator job finishes successf You can read more about how the `chain_data` json object in the script above can be used in our subsequent acquisition job [here](https://github.com/jhuckaby/Cronicle/blob/master/docs/Plugins.md#chain-reaction-control). -#### Acquisition job - -The indicator job loads the location of the relevant csv output files into chained data, which the acquisition job then loads into our database. - -Example script: - -``` -#!/usr/bin/python3 - -import subprocess -import json - -str_data = input() -print(str_data) - -data = json.loads(str_data, strict=False) -chain_data = data["chain_data"] -user = chain_data["user"] -host = chain_data["host"] -acq_ind_name = chain_data["acq_ind_name"] - -cmd = f'''ssh -T -l {user} {host} "cd ~/driver && python3 -m delphi.epidata.acquisition.covidcast.csv_to_database --data_dir=/common/covidcast --indicator_name={acq_ind_name} --log_file=/var/log/epidata/csv_upload_{acq_ind_name}.log"''' - -std_err, std_out = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() - -print(std_err.decode('UTF-8')) -print(std_out.decode('UTF-8')) -``` - #### Staging database checks Apart from checking the logs of staging indicator run and acquisition jobs to identify potential issues with the pipeline, one can also check the contents of staging database for abnormalities.