Fetch#

class Fetch(ev)[source]#

Component class for fetching data from external sources.

Methods

nwm_operational_grids

Fetch NWM operational gridded data, calculate zonal statistics (currently only mean is available) of selected variable for given zones, and load into the TEEHR dataset.

nwm_operational_points

Fetch operational NWM point data and load into the TEEHR dataset.

nwm_retrospective_grids

Fetch NWM retrospective gridded data, calculate zonal statistics (currently only mean is available) of selected variable for given zones, and load into the TEEHR dataset.

nwm_retrospective_points

Fetch NWM retrospective point data and load into the TEEHR dataset.

usgs_streamflow

Fetch USGS gage data and load into the TEEHR dataset.

nwm_operational_grids(nwm_configuration: str, output_type: str, variable_name: str, nwm_version: SupportedNWMOperationalVersionsEnum, start_date: str | datetime | Timestamp, end_date: str | datetime | Timestamp | None = None, ingest_days: int | None = None, calculate_zonal_weights: bool = False, location_id_prefix: str | None = None, data_source: SupportedNWMDataSourcesEnum | None = 'GCS', kerchunk_method: SupportedKerchunkMethod | None = 'local', prioritize_analysis_value_time: bool | None = False, t_minus_hours: List[int] | None = None, ignore_missing_file: bool | None = True, overwrite_output: bool | None = False, timeseries_type: TimeseriesTypeEnum = 'secondary', table_name: str | None = None, starting_z_hour: int | None = None, ending_z_hour: int | None = None, write_mode: str = 'append', zonal_weights_filepath: Path | str | None = None, drop_duplicates: bool = True, drop_overlapping_assimilation_values: bool = True)[source]#

Fetch NWM operational gridded data, calculate zonal statistics (currently only mean is available) of selected variable for given zones, and load into the TEEHR dataset.

Data is fetched for the location IDs in the locations table having a given location_id_prefix. All dates and times within the files and in the cache file names are in UTC.

The zonal weights file, which contains the fraction each grid pixel overlaps each zone is necessary, and can be calculated and saved to the cache directory if it does not already exist.

