Viewing File: /home/ubuntu/codegamaai-test/voice_clone/app.py

# Description: This file is used to train and test RVC model.
# =====================================================================================
import json
import os
import threading
import uuid
import warnings
from random import shuffle
from subprocess import PIPE, STDOUT, Popen

from typing import List
import boto3
import time

import faiss
import numpy as np
import torch
from fastapi import (Depends, FastAPI, File, Form, HTTPException, Request,
                     UploadFile)
from mega import Mega
from pydantic import BaseModel, HttpUrl


from src.rvc_implementation.train_index import (click_train,
                                                rvc_train_and_save,
                                                train_index)
from src.utils import *
from src.s3_download import download_file_from_s3, direct_download
from fastapi.staticfiles import StaticFiles


import os
os.environ['TMPDIR'] = '/tmp'


# Get the current working directory
current_directory = os.getcwd()
print(f"Current working directory: {current_directory}")

# Check if new_directory exists
new_directory = os.path.join(current_directory, "src", "rvc_implementation")


if os.path.exists(new_directory):
    os.chdir(new_directory)
    print(f"Updated working directory: {new_directory}")


from src.rvc_implementation.app import *

from src.rvc_implementation.app import *

warnings.filterwarnings("ignore")
app = FastAPI()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

app = FastAPI()
app.mount("/static", StaticFiles(directory=current_directory), name="static")

class AudioPayload(BaseModel):
    user_id: str
    vid: str
    audio_files: List[HttpUrl]
    download_type: str


@app.post("/upload_audio/")
async def download_audios(payload: AudioPayload):
    directory = os.path.join(os.environ['dataset_store'], payload.user_id, payload.vid, "audio")
    if not os.path.exists(directory):
        os.makedirs(directory)

    for url in payload.audio_files:
        try:
            if payload.download_type == 's3':
                download_file_from_s3(url, directory)
                print(f"Downloaded and saved {url} to {directory}")

            else:
                direct_download(url, directory)
                print(f"Downloaded and saved {url} to {directory}")
            
            return {"status": 1, "message": "Files downloaded successfully", "directory": directory}
        
        except Exception as e:
            print(f"Failed to download {url}: {str(e)}")
            # raise HTTPException(status_code=400, detail=f"Failed to download {url}: {str(e)}")
            return {"status": 0, "message": "Failed to download files", "directory": None, "error": str(e)}  



@app.post("/train_rvc")
def train_rvc(json_data: dict):
    user_id = json_data.get('user_id')
    vid = json_data.get('vid')
    epochs = json_data.get('epochs')
    save_frequency = json_data.get('save_frequency')
    f0method = json_data.get('f0method')

    try:

        model_name = user_id + '_' + vid

        check_config = check_config_file(model_name, vid, epochs, save_frequency)
        if check_config.get('status') == 2:
            return check_config

        dataset_folder  = dataset_store + user_id + '/' + vid + '/' + 'audio'
        check_dataset = check_dataset_path(dataset_folder, model_name)
        if check_dataset:
            return check_dataset
        
        training_thread = threading.Thread(target=rvc_train_and_save, args=(model_name, f0method, epochs, save_frequency, user_id, vid, dataset_folder))
        training_thread.start()
        return {'status': 1, 'message': 'Training and saving will start in a separate thread.'}

    except Exception as e:
        print(f"Failed to train the model: {str(e)}")

        # Send Training Failed Response
        send_callback_to_frontend(user_id, vid, status = 0)
        return {'status': 0, 'message': 'Failed to train the model.', 'error': str(e)}


