Viewing File: /home/ubuntu/combine_ai/video_effect/mainv2.py

import requests
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Depends
from pydantic import BaseModel, HttpUrl
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import cv2
import numpy as np
from typing import Tuple
import shutil
import os
import uuid
from pathlib import Path
import subprocess
import boto3
from botocore.exceptions import NoCredentialsError
from dotenv import load_dotenv
import os
import requests


class VideoProcessRequest(BaseModel):
    file: HttpUrl
    effect: str
    final_zoom_factor: float
    blend_size: int
    bucket_name: str = "combineai-s3"
    folder_name: str = "video"


app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
output_dir = "processed_videos"
Path(output_dir).mkdir(exist_ok=True)

load_dotenv()

access_key = os.getenv("AWS_ACCESS_KEY_ID")
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
region = os.getenv("AWS_DEFAULT_REGION")
bucket_name = os.getenv("AWS_BUCKET_NAME")


def upload_to_s3(file_name, bucket_name, folder_name):
    s3 = boto3.client('s3')
    object_name = f"{folder_name}/{os.path.basename(file_name)}"  # This line constructs the object key
    try:
        s3.upload_file(file_name, bucket_name, object_name)
        location = s3.get_bucket_location(Bucket=bucket_name)['LocationConstraint']
        url = f"https://{bucket_name}.s3-{location}.amazonaws.com/{object_name}"
        return url
    except FileNotFoundError:
        return "The file was not found"
    except NoCredentialsError:
        return "Credentials not available"


def zoom_frame(frame, frame_count, total_frames, zoom_settings: Tuple[float, float]):
    initial_zoom, final_zoom = zoom_settings
    zoom_factor = initial_zoom + (final_zoom - initial_zoom) * (frame_count / total_frames)
    height, width = frame.shape[:2]
    new_height, new_width = int(height * zoom_factor), int(width * zoom_factor)
    resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
    y_center = new_height // 2
    x_center = new_width // 2
    start_x = max(x_center - width // 2, 0)
    start_y = max(y_center - height // 2, 0)
    end_x = start_x + width
    end_y = start_y + height
    cropped = resized[start_y:end_y, start_x:end_x]
    return cropped


@app.post("/process_video/")
async def process_video(data: VideoProcessRequest):
    if not (1.0 <= data.final_zoom_factor <= 3.0):
        raise HTTPException(status_code=400, detail="Final zoom factor must be between 1.0 and 3.0")
    if not (3 <= data.blend_size <= 20):
        raise HTTPException(status_code=400, detail="Blend size must be between 3 and 20")

    session_id = uuid.uuid4().hex
    input_path = os.path.join(output_dir, f"{session_id}_input.mp4")
    response = requests.get(data.file)
    with open(input_path, 'wb') as f:
        f.write(response.content)

    output_video_path = os.path.join(output_dir, f"{session_id}_output_video.mp4")
    output_path = os.path.join(output_dir, f"{session_id}_output.mp4")
    audio_path = os.path.join(output_dir, f"{session_id}_audio.mp3")

    subprocess.run(["ffmpeg", "-i", input_path, "-q:a", "0", "-map", "a", audio_path, "-y"], check=True)

    cap = cv2.VideoCapture(input_path)
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'MP4V'), 30, (frame_width, frame_height))

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_count = 0

    if data.effect == "Motion Blend":
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)
            if len(frames) > data.blend_size:
                frames.pop(0)
            if len(frames) == data.blend_size:
                blended_frame = np.mean(frames, axis=0).astype(np.uint8)
                out.write(blended_frame)
    elif data.effect == "Infinite Zoom":
        initial_zoom = 1.0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            zoomed_frame = zoom_frame(frame, frame_count, total_frames, (initial_zoom, data.final_zoom_factor))
            out.write(zoomed_frame)
            frame_count += 1

    cap.release()
    out.release()

    subprocess.run(
        ["ffmpeg", "-i", output_video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-strict", "experimental",
         output_path, "-y"], check=True)

    # Upload to S3 and return URL
    url = upload_to_s3(output_path, data.bucket_name, data.folder_name)
    return JSONResponse(content={"url": url})


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8005, ssl_keyfile="privkey.pem", ssl_certfile="fullchain.pem")
Back to Directory File Manager