Viewing File: /home/ubuntu/codegamaai-test/rag_drive/RagDrive/app2.py

import asyncio
import os
import shutil
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Thread

import psutil
import requests
import uvicorn
from fastapi import BackgroundTasks, FastAPI, Form
from fastapi.middleware.cors import CORSMiddleware

from src.constants import *
from src.load_test import *
from src.training import *
from supertokens_fastapi import get_cors_allowed_headers

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"] + get_cors_allowed_headers(),
)

def start_server():
    url = "localhost:5005/start"
    payload = {}
    headers = {}
    response = requests.request("POST", url, headers=headers, data=payload)
    print(response.text)

def kill_process(port):
    try:
        subprocess.run(["kill", "-9", str(port)], check=True)
    except subprocess.CalledProcessError as e:
        print(f"Error killing process: {e}")

def start_app():
    try:
        subprocess.run(["python", "app.py"], check=True, capture_output=True)
    except subprocess.CalledProcessError as e:
        print(f"Error starting app: {e}")

def run_background_tasks():
    time.sleep(5)  # Wait for 5 seconds
    with BackgroundTasks() as tasks:
        # Execute the first command
        tasks.add_task(kill_process, port=5003)

        # Execute the second command
        tasks.add_task(start_app)
    
def move_files(user_id, bot_id):
    # source_path = f"./Local_DB/{user_id}/{bot_id}/temp_data"
    # destination_path = f"./Local_DB/{user_id}/{bot_id}/data"

    source_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "temp_data")
    destination_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "data")
    # Check if source directory exists
    if not os.path.exists(source_path):
        return "Source directory not found"

    # Check if destination directory exists, create if not
    if not os.path.exists(destination_path):
        os.makedirs(destination_path)

    # Get a list of files in the source directory
    files = os.listdir(source_path)

    # Check if there are files to move
    if not files:
        return "No files found for training"

    # Move each file to the destination directory
    for file in files:
        source_file_path = os.path.join(source_path, file)
        destination_file_path = os.path.join(destination_path, file)
        shutil.move(source_file_path, destination_file_path)

    return "Files moved successfully"
    
def process_request(data):
    user_id = data['user_id']
    bot_id = data['bot_id']
    query = data['query']
    context_id = data['context_id']
    bot_name = "Haive AI"
    prompt = data['prompt']
    file_name = data['file_name']
    general_ai = 1

    response, source_dict = load_test(user_id=user_id, bot_id=bot_id, query=query, context_id=context_id, bot_name=bot_name,
                         custom_instruction=prompt,general_ai=general_ai, file_name=file_name)
    return response, source_dict



async def process_request_async(data):
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        response, source_dict = await loop.run_in_executor(None, process_request, data)
        return response, source_dict



def create_app():
    app = FastAPI()
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"] + get_cors_allowed_headers(),
    )

    @app.get("/api/v1/cancel_training")
    async def cancel_training(background_tasks: BackgroundTasks):
        try:
            # Start the background tasks in a separate thread
            thread = Thread(target=run_background_tasks)
            thread.start()

            return {"success": True, "message":"Cancel training initiated" ,"code": 200}
        except:
            return {"success": False, "message":"Cancel training initiated failed" ,"code": 200}

    @app.post("/api/v1/train")
    async def create_item2(user_id: str = Form(...), bot_id: str = Form(...)):
        # try:
        # user_folder = f"./Local_DB/{user_id}"
        # user_bot_folder = f"./Local_DB/{user_id}/{bot_id}"
        # user_bot_data = f"./Local_DB/{user_id}/{bot_id}/data"
        # user_bot_qanda = f"./Local_DB/{user_id}/{bot_id}/q_a_data/"
        # user_bot_KB = f"./Local_DB/{user_id}/{bot_id}/knowledge_base"

        user_folder = os.path.join(os.environ['HAIVE_DB_DIR'], user_id)
        user_bot_folder = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id)
        user_bot_data = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "data")
        user_bot_qanda = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "q_a_data")
        user_bot_KB = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "knowledge_base")


        if move_files(user_id, bot_id) == "Files moved successfully":
            if not os.path.exists(user_folder):
                os.makedirs(user_folder)
            if not os.path.exists(user_bot_folder):
                os.makedirs(user_bot_folder)
            if not os.path.exists(user_bot_data):
                os.makedirs(user_bot_data)
            if not os.path.exists(user_bot_KB):
                os.makedirs(user_bot_KB)
            else:
                shutil.rmtree(user_bot_KB)
                os.makedirs(user_bot_KB)
            if not os.path.exists(user_bot_qanda):
                os.makedirs(user_bot_qanda)
            train_save(user_id, bot_id)
            return {"success": True, "message":"Training completed successfully" ,"code": 200}
        else:
            return {"success": False, "message":"No files for training" ,"code": 200}
        
    @app.post("/api/v1/query")
    async def create_item(user_id: str = Form(...),
        bot_id: str = Form(...),
        query: str = Form(...),
        context_id: str = Form(...),
        prompt: str = Form(...),
        file_name: str = Form(...)
    ):
        # user_folder = f"/usr/local/bin/haive/src/tmp"
        # user_folder1 = f"/usr/local/bin/haive/src/tmp2"

        user_folder = os.environ['HAIVE_TMP_DIR']
        user_folder1 = os.environ['HAIVE_TMP2_DIR']
        if not os.path.exists(user_folder):
            os.makedirs(user_folder)
        if not os.path.exists(user_folder1):
            os.makedirs(user_folder1)

# Create Tmp directory
        user_tmp = os.path.join(os.environ['HAIVE_TMP_DIR'], user_id)
        user_tmp2 = os.path.join(os.environ['HAIVE_TMP2_DIR'], user_id)

        if not os.path.exists(user_tmp):
            os.makedirs(user_tmp)
        if not os.path.exists(user_tmp2):
            os.makedirs(user_tmp2)

        try:
            data = {
                "user_id": user_id,
                "bot_id": bot_id,
                "query": query,
                "context_id": context_id,
                "prompt": prompt,
                "file_name": file_name
            }
            print(data)
            
            response, source_dict = await process_request_async(data)
            return {"success": True, "message":'query api success' ,"code": 200,"data":{"response":response, "file_names":source_dict}}
            
        except Exception as e:
            print(e)
            return {"success": False, "message":'query api failed' ,"code": 200}
        
    return app





# if __name__ == "__main__":
#     uvicorn.run(app, host="0.0.0.0",port=5004)
Back to Directory File Manager