Viewing File: /home/ubuntu/codegamaai-test/girlfriend_bot/app.py
import os
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Form, BackgroundTasks
from pydantic import BaseModel, Field
from fastapi.responses import JSONResponse, FileResponse
from supertokens_fastapi import get_cors_allowed_headers
from fastapi import File, UploadFile, HTTPException
import uvicorn
import shutil
from fastapi import FastAPI
from typing import Union
from threading import Thread
import uuid
from src.constants import *
from src.utils import *
from main import *
from src.constants import MEDIA_DIR, TEMP_DIR
from typing import List, Optional
from fastapi import UploadFile
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"] + get_cors_allowed_headers(),
)
class Choice(BaseModel):
id: str
text: str
class Question(BaseModel):
question_number: int
question: str
choices: List[Choice]
selected_choice: str = Field(default="")
class SurveyData(BaseModel):
user_uid: str
questions: List[Question]
class AudioQuery(BaseModel):
audio_file_path: str
audio_format: str
context_id: Union[int, str]
class UserRequest(BaseModel):
user_uid: str
def download_file(url, destination_path):
"""Download a file from a given URL to a local destination path."""
response = requests.get(url)
response.raise_for_status() # Ensure we got a successful response
with open(destination_path, 'wb') as file:
file.write(response.content)
@app.post("/api/v1/query")
async def query(data: dict):
try:
question = data['question']
user_uid = data['user_uid']
response_message, audio_path = data_querying(question, user_uid, is_audio=False)
response_data = {"message": response_message}
if audio_path is not None:
response_audio_path = os.path.join(MEDIA_DIR, audio_path)
response_data["audio_path"] = response_audio_path
return JSONResponse(status_code=200, content={"success": True, "message": "Query processed successfully", "code": 100, "data": response_data})
except Exception as e:
return JSONResponse(status_code=500, content={"success": False, "error": str(e), "error_code": 500})
@app.post("/api/v1/query/audio")
async def query_audio(data: AudioQuery):
try:
print(f"Received audio query: {data}")
temp_audio_path = data.audio_file_path
user_uid = str(data.context_id)
user_query_text = speech_to_text(temp_audio_path)
response_text, s3_audio_url = data_querying(user_query_text, user_uid, is_audio=True)
# Directly return the S3 URL without appending it to MEDIA_DIR
return JSONResponse(content={"audio_path": s3_audio_url if s3_audio_url else None, "user_uid": user_uid})
except Exception as e:
print(f"Error processing audio query: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/submit_mcq")
async def process_survey(survey_data: SurveyData):
try:
user_traits = extract_user_traits(survey_data.dict())
save_file = os.path.join(os.environ['DB_DIR'], "user_preference")
save_traits_to_file(user_traits, save_file)
# save_traits_to_file(user_traits, "/home/zrlhowsqpnco/codegama_bot/GF_replika/girlfriend_bot/Local_DB/user_preference/")
return JSONResponse(status_code=200, content={"success": True, "message": "Answer received and file saved", "code": 100, "data": user_traits})
except Exception as e:
return JSONResponse(status_code=500, content={"success": False, "error": str(e), "error_code": 500})
def extract_user_traits(survey_data):
user_traits = {
"user_uid": survey_data["user_uid"],
"user_traits": {
"familiarity_with_ai": "",
"download_reason": "",
"free_time_activities": [],
"solitude_perception": "",
"loneliness_coping": "",
"interaction_preferences": {
"desired_traits": [],
"interaction_style": "",
"preferred_bot_gender": ""
},
"love_language": "",
"turn_offs": []
},
"bot_settings": {
"proactive_interaction": True,
"emotional_depth": True,
"humor_enabled": True,
"supportive_conversations": True
}
}
for question in survey_data["questions"]:
selected_choice = next(
(choice["text"] for choice in question["choices"] if choice["id"] == question["selected_choice"]), "")
if question["question_number"] == 1:
user_traits["user_traits"]["familiarity_with_ai"] = selected_choice
elif question["question_number"] == 3:
user_traits["user_traits"]["download_reason"] = selected_choice
elif question["question_number"] == 5:
user_traits["user_traits"]["free_time_activities"].append(selected_choice)
elif question["question_number"] == 6:
user_traits["user_traits"]["solitude_perception"] = selected_choice
elif question["question_number"] == 7:
user_traits["user_traits"]["loneliness_coping"] = selected_choice
elif question["question_number"] in [9, 11]:
user_traits["user_traits"]["interaction_preferences"]["desired_traits"].append(selected_choice)
elif question["question_number"] == 8:
user_traits["user_traits"]["interaction_preferences"]["interaction_style"] = selected_choice
elif question["question_number"] == 10:
user_traits["user_traits"]["interaction_preferences"]["preferred_bot_gender"] = selected_choice
elif question["question_number"] == 13:
user_traits["user_traits"]["love_language"] = selected_choice
elif question["question_number"] == 15:
user_traits["user_traits"]["turn_offs"].append(selected_choice)
return user_traits
def save_traits_to_file(user_traits, output_dir):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_file_path = f"{output_dir}/{user_traits['user_uid']}_traits.json"
with open(output_file_path, 'w') as file:
json.dump(user_traits, file, indent=4)
@app.post("/get_user_history")
async def get_user_history(request: UserRequest):
try:
user_id = request.user_uid
# file_path = f"/home/zrlhowsqpnco/codegama_bot/GF_replika/girlfriend_bot/Local_DB/user_data/{user_id}_full_history.json"
file_path = os.path.join(os.environ['USER_DATA_DIR'], f"{user_id}_full_history.json")
if os.path.exists(file_path):
with open(file_path, "r") as file:
data = json.load(file)
return JSONResponse(status_code=200, content={"success": True, "message": "User history retrieved successfully", "code": 100, "data": data})
else:
return JSONResponse(status_code=404, content={"success": False, "error": "User data not found", "error_code": 404})
except Exception as e:
return JSONResponse(status_code=500, content={"success": False, "error": str(e), "error_code": 500})
@app.get("/media/{filename}")
async def get_media(filename: str):
file_path = os.path.join(MEDIA_DIR, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5007,ssl_keyfile="privkey.pem", ssl_certfile="fullchain.pem")
Back to Directory
File Manager