Viewing File: /home/ubuntu/codegamaai-test/rag_drive/RagDrive/app2.py
import asyncio
import os
import shutil
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
import psutil
import requests
import uvicorn
from fastapi import BackgroundTasks, FastAPI, Form
from fastapi.middleware.cors import CORSMiddleware
from src.constants import *
from src.load_test import *
from src.training import *
from supertokens_fastapi import get_cors_allowed_headers
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"] + get_cors_allowed_headers(),
)
def start_server():
url = "localhost:5005/start"
payload = {}
headers = {}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
def kill_process(port):
try:
subprocess.run(["kill", "-9", str(port)], check=True)
except subprocess.CalledProcessError as e:
print(f"Error killing process: {e}")
def start_app():
try:
subprocess.run(["python", "app.py"], check=True, capture_output=True)
except subprocess.CalledProcessError as e:
print(f"Error starting app: {e}")
def run_background_tasks():
time.sleep(5) # Wait for 5 seconds
with BackgroundTasks() as tasks:
# Execute the first command
tasks.add_task(kill_process, port=5003)
# Execute the second command
tasks.add_task(start_app)
def move_files(user_id, bot_id):
# source_path = f"./Local_DB/{user_id}/{bot_id}/temp_data"
# destination_path = f"./Local_DB/{user_id}/{bot_id}/data"
source_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "temp_data")
destination_path = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "data")
# Check if source directory exists
if not os.path.exists(source_path):
return "Source directory not found"
# Check if destination directory exists, create if not
if not os.path.exists(destination_path):
os.makedirs(destination_path)
# Get a list of files in the source directory
files = os.listdir(source_path)
# Check if there are files to move
if not files:
return "No files found for training"
# Move each file to the destination directory
for file in files:
source_file_path = os.path.join(source_path, file)
destination_file_path = os.path.join(destination_path, file)
shutil.move(source_file_path, destination_file_path)
return "Files moved successfully"
def process_request(data):
user_id = data['user_id']
bot_id = data['bot_id']
query = data['query']
context_id = data['context_id']
bot_name = "Haive AI"
prompt = data['prompt']
file_name = data['file_name']
general_ai = 1
response, source_dict = load_test(user_id=user_id, bot_id=bot_id, query=query, context_id=context_id, bot_name=bot_name,
custom_instruction=prompt,general_ai=general_ai, file_name=file_name)
return response, source_dict
async def process_request_async(data):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
response, source_dict = await loop.run_in_executor(None, process_request, data)
return response, source_dict
def create_app():
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"] + get_cors_allowed_headers(),
)
@app.get("/api/v1/cancel_training")
async def cancel_training(background_tasks: BackgroundTasks):
try:
# Start the background tasks in a separate thread
thread = Thread(target=run_background_tasks)
thread.start()
return {"success": True, "message":"Cancel training initiated" ,"code": 200}
except:
return {"success": False, "message":"Cancel training initiated failed" ,"code": 200}
@app.post("/api/v1/train")
async def create_item2(user_id: str = Form(...), bot_id: str = Form(...)):
# try:
# user_folder = f"./Local_DB/{user_id}"
# user_bot_folder = f"./Local_DB/{user_id}/{bot_id}"
# user_bot_data = f"./Local_DB/{user_id}/{bot_id}/data"
# user_bot_qanda = f"./Local_DB/{user_id}/{bot_id}/q_a_data/"
# user_bot_KB = f"./Local_DB/{user_id}/{bot_id}/knowledge_base"
user_folder = os.path.join(os.environ['HAIVE_DB_DIR'], user_id)
user_bot_folder = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id)
user_bot_data = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "data")
user_bot_qanda = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "q_a_data")
user_bot_KB = os.path.join(os.environ['HAIVE_DB_DIR'], user_id, bot_id, "knowledge_base")
if move_files(user_id, bot_id) == "Files moved successfully":
if not os.path.exists(user_folder):
os.makedirs(user_folder)
if not os.path.exists(user_bot_folder):
os.makedirs(user_bot_folder)
if not os.path.exists(user_bot_data):
os.makedirs(user_bot_data)
if not os.path.exists(user_bot_KB):
os.makedirs(user_bot_KB)
else:
shutil.rmtree(user_bot_KB)
os.makedirs(user_bot_KB)
if not os.path.exists(user_bot_qanda):
os.makedirs(user_bot_qanda)
train_save(user_id, bot_id)
return {"success": True, "message":"Training completed successfully" ,"code": 200}
else:
return {"success": False, "message":"No files for training" ,"code": 200}
@app.post("/api/v1/query")
async def create_item(user_id: str = Form(...),
bot_id: str = Form(...),
query: str = Form(...),
context_id: str = Form(...),
prompt: str = Form(...),
file_name: str = Form(...)
):
# user_folder = f"/usr/local/bin/haive/src/tmp"
# user_folder1 = f"/usr/local/bin/haive/src/tmp2"
user_folder = os.environ['HAIVE_TMP_DIR']
user_folder1 = os.environ['HAIVE_TMP2_DIR']
if not os.path.exists(user_folder):
os.makedirs(user_folder)
if not os.path.exists(user_folder1):
os.makedirs(user_folder1)
# Create Tmp directory
user_tmp = os.path.join(os.environ['HAIVE_TMP_DIR'], user_id)
user_tmp2 = os.path.join(os.environ['HAIVE_TMP2_DIR'], user_id)
if not os.path.exists(user_tmp):
os.makedirs(user_tmp)
if not os.path.exists(user_tmp2):
os.makedirs(user_tmp2)
try:
data = {
"user_id": user_id,
"bot_id": bot_id,
"query": query,
"context_id": context_id,
"prompt": prompt,
"file_name": file_name
}
print(data)
response, source_dict = await process_request_async(data)
return {"success": True, "message":'query api success' ,"code": 200,"data":{"response":response, "file_names":source_dict}}
except Exception as e:
print(e)
return {"success": False, "message":'query api failed' ,"code": 200}
return app
# if __name__ == "__main__":
# uvicorn.run(app, host="0.0.0.0",port=5004)
Back to Directory
File Manager