{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# NWM Point Data Loading" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Each river reach in the National Water Model is represented as a point, therefore output variables such as `streamflow` and `velocity` correspond to a single river reach via the `feature_id`. \n", "\n", "This notebook demonstrates the use of TEEHR to fetch operational NWM streamflow forecasts from Google Cloud Storage (GCS) and load them into the TEEHR data model as parquet files.\n", "\n", "TEEHR makes use of Dask when it can to parallelize data fetching and improve performance.\n", "\n", "The steps are fairly straightforward:\n", "* Import the TEEHR point loading module and other necessary libraries.\n", "* Specify input variables such a NWM version, forecast configuration, start date, etc.\n", "* Call the function to fetch and format the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Import the required packages.\n", "import os\n", "\n", "import teehr.loading.nwm.nwm_points as tlp\n", "\n", "from pathlib import Path\n", "from dask.distributed import Client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Specify input variables" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can define the variables that determining what NWM data to fetch, the NWM version, the source, how it should be processed, and the time span. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "CONFIGURATION = \"short_range\" # analysis_assim, short_range, analysis_assim_hawaii, medium_range_mem1\n", "OUTPUT_TYPE = \"channel_rt\"\n", "VARIABLE_NAME = \"streamflow\"\n", "T_MINUS = [0, 1, 2] # Only used if an assimilation run is selected\n", "\n", "NWM_VERSION = \"nwm22\" # Currently accepts \"nwm22\" or \"nwm30\"\n", " # Use \"nwm22\" for dates prior to 09-19-2023\n", "\n", "DATA_SOURCE = \"GCS\" # Specifies the remote location from which to fetch the data\n", " # (\"GCS\", \"NOMADS\", \"DSTOR\")\n", "\n", "KERCHUNK_METHOD = \"auto\" # When data_source = \"GCS\", specifies the preference in creating Kerchunk reference json files.\n", " # \"local\" - always create new json files from netcdf files in GCS and save locally, if they do not already exist\n", " # \"remote\" - read the CIROH pre-generated jsons from s3, ignoring any that are unavailable\n", " # \"auto\" - read the CIROH pre-generated jsons from s3, and create any that are unavailable, storing locally\n", "\n", "PROCESS_BY_Z_HOUR = True # If True, NWM files will be processed by z-hour per day. If False, files will be\n", " # processed in chunks (defined by STEPSIZE). This can help if you want to read many reaches\n", " # at once (all ~2.7 million for medium range for example).\n", "\n", "STEPSIZE = 100 # Only used if PROCESS_BY_Z_HOUR = False. Controls how many files are processed in memory at once\n", " # Higher values can increase performance at the expense on memory (default value: 100)\n", "\n", "IGNORE_MISSING_FILE = True # If True, the missing file(s) will be skipped and the process will resume\n", " # If False, TEEHR will fail if a missing NWM file is encountered\n", "\n", "OVERWRITE_OUTPUT = True # If True, existing output files will be overwritten\n", " # If False (default), existing files are retained\n", "\n", "START_DATE = \"2023-03-18\"\n", "INGEST_DAYS = 1\n", "\n", "OUTPUT_ROOT = Path(Path().home(), \"temp\")\n", "JSON_DIR = Path(OUTPUT_ROOT, \"zarr\", CONFIGURATION)\n", "OUTPUT_DIR = Path(OUTPUT_ROOT, \"timeseries\", CONFIGURATION)\n", "\n", "# For this simple example, we'll get data for 10 NWM reaches that coincide with USGS gauges\n", "LOCATION_IDS = [7086109, 7040481, 7053819, 7111205, 7110249, 14299781, 14251875, 14267476, 7152082, 14828145]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Start a local dask cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "TEEHR takes advantage of Dask under the hood, so starting a local Dask cluster will improve performance." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "n_workers = max(os.cpu_count() - 1, 1)\n", "client = Client(n_workers=n_workers)\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fetch the NWM data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we just need to make the call to the function." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%time\n", "tlp.nwm_to_parquet(\n", " configuration=CONFIGURATION,\n", " output_type=OUTPUT_TYPE,\n", " variable_name=VARIABLE_NAME,\n", " start_date=START_DATE,\n", " ingest_days=INGEST_DAYS,\n", " location_ids=LOCATION_IDS,\n", " json_dir=JSON_DIR,\n", " output_parquet_dir=OUTPUT_DIR,\n", " nwm_version=NWM_VERSION,\n", " data_source=DATA_SOURCE,\n", " kerchunk_method=KERCHUNK_METHOD,\n", " t_minus_hours=T_MINUS,\n", " process_by_z_hour=PROCESS_BY_Z_HOUR,\n", " stepsize=STEPSIZE,\n", " ignore_missing_file=IGNORE_MISSING_FILE,\n", " overwrite_output=OVERWRITE_OUTPUT,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The resulting parquet file should look something like this:\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.12" } }, "nbformat": 4, "nbformat_minor": 4 }