Viewing File: /home/ubuntu/codegamaai-test/voice_clone/app.py
# Description: This file is used to train and test RVC model.
# =====================================================================================
import json
import os
import threading
import uuid
import warnings
from random import shuffle
from subprocess import PIPE, STDOUT, Popen
from typing import List
import boto3
import time
import faiss
import numpy as np
import torch
from fastapi import (Depends, FastAPI, File, Form, HTTPException, Request,
UploadFile)
from mega import Mega
from pydantic import BaseModel, HttpUrl
from src.rvc_implementation.train_index import (click_train,
rvc_train_and_save,
train_index)
from src.utils import *
from src.s3_download import download_file_from_s3, direct_download
from fastapi.staticfiles import StaticFiles
import os
os.environ['TMPDIR'] = '/tmp'
# Get the current working directory
current_directory = os.getcwd()
print(f"Current working directory: {current_directory}")
# Check if new_directory exists
new_directory = os.path.join(current_directory, "src", "rvc_implementation")
if os.path.exists(new_directory):
os.chdir(new_directory)
print(f"Updated working directory: {new_directory}")
from src.rvc_implementation.app import *
from src.rvc_implementation.app import *
warnings.filterwarnings("ignore")
app = FastAPI()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
app = FastAPI()
app.mount("/static", StaticFiles(directory=current_directory), name="static")
class AudioPayload(BaseModel):
user_id: str
vid: str
audio_files: List[HttpUrl]
download_type: str
@app.post("/upload_audio/")
async def download_audios(payload: AudioPayload):
directory = os.path.join(os.environ['dataset_store'], payload.user_id, payload.vid, "audio")
if not os.path.exists(directory):
os.makedirs(directory)
for url in payload.audio_files:
try:
if payload.download_type == 's3':
download_file_from_s3(url, directory)
print(f"Downloaded and saved {url} to {directory}")
else:
direct_download(url, directory)
print(f"Downloaded and saved {url} to {directory}")
return {"status": 1, "message": "Files downloaded successfully", "directory": directory}
except Exception as e:
print(f"Failed to download {url}: {str(e)}")
# raise HTTPException(status_code=400, detail=f"Failed to download {url}: {str(e)}")
return {"status": 0, "message": "Failed to download files", "directory": None, "error": str(e)}
@app.post("/train_rvc")
def train_rvc(json_data: dict):
user_id = json_data.get('user_id')
vid = json_data.get('vid')
epochs = json_data.get('epochs')
save_frequency = json_data.get('save_frequency')
f0method = json_data.get('f0method')
try:
model_name = user_id + '_' + vid
check_config = check_config_file(model_name, vid, epochs, save_frequency)
if check_config.get('status') == 2:
return check_config
dataset_folder = dataset_store + user_id + '/' + vid + '/' + 'audio'
check_dataset = check_dataset_path(dataset_folder, model_name)
if check_dataset:
return check_dataset
training_thread = threading.Thread(target=rvc_train_and_save, args=(model_name, f0method, epochs, save_frequency, user_id, vid, dataset_folder))
training_thread.start()
return {'status': 1, 'message': 'Training and saving will start in a separate thread.'}
except Exception as e:
print(f"Failed to train the model: {str(e)}")
# Send Training Failed Response
send_callback_to_frontend(user_id, vid, status = 0)
return {'status': 0, 'message': 'Failed to train the model.', 'error': str(e)}
@app.post("/rvc_text")
async def test_rvc_text(json_data: dict):
user_id = json_data.get('user_id')
vid = json_data.get('vid')
text = json_data.get('text')
speaker_iden = json_data.get('speaker_iden')
pitch_extraction_algorithm = json_data.get('pitch_extraction_algorithm')
transpose = json_data.get('transpose')
median_filtering = json_data.get('median_filtering')
audio_resampling = json_data.get('audio_resampling')
feature_ratio = json_data.get('feature_ratio')
volume_envelope_scaling = json_data.get('volume_envelope_scaling')
protect_voiceless_consonants = json_data.get('protect_voiceless_consonants')
f0_file = json_data.get('f0_file')
try:
model_name = str(user_id) + "_" + str(vid)
# Check if the model exists
check_test_config = check_test_config_file(model_name)
if check_test_config.get('status') == 2 or check_test_config.get('status') == 0:
return check_test_config
input_audio_file_path, input_media_url = main_TTS(text,model_name, speaker_iden)
sid, index_file = get_index_file(model_name)
vc_input = get_voice_input(input_audio_file_path)
spk_item = 0
vc.get_vc(sid,user_id, vid, None,None)
vc_output1, vc_output2 = vc.vc_single(
spk_item, # spk_item
vc_input, # vc_input3
transpose, # vc_transform0
f0_file, # f0_file
pitch_extraction_algorithm, # f0method0
index_file, # file_index1
None, # file_index2
# file_big_npy1, # file_big_npy1
feature_ratio, # index_rate1
median_filtering, # filter_radius0
audio_resampling, # resample_sr0
volume_envelope_scaling, # rms_mix_rate0
protect_voiceless_consonants, # protect0
)
output_file_add = audio_output_file(user_id, vid, vc_output2)
return {'status': 1, 'output_file': output_file_add, 'message': 'Audio file created successfully.'}
except Exception as e:
print(f"Failed to Create file: {str(e)}")
# return None
return {'status': 0, 'message': 'Failed to create audio file.', 'output_file': None, 'error': str(e)}
class AudioInput(BaseModel):
user_id: str
vid: str
pitch_extraction_algorithm: str
transpose: int
median_filtering: int
audio_resampling: int
feature_ratio: float
volume_envelope_scaling: float
protect_voiceless_consonants: float
f0_file: str
@app.post("/rvc_audio")
async def test_rvc_audio(audio_data: AudioInput = Depends(), files: UploadFile = File(...)):
# Save the uploaded audio file locally
user_id = audio_data.user_id
vid = audio_data.vid
pitch_extraction_algorithm = audio_data.pitch_extraction_algorithm
transpose = audio_data.transpose
median_filtering = audio_data.median_filtering
audio_resampling = audio_data.audio_resampling
feature_ratio = audio_data.feature_ratio
volume_envelope_scaling = audio_data.volume_envelope_scaling
protect_voiceless_consonants = audio_data.protect_voiceless_consonants
f0_file = audio_data.f0_file
try:
model_name = f"{audio_data.user_id}_{audio_data.vid}"
uuid = generate_unique_filename()
tmp_file = model_name + str(uuid) + ".wav"
tmp_folder = os.environ['dataset_store'] + user_id + '/' + vid + '/' + 'raw_audio'
# create the directory if it does not exist
if not os.path.exists(tmp_folder):
os.makedirs(tmp_folder)
tmp_filename = tmp_folder + "/" + tmp_file
with open(tmp_filename , "wb") as audio:
audio.write(await files.read())
input_audio_file_path = tmp_filename
# Add The function
# Check if the model exists
check_test_config = check_test_config_file(model_name)
if check_test_config.get('status') == 2 or check_test_config.get('status') == 0:
return check_test_config
sid, index_file = get_index_file(model_name)
vc_input = get_voice_input(input_audio_file_path)
spk_item = 0
vc.get_vc(sid,user_id, vid, None,None)
vc_output1, vc_output2 = vc.vc_single(
spk_item, # spk_item
vc_input, # vc_input3
transpose, # vc_transform0
f0_file, # f0_file
pitch_extraction_algorithm, # f0method0
index_file, # file_index1
None, # file_index2
# file_big_npy1, # file_big_npy1
feature_ratio, # index_rate1
median_filtering, # filter_radius0
audio_resampling, # resample_sr0
volume_envelope_scaling, # rms_mix_rate0
protect_voiceless_consonants, # protect0
)
output_file_add = audio_output_file(user_id, vid, vc_output2)
return {'status': 1, 'output_file': output_file_add, 'message': 'Audio file created successfully.'}
except Exception as e:
print(f"Failed to Create file: {str(e)}")
# return None
return {'status': 0, 'message': 'Failed to create audio file.', 'output_file': None, 'error': str(e)}
@app.post("/rvc_audio_url")
async def test_rvc_audio_url(json_data: dict):
user_id = json_data.get('user_id')
vid = json_data.get('vid')
audio_file_path = json_data.get('audio_file_path')
pitch_extraction_algorithm = json_data.get('pitch_extraction_algorithm')
transpose = json_data.get('transpose')
median_filtering = json_data.get('median_filtering')
audio_resampling = json_data.get('audio_resampling')
feature_ratio = json_data.get('feature_ratio')
volume_envelope_scaling = json_data.get('volume_envelope_scaling')
protect_voiceless_consonants = json_data.get('protect_voiceless_consonants')
f0_file = json_data.get('f0_file')
model_name = str(user_id) + "_" + str(vid)
try:
input_audio_file_path = direct_save(audio_file_path, user_id, vid)
# Check if the model exists
check_test_config = check_test_config_file(model_name)
if check_test_config.get('status') == 2 or check_test_config.get('status') == 0:
return check_test_config
sid, index_file = get_index_file(model_name)
vc_input = get_voice_input(input_audio_file_path)
spk_item = 0
vc.get_vc(sid,user_id, vid, None,None)
vc_output1, vc_output2 = vc.vc_single(
spk_item, # spk_item
vc_input, # vc_input3
transpose, # vc_transform0
f0_file, # f0_file
pitch_extraction_algorithm, # f0method0
index_file, # file_index1
None, # file_index2
# file_big_npy1, # file_big_npy1
feature_ratio, # index_rate1
median_filtering, # filter_radius0
audio_resampling, # resample_sr0
volume_envelope_scaling, # rms_mix_rate0
protect_voiceless_consonants, # protect0
)
output_file_add = audio_output_file(user_id, vid, vc_output2)
# return output_file_add
return {'status': 1, 'output_file': output_file_add, 'message': 'Audio file created successfully.'}
except Exception as e:
print(f"Failed to Create file: {str(e)}")
# return None
return {'status': 0, 'message': 'Failed to create audio file.', 'output_file': None, 'error': str(e)}
@app.post("/check_status")
def train_status(json_data: dict):
user_id = json_data.get('user_id')
vid = json_data.get('vid')
try:
model_name = user_id + '_' + vid
# Check the file exists or not
if not os.path.exists(os.path.join(user_config_files, model_name + '.json')):
return {'status': 2, 'message': 'The model does not exist. Or the model name is incorrect.'}
else:
with open(os.path.join(user_config_files, model_name + '.json')) as f:
data = json.load(f)
# Read the status of the model
status = data.get('status')
if status == 0:
return {'status': 0, 'message': 'Model is being trained. Please wait. You will be notified when the model is trained.'}
elif status == 1:
return {'status': 1, 'message': 'Model is trained. You can use the model now.'}
except Exception as e:
print(f"Failed to check the status: {str(e)}")
return {'status': 2, 'message': 'Failed to check the status.', 'error': str(e)}
ssl_keyfile = os.path.join(os.environ['current_directory'], "privkey.pem")
ssl_certfile = os.path.join(os.environ['current_directory'], "fullchain.pem")
print(f"SSL Keyfile: {ssl_keyfile}")
print(f"SSL Certfile: {ssl_certfile}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8012, ssl_keyfile=ssl_keyfile,ssl_certfile=ssl_certfile)
Back to Directory
File Manager