import boto3
import botocore
from src.constants import *
import os
import json
from datetime import datetime
s3 = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
region_name=REGION_NAME
)
def create_folder_and_store_files(user_id: str, bot_id: str):
user_bot_data = os.path.join(os.environ['DB_DIR'], user_id, bot_id, 'scrap_data')
# user_bot_data= "./Local_DB/"+str(user_id)+"/"+str(bot_id)+"/"+"scrap_data"
bot_data_folder = 'bot_data'
# Initialize the S3 client
# Check if the folder exists, and create it if it doesn't
folder_path = f'{user_id}/{bot_id}/{bot_data_folder}/'
try:
response = s3.head_object(Bucket=bucket_name, Key=(folder_path))
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
# The folder doesn't exist, create it
s3.put_object(Bucket=bucket_name, Key=(folder_path))
else:
print(e.response['Error']['Code'], "-", e.response['Error']['Message'])
# Store files within the bot_data folder
for root, dirs, files in os.walk(user_bot_data):
for file in files:
local_file_path = os.path.join(root, file)
s3_file_path = folder_path+file
s3.upload_file(local_file_path, bucket_name, s3_file_path)
return "All files uploaded successfully"
def delete_files(user_id: str, bot_id: str, file_names):
bot_data_folder = 'bot_data'
folder_path = f'{user_id}/{bot_id}/{bot_data_folder}'
for file_name in file_names:
key = f"{folder_path}/{file_name}"
print(key)
s3.delete_object(Bucket=bucket_name, Key=key)
return "Deleted succesfully"
def download_files_from_s3_bucket_LLM_data(user_id,bot_id):
bot_data_folder = 'bot_data'
# local_folder= "./Local_DB/"+str(user_id)+"/"+str(bot_id)+"/"+"data"
local_folder = os.path.join(os.environ['DB_DIR'], user_id, bot_id, 'data')
folder_name = f'{user_id}/{bot_id}/{bot_data_folder}/'
# Create the local folder if it doesn't exist
os.makedirs(local_folder, exist_ok=True)
# List all objects in the bucket folder
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
# Iterate through each object in the response
for obj in response.get('Contents', []):
# Extract the file name from the object key
file_name = os.path.basename(obj['Key'])
# Construct the local file or directory path
local_path = os.path.join(local_folder, file_name)
# Check if the object is a file or directory
if obj['Key'].endswith('/'):
# Create the local directory if it doesn't exist
os.makedirs(local_path, exist_ok=True)
else:
# Download the file from S3 to the local folder
s3.download_file(bucket_name, obj['Key'], local_path)
status = download_files_from_s3_bucket_qanda_data(user_id,bot_id)
status2 = download_files_from_s3_bucket_mark_down(user_id,bot_id)
if status == "Download successfully" and status2 == "Download successfully":
return "Download successfully"
else:
return "Download Failed"
def download_files_from_s3_bucket_qanda_data(user_id,bot_id):
bot_data_folder = 'bot_data/q_a_data'
# local_folder= "./Local_DB/"+str(user_id)+"/"+str(bot_id)+"/"+"q_a_data"
local_folder = os.path.join(os.environ['DB_DIR'], user_id, bot_id, 'q_a_data')
folder_name = f'{user_id}/{bot_id}/{bot_data_folder}/'
# Create the local folder if it doesn't exist
os.makedirs(local_folder, exist_ok=True)
# List all objects in the bucket folder
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
# Iterate through each object in the response
for obj in response.get('Contents', []):
# Extract the file name from the object key
file_name = os.path.basename(obj['Key'])
# Construct the local file or directory path
local_path = os.path.join(local_folder, file_name)
# Check if the object is a file or directory
if obj['Key'].endswith('/'):
# Create the local directory if it doesn't exist
os.makedirs(local_path, exist_ok=True)
else:
# Download the file from S3 to the local folder
s3.download_file(bucket_name, obj['Key'], local_path)
return "Download successfully"
def download_files_from_s3_bucket_mark_down(user_id,bot_id):
bot_data_folder = 'markdown_document'
# local_folder= "./Local_DB/"+str(user_id)+"/"+str(bot_id)+"/"+"markdown_document"
local_folder = os.path.join(os.environ['DB_DIR'], user_id, bot_id, 'markdown_document')
folder_name = f'{user_id}/{bot_id}/{bot_data_folder}/'
# Create the local folder if it doesn't exist
os.makedirs(local_folder, exist_ok=True)
# List all objects in the bucket folder
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
# Iterate through each object in the response
for obj in response.get('Contents', []):
# Extract the file name from the object key
file_name = os.path.basename(obj['Key'])
# Construct the local file or directory path
local_path = os.path.join(local_folder, file_name)
# Check if the object is a file or directory
if obj['Key'].endswith('/'):
# Create the local directory if it doesn't exist
os.makedirs(local_path, exist_ok=True)
else:
# Download the file from S3 to the local folder
s3.download_file(bucket_name, obj['Key'], local_path)
return "Download successfully"
def check_values_match(json_list, value1, value2):
for item in json_list:
if value1 in item.values() or value2 in item.values():
return True
return False
def get_formatted_time_and_date():
# Get the current time
current_time = datetime.now()
# Format the time in 12-hour format
formatted_time = current_time.strftime("%I:%M:%S %p")
formatted_date = current_time.strftime("%B %d, %Y")
return f"The current time is {formatted_time} and the date is {formatted_date}."