Viewing File: /home/ubuntu/combine_ai/video_effect/mainv2.py
import requests
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Depends
from pydantic import BaseModel, HttpUrl
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import cv2
import numpy as np
from typing import Tuple
import shutil
import os
import uuid
from pathlib import Path
import subprocess
import boto3
from botocore.exceptions import NoCredentialsError
from dotenv import load_dotenv
import os
import requests
class VideoProcessRequest(BaseModel):
file: HttpUrl
effect: str
final_zoom_factor: float
blend_size: int
bucket_name: str = "combineai-s3"
folder_name: str = "video"
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
output_dir = "processed_videos"
Path(output_dir).mkdir(exist_ok=True)
load_dotenv()
access_key = os.getenv("AWS_ACCESS_KEY_ID")
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
region = os.getenv("AWS_DEFAULT_REGION")
bucket_name = os.getenv("AWS_BUCKET_NAME")
def upload_to_s3(file_name, bucket_name, folder_name):
s3 = boto3.client('s3')
object_name = f"{folder_name}/{os.path.basename(file_name)}" # This line constructs the object key
try:
s3.upload_file(file_name, bucket_name, object_name)
location = s3.get_bucket_location(Bucket=bucket_name)['LocationConstraint']
url = f"https://{bucket_name}.s3-{location}.amazonaws.com/{object_name}"
return url
except FileNotFoundError:
return "The file was not found"
except NoCredentialsError:
return "Credentials not available"
def zoom_frame(frame, frame_count, total_frames, zoom_settings: Tuple[float, float]):
initial_zoom, final_zoom = zoom_settings
zoom_factor = initial_zoom + (final_zoom - initial_zoom) * (frame_count / total_frames)
height, width = frame.shape[:2]
new_height, new_width = int(height * zoom_factor), int(width * zoom_factor)
resized = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
y_center = new_height // 2
x_center = new_width // 2
start_x = max(x_center - width // 2, 0)
start_y = max(y_center - height // 2, 0)
end_x = start_x + width
end_y = start_y + height
cropped = resized[start_y:end_y, start_x:end_x]
return cropped
@app.post("/process_video/")
async def process_video(data: VideoProcessRequest):
if not (1.0 <= data.final_zoom_factor <= 3.0):
raise HTTPException(status_code=400, detail="Final zoom factor must be between 1.0 and 3.0")
if not (3 <= data.blend_size <= 20):
raise HTTPException(status_code=400, detail="Blend size must be between 3 and 20")
session_id = uuid.uuid4().hex
input_path = os.path.join(output_dir, f"{session_id}_input.mp4")
response = requests.get(data.file)
with open(input_path, 'wb') as f:
f.write(response.content)
output_video_path = os.path.join(output_dir, f"{session_id}_output_video.mp4")
output_path = os.path.join(output_dir, f"{session_id}_output.mp4")
audio_path = os.path.join(output_dir, f"{session_id}_audio.mp3")
subprocess.run(["ffmpeg", "-i", input_path, "-q:a", "0", "-map", "a", audio_path, "-y"], check=True)
cap = cv2.VideoCapture(input_path)
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'MP4V'), 30, (frame_width, frame_height))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_count = 0
if data.effect == "Motion Blend":
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
if len(frames) > data.blend_size:
frames.pop(0)
if len(frames) == data.blend_size:
blended_frame = np.mean(frames, axis=0).astype(np.uint8)
out.write(blended_frame)
elif data.effect == "Infinite Zoom":
initial_zoom = 1.0
while True:
ret, frame = cap.read()
if not ret:
break
zoomed_frame = zoom_frame(frame, frame_count, total_frames, (initial_zoom, data.final_zoom_factor))
out.write(zoomed_frame)
frame_count += 1
cap.release()
out.release()
subprocess.run(
["ffmpeg", "-i", output_video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-strict", "experimental",
output_path, "-y"], check=True)
# Upload to S3 and return URL
url = upload_to_s3(output_path, data.bucket_name, data.folder_name)
return JSONResponse(content={"url": url})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8005, ssl_keyfile="privkey.pem", ssl_certfile="fullchain.pem")
Back to Directory
File Manager