Viewing File: /home/ubuntu/codegamaai-test/voice_clone/src/utils.py
import json
import os
import uuid
import numpy as np
import librosa
import requests
from pydub import AudioSegment
from scipy.io import wavfile
from scipy.io.wavfile import write
current_directory = os.getcwd()
user_config_files = f"{current_directory}/src/user_config_files/"
dataset_store = f"{current_directory}/data/"
model_weights_dir = f"{current_directory}/src/rvc_implementation/assets/weights/"
model_index_dir = f"{current_directory}/src/rvc_implementation/logs/"
model_base_dir = f"{current_directory}/data/"
new_directory = f"{current_directory}/src/rvc_implementation"
os.environ['current_directory'] = current_directory
os.environ['user_config_files'] = user_config_files
os.environ['dataset_store'] = dataset_store
os.environ['model_weights_dir'] = model_weights_dir
os.environ['model_index_dir'] = model_index_dir
os.environ['model_base_dir'] = model_base_dir
os.environ['new_directory'] = new_directory
assets_dir = f"{current_directory}/src/rvc_implementation/assets/"
os.environ['assets_dir'] = assets_dir
AWS_ACCESS_KEY_ID="AKIAXWYZ72NY4I63JG76"
AWS_SECRET_ACCESS_KEY="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"
AWS_DEFAULT_REGION="us-east-1"
os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY
os.environ['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
os.environ['TTS_API_URL'] = "https://164.52.211.150:5001/text-to-audio-msen"
os.environ['OUTPUT_MEDIA_URL'] = "https://vc-ai.botfingers.com:8012/static"
os.environ['INPUT_MEDIA_URL'] = "https://yggzyuqumjv.haive.online:5001/static_vits/data/processed_audio/output_speech_"
os.environ['INPUT_MEDIA_FILE_PATH'] = "/home/haive_user_1/abhinav/text_to_speech/vits_ms/data/processed_audio/output_speech_"
os.environ["rmvpe_root"] = f"{os.environ['new_directory']}/assets/rmvpe"
os.environ["hubert_root"] = f"{os.environ['new_directory']}/assets/hubert"
def direct_save(url, user_id, vid):
# Create the directory if it does not exist
directory = os.path.join(os.environ['model_base_dir'], user_id, vid, "raw_audio")
if not os.path.exists(directory):
os.makedirs(directory)
local_file_path = os.path.join(os.environ['model_base_dir'], user_id, vid, "raw_audio", url.split('/')[-1])
try:
response = requests.get(url)
with open(local_file_path, 'wb') as f:
f.write(response.content)
print(f"File downloaded successfully: {local_file_path}")
return local_file_path
except Exception as e:
print(f"Failed to download file: {e}")
return None
def check_config_file(model_name, vid, epochs, save_frequency):
if os.path.exists(os.path.join(user_config_files, model_name + '.json')):
# First load the json file and check if the status is 0
with open(os.path.join(user_config_files, model_name + '.json'), 'r') as f:
data = json.load(f)
status = data.get('status')
if status == 0:
return {'status': 0, 'message': 'The model is still training.'}
else:
return {'status': 2, 'message': 'The model already exists. Please use a different VID.'}
else:
# Create a json file with the model name
with open(os.path.join(user_config_files, model_name + '.json'), 'w') as f:
data = {
'status': 0,
'model_name': model_name,
'dataset_path': vid,
'epochs': epochs,
'save_frequency': save_frequency,
'index_file': None,
'model_weights' : None,
}
json.dump(data, f)
print("Created Config File.")
return {'status': 0, 'message': 'Created Config File.'}
def generate_unique_filename():
unique_id = str(uuid.uuid4())
return unique_id
def check_dataset_path(dataset_folder, model_name):
# Check if the dataset path exists
if not os.path.exists(dataset_folder):
print("The dataset path does not exist.")
return {'status': 2, 'message': 'The dataset path does not exist. Please create a Dataset.'}
if len(os.listdir(dataset_folder)) < 1:
print("Your dataset folder is empty.")
return {'status': 2, 'message': 'Your dataset folder is empty.'}
else:
pass
def check_test_config_file(model_name):
if os.path.exists(os.path.join(user_config_files , model_name + '.json')):
# Read the json file and get status
print("Model exists")
with open(os.path.join(user_config_files , model_name + '.json'), 'r') as f:
data = json.load(f)
status = data.get('status')
if status == 1:
print("The model is ready for testing.")
return {'status': 1, 'message': 'The model is ready for testing.'}
elif status == 0:
print("The model is still training. Please try again later.")
return {'status': 0, 'message': 'The model is still training. Please try again later.'}
else:
print("The provided model does not exist. Please train the model first.")
return {'status': 2, 'message': 'The provided model does not exist.'}
else:
print("The provided model does not exist.")
return {'status': 2, 'message': 'The provided model does not exist.'}
def main_TTS(text,id, speaker_iden):
# Create a unique id for the audio file
uid = str(uuid.uuid4())
payload = {
"text": text,
"speaker_iden" : speaker_iden,
"id" : id,
"uid": uid
}
text_to_audio_api_request(payload = payload)
media_url = "https://cloud.haive.online/public/AI_voice/"+id+uid+".mp3"
output_file_mp3 = "/var/www/html/output/public/AI_voice/"+id+str(uid)+".mp3"
return output_file_mp3, media_url
def text_to_audio_api_request(payload):
url = "http://162.244.83.121:8008/text-to-audio-msen"
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
response_data = response.json()
if response.status_code == 200:
print("API request successful.")
print("Response:", response_data)
else:
print("API request failed. Status code:", response.status_code)
print("Response:", response_data)
except Exception as e:
print("An error occurred:", str(e))
return "File Created"
def audio_output_file(user_id, vid, vc_output2):
model_name = str(user_id) + "_" + str(vid)
folder_name = os.path.join(os.environ['dataset_store'], str(user_id), str(vid), 'processed_audio')
# Create the directory if it does not exist
if not os.path.exists(folder_name):
os.makedirs(folder_name)
uid_2 = str(uuid.uuid4())
output_file = folder_name + "/" + model_name + "_" + str(uid_2)+ ".wav"
# output_file_mp3 = model_name + "_" + str(uid_2) +".mp3"
# output_file_add = os.path.join(os.environ['OUTPUT_MEDIA_URL'] , 'data', str(user_id), str(vid), 'processed_audio', output_file_mp3)
output_file_add = os.path.join(os.environ['OUTPUT_MEDIA_URL'] , 'data', str(user_id), str(vid), 'processed_audio', model_name + "_" + str(uid_2)+ ".wav")
print("output_file: ", output_file)
# output_file_mp3 = folder_name + "/" + output_file_mp3
sample_rate = vc_output2[0]
speech = vc_output2[1]
print("sample_rate: ", sample_rate)
print("speech: ", speech)
# Save the audio as a .wav file
wavfile.write(output_file , sample_rate, speech)
print(f'Audio content written to file "{output_file}".')
# audio = AudioSegment.from_wav(output_file)
# audio.export(output_file_mp3, format="mp3")
# os.remove(output_file)
print(f"Converting text to audio and saving to server")
print("Audio File Created at: ", output_file_add)
return output_file_add
def get_index_file(model_name):
with open(os.path.join(user_config_files, model_name + '.json'), 'r') as f:
data = json.load(f)
sid = data.get('model_weights')
index_file = data.get('index_file')
return sid, index_file
def get_voice_input(input_audio_file_path):
audio, sample_rate = librosa.load(input_audio_file_path, sr=22050)
audio = (audio * 32767).astype(np.int16)
vc_input = (sample_rate, audio)
return vc_input
def rvc_clone_api_request(user_id, vid, text):
url = "http://162.244.83.121:8020/test_rvc"
payload = json.dumps({
"user_id": user_id, # "user_id_2",
"vid": vid, # "vid_2",
"text": text, # "Hi, I am Sarah. I am an AI Assistent. I am developed by Developers at Haive.",
"speaker_iden": 99,
"pitch_extraction_algorithm": "pm",
"transpose": 0,
"median_filtering": 3,
"audio_resampling": 0,
"feature_ratio": 0.66,
"volume_envelope_scaling": 0.21,
"protect_voiceless_consonants": 0.33,
"f0_file": ""
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
return response.text
def send_callback_to_frontend(user_id, vid, status):
# Validate the weight and Index File paths send callback to the user
callback_url = 'https://cms-voxsync.botfingers.com/api/user/call_back'
if status == 1:
print("Model weights and Index File paths validated.")
# Send callback to the user
payload = {
'user_id': user_id,
'vid': vid,
'status': 1,
'message': 'Model training successful.',
}
# Send a post request to the callback_url
response = requests.post(callback_url, data=payload)
# Retrun response status code in dict
if response.status_code == 200:
print("Callback sent successfully.")
return {'status': 1, 'message': 'Callback sent successfully.', 'response': response.status_code}
else:
print("Callback failed. Status code:", response.status_code)
return {'status': 0, 'message': 'Callback failed. Status code:', 'response' : response.status_code}
else:
print("Model weights and Index File paths not validated.")
# Send callback to the user
payload = {
'user_id': user_id,
'vid': vid,
'status': 0,
'message': 'Model training failed.',
}
# Send a post request to the callback_url
response = requests.post(callback_url, data=payload)
# Retrun response status code in dict
if response.status_code == 200:
print("Callback sent successfully.")
return {'status': 1, 'message': 'Callback sent successfully.', 'response': response.status_code}
else:
print("Callback failed. Status code:", response.status_code)
return {'status': 0, 'message': 'Callback failed. Status code:', 'response' : response.status_code}
Back to Directory
File Manager