python-docs-samples icon indicating copy to clipboard operation
python-docs-samples copied to clipboard

Kaiyang expansion project 2022

Open kaiyang-code opened this issue 3 years ago • 1 comments

Description

@leahecole @bradmiro This is the draft PR for Kaiyang Yu's expansion project. The DAG script is an expansion of data_analytics_dag.py and the PySpark code is an expansion of the data_analytics_process.py. The entire workflow is trying to answer the question: "How has the rainfall and snowfall patterns in the western US changed over the past 25 years?" and "How has the rainfall and snowfall patterns in Phoenix changed over the past 25 years?". The instructions are specifically designed for @leahecole and @bradmiro as others may not have access to the recourses.

How to run

The expansion project can be run in the following way:

  1. Download the ghcnd-stations-new.txt (if the link doesn't work, you can find it in the workshop_example_bucket GCS bucket) file and upload it to your desired GCS bucket. This is the dataset after pre-processing.
  2. Upload the data_analytics_process_expansion.py to the same GCS bucket as the last step.
  3. Create a Cloud Composer environment with the latest version of Composer and Airflow. Add the following variables using Airflow UI:
    • dataproc_service_account: #######[email protected]
    • gce_region: us-central1
    • gcp_project: <your_gcp_project>
    • gcs_bucket: the bucket you created in step one
  4. Upload the data_analytics_dag_expansion.py to the Composer environment you just created and trigger the DAG

Alternatively, you can also directly run it with the same environment that I'm using:

  1. Navigate to the Cloud Composer console and select the environment called expansion_project
  2. Select data_analytics_dag
  3. Trigger the DAG

If you just want to run the PySpark code:

  1. In data_analytics_process_expansion.py, comment out line 32 to 38 (inclusive), uncomment line 23 to 29 and 40 to 46 (inclusive).
  2. Upload the new data_analytics_process_expansion.py to a GCS bucket.
  3. Run gcloud dataproc jobs submit pyspark gs://path_to_your_file_from_last_step --cluster=your_cluster --region=us-central1 --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar using Cloud Shell.

Alternatively, you can run the PySpark code using my cluster:

  1. In data_analytics_process_expansion.py, comment out line 32 to 38 (inclusive), uncomment line 23 to 29 and 40 to 46 (inclusive).
  2. Re-upload the data_analytics_process_expansion.py to the workshop_example_bucket GCS bucket, overwriting the original file.
  3. In Cloud Shell, run gcloud dataproc jobs submit pyspark gs://workshop_example_bucket/data_analytics_process_expansion.py --cluster=cluster-d630 --region=us-central1 --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar.

View results

  • You can view the results in BigQuery, under the dataset expansion_project:
    • ghcnd_stations_joined: merged dataset after the BigQuery query job.
    • ghcnd_stations_normalization: dataset after row filtering and unit normalization.
    • ghcnd_stations_prcp_mean: arithmetic mean of annual precipitation in western US over the past 25 years
    • ghcnd_stations_snow_mean: arithmetic mean of annual snowfall in western US over the past 25 years
    • phx_annual_prcp: annual precipitation in Phoenix over the past 25 years (result of distance weighting algorithm)
    • phx_annual_snow: annual snowfall in Phoenix over the past 25 years (result of distance weighting algorithm)
:warning: CAUTION: Before running the DAG, be sure to remove the ghcnd-stations-joined dataset since the DAG codes features a WRITE_APPEND write disposition and the dataset will double in size every time the DAG runs. You don't have to worry about it if you are only running the PySpark program. However, be sure that the ghcnd-stations-joined dataset exists in BQ if you're only running the PySpark code.

Next step

  • Add test for data_analytics_process_expansion.py.
  • Delete the print() functions, as they are here only for debugging purposes.

Checklist

kaiyang-code avatar Aug 01 '22 21:08 kaiyang-code

Ok I haven't even started reviewing but this PR description is 🔥 and because of that it has me excited to review it , well done @kaiyang-code

leahecole avatar Aug 02 '22 14:08 leahecole