Viewing File: /home/ubuntu/codegamaai-test/rag_drive/RagDrive/app.py

import json
import os
import shutil
import subprocess
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
from typing import Any, Dict, List

import uvicorn
from fastapi import FastAPI, File, Form, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

from src.constants import *
from supertokens_fastapi import get_cors_allowed_headers

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"] + get_cors_allowed_headers(),
)




def move_files2(user_id, bot_id):
    # source_path = f"./Local_DB/{user_id}/{bot_id}/data"
    # destination_path = f"./Local_DB/{user_id}/{bot_id}/temp_data"

    source_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "data")
    destination_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "temp_data")

    # Check if source directory exists
    if not os.path.exists(source_path):
        return "Source directory not found"

    # Check if destination directory exists, create if not
    if not os.path.exists(destination_path):
        os.makedirs(destination_path)

    # Get a list of files in the source directory
    files = os.listdir(source_path)

    # Check if there are files to move
    if not files:
        return "No files found for training"

    # Move each file to the destination directory
    for file in files:
        source_file_path = os.path.join(source_path, file)
        destination_file_path = os.path.join(destination_path, file)
        shutil.move(source_file_path, destination_file_path)

    return "Files moved successfully"



def read_json2(user_id: str, uuid: str) -> Dict[str, Any]:
    try:
        file_path = os.path.join(os.environ['HAIVE_TMP2_DIR'], user_id, f"{uuid}.json")
        with open(file_path, 'r', encoding="UTF-8") as openfile:
            json_object = json.load(openfile)
        return {
            "success": True,
            "message": "data retrieved successfully",
            "code": 200,
            "data": json_object
        }
    except:
        return {
            "success": False,
            "message": "data retrieved failed",
            "code": 200}

def delete_json2(user_id: str, uuid: str) -> Dict[str, Any]:
    try:
        # file_path = f"/usr/local/bin/haive/src/tmp2/{uuid}.json"
        file_path = os.path.join(os.environ['HAIVE_TMP2_DIR'], user_id, f"{uuid}.json")
        os.remove(file_path)
        return {
            "success": True,
            "message": f"context deleted successfully",
            "code": 200
        }
    except :
        return {
            "success": False,
            "message": f"context deleted failed",
            "code": 200
        }


def list_files(folder_path: str) -> Dict[str, Any]:
    try:
        files = []
        for index, f in enumerate(os.listdir(folder_path)):
            if os.path.isfile(os.path.join(folder_path, f)):
                # Read the Json file
                with open(os.path.join(folder_path, f), 'r', encoding="UTF-8") as openfile:
                    json_object = json.load(openfile)
                # Access the file name from the Json file
                file_name = json_object["file_name"]
                file_info = {
                    "id": f.split(".")[0],  # Extracting ID from the file name
                    "title": f"Conversation {index + 1}",  # Generating title as "Message 1", "Message 2", etc.
                    "file_name": file_name
                }
                files.append(file_info)
        
        return {
            "success": True,
            "message": "context id retrieved successfully",
            "code": 200,
            "data": {"context_id": files}
        }
    except FileNotFoundError:
        return {
            "success": False,
            "message": "context id retrieved failed",
            "code": 200
        }


def read_config():
    file_path = os.path.join(os.environ['DATA_DIR'], "config.json")
    with open(file_path, "r") as config_file:
        config_data = json.load(config_file)
    return config_data['status']


# def run_server():
#     try:
#         # Get the directory from the environment variable
#         path_dir = os.environ['HAIVE_BUILD_DIR']
#         print(f"Path for Build: {path_dir}")
        
#         # Change the working directory to the specified path
#         os.chdir(path_dir)
        
#         # Set the server address and port
#         server_address = ('', 5253)  # '' means localhost
        
#         # Create HTTP server with the handler
#         httpd = HTTPServer(server_address, SimpleHTTPRequestHandler)
        
#         # Start the server
#         print(f"Serving HTTP on port 5253 from {path_dir}...")
#         httpd.serve_forever()

#         # Change the working directory back to the original directory
#         os.chdir(os.environ['HAIVE_DIR'])
#         print(f"Changed directory to {os.environ['HAIVE_DIR']}")
#     except Exception as e:
#         print(f"Error: {e}")


# # Create a thread and start it
# server_thread = threading.Thread(target=run_server)
# server_thread.start()



