Viewing File: /home/ubuntu/codegamaai-test/voice_clone/src/app.py
import os
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI,Form,BackgroundTasks
from pydantic import BaseModel
from fastapi.responses import JSONResponse, FileResponse
from supertokens_fastapi import get_cors_allowed_headers
from fastapi import File, UploadFile, HTTPException
import uvicorn
import shutil
from fastapi import FastAPI
from typing import Union
from threading import Thread
import uuid
from src.constants import *
from src.utils import *
from main import *
from src.constants import MEDIA_DIR, TEMP_DIR
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"] + get_cors_allowed_headers(),
)
class AudioQuery(BaseModel):
audio_file_path: str
audio_format: str
context_id: Union[int, str]
def download_file(url, destination_path):
"""Download a file from a given URL to a local destination path."""
response = requests.get(url)
response.raise_for_status() # Ensure we got a successful response
with open(destination_path, 'wb') as file:
file.write(response.content)
@app.post("/api/v1/query")
async def query(data: dict):
question = data['question']
user_uid = data['user_uid']
response_message, audio_path = data_querying(question, user_uid, is_audio=False)
response_data = response_message
if audio_path is not None:
response_audio_path = os.path.join(MEDIA_DIR, audio_path)
response_data["audio_path"] = response_audio_path
return JSONResponse(content=response_data)
@app.post("/api/v1/query/audio")
async def query_audio(data: AudioQuery):
try:
print(f"Received audio query: {data}")
temp_audio_path = data.audio_file_path
user_uid = str(data.context_id)
user_query_text = speech_to_text(temp_audio_path)
response_text, s3_audio_url = data_querying(user_query_text, user_uid, is_audio=True)
# Directly return the S3 URL without appending it to MEDIA_DIR
return JSONResponse(content={"audio_path": s3_audio_url if s3_audio_url else None, "user_uid": user_uid})
except Exception as e:
print(f"Error processing audio query: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/media/{filename}")
async def get_media(filename:str):
file_path = os.path.join(MEDIA_DIR, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0",port=5007)
Back to Directory
File Manager