Viewing File: /home/ubuntu/codegamaai-test/voice_clone/src/utils.py

import json
import os
import uuid
import numpy as np

import librosa
import requests
from pydub import AudioSegment
from scipy.io import wavfile
from scipy.io.wavfile import write

current_directory = os.getcwd()
user_config_files = f"{current_directory}/src/user_config_files/"
dataset_store = f"{current_directory}/data/"
model_weights_dir = f"{current_directory}/src/rvc_implementation/assets/weights/"
model_index_dir = f"{current_directory}/src/rvc_implementation/logs/"
model_base_dir = f"{current_directory}/data/"
new_directory = f"{current_directory}/src/rvc_implementation"


os.environ['current_directory'] = current_directory
os.environ['user_config_files'] = user_config_files
os.environ['dataset_store'] = dataset_store
os.environ['model_weights_dir'] = model_weights_dir
os.environ['model_index_dir'] = model_index_dir
os.environ['model_base_dir'] = model_base_dir
os.environ['new_directory'] = new_directory

assets_dir = f"{current_directory}/src/rvc_implementation/assets/"
os.environ['assets_dir'] = assets_dir


AWS_ACCESS_KEY_ID="AKIAXWYZ72NY4I63JG76"
AWS_SECRET_ACCESS_KEY="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"
AWS_DEFAULT_REGION="us-east-1"