Parameters:
  • nwm_configuration (str) – NWM forecast category. (e.g., “analysis_assim”, “short_range”, …).

  • output_type (str) – Output component of the nwm_configuration. (e.g., “channel_rt”, “reservoir”, …).

  • variable_name (str) – Name of the NWM data variable to download. (e.g., “streamflow”, “velocity”, …).

  • nwm_version (SupportedNWMOperationalVersionsEnum) – The NWM operational version. “nwm12”, “nwm20”, “nwm21”, “nwm22”, or “nwm30”. Note that there is no change in NWM configuration between version 2.1 and 2.2, and they are treated as the same version. They are both allowed here for convenience.

    Availability of each version:

    • v1.2: 2018-09-17 - 2019-06-18

    • v2.0: 2019-06-19 - 2021-04-19

    • v2.1/2.2: 2021-04-20 - 2023-09-18

    • v3.0: 2023-09-19 - present

  • start_date (Union[str, datetime, pd.Timestamp]) – Date and time to begin data ingest. Str formats can include YYYY-MM-DD HH:MM or MM/DD/YYYY HH:MM.

  • end_date (Optional[Union[str, datetime, pd.Timestamp]],) – Date and time to end data ingest. Str formats can include YYYY-MM-DD HH:MM or MM/DD/YYYY HH:MM. If not provided, must provide ingest_days.

  • ingest_days (Optional[int]) – Number of days to ingest data after start date. This is deprecated in favor of end_date, and will be removed in a future release. If both are provided, ingest_days takes precedence. If not provided, end_date must be specified.

  • calculate_zonal_weights (bool) – Flag specifying whether or not to calculate zonal weights. True = calculate; False = use existing file. Default is False.

  • location_id_prefix (Optional[str]) – Prefix to include when filtering the locations table for polygon primary_location_id. Default is None, all locations are included.

  • data_source (Optional[SupportedNWMDataSourcesEnum]) – Specifies the remote location from which to fetch the data “GCS” (default), “NOMADS”, or “DSTOR”. Currently only “GCS” is implemented.

  • kerchunk_method (Optional[SupportedKerchunkMethod]) – When data_source = “GCS”, specifies the preference in creating Kerchunk reference json files. “local” (default) will create new json files from netcdf files in GCS and save to a local directory if they do not already exist locally, in which case the creation is skipped. “remote” - read the CIROH pre-generated jsons from s3, ignoring any that are unavailable. “auto” - read the CIROH pre-generated jsons from s3, and create any that are unavailable, storing locally.

  • prioritize_analysis_value_time (Optional[bool]) – A boolean flag that determines the method of fetching analysis-assimilation data. When False (default), all non-overlapping value_time hours (prioritizing the most recent reference_time) are included in the output. When True, only the hours within t_minus_hours are included.

  • t_minus_hours (Optional[Iterable[int]]) – Specifies the look-back hours to include if an assimilation nwm_configuration is specified. Only utilized if assimilation data is requested and prioritize_analysis_value_time is True.

  • ignore_missing_file (bool) – Flag specifying whether or not to fail if a missing NWM file is encountered True = skip and continue; False = fail.

  • overwrite_output (bool) – Flag specifying whether or not to overwrite output files if they already exist. True = overwrite; False = fail.

  • timeseries_type (str) – Whether to consider as the “primary” or “secondary” timeseries. Default is “secondary”, unless the configuration is a analysis containing assimilation, in which case the default is “primary”.

  • table_name (str) – The name of the table to load the data into. Must be either “primary_timeseries” or “secondary_timeseries”. This is redundant to, and takes precedence over timeseries_type, which is deprecated.

  • starting_z_hour (Optional[int]) – The starting z_hour to include in the output. If None, z_hours for the first day are determined by start_date. Default is None. Must be between 0 and 23.

  • ending_z_hour (Optional[int]) – The ending z_hour to include in the output. If None, z_hours for the last day are determined by end_date if provided, otherwise all z_hours are included in the final day. Default is None. Must be between 0 and 23.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append” or “upsert”. If “append”, the Evaluation table will be appended with new data that does not already exist. If “upsert”, existing data will be replaced and new data that does not exist will be appended.

  • zonal_weights_filepath (Optional[Union[Path, str]]) – The path to the zonal weights file. If None and calculate_zonal_weights is False, the weights file must exist in the cache for the configuration. Default is None.

  • drop_duplicates (bool) – Whether to drop duplicates in the data. Default is True.

  • drop_overlapping_assimilation_values (Optional[bool] = True) – Whether to drop assimilation values that overlap in value_time. Default is True. If True, values that overlap in value_time are dropped, keeping those with the most recent reference_time. In this case, all reference_time values are set to None. If False, overlapping values are kept and reference_time is retained.

Note

Data in the cache is cleared before each call to the fetch method. So if a long-running fetch is interrupted before the data is automatically loaded into the Evaluation, it should be loaded or cached manually. This will prevent it from being deleted when the fetch job is resumed.

Notes

The NWM variables, including nwm_configuration, output_type, and variable_name are stored as a pydantic model in grid_config_models.py.

The cached forecast and assimilation data is grouped and saved one file per reference time, using the file name convention “YYYYMMDDTHH”.

All dates and times within the files and in the file names are in UTC.

Examples

Here we will calculate mean areal precipitation using operational NWM forcing data for the polygons in the locations table. Pixel weights (fraction of pixel overlap) are calculated for each polygon and stored in the evaluation cache directory.

(see: generate_weights_file() for weights calculation).

>>> import teehr
>>> ev = teehr.LocalReadWriteEvaluation()
>>> ev.fetch.nwm_operational_grids(
>>>     nwm_configuration="forcing_short_range",
>>>     output_type="forcing",
>>>     variable_name="RAINRATE",
>>>     start_date=datetime(2000, 1, 1),
>>>     end_date=datetime(2000, 1, 2),
>>>     Path(Path.home(), "nextgen_03S_weights.parquet"),
>>>     nwm_version="nwm22",
>>>     data_source="GCS",
>>>     kerchunk_method="auto"
>>> )

Note

NWM data can also be fetched outside of a TEEHR Evaluation by calling the method directly.

>>> from teehr.fetching.nwm.nwm_grids import nwm_grids_to_parquet

Perform the calculations, writing to the specified directory.

