Viewing File: /home/ubuntu/combine_ai/video_effect/main.py

from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import FileResponse
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import cv2
import numpy as np
from typing import Tuple
import shutil
import os
import uuid
from pathlib import Path
import subprocess
import boto3
from botocore.exceptions import NoCredentialsError
from dotenv import load_dotenv
import os


app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
output_dir = "processed_videos"
Path(output_dir).mkdir(exist_ok=True)


load_dotenv()

access_key = os.getenv("AWS_ACCESS_KEY_ID")
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
region = os.getenv("AWS_DEFAULT_REGION")
bucket_name = os.getenv("AWS_BUCKET_NAME")



def upload_to_s3(file_name, bucket_name, folder_name):
    s3 = boto3.client('s3')
    object_name = f"{folder_name}/{os.path.basename(file_name)}"  # This line constructs the object key
    try:
        s3.upload_file(file_name, bucket_name, object_name)
        location = s3.get_bucket_location(Bucket=bucket_name)['LocationConstraint']
        url = f"https://{bucket_name}.s3-{location}.amazonaws.com/{object_name}"
        return url
    except FileNotFoundError:
        return "The file was not found"
    except NoCredentialsError:
        return "Credentials not available"


def zoom_frame(frame, frame_count, total_frames, zoom_settings: Tuple[float, float]):
    initial_zoom, final_zoom = zoom_settings
    zoom_factor = initial_zoom + (final_zoom - initial_zoom) * (frame_count / total_frames)
    height, width = frame.shape[:2]
    new_height, new_width = int(height * zoom_factor), int(width * zoom_factor)
    resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
    y_center = new_height // 2
    x_center = new_width // 2
    start_x = max(x_center - width // 2, 0)
    start_y = max(y_center - height // 2, 0)
    end_x = start_x + width
    end_y = start_y + height
    cropped = resized[start_y:end_y, start_x:end_x]
    return cropped


@app.post("/process_video/")
async def process_video(
        file: UploadFile = File(...),
        effect: str = Form(...),
        final_zoom_factor: float = Form(...),
        blend_size: int = Form(...),
        bucket_name: str = Form("combineai-s3"),
        folder_name: str = Form("video")):
    if final_zoom_factor < 1.0 or final_zoom_factor > 3.0:
        raise HTTPException(status_code=400, detail="Final zoom factor must be between 1.0 and 3.0")
    if blend_size < 3 or blend_size > 20:
        raise HTTPException(status_code=400, detail="Blend size must be between 3 and 20")

    session_id = uuid.uuid4().hex
    input_path = os.path.join(output_dir, f"{session_id}_input.mp4")
    output_video_path = os.path.join(output_dir, f"{session_id}_output_video.mp4")
    output_path = os.path.join(output_dir, f"{session_id}_output.mp4")
    audio_path = os.path.join(output_dir, f"{session_id}_audio.mp3")

    with open(input_path, "wb") as buffer:
        shutil.copyfileobj(file.file, buffer)

    subprocess.run(["ffmpeg","-i", input_path, "-q:a", "0", "-map", "a", audio_path, "-y"], check=True)

    cap = cv2.VideoCapture(input_path)
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'MP4V'), 30, (frame_width, frame_height))

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_count = 0

    if effect == "Motion Blend":
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)
            if len(frames) > blend_size:
                frames.pop(0)
            if len(frames) == blend_size:
                blended_frame = np.mean(frames, axis=0).astype(np.uint8)
                out.write(blended_frame)
    elif effect == "Infinite Zoom":
        initial_zoom = 1.0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            zoomed_frame = zoom_frame(frame, frame_count, total_frames, (initial_zoom, final_zoom_factor))
            out.write(zoomed_frame)
            frame_count += 1

    cap.release()
    out.release()

    subprocess.run(["ffmpeg", "-i", output_video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-strict", "experimental", output_path, "-y"], check=True)

    # Upload to S3 and return URL
    url = upload_to_s3(output_path, bucket_name, folder_name)
    return JSONResponse(content={"url": url})

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8005, ssl_keyfile="privkey.pem", ssl_certfile="fullchain.pem")

Back to Directory File Manager