Viewing File: /home/ubuntu/combine_ai/video_effect/main.py
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import FileResponse
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import cv2
import numpy as np
from typing import Tuple
import shutil
import os
import uuid
from pathlib import Path
import subprocess
import boto3
from botocore.exceptions import NoCredentialsError
from dotenv import load_dotenv
import os
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
output_dir = "processed_videos"
Path(output_dir).mkdir(exist_ok=True)
load_dotenv()
access_key = os.getenv("AWS_ACCESS_KEY_ID")
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
region = os.getenv("AWS_DEFAULT_REGION")
bucket_name = os.getenv("AWS_BUCKET_NAME")
def upload_to_s3(file_name, bucket_name, folder_name):
s3 = boto3.client('s3')
object_name = f"{folder_name}/{os.path.basename(file_name)}" # This line constructs the object key
try:
s3.upload_file(file_name, bucket_name, object_name)
location = s3.get_bucket_location(Bucket=bucket_name)['LocationConstraint']
url = f"https://{bucket_name}.s3-{location}.amazonaws.com/{object_name}"
return url
except FileNotFoundError:
return "The file was not found"
except NoCredentialsError:
return "Credentials not available"
def zoom_frame(frame, frame_count, total_frames, zoom_settings: Tuple[float, float]):
initial_zoom, final_zoom = zoom_settings
zoom_factor = initial_zoom + (final_zoom - initial_zoom) * (frame_count / total_frames)
height, width = frame.shape[:2]
new_height, new_width = int(height * zoom_factor), int(width * zoom_factor)
resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
y_center = new_height // 2
x_center = new_width // 2
start_x = max(x_center - width // 2, 0)
start_y = max(y_center - height // 2, 0)
end_x = start_x + width
end_y = start_y + height
cropped = resized[start_y:end_y, start_x:end_x]
return cropped
@app.post("/process_video/")
async def process_video(
file: UploadFile = File(...),
effect: str = Form(...),
final_zoom_factor: float = Form(...),
blend_size: int = Form(...),
bucket_name: str = Form("combineai-s3"),
folder_name: str = Form("video")):
if final_zoom_factor < 1.0 or final_zoom_factor > 3.0:
raise HTTPException(status_code=400, detail="Final zoom factor must be between 1.0 and 3.0")
if blend_size < 3 or blend_size > 20:
raise HTTPException(status_code=400, detail="Blend size must be between 3 and 20")
session_id = uuid.uuid4().hex
input_path = os.path.join(output_dir, f"{session_id}_input.mp4")
output_video_path = os.path.join(output_dir, f"{session_id}_output_video.mp4")
output_path = os.path.join(output_dir, f"{session_id}_output.mp4")
audio_path = os.path.join(output_dir, f"{session_id}_audio.mp3")
with open(input_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
subprocess.run(["ffmpeg","-i", input_path, "-q:a", "0", "-map", "a", audio_path, "-y"], check=True)
cap = cv2.VideoCapture(input_path)
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'MP4V'), 30, (frame_width, frame_height))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_count = 0
if effect == "Motion Blend":
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
if len(frames) > blend_size:
frames.pop(0)
if len(frames) == blend_size:
blended_frame = np.mean(frames, axis=0).astype(np.uint8)
out.write(blended_frame)
elif effect == "Infinite Zoom":
initial_zoom = 1.0
while True:
ret, frame = cap.read()
if not ret:
break
zoomed_frame = zoom_frame(frame, frame_count, total_frames, (initial_zoom, final_zoom_factor))
out.write(zoomed_frame)
frame_count += 1
cap.release()
out.release()
subprocess.run(["ffmpeg", "-i", output_video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-strict", "experimental", output_path, "-y"], check=True)
# Upload to S3 and return URL
url = upload_to_s3(output_path, bucket_name, folder_name)
return JSONResponse(content={"url": url})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8005, ssl_keyfile="privkey.pem", ssl_certfile="fullchain.pem")
Back to Directory
File Manager