Viewing File: /home/ubuntu/codegamaai-test/voice_clone/src/app.py

import os
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI,Form,BackgroundTasks
from pydantic import BaseModel
from fastapi.responses import JSONResponse, FileResponse
from supertokens_fastapi import get_cors_allowed_headers
from fastapi import File, UploadFile, HTTPException
import uvicorn
import shutil
from fastapi import FastAPI
from typing import Union
from threading import Thread
import uuid
from src.constants import *
from src.utils import *
from main import *
from src.constants import MEDIA_DIR, TEMP_DIR


app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"] + get_cors_allowed_headers(),
)

class AudioQuery(BaseModel):
    audio_file_path: str
    audio_format: str
    context_id: Union[int, str]

def download_file(url, destination_path):
    """Download a file from a given URL to a local destination path."""
    response = requests.get(url)
    response.raise_for_status()  # Ensure we got a successful response
    with open(destination_path, 'wb') as file:
        file.write(response.content)

@app.post("/api/v1/query")
async def query(data: dict):
    question = data['question']
    user_uid = data['user_uid']

    response_message, audio_path = data_querying(question, user_uid, is_audio=False)
    response_data = response_message

    if audio_path is not None:
        response_audio_path = os.path.join(MEDIA_DIR, audio_path)
        response_data["audio_path"] = response_audio_path

    return JSONResponse(content=response_data)

@app.post("/api/v1/query/audio")
async def query_audio(data: AudioQuery):
    try:
        print(f"Received audio query: {data}")
        temp_audio_path = data.audio_file_path
        
        user_uid = str(data.context_id)
        user_query_text = speech_to_text(temp_audio_path)
        response_text, s3_audio_url = data_querying(user_query_text, user_uid, is_audio=True)
        
        # Directly return the S3 URL without appending it to MEDIA_DIR
        return JSONResponse(content={"audio_path": s3_audio_url if s3_audio_url else None, "user_uid": user_uid})
    
    except Exception as e:
        print(f"Error processing audio query: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/media/{filename}")
async def get_media(filename:str):
    file_path = os.path.join(MEDIA_DIR, filename)
    if not os.path.exists(file_path):
        raise HTTPException(status_code=404, detail="File not found")
    return FileResponse(file_path)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0",port=5007)
Back to Directory File Manager