>>> nwm_grids_to_parquet(
>>>     nwm_configuration=forcing_short_range,
>>>     output_type=forcing,
>>>     variable_name=RAINRATE,
>>>     start_date="2020-12-18",
>>>     end_date="2020-12-19",
>>>     zonal_weights_filepath=Path(Path.home(), "nextgen_03S_weights.parquet"),
>>>     json_dir=Path(Path.home(), "temp/parquet/jsons/"),
>>>     output_parquet_dir=Path(Path.home(), "temp/parquet"),
>>>     nwm_version="nwm21",
>>>     data_source="GCS",
>>>     kerchunk_method="auto",
>>>     t_minus_hours=[0, 1, 2],
>>>     ignore_missing_file=True,
>>>     overwrite_output=True
>>> )

See also

teehr.fetching.nwm.nwm_grids.nwm_grids_to_parquet()

nwm_operational_points(nwm_configuration: str, output_type: str, variable_name: str, nwm_version: SupportedNWMOperationalVersionsEnum, start_date: str | datetime | Timestamp, end_date: str | datetime | Timestamp | None = None, ingest_days: int | None = None, data_source: SupportedNWMDataSourcesEnum | None = 'GCS', kerchunk_method: SupportedKerchunkMethod | None = 'local', prioritize_analysis_value_time: bool | None = False, t_minus_hours: List[int] | None = None, process_by_z_hour: bool | None = True, stepsize: int | None = 100, ignore_missing_file: bool | None = True, overwrite_output: bool | None = False, timeseries_type: TimeseriesTypeEnum = 'secondary', table_name: str | None = None, starting_z_hour: int | None = None, ending_z_hour: int | None = None, write_mode: str = 'append', drop_duplicates: bool = True, drop_overlapping_assimilation_values: bool | None = True)[source]#

Fetch operational NWM point data and load into the TEEHR dataset.

Data is fetched for all secondary location IDs in the locations crosswalk table that are prefixed by the NWM version, and all dates and times within the files and in the cache file names are in UTC.

Parameters:
  • nwm_configuration (str) – NWM forecast category. (e.g., “analysis_assim”, “short_range”, …).

  • output_type (str) – Output component of the nwm_configuration. (e.g., “channel_rt”, “reservoir”, …).

  • variable_name (str) – Name of the NWM data variable to download. (e.g., “streamflow”, “velocity”, …).

  • nwm_version (SupportedNWMOperationalVersionsEnum) – The NWM operational version. “nwm12”, “nwm20”, “nwm21”, “nwm22”, or “nwm30”. Note that there is no change in NWM configuration between version 2.1 and 2.2, and they are treated as the same version. They are both allowed here for convenience.

    Availability of each version:

    • v1.2: 2018-09-17 - 2019-06-18

    • v2.0: 2019-06-19 - 2021-04-19

    • v2.1/2.2: 2021-04-20 - 2023-09-18

    • v3.0: 2023-09-19 - present

  • start_date (Union[str, datetime, pd.Timestamp]) – Date and time to begin data ingest. Str formats can include YYYY-MM-DD HH:MM or MM/DD/YYYY HH:MM.

  • end_date (Optional[Union[str, datetime, pd.Timestamp]],) – Date and time to end data ingest. Str formats can include YYYY-MM-DD HH:MM or MM/DD/YYYY HH:MM. If not provided, must provide ingest_days.

  • ingest_days (Optional[int]) – Number of days to ingest data after start date. This is deprecated in favor of end_date, and will be removed in a future release. If both are provided, ingest_days takes precedence. If not provided, end_date must be specified.

  • data_source (Optional[SupportedNWMDataSourcesEnum]) – Specifies the remote location from which to fetch the data “GCS” (default), “NOMADS”, or “DSTOR” Currently only “GCS” is implemented.

  • kerchunk_method (Optional[SupportedKerchunkMethod]) – When data_source = “GCS”, specifies the preference in creating Kerchunk reference json files. “local” (default) will create new json files from netcdf files in GCS and save to a local directory if they do not already exist locally, in which case the creation is skipped. “remote” - read the CIROH pre-generated jsons from s3, ignoring any that are unavailable. “auto” - read the CIROH pre-generated jsons from s3, and create any that are unavailable, storing locally.

  • prioritize_analysis_value_time (Optional[bool]) – A boolean flag that determines the method of fetching analysis-assimilation data. When False (default), all non-overlapping value_time hours (prioritizing the most recent reference_time) are included in the output. When True, only the hours within t_minus_hours are included.

  • t_minus_hours (Optional[List[int]]) – Specifies the look-back hours to include if an assimilation nwm_configuration is specified. Only utilized if assimilation data is requested and prioritize_analysis_value_time is True.

  • process_by_z_hour (Optional[bool]) – A boolean flag that determines the method of grouping files for processing. The default is True, which groups by day and z_hour. False groups files sequentially into chunks, whose size is determined by stepsize. This allows users to process more data potentially more efficiently, but runs to risk of splitting up forecasts into separate output files.

  • stepsize (Optional[int]) – The number of json files to process at one time. Used if process_by_z_hour is set to False. Default value is 100. Larger values can result in greater efficiency but require more memory.

  • ignore_missing_file (Optional[bool]) – Flag specifying whether or not to fail if a missing NWM file is encountered. True = skip and continue; False = fail.

  • overwrite_output (Optional[bool]) – Flag specifying whether or not to overwrite output files if they already exist. True = overwrite; False = fail.

  • timeseries_type (str) – Whether to consider as the “primary” or “secondary” timeseries. Default is “secondary”.

  • table_name (str) – The name of the table to load the data into. Must be either “primary_timeseries” or “secondary_timeseries”. This is redundant to, and takes precedence over timeseries_type, which is deprecated.

  • starting_z_hour (Optional[int]) – The starting z_hour to include in the output. If None, z_hours for the first day are determined by start_date. Default is None. Must be between 0 and 23.

  • ending_z_hour (Optional[int]) – The ending z_hour to include in the output. If None, z_hours for the last day are determined by end_date if provided, otherwise all z_hours are included in the final day. Default is None. Must be between 0 and 23.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append” or “upsert”. If “append”, the Evaluation table will be appended with new data that does not already exist. If “upsert”, existing data will be replaced and new data that does not exist will be appended.

  • drop_duplicates (bool) – Whether to drop duplicates in the data. Default is True.

  • drop_overlapping_assimilation_values (Optional[bool] = True) – Whether to drop assimilation values that overlap in value_time. Default is True. If True, values that overlap in value_time are dropped, keeping those with the most recent reference_time. In this case, all reference_time values are set to None. If False, overlapping values are kept and reference_time is retained.

