Source code for prescient_sdk.upload

"""Upload files to bucket."""

from __future__ import annotations

import logging
import os
from collections.abc import Iterator
from pathlib import Path, PurePath
from typing import Optional

import boto3
import botocore.exceptions

from prescient_sdk.client import PrescientClient

FileList = list[PurePath]

logger = logging.getLogger(__name__)


[docs] def iter_files(input_dir: Path, exclude: Optional[list[str]] = None) -> Iterator[Path]: """Return an iterator of Path""" glob_pattern = "**/*" for path in input_dir.glob(glob_pattern): if path.is_dir(): continue if exclude: if any(path.match(e) for e in exclude): continue yield path
def _upload( file: str, bucket: str, key: str, session: boto3.Session, overwrite: bool = True ) -> None: s3 = session.client("s3") if not overwrite: try: _ = s3.head_object(Bucket=bucket, Key=key) logger.info( "skipping file %s as it already exists at s3://%s%s", file, bucket, key ) return except botocore.exceptions.ClientError as e: if "Not Found" in e.args[0]: pass else: raise e logger.info("uploading file %s to s3://%s%s", file, bucket, key) s3.upload_file(Filename=file, Bucket=bucket, Key=key)
[docs] def upload( input_dir: str | os.PathLike, exclude: Optional[list[str]] = None, prescient_client: Optional[PrescientClient] = None, overwrite: bool = True, ) -> None: """ Upload files from input directory to the location defined by PRESCIENT_UPLOAD_BUCKET Args: input_dir (str | os.PathLike): Input directory containing file(s) to be uploaded. By default will upload all files contained in input directory. exclude (Optional[list[str]]): A list of glob patterns to exclude from uploading. For example `exclude=["*.txt", "*.csv"] would skip any matched files that end with a .txt or .csv suffix. If not provided by default all files will be uploaded. prescient_client (Optional[PrescientClient]): A PrescientClient instance. If not provided a default PrescientClient instance will be created. overwrite (bool): Whether to overwrite objects if they already exist. If False, upload is skipped. Useful for continuing an upload that was started previously. Defaults to True. """ if overwrite: logger.info("overwrite=%s, thus will overwrite any existing objects", overwrite) input_path = Path(input_dir) if not input_path.exists(): raise FileNotFoundError(input_dir) prescient_client = prescient_client or PrescientClient() files = list(iter_files(input_path, exclude=exclude)) logger.info("found %s files to upload", len(files)) for file in files: _upload( file=str(file), bucket=prescient_client.settings.prescient_upload_bucket, key=file.as_posix(), session=prescient_client.upload_session, overwrite=overwrite, )