os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY
os.environ['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION


os.environ['TTS_API_URL'] = "https://164.52.211.150:5001/text-to-audio-msen"
os.environ['OUTPUT_MEDIA_URL'] = "https://vc-ai.botfingers.com:8012/static"
os.environ['INPUT_MEDIA_URL'] = "https://yggzyuqumjv.haive.online:5001/static_vits/data/processed_audio/output_speech_"
os.environ['INPUT_MEDIA_FILE_PATH'] = "/home/haive_user_1/abhinav/text_to_speech/vits_ms/data/processed_audio/output_speech_"
os.environ["rmvpe_root"] = f"{os.environ['new_directory']}/assets/rmvpe"
os.environ["hubert_root"] = f"{os.environ['new_directory']}/assets/hubert"

def direct_save(url, user_id, vid):
    # Create the directory if it does not exist
    directory = os.path.join(os.environ['model_base_dir'], user_id, vid, "raw_audio")
    if not os.path.exists(directory):
        os.makedirs(directory)
    local_file_path = os.path.join(os.environ['model_base_dir'], user_id, vid, "raw_audio", url.split('/')[-1])

    try:
        response = requests.get(url)
        with open(local_file_path, 'wb') as f:
            f.write(response.content)
        print(f"File downloaded successfully: {local_file_path}")
        return local_file_path
    except Exception as e:
        print(f"Failed to download file: {e}")
        return None



def check_config_file(model_name, vid, epochs, save_frequency):

    if os.path.exists(os.path.join(user_config_files, model_name + '.json')):
        # First load the json file and check if the status is 0
        with open(os.path.join(user_config_files, model_name + '.json'), 'r') as f:
            data = json.load(f)
            status = data.get('status')
            if status == 0:
                return {'status': 0, 'message': 'The model is still training.'}
            else:
                return {'status': 2, 'message': 'The model already exists. Please use a different VID.'}
    else:
        # Create a json file with the model name
        with open(os.path.join(user_config_files, model_name + '.json'), 'w') as f:
            data = {
                'status': 0,
                'model_name': model_name,
                'dataset_path': vid,
                'epochs': epochs,
                'save_frequency': save_frequency,
                'index_file': None,
                'model_weights' : None,
            }
            json.dump(data, f)
        print("Created Config File.")

        return {'status': 0, 'message': 'Created Config File.'}

def generate_unique_filename():
    unique_id = str(uuid.uuid4())
    return unique_id


def check_dataset_path(dataset_folder, model_name):
    # Check if the dataset path exists
    if not os.path.exists(dataset_folder):
        print("The dataset path does not exist.")
        return {'status': 2, 'message': 'The dataset path does not exist. Please create a Dataset.'}
    if len(os.listdir(dataset_folder)) < 1:
        print("Your dataset folder is empty.")
        return {'status': 2, 'message': 'Your dataset folder is empty.'}
    else:
        pass


def check_test_config_file(model_name):
    if os.path.exists(os.path.join(user_config_files , model_name + '.json')):
        # Read the json file and get status
        print("Model exists")
        with open(os.path.join(user_config_files , model_name + '.json'), 'r') as f:
            data = json.load(f)
            status = data.get('status')
            if status == 1:
                print("The model is ready for testing.")
                return {'status': 1, 'message': 'The model is ready for testing.'}
            elif status == 0:
                print("The model is still training. Please try again later.")
                return {'status': 0, 'message': 'The model is still training. Please try again later.'}
            
            else:
                print("The provided model does not exist. Please train the model first.")
                return {'status': 2, 'message': 'The provided model does not exist.'}

    else:
        print("The provided model does not exist.")
        return {'status': 2, 'message': 'The provided model does not exist.'}



def main_TTS(text,id, speaker_iden):
    # Create a unique id for the audio file
    uid = str(uuid.uuid4())
    payload = {
        "text": text,
        "speaker_iden" : speaker_iden,
        "id" : id,
        "uid": uid
    }
    
    text_to_audio_api_request(payload = payload)
    media_url = "https://cloud.haive.online/public/AI_voice/"+id+uid+".mp3"
    output_file_mp3 = "/var/www/html/output/public/AI_voice/"+id+str(uid)+".mp3"
    return output_file_mp3, media_url

def text_to_audio_api_request(payload):

    url = "http://162.244.83.121:8008/text-to-audio-msen"
    headers = {"Content-Type": "application/json"}
    
    try:
        response = requests.post(url, headers=headers, data=json.dumps(payload))
        response_data = response.json()
        
        if response.status_code == 200:
            print("API request successful.")
            print("Response:", response_data)
        else:
            print("API request failed. Status code:", response.status_code)
            print("Response:", response_data)
    except Exception as e:
        print("An error occurred:", str(e))
    return "File Created"


def audio_output_file(user_id, vid, vc_output2):
    model_name = str(user_id) + "_" + str(vid)
    folder_name = os.path.join(os.environ['dataset_store'], str(user_id), str(vid), 'processed_audio')
    # Create the directory if it does not exist
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
    
    uid_2 = str(uuid.uuid4())
    output_file  = folder_name + "/" + model_name + "_" + str(uid_2)+ ".wav" 
    # output_file_mp3 = model_name + "_" + str(uid_2) +".mp3"
    # output_file_add = os.path.join(os.environ['OUTPUT_MEDIA_URL'] , 'data', str(user_id), str(vid), 'processed_audio', output_file_mp3)

    output_file_add = os.path.join(os.environ['OUTPUT_MEDIA_URL'] , 'data', str(user_id), str(vid), 'processed_audio', model_name + "_" + str(uid_2)+ ".wav")
    print("output_file: ", output_file)
    # output_file_mp3 = folder_name + "/" + output_file_mp3
    
    sample_rate = vc_output2[0]
    speech = vc_output2[1]
    print("sample_rate: ", sample_rate)
    print("speech: ", speech)

# Save the audio as a .wav file
    wavfile.write(output_file , sample_rate, speech)
    print(f'Audio content written to file "{output_file}".')
    # audio = AudioSegment.from_wav(output_file)
    # audio.export(output_file_mp3, format="mp3")
    # os.remove(output_file)
    

    print(f"Converting text to audio and saving to server")
    print("Audio File Created at: ", output_file_add)

    return output_file_add

def get_index_file(model_name):
    with open(os.path.join(user_config_files, model_name + '.json'), 'r') as f:
        data = json.load(f)
        sid = data.get('model_weights')
        index_file = data.get('index_file')
    return sid, index_file


def get_voice_input(input_audio_file_path):
    audio, sample_rate = librosa.load(input_audio_file_path, sr=22050)
    audio = (audio * 32767).astype(np.int16)
    vc_input = (sample_rate, audio)
    return vc_input


def rvc_clone_api_request(user_id, vid, text):
    url = "http://162.244.83.121:8020/test_rvc"

    payload = json.dumps({
    "user_id": user_id, # "user_id_2",
    "vid": vid, # "vid_2",
    "text": text, # "Hi, I am Sarah. I am an AI Assistent. I am developed by Developers at Haive.",
    "speaker_iden": 99,
    "pitch_extraction_algorithm": "pm",
    "transpose": 0,
    "median_filtering": 3,
    "audio_resampling": 0,
    "feature_ratio": 0.66,
    "volume_envelope_scaling": 0.21,
    "protect_voiceless_consonants": 0.33,
    "f0_file": ""
    })
    headers = {
    'Content-Type': 'application/json'
    }

    response = requests.request("POST", url, headers=headers, data=payload)

    print(response.text)

    return response.text


def send_callback_to_frontend(user_id, vid, status):

    # Validate the weight and Index File paths send callback to the user
    callback_url = 'https://cms-voxsync.botfingers.com/api/user/call_back'
    if status == 1:
        print("Model weights and Index File paths validated.")
        # Send callback to the user
        payload = {
            'user_id': user_id,
            'vid': vid,
            'status': 1,
            'message': 'Model training successful.',
        }
        # Send a post request to the callback_url
        response = requests.post(callback_url, data=payload)
        
        # Retrun response status code in dict
        if response.status_code == 200:
            print("Callback sent successfully.")
            return {'status': 1, 'message': 'Callback sent successfully.', 'response': response.status_code}
        else:
            print("Callback failed. Status code:", response.status_code)
            return {'status': 0, 'message': 'Callback failed. Status code:', 'response' : response.status_code}

            
        
    else:
        print("Model weights and Index File paths not validated.")
        # Send callback to the user
        payload = {
            'user_id': user_id,
            'vid': vid,
            'status': 0,
            'message': 'Model training failed.',
        }
        # Send a post request to the callback_url
        response = requests.post(callback_url, data=payload)
        # Retrun response status code in dict
        if response.status_code == 200:
            print("Callback sent successfully.")
            return {'status': 1, 'message': 'Callback sent successfully.', 'response': response.status_code}
        else:
            print("Callback failed. Status code:", response.status_code)
            return {'status': 0, 'message': 'Callback failed. Status code:', 'response' : response.status_code}


Back to Directory File Manager