Viewing File: /home/ubuntu/codegamaai-test/girlfriend_bot/app.py

import os
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Form, BackgroundTasks
from pydantic import BaseModel, Field
from fastapi.responses import JSONResponse, FileResponse
from supertokens_fastapi import get_cors_allowed_headers
from fastapi import File, UploadFile, HTTPException
import uvicorn
import shutil
from fastapi import FastAPI
from typing import Union
from threading import Thread
import uuid
from src.constants import *
from src.utils import *
from main import *
from src.constants import MEDIA_DIR, TEMP_DIR
from typing import List, Optional
from fastapi import UploadFile

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"] + get_cors_allowed_headers(),
)


class Choice(BaseModel):
    id: str
    text: str


class Question(BaseModel):
    question_number: int
    question: str
    choices: List[Choice]
    selected_choice: str = Field(default="")


class SurveyData(BaseModel):
    user_uid: str
    questions: List[Question]


class AudioQuery(BaseModel):
    audio_file_path: str
    audio_format: str
    context_id: Union[int, str]


class UserRequest(BaseModel):
    user_uid: str


def download_file(url, destination_path):
    """Download a file from a given URL to a local destination path."""
    response = requests.get(url)
    response.raise_for_status()  # Ensure we got a successful response
    with open(destination_path, 'wb') as file:
        file.write(response.content)


@app.post("/api/v1/query")
async def query(data: dict):
    try:
        question = data['question']
        user_uid = data['user_uid']
        response_message, audio_path = data_querying(question, user_uid, is_audio=False)
        response_data = {"message": response_message}
        if audio_path is not None:
            response_audio_path = os.path.join(MEDIA_DIR, audio_path)
            response_data["audio_path"] = response_audio_path
        return JSONResponse(status_code=200, content={"success": True, "message": "Query processed successfully", "code": 100, "data": response_data})
    except Exception as e:
        return JSONResponse(status_code=500, content={"success": False, "error": str(e), "error_code": 500})


@app.post("/api/v1/query/audio")
async def query_audio(data: AudioQuery):
    try:
        print(f"Received audio query: {data}")
        temp_audio_path = data.audio_file_path

        user_uid = str(data.context_id)
        user_query_text = speech_to_text(temp_audio_path)
        response_text, s3_audio_url = data_querying(user_query_text, user_uid, is_audio=True)

        # Directly return the S3 URL without appending it to MEDIA_DIR
        return JSONResponse(content={"audio_path": s3_audio_url if s3_audio_url else None, "user_uid": user_uid})

    except Exception as e:
        print(f"Error processing audio query: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/submit_mcq")
async def process_survey(survey_data: SurveyData):
    try:
        user_traits = extract_user_traits(survey_data.dict())
        save_file = os.path.join(os.environ['DB_DIR'], "user_preference")
        save_traits_to_file(user_traits, save_file)
        # save_traits_to_file(user_traits, "/home/zrlhowsqpnco/codegama_bot/GF_replika/girlfriend_bot/Local_DB/user_preference/")
        return JSONResponse(status_code=200, content={"success": True, "message": "Answer received and file saved", "code": 100, "data": user_traits})
    except Exception as e:
        return JSONResponse(status_code=500, content={"success": False, "error": str(e), "error_code": 500})

def extract_user_traits(survey_data):
    user_traits = {
        "user_uid": survey_data["user_uid"],
        "user_traits": {
            "familiarity_with_ai": "",
            "download_reason": "",
            "free_time_activities": [],
            "solitude_perception": "",
            "loneliness_coping": "",
            "interaction_preferences": {
                "desired_traits": [],
                "interaction_style": "",
                "preferred_bot_gender": ""
            },
            "love_language": "",
            "turn_offs": []
        },
        "bot_settings": {
            "proactive_interaction": True,
            "emotional_depth": True,
            "humor_enabled": True,
            "supportive_conversations": True
        }
    }

    for question in survey_data["questions"]:
        selected_choice = next(
            (choice["text"] for choice in question["choices"] if choice["id"] == question["selected_choice"]), "")

        if question["question_number"] == 1:
            user_traits["user_traits"]["familiarity_with_ai"] = selected_choice
        elif question["question_number"] == 3:
            user_traits["user_traits"]["download_reason"] = selected_choice
        elif question["question_number"] == 5:
            user_traits["user_traits"]["free_time_activities"].append(selected_choice)
        elif question["question_number"] == 6:
            user_traits["user_traits"]["solitude_perception"] = selected_choice
        elif question["question_number"] == 7:
            user_traits["user_traits"]["loneliness_coping"] = selected_choice
        elif question["question_number"] in [9, 11]:
            user_traits["user_traits"]["interaction_preferences"]["desired_traits"].append(selected_choice)
        elif question["question_number"] == 8:
            user_traits["user_traits"]["interaction_preferences"]["interaction_style"] = selected_choice
        elif question["question_number"] == 10:
            user_traits["user_traits"]["interaction_preferences"]["preferred_bot_gender"] = selected_choice
        elif question["question_number"] == 13:
            user_traits["user_traits"]["love_language"] = selected_choice
        elif question["question_number"] == 15:
            user_traits["user_traits"]["turn_offs"].append(selected_choice)

    return user_traits


def save_traits_to_file(user_traits, output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    output_file_path = f"{output_dir}/{user_traits['user_uid']}_traits.json"
    with open(output_file_path, 'w') as file:
        json.dump(user_traits, file, indent=4)


@app.post("/get_user_history")
async def get_user_history(request: UserRequest):
    try:
        user_id = request.user_uid
        # file_path = f"/home/zrlhowsqpnco/codegama_bot/GF_replika/girlfriend_bot/Local_DB/user_data/{user_id}_full_history.json"
        file_path = os.path.join(os.environ['USER_DATA_DIR'], f"{user_id}_full_history.json")
        if os.path.exists(file_path):
            with open(file_path, "r") as file:
                data = json.load(file)
            return JSONResponse(status_code=200, content={"success": True, "message": "User history retrieved successfully", "code": 100, "data": data})
        else:
            return JSONResponse(status_code=404, content={"success": False, "error": "User data not found", "error_code": 404})
    except Exception as e:
        return JSONResponse(status_code=500, content={"success": False, "error": str(e), "error_code": 500})

@app.get("/media/{filename}")
async def get_media(filename: str):
    file_path = os.path.join(MEDIA_DIR, filename)
    if not os.path.exists(file_path):
        raise HTTPException(status_code=404, detail="File not found")
    return FileResponse(file_path)


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=5007,ssl_keyfile="privkey.pem", ssl_certfile="fullchain.pem")
Back to Directory File Manager