import boto3 from botocore.exceptions import ClientError, NoCredentialsError import os from pathlib import Path class S3BucketManager: def __init__(self, bucket_name, region_name='us-east-1'): """ Initialize S3 client and set bucket name Args: bucket_name (str): Name of the S3 bucket region_name (str): AWS region name (default: us-east-1) """ self.bucket_name = bucket_name self.region_name = region_name try: # Initialize S3 client self.s3_client = boto3.client('s3', region_name=region_name) self.s3_resource = boto3.resource('s3', region_name=region_name) self.bucket = self.s3_resource.Bucket(bucket_name) except NoCredentialsError: print("Error: AWS credentials not found. Please configure your credentials.") raise except Exception as e: print(f"Error initializing S3 client: {e}") raise def list_objects(self, prefix=''): """ List all objects in the bucket with optional prefix filter Args: prefix (str): Optional prefix to filter objects Returns: list: List of object keys """ try: objects = [] paginator = self.s3_client.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=self.bucket_name, Prefix=prefix) for page in pages: if 'Contents' in page: for obj in page['Contents']: objects.append({ 'key': obj['Key'], 'size': obj['Size'], 'last_modified': obj['LastModified'] }) return objects except ClientError as e: print(f"Error listing objects: {e}") return [] def upload_file(self, local_file_path, s3_key=None): """ Upload a file to S3 bucket Args: local_file_path (str): Path to local file s3_key (str): S3 object key (if None, uses filename) Returns: bool: True if successful, False otherwise """ if s3_key is None: s3_key = Path(local_file_path).name try: self.s3_client.upload_file(local_file_path, self.bucket_name, s3_key) print(f"Successfully uploaded {local_file_path} to s3://{self.bucket_name}/{s3_key}") return True except FileNotFoundError: print(f"Error: File {local_file_path} not found") return False except ClientError as e: print(f"Error uploading file: {e}") return False def download_file(self, s3_key, local_file_path): """ Download a file from S3 bucket Args: s3_key (str): S3 object key local_file_path (str): Local path to save file Returns: bool: True if successful, False otherwise """ try: self.s3_client.download_file(self.bucket_name, s3_key, local_file_path) print(f"Successfully downloaded s3://{self.bucket_name}/{s3_key} to {local_file_path}") return True except ClientError as e: print(f"Error downloading file: {e}") return False def delete_object(self, s3_key): """ Delete an object from S3 bucket Args: s3_key (str): S3 object key to delete Returns: bool: True if successful, False otherwise """ try: self.s3_client.delete_object(Bucket=self.bucket_name, Key=s3_key) print(f"Successfully deleted s3://{self.bucket_name}/{s3_key}") return True except ClientError as e: print(f"Error deleting object: {e}") return False def get_object_url(self, s3_key, expiration=3600): """ Generate a presigned URL for an S3 object Args: s3_key (str): S3 object key expiration (int): URL expiration time in seconds (default: 1 hour) Returns: str: Presigned URL or None if error """ try: url = self.s3_client.generate_presigned_url( 'get_object', Params={'Bucket': self.bucket_name, 'Key': s3_key}, ExpiresIn=expiration ) return url except ClientError as e: print(f"Error generating presigned URL: {e}") return None def object_exists(self, s3_key): """ Check if an object exists in the bucket Args: s3_key (str): S3 object key Returns: bool: True if object exists, False otherwise """ try: self.s3_client.head_object(Bucket=self.bucket_name, Key=s3_key) return True except ClientError as e: if e.response['Error']['Code'] == '404': return False else: print(f"Error checking object existence: {e}") return False # Example usage if __name__ == "__main__": # Replace with your bucket name BUCKET_NAME = "your-bucket-name" REGION = "us-east-1" # Replace with your region # Initialize S3 manager s3_manager = S3BucketManager(BUCKET_NAME, REGION) # List objects in bucket print("Objects in bucket:") objects = s3_manager.list_objects() for obj in objects: print(f" {obj['key']} ({obj['size']} bytes)") # Example operations (uncomment to use): # Upload a file # s3_manager.upload_file("local_file.txt", "folder/remote_file.txt") # Download a file # s3_manager.download_file("folder/remote_file.txt", "downloaded_file.txt") # Check if object exists # if s3_manager.object_exists("folder/remote_file.txt"): # print("Object exists!") # Generate presigned URL # url = s3_manager.get_object_url("folder/remote_file.txt", expiration=7200) # print(f"Presigned URL: {url}") # Delete an object # s3_manager.delete_object("folder/remote_file.txt")