@app.post("/rvc_text")
async def test_rvc_text(json_data: dict):
    user_id = json_data.get('user_id')
    vid = json_data.get('vid')
    text = json_data.get('text')
    speaker_iden = json_data.get('speaker_iden')
    pitch_extraction_algorithm = json_data.get('pitch_extraction_algorithm')
    transpose = json_data.get('transpose')
    median_filtering = json_data.get('median_filtering')
    audio_resampling = json_data.get('audio_resampling')
    feature_ratio = json_data.get('feature_ratio')
    volume_envelope_scaling = json_data.get('volume_envelope_scaling')
    protect_voiceless_consonants = json_data.get('protect_voiceless_consonants')
    f0_file = json_data.get('f0_file')

    try:

        model_name = str(user_id) + "_" + str(vid)

        # Check if the model exists
        check_test_config = check_test_config_file(model_name)
        if check_test_config.get('status') == 2 or check_test_config.get('status') == 0:
            return check_test_config
        input_audio_file_path, input_media_url = main_TTS(text,model_name, speaker_iden)

        sid, index_file = get_index_file(model_name)
        vc_input = get_voice_input(input_audio_file_path)
        spk_item = 0

        vc.get_vc(sid,user_id, vid, None,None)
        vc_output1, vc_output2  = vc.vc_single(
            spk_item,         # spk_item
            vc_input,        # vc_input3
            transpose,        # vc_transform0
            f0_file,          # f0_file
            pitch_extraction_algorithm,   # f0method0
            index_file,       # file_index1
            None,             # file_index2
            # file_big_npy1,  # file_big_npy1
            feature_ratio,    # index_rate1
            median_filtering, # filter_radius0
            audio_resampling, # resample_sr0
            volume_envelope_scaling,       # rms_mix_rate0
            protect_voiceless_consonants,  # protect0
        )
        
        output_file_add = audio_output_file(user_id, vid, vc_output2)
        return {'status': 1, 'output_file': output_file_add, 'message': 'Audio file created successfully.'}
    except Exception as e:
        print(f"Failed to Create file: {str(e)}")
        # return None
        return {'status': 0, 'message': 'Failed to create audio file.', 'output_file': None, 'error': str(e)}


class AudioInput(BaseModel):
    user_id: str
    vid: str
    pitch_extraction_algorithm: str
    transpose: int
    median_filtering: int
    audio_resampling: int
    feature_ratio: float
    volume_envelope_scaling: float
    protect_voiceless_consonants: float
    f0_file: str


@app.post("/rvc_audio")
async def test_rvc_audio(audio_data: AudioInput = Depends(), files: UploadFile = File(...)):
    # Save the uploaded audio file locally

    user_id = audio_data.user_id
    vid = audio_data.vid
    pitch_extraction_algorithm = audio_data.pitch_extraction_algorithm
    transpose = audio_data.transpose
    median_filtering = audio_data.median_filtering
    audio_resampling = audio_data.audio_resampling
    feature_ratio =   audio_data.feature_ratio
    volume_envelope_scaling = audio_data.volume_envelope_scaling
    protect_voiceless_consonants = audio_data.protect_voiceless_consonants
    f0_file = audio_data.f0_file

    try:

        model_name = f"{audio_data.user_id}_{audio_data.vid}"
        uuid = generate_unique_filename()
        tmp_file = model_name + str(uuid) + ".wav"
        tmp_folder = os.environ['dataset_store'] + user_id + '/' + vid  + '/' + 'raw_audio'
        # create the directory if it does not exist
        if not os.path.exists(tmp_folder):
            os.makedirs(tmp_folder)
        
        tmp_filename = tmp_folder + "/" + tmp_file

        with open(tmp_filename , "wb") as audio:
            audio.write(await files.read())
        input_audio_file_path = tmp_filename 
        # Add The function

    # Check if the model exists
        check_test_config = check_test_config_file(model_name)
        if check_test_config.get('status') == 2 or check_test_config.get('status') == 0:
            return check_test_config

        sid, index_file = get_index_file(model_name)
        vc_input = get_voice_input(input_audio_file_path)
        spk_item = 0

        vc.get_vc(sid,user_id, vid, None,None)
        vc_output1, vc_output2  = vc.vc_single(
            spk_item,         # spk_item
            vc_input,        # vc_input3
            transpose,        # vc_transform0
            f0_file,          # f0_file
            pitch_extraction_algorithm,   # f0method0
            index_file,       # file_index1
            None,             # file_index2
            # file_big_npy1,  # file_big_npy1
            feature_ratio,    # index_rate1
            median_filtering, # filter_radius0
            audio_resampling, # resample_sr0
            volume_envelope_scaling,       # rms_mix_rate0
            protect_voiceless_consonants,  # protect0
        )
        
        output_file_add = audio_output_file(user_id, vid, vc_output2)

        return {'status': 1, 'output_file': output_file_add, 'message': 'Audio file created successfully.'}
    except Exception as e:
        print(f"Failed to Create file: {str(e)}")
        # return None
        return {'status': 0, 'message': 'Failed to create audio file.', 'output_file': None, 'error': str(e)}