def create_app():
    app = FastAPI()
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"] + get_cors_allowed_headers(),
    )

    @app.post('/api/v1/delete_file')
    async def delete_file(file_name: str = Form(...), user_id: str = Form(...), bot_id: str = Form(...)):
        # Create file paths
        # non_trained_path = f"./Local_DB/{user_id}/{bot_id}/temp_data/{file_name}"
        # trained_path = f"./Local_DB/{user_id}/{bot_id}/data/{file_name}"

        non_trained_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "temp_data", file_name)
        trained_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "data", file_name)

        print(trained_path)
        # Check if the file exists in the non_trained directory
        if os.path.exists(non_trained_path):
            os.remove(non_trained_path)
            return {"success": True, "message":"File deleted successfully" ,"code": 200}

        # Check if the file exists in the trained directory
        elif os.path.exists(trained_path):
            os.remove(trained_path)
            return {"success": True, "message":"File deleted successfully" ,"code": 200}

        # If the file does not exist in either directory
        else:
            raise {"success": False, "message":'delete_file api failed' ,"code": 200}
        
    @app.post('/api/v1/file_info')
    async def get_all_user_bot_files(user_id: str = Form(...), bot_id: str = Form(...)):
        # Construct folder path
        # non_trained = f"./Local_DB/{user_id}/{bot_id}/temp_data"
        # trained = f"./Local_DB/{user_id}/{bot_id}/data"

        non_trained = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "temp_data")
        trained = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "data")

        # Check if the folder exists
        if os.path.exists(non_trained) and os.path.exists(trained):
            # Get all file names in the folder
            file_names = [f for f in os.listdir(trained) if os.path.isfile(os.path.join(trained, f))]
            non_trained_file_names = [f for f in os.listdir(non_trained) if os.path.isfile(os.path.join(non_trained, f))]

            response = {
                'trained_file_names': file_names,
                'non_trained_file_names':non_trained_file_names
                
            }
            return {"success": True, "message":'file_info api failed' ,"code": 200,"data":response}
        else:
            return {"success": False, "message":'file_info api failed' ,"code": 200}

        # except:
        #     return {"message":'Training API failed',"status_code":500}

    @app.post("/uploadfile/")
    async def create_upload_files(user_id: str, bot_id: str, files: List[UploadFile] = File(...)):
        try:
            # user_folder = f"./Local_DB/{user_id}"
            # user_bot_folder = f"./Local_DB/{user_id}/{bot_id}"
            # user_bot_data = f"./Local_DB/{user_id}/{bot_id}/data"
            # user_bot_qanda = f"./Local_DB/{user_id}/{bot_id}/q_a_data/"
            # user_bot_KB = f"./Local_DB/{user_id}/{bot_id}/knowledge_base"

            user_folder = os.path.join(os.environ['HAIVE_DB_DIR'], user_id)
            user_bot_folder = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id)
            user_bot_data = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "data")
            user_bot_qanda = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "q_a_data")
            user_bot_KB = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "knowledge_base")

            # Create Tmp directory
            user_tmp = os.path.join(os.environ['HAIVE_TMP_DIR'], user_id)
            user_tmp2 = os.path.join(os.environ['HAIVE_TMP2_DIR'], user_id)

            if not os.path.exists(user_tmp):
                os.makedirs(user_tmp)
            if not os.path.exists(user_tmp2):
                os.makedirs(user_tmp2)

            if not os.path.exists(user_folder):
                os.makedirs(user_folder)
            if not os.path.exists(user_bot_folder):
                os.makedirs(user_bot_folder)
            if not os.path.exists(user_bot_data):
                os.makedirs(user_bot_data)
            if not os.path.exists(user_bot_KB):
                os.makedirs(user_bot_KB)
            else:
                shutil.rmtree(user_bot_KB)
                os.makedirs(user_bot_KB)
            if not os.path.exists(user_bot_qanda):
                os.makedirs(user_bot_qanda)
            # path = f"./Local_DB/{user_id}/{bot_id}/temp_data"
            path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "temp_data")
            os.makedirs(path, exist_ok=True)

            for file in files:
                file_path = f"{path}/{file.filename}"
                with open(file_path, "wb") as f:
                    f.write(file.file.read())

            return {"success": True, "message":"successfully file uploaded" ,"code": 200}
        except:
            return {"success": False, "message":"file uploaded failed" ,"code": 200}
    
    @app.post('/api/v1/delete_file')
    async def delete_file(file_name: str = Form(...), user_id: str = Form(...), bot_id: str = Form(...)):
        # Create file paths
        # non_trained_path = f"./Local_DB/{user_id}/{bot_id}/temp_data/{file_name}"
        # trained_path = f"./Local_DB/{user_id}/{bot_id}/data/{file_name}"

        non_trained_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "temp_data", file_name)
        trained_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "data", file_name)

        print(trained_path)
        # Check if the file exists in the non_trained directory
        if os.path.exists(non_trained_path):
            os.remove(non_trained_path)
            return {"success": True, "message":"File deleted successfully" ,"code": 200}

        # Check if the file exists in the trained directory
        elif os.path.exists(trained_path):
            os.remove(trained_path)
            return {"success": True, "message":"File deleted successfully" ,"code": 200}

        # If the file does not exist in either directory
        else:
            raise {"success": False, "message":'delete_file api failed' ,"code": 200}
        

    @app.post("/api/v1/get_context_data", response_model=Dict[str, Any])
    async def read_json(user_id: str = Form(...), uuid: str = Form(...)):
        return read_json2(user_id, uuid)
        
    @app.post("/api/v1/delete_context_data", response_model=Dict[str, Any])
    async def delete_json(user_id: str = Form(...), uuid: str = Form(...)):
        return delete_json2(user_id, uuid)

    @app.get("/api/v1/get_all_context_id")
    async def list_files_endpoint(user_id: str = Form(...)):
        # folder_path = "/usr/local/bin/haive/src/tmp2"  # Adjust the folder path as needed
        folder_path = os.environ['HAIVE_TMP2_DIR']
        folder_path = os.path.join(folder_path, user_id)
        return list_files(folder_path)
    
    @app.get("/status")
    async def get_status():
        config_status = read_config()
        
        if config_status.lower() == "disable":
            return {"status": "API is disabled"}
        else:
            return {"status": "API is working"}
        
    return app

# if __name__ == "__main__":
#     uvicorn.run(app, host="0.0.0.0",port=5003)



Back to Directory File Manager