Kaiyang expansion project 2022
Description
@leahecole @bradmiro This is the draft PR for Kaiyang Yu's expansion project. The DAG script is an expansion of data_analytics_dag.py and the PySpark code is an expansion of the data_analytics_process.py. The entire workflow is trying to answer the question: "How has the rainfall and snowfall patterns in the western US changed over the past 25 years?" and "How has the rainfall and snowfall patterns in Phoenix changed over the past 25 years?". The instructions are specifically designed for @leahecole and @bradmiro as others may not have access to the recourses.
How to run
The expansion project can be run in the following way:
- Download the ghcnd-stations-new.txt (if the link doesn't work, you can find it in the
workshop_example_bucketGCS bucket) file and upload it to your desired GCS bucket. This is the dataset after pre-processing. - Upload the
data_analytics_process_expansion.pyto the same GCS bucket as the last step. - Create a Cloud Composer environment with the latest version of Composer and Airflow. Add the following variables using Airflow UI:
-
dataproc_service_account: #######[email protected] -
gce_region: us-central1 -
gcp_project: <your_gcp_project> -
gcs_bucket: the bucket you created in step one
-
- Upload the
data_analytics_dag_expansion.pyto the Composer environment you just created and trigger the DAG
Alternatively, you can also directly run it with the same environment that I'm using:
- Navigate to the Cloud Composer console and select the environment called
expansion_project - Select
data_analytics_dag - Trigger the DAG
If you just want to run the PySpark code:
- In
data_analytics_process_expansion.py, comment out line 32 to 38 (inclusive), uncomment line 23 to 29 and 40 to 46 (inclusive). - Upload the new
data_analytics_process_expansion.pyto a GCS bucket. - Run
gcloud dataproc jobs submit pyspark gs://path_to_your_file_from_last_step --cluster=your_cluster --region=us-central1 --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jarusing Cloud Shell.
Alternatively, you can run the PySpark code using my cluster:
- In
data_analytics_process_expansion.py, comment out line 32 to 38 (inclusive), uncomment line 23 to 29 and 40 to 46 (inclusive). - Re-upload the
data_analytics_process_expansion.pyto theworkshop_example_bucketGCS bucket, overwriting the original file. - In Cloud Shell, run
gcloud dataproc jobs submit pyspark gs://workshop_example_bucket/data_analytics_process_expansion.py --cluster=cluster-d630 --region=us-central1 --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar.
View results
- You can view the results in BigQuery, under the dataset
expansion_project:-
ghcnd_stations_joined: merged dataset after the BigQuery query job. -
ghcnd_stations_normalization: dataset after row filtering and unit normalization. -
ghcnd_stations_prcp_mean: arithmetic mean of annual precipitation in western US over the past 25 years -
ghcnd_stations_snow_mean: arithmetic mean of annual snowfall in western US over the past 25 years -
phx_annual_prcp: annual precipitation in Phoenix over the past 25 years (result of distance weighting algorithm) -
phx_annual_snow: annual snowfall in Phoenix over the past 25 years (result of distance weighting algorithm)
-
:warning: CAUTION: Before running the DAG, be sure to remove the ghcnd-stations-joined dataset since the DAG codes features a WRITE_APPEND write disposition and the dataset will double in size every time the DAG runs. You don't have to worry about it if you are only running the PySpark program. However, be sure that the ghcnd-stations-joined dataset exists in BQ if you're only running the PySpark code. |
|---|
Next step
- Add test for
data_analytics_process_expansion.py. - Delete the
print()functions, as they are here only for debugging purposes.
Checklist
- [x] I have followed Sample Guidelines from AUTHORING_GUIDE.MD
- [x] README is updated to include all relevant information
- [x] Tests pass:
nox -s py-3.9(see Test Environment Setup) - [x] Lint pass:
nox -s lint(see Test Environment Setup) - [ ] These samples need a new API enabled in testing projects to pass (let us know which ones)
- [ ] These samples need a new/updated env vars in testing projects set to pass (let us know which ones)
- [x] Please merge this PR for me once it is approved.
- [ ] This sample adds a new sample directory, and I updated the CODEOWNERS file with the codeowners for this sample
Ok I haven't even started reviewing but this PR description is 🔥 and because of that it has me excited to review it , well done @kaiyang-code