@app.post("/rvc_audio_url")
async def test_rvc_audio_url(json_data: dict):
    user_id = json_data.get('user_id')
    vid = json_data.get('vid')
    audio_file_path = json_data.get('audio_file_path')
    pitch_extraction_algorithm = json_data.get('pitch_extraction_algorithm')
    transpose = json_data.get('transpose')
    median_filtering = json_data.get('median_filtering')
    audio_resampling = json_data.get('audio_resampling')
    feature_ratio = json_data.get('feature_ratio')
    volume_envelope_scaling = json_data.get('volume_envelope_scaling')
    protect_voiceless_consonants = json_data.get('protect_voiceless_consonants')
    f0_file = json_data.get('f0_file')
    model_name = str(user_id) + "_" + str(vid)

    try:

        input_audio_file_path = direct_save(audio_file_path, user_id, vid)
        # Check if the model exists
        check_test_config = check_test_config_file(model_name)
        if check_test_config.get('status') == 2 or check_test_config.get('status') == 0:
            return check_test_config

        sid, index_file = get_index_file(model_name)
        vc_input = get_voice_input(input_audio_file_path)
        spk_item = 0

        vc.get_vc(sid,user_id, vid, None,None)
        vc_output1, vc_output2  = vc.vc_single(
            spk_item,         # spk_item
            vc_input,        # vc_input3
            transpose,        # vc_transform0
            f0_file,          # f0_file
            pitch_extraction_algorithm,   # f0method0
            index_file,       # file_index1
            None,             # file_index2
            # file_big_npy1,  # file_big_npy1
            feature_ratio,    # index_rate1
            median_filtering, # filter_radius0
            audio_resampling, # resample_sr0
            volume_envelope_scaling,       # rms_mix_rate0
            protect_voiceless_consonants,  # protect0
        )
        
        output_file_add = audio_output_file(user_id, vid, vc_output2)
        # return  output_file_add
        return {'status': 1, 'output_file': output_file_add, 'message': 'Audio file created successfully.'}
    except Exception as e:
        print(f"Failed to Create file: {str(e)}")
        # return None
        return {'status': 0, 'message': 'Failed to create audio file.', 'output_file': None, 'error': str(e)}


@app.post("/check_status")
def train_status(json_data: dict):
    user_id = json_data.get('user_id')
    vid = json_data.get('vid')

    try:

        model_name = user_id + '_' + vid

        # Check the file exists or not
        if not os.path.exists(os.path.join(user_config_files, model_name + '.json')):
            return {'status': 2, 'message': 'The model does not exist. Or the model name is incorrect.'}
        else:
            with open(os.path.join(user_config_files, model_name + '.json')) as f:
                data = json.load(f)
                # Read the status of the model
                status = data.get('status')
                if status == 0:
                    return {'status': 0, 'message': 'Model is being trained. Please wait. You will be notified when the model is trained.'}
                elif status == 1:
                    return {'status': 1, 'message': 'Model is trained. You can use the model now.'} 

    except Exception as e:
        print(f"Failed to check the status: {str(e)}")
        return {'status': 2, 'message': 'Failed to check the status.', 'error': str(e)} 


ssl_keyfile = os.path.join(os.environ['current_directory'], "privkey.pem")
ssl_certfile = os.path.join(os.environ['current_directory'], "fullchain.pem")
print(f"SSL Keyfile: {ssl_keyfile}")
print(f"SSL Certfile: {ssl_certfile}")
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host='0.0.0.0', port=8012, ssl_keyfile=ssl_keyfile,ssl_certfile=ssl_certfile)
Back to Directory File Manager