Viewing File: /home/ubuntu/codegamaai-test/efimarket_bot/src/utils.py

import boto3
import botocore
from src.constants import *
import os
import json
from datetime import datetime
s3 = boto3.client(
        's3',
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_KEY,
        region_name=REGION_NAME
    )


def create_folder_and_store_files(user_id: str, bot_id: str):

    user_bot_data = os.path.join(os.environ['DB_DIR'], user_id, bot_id, 'scrap_data')
    # user_bot_data= "./Local_DB/"+str(user_id)+"/"+str(bot_id)+"/"+"scrap_data"
    bot_data_folder = 'bot_data'
    # Initialize the S3 client

    # Check if the folder exists, and create it if it doesn't
    folder_path = f'{user_id}/{bot_id}/{bot_data_folder}/'
    try:
        response = s3.head_object(Bucket=bucket_name, Key=(folder_path))
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            # The folder doesn't exist, create it
            s3.put_object(Bucket=bucket_name, Key=(folder_path))
        else:
            print(e.response['Error']['Code'], "-", e.response['Error']['Message'])

    # Store files within the bot_data folder
    for root, dirs, files in os.walk(user_bot_data):
            for file in files:
                local_file_path = os.path.join(root, file)
                s3_file_path = folder_path+file

                s3.upload_file(local_file_path, bucket_name, s3_file_path)
    return "All files uploaded successfully"

    
def delete_files(user_id: str, bot_id: str, file_names):
    bot_data_folder = 'bot_data'
    folder_path = f'{user_id}/{bot_id}/{bot_data_folder}'
    for file_name in file_names:
        key = f"{folder_path}/{file_name}"
        print(key)
        s3.delete_object(Bucket=bucket_name, Key=key)
    return "Deleted succesfully"


def download_files_from_s3_bucket_LLM_data(user_id,bot_id):
    bot_data_folder = 'bot_data'
    # local_folder= "./Local_DB/"+str(user_id)+"/"+str(bot_id)+"/"+"data"
    local_folder = os.path.join(os.environ['DB_DIR'], user_id, bot_id, 'data')
    folder_name = f'{user_id}/{bot_id}/{bot_data_folder}/'
    # Create the local folder if it doesn't exist
    os.makedirs(local_folder, exist_ok=True)
    
    # List all objects in the bucket folder
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)

    # Iterate through each object in the response
    for obj in response.get('Contents', []):
        # Extract the file name from the object key
        file_name = os.path.basename(obj['Key'])
        
        # Construct the local file or directory path
        local_path = os.path.join(local_folder, file_name)
        
        # Check if the object is a file or directory
        if obj['Key'].endswith('/'):
            # Create the local directory if it doesn't exist
            os.makedirs(local_path, exist_ok=True)
        else:
            # Download the file from S3 to the local folder
            s3.download_file(bucket_name, obj['Key'], local_path)
    status = download_files_from_s3_bucket_qanda_data(user_id,bot_id)
    status2 = download_files_from_s3_bucket_mark_down(user_id,bot_id)
    if status == "Download successfully" and status2 == "Download successfully":
        return "Download successfully"
    else:
        return "Download Failed"


def download_files_from_s3_bucket_qanda_data(user_id,bot_id):
    bot_data_folder = 'bot_data/q_a_data'
    # local_folder= "./Local_DB/"+str(user_id)+"/"+str(bot_id)+"/"+"q_a_data"
    local_folder = os.path.join(os.environ['DB_DIR'], user_id, bot_id, 'q_a_data')
    folder_name = f'{user_id}/{bot_id}/{bot_data_folder}/'
    # Create the local folder if it doesn't exist
    os.makedirs(local_folder, exist_ok=True)
    
    # List all objects in the bucket folder
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)

    # Iterate through each object in the response
    for obj in response.get('Contents', []):
        # Extract the file name from the object key
        file_name = os.path.basename(obj['Key'])
        
        # Construct the local file or directory path
        local_path = os.path.join(local_folder, file_name)
        
        # Check if the object is a file or directory
        if obj['Key'].endswith('/'):
            # Create the local directory if it doesn't exist
            os.makedirs(local_path, exist_ok=True)
        else:
            # Download the file from S3 to the local folder
            s3.download_file(bucket_name, obj['Key'], local_path)
    return "Download successfully"

def download_files_from_s3_bucket_mark_down(user_id,bot_id):
    bot_data_folder = 'markdown_document'
    # local_folder= "./Local_DB/"+str(user_id)+"/"+str(bot_id)+"/"+"markdown_document"
    local_folder = os.path.join(os.environ['DB_DIR'], user_id, bot_id, 'markdown_document')
    folder_name = f'{user_id}/{bot_id}/{bot_data_folder}/'
    # Create the local folder if it doesn't exist
    os.makedirs(local_folder, exist_ok=True)
    
    # List all objects in the bucket folder
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)

    # Iterate through each object in the response
    for obj in response.get('Contents', []):
        # Extract the file name from the object key
        file_name = os.path.basename(obj['Key'])
        
        # Construct the local file or directory path
        local_path = os.path.join(local_folder, file_name)
        
        # Check if the object is a file or directory
        if obj['Key'].endswith('/'):
            # Create the local directory if it doesn't exist
            os.makedirs(local_path, exist_ok=True)
        else:
            # Download the file from S3 to the local folder
            s3.download_file(bucket_name, obj['Key'], local_path)
    return "Download successfully"

def check_values_match(json_list, value1, value2):
    for item in json_list:
        if value1 in item.values() or value2 in item.values():
            return True
    return False


def get_formatted_time_and_date():
    # Get the current time
    current_time = datetime.now()

    # Format the time in 12-hour format
    formatted_time = current_time.strftime("%I:%M:%S %p")
    formatted_date = current_time.strftime("%B %d, %Y")

    return f"The current time is {formatted_time} and the date is {formatted_date}."
Back to Directory File Manager