Note

Data in the cache is cleared before each call to the fetch method. So if a long-running fetch is interrupted before the data is automatically loaded into the Evaluation, it should be loaded or cached manually. This will prevent it from being deleted when the fetch job is resumed.

Notes

The NWM variables, including nwm_configuration, output_type, and variable_name are stored as pydantic models in point_config_models.py

The cached forecast and assimilation data is grouped and saved one file per reference time, using the file name convention “YYYYMMDDTHH”.

Examples

Here we fetch operational streamflow forecasts for NWM v2.2 from GCS, and load into the TEEHR dataset.

>>> import teehr
>>> ev = teehr.LocalReadWriteEvaluation()
>>> ev.fetch.nwm_operational_points(
>>>     nwm_configuration="short_range",
>>>     output_type="channel_rt",
>>>     variable_name="streamflow",
>>>     start_date=datetime(2000, 1, 1),
>>>     end_date=datetime(2000, 1, 2),
>>>     nwm_version="nwm21",
>>>     data_source="GCS",
>>>     kerchunk_method="auto"
>>> )

Note

NWM data can also be fetched outside of a TEEHR Evaluation by calling the method directly.

>>> from teehr.fetching.nwm.nwm_points import nwm_to_parquet

Fetch and format the data, writing to the specified directory.

>>> nwm_to_parquet(
>>>     nwm_configuration="short_range",
>>>     output_type="channel_rt",
>>>     variable_name="streamflow",
>>>     start_date="2023-03-18",
>>>     end_date="2023-03-19",
>>>     location_ids=LOCATION_IDS,
>>>     json_dir=Path(Path.home(), "temp/parquet/jsons/"),
>>>     output_parquet_dir=Path(Path.home(), "temp/parquet"),
>>>     nwm_version="nwm21",
>>>     data_source="GCS",
>>>     kerchunk_method="auto",
>>>     process_by_z_hour=True,
>>>     ignore_missing_file=True,
>>>     overwrite_output=True,
>>> )

See also

teehr.fetching.nwm.nwm_points.nwm_to_parquet()

nwm_retrospective_grids(nwm_version: SupportedNWMRetroVersionsEnum, variable_name: ForcingVariablesEnum, start_date: str | datetime | Timestamp, end_date: str | datetime | Timestamp, calculate_zonal_weights: bool = False, overwrite_output: bool | None = False, chunk_by: NWMChunkByEnum | None = None, domain: SupportedNWMRetroDomainsEnum | None = 'CONUS', location_id_prefix: str | None = None, timeseries_type: TimeseriesTypeEnum = 'primary', table_name: str | None = None, write_mode: str = 'append', zonal_weights_filepath: Path | str | None = None, drop_duplicates: bool = True)[source]#

Fetch NWM retrospective gridded data, calculate zonal statistics (currently only mean is available) of selected variable for given zones, and load into the TEEHR dataset.

Data is fetched for the location IDs in the locations table having a given location_id_prefix. All dates and times within the files and in the cache file names are in UTC.

The zonal weights file, which contains the fraction each grid pixel overlaps each zone is necessary, and can be calculated and saved to the cache directory if it does not already exist.

Parameters:
  • nwm_version (SupportedNWMRetroVersionsEnum) – NWM retrospective version to fetch. Currently nwm21 and nwm30 supported. Note that since there is no change in NWM configuration between version 2.1 and 2.2, no retrospective run was produced for v2.2.

  • variable_name (str) – Name of the NWM forcing data variable to download. (e.g., “PRECIP”, “PSFC”, “Q2D”, …).

  • start_date (Union[str, datetime, pd.Timestamp]) – Date and time to begin data ingest. Str formats can include YYYY-MM-DD HH:MM or MM/DD/YYYY HH:MM

    • v2.0: 1993-01-01

    • v2.1: 1979-01-01

    • v3.0: 1979-02-01

  • end_date (Union[str, datetime, pd.Timestamp],) – Date and time to end data ingest. Str formats can include YYYY-MM-DD HH:MM or MM/DD/YYYY HH:MM

    • v2.0: 2018-12-31

    • v2.1: 2020-12-31

    • v3.0: 2023-01-31

  • calculate_zonal_weights (bool) – Flag specifying whether or not to calculate zonal weights. True = calculate; False = use existing file. Default is False.

  • location_id_prefix (Optional[str]) – Prefix to include when filtering the locations table for polygon primary_location_id. Default is None, all locations are included.

  • overwrite_output (bool) – Flag specifying whether or not to overwrite output files if they already exist. True = overwrite; False = fail.

  • chunk_by (Optional[NWMChunkByEnum] = None,) – If None (default) saves all timeseries to a single file, otherwise the data is processed using the specified parameter. Can be: ‘week’ or ‘month’ for gridded data.

  • domain (str = "CONUS") – Geographical domain when NWM version is v3.0. Acceptable values are “Alaska”, “CONUS” (default), “Hawaii”, and “PR”. Only relevant when NWM version equals v3.0.

  • timeseries_type (str) – Whether to consider as the “primary” or “secondary” timeseries. Default is “primary”.

  • table_name (str) – The name of the table to load the data into. Must be either “primary_timeseries” or “secondary_timeseries”. This is redundant to, and takes precedence over timeseries_type, which is deprecated.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append”, “upsert”, or “create_or_replace”.

  • zonal_weights_filepath (Optional[Union[Path, str]]) – The path to the zonal weights file. If None and calculate_zonal_weights is False, the weights file must exist in the cache for the configuration. Default is None.

  • drop_duplicates (bool) – Whether to drop duplicates in the data. Default is True.

Examples

Here we will calculate mean areal precipitation using NWM forcing data for the polygons in the locations table. Pixel weights (fraction of pixel overlap) are calculated for each polygon and stored in the evaluation cache directory.

(see: generate_weights_file() for weights calculation).

>>> import teehr
>>> ev = teehr.LocalReadWriteEvaluation()
>>> ev.fetch.nwm_retrospective_grids(
>>>     nwm_version="nwm30",
>>>     variable_name="RAINRATE",
>>>     calculate_zonal_weights=True,
>>>     start_date=datetime(2000, 1, 1),
>>>     end_date=datetime(2001, 1, 1),
>>>     location_id_prefix="huc10"
>>> )

Note

NWM data can also be fetched outside of a TEEHR Evaluation by calling the method directly.

>>> from teehr.fetching.nwm.retrospective_grids import nwm_retro_grids_to_parquet

Perform the calculations, writing to the specified directory.

>>> nwm_retro_grids_to_parquet(
>>>     nwm_version="nwm30",
>>>     variable_name="RAINRATE",
>>>     zonal_weights_filepath=Path(Path.home(), "nextgen_03S_weights.parquet"),
>>>     start_date=2020-12-18,
>>>     end_date=2022-12-18,
>>>     output_parquet_dir=Path(Path.home(), "temp/parquet"),
>>>     location_id_prefix="huc10",
>>> )

See also

teehr.fetching.nwm.nwm_grids.nwm_grids_to_parquet()

nwm_retrospective_points(nwm_version: SupportedNWMRetroVersionsEnum, variable_name: ChannelRtRetroVariableEnum, start_date: str | datetime | Timestamp, end_date: str | datetime | Timestamp, chunk_by: NWMChunkByEnum | None = None, overwrite_output: bool | None = False, domain: SupportedNWMRetroDomainsEnum | None = 'CONUS', timeseries_type: TimeseriesTypeEnum = 'secondary', table_name: str | None = None, write_mode: str = 'append', drop_duplicates: bool = True)[source]#

Fetch NWM retrospective point data and load into the TEEHR dataset.

Data is fetched for all secondary location IDs in the locations crosswalk table that are prefixed by the NWM version, and all dates and times within the files and in the cache file names are in UTC.

Parameters:
  • nwm_version (SupportedNWMRetroVersionsEnum) – NWM retrospective version to fetch. Currently nwm20, nwm21, and nwm30 supported. Note that since there is no change in NWM configuration between version 2.1 and 2.2, no retrospective run was produced for v2.2.

  • variable_name (str) – Name of the NWM data variable to download. (e.g., “streamflow”, “velocity”, …).

  • start_date (Union[str, datetime, pd.Timestamp]) – Date and time to begin data ingest. Str formats can include YYYY-MM-DD HH:MM or MM/DD/YYYY HH:MM

    • v2.0: 1993-01-01

    • v2.1: 1979-01-01

    • v3.0: 1979-02-01

  • end_date (Union[str, datetime, pd.Timestamp],) – Date and time to end data ingest. Str formats can include YYYY-MM-DD HH:MM or MM/DD/YYYY HH:MM

    • v2.0: 2018-12-31

    • v2.1: 2020-12-31

    • v3.0: 2023-01-31

  • chunk_by (Union[NWMChunkByEnum, None] = None,) – If None (default) saves all timeseries to a single file, otherwise the data is processed using the specified parameter. Can be: ‘week’, ‘month’, or ‘year’.

  • overwrite_output (bool = False,) – Whether output should overwrite files if they exist. Default is False.

  • domain (str = "CONUS") – Geographical domain when NWM version is v3.0. Acceptable values are “Alaska”, “CONUS” (default), “Hawaii”, and “PR”. Only relevant when NWM version equals nwm30.

  • timeseries_type (str) – Whether to consider as the “primary” or “secondary” timeseries. Default is “primary”.

  • table_name (str) – The name of the table to load the data into. Must be either “primary_timeseries” or “secondary_timeseries”. This is redundant to, and takes precendence over timeseries_type, which is deprecated.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append” or “upsert”. If “append”, the Evaluation table will be appended with new data that does not already exist. If “upsert”, existing data will be replaced and new data that does not exist will be appended.

  • drop_duplicates (bool) – Whether to drop duplicates in the data. Default is True.

Note

Data in the cache is cleared before each call to the fetch method. So if a long-running fetch is interrupted before the data is automatically loaded into the Evaluation, it should be loaded or cached manually. This will prevent it from being deleted when the fetch job is resumed.

Examples

Here we fetch one days worth of NWM hourly streamflow data. Initially the data is saved to the cache directory, then it is validated and loaded into the TEEHR dataset.

>>> import teehr
>>> ev = teehr.LocalReadWriteEvaluation()
>>> ev.fetch.nwm_retrospective_points(
>>>     nwm_version="nwm30",
>>>     variable_name="streamflow",
>>>     start_date=datetime(2000, 1, 1),
>>>     end_date=datetime(2000, 1, 2, 23)
>>> )

Note

NWM data can also be fetched outside of a TEEHR Evaluation by calling the method directly.

>>> import teehr.fetching.nwm.retrospective_points as nwm_retro

Fetch and format the data, writing to the specified directory.

>>> nwm_retro.nwm_retro_to_parquet(
>>>     nwm_version="nwm20",
>>>     variable_name="streamflow",
>>>     start_date=datetime(2000, 1, 1),
>>>     end_date=datetime(2000, 1, 2, 23),
>>>     location_ids=[7086109, 7040481],
>>>     output_parquet_dir=Path(Path.home(), "nwm20_retrospective")
>>> )

See also

teehr.fetching.nwm.retrospective_points.nwm_retro_to_parquet()

usgs_streamflow(start_date: str | datetime | Timestamp, end_date: str | datetime | Timestamp, service: USGSServiceEnum = 'iv', chunk_by: USGSChunkByEnum | None = None, filter_to_hourly: bool = True, filter_no_data: bool = True, convert_to_si: bool = True, overwrite_output: bool | None = False, timeseries_type: TimeseriesTypeEnum = 'primary', table_name: str | None = None, write_mode: str = 'append', drop_duplicates: bool = True)[source]#

Fetch USGS gage data and load into the TEEHR dataset.

Data is fetched for all USGS IDs in the locations table, and all dates and times within the files and in the cached file names are in UTC.

Parameters:
  • start_date (Union[str, datetime, pd.Timestamp]) – Start date and time of data to fetch.

  • end_date (Union[str, datetime, pd.Timestamp]) – End date and time of data to fetch. Note, since start_date is inclusive for the USGS service, we subtract 1 minute from this time so we don’t get overlap between consecutive calls.

  • service (USGSServiceEnum, default = "iv") – The USGS service to use for fetching data (‘iv’ for hourly instantaneous values or ‘dv’ for daily mean values).

  • chunk_by (Union[str, None], default = None) – A string specifying how much data is fetched and read into memory at once. The default is to fetch all locations and all dates at once. Valid options = [“location_id”, “day”, “week”, “month”, “year”, None].

  • filter_to_hourly (bool = True) – Return only values that fall on the hour (i.e. drop 15 minute data).

  • filter_no_data (bool = True) – Filter out -999 values.

  • convert_to_si (bool = True) – Multiplies values by 0.3048**3 and sets measurement_units to “m3/s”.

  • overwrite_output (bool) – Flag specifying whether or not to overwrite output files if they already exist. True = overwrite; False = fail.

  • timeseries_type (str) – Whether to consider as the “primary” or “secondary” timeseries. Default is “primary”.

  • table_name (str) – The name of the table to load the data into. Must be either “primary_timeseries” or “secondary_timeseries”. This is redundant to, and takes precedence over timeseries_type, which is deprecated.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append” or “upsert”. If “append”, the Evaluation table will be appended with new data that does not already exist. If “upsert”, existing data will be replaced and new data that does not exist will be appended.

  • drop_duplicates (bool) – Whether to drop duplicates in the data. Default is True.

Note

In some edge cases, a gage site may contain one or more sub-locations that also measure discharge. To differentiate these sub-locations, the usgs_to_parquet method should be called directly, and a dictionary can be passed in for a site. Each dictionary should contain the site number and a description of the sub-location. The description is used to filter the data to the specific sub-location. For example: [{"site_no": "02449838", "description": "Main Gage"}] Note that the dictionary must contain the keywords 'site_no' and 'description'.

Note

Data in the cache is cleared before each call to the fetch method. So if a long-running fetch is interrupted before the data is automatically loaded into the Evaluation, it should be loaded or cached manually. This will prevent it from being deleted when the fetch job is resumed.

Examples

Here we fetch over a year of USGS hourly streamflow data. Initially the data is saved to the cache directory, then it is validated and loaded into the TEEHR dataset.

>>> import teehr
>>> ev = teehr.LocalReadWriteEvaluation()

Fetch the data for locations in the locations table.

>>> eval.fetch.usgs_streamflow(
>>>     start_date=datetime(2021, 1, 1),
>>>     end_date=datetime(2022, 2, 28)
>>> )

Note

USGS data can also be fetched outside of a TEEHR Evaluation by calling the method directly.

>>> from teehr.fetching.usgs.usgs import usgs_to_parquet

This requires specifying a list of USGS gage IDs and an output directory in addition to the above arguments.

>>> usgs_to_parquet(
>>>     sites=["02449838", "02450825"],
>>>     start_date=datetime(2023, 2, 20),
>>>     end_date=datetime(2023, 2, 25),
>>>     output_parquet_dir=Path(Path().home(), "temp", "usgs"),
>>>     chunk_by="day",
>>>     overwrite_output=True
>>> )