Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A Music Project for developing different A/V filters in Python and FFmpeg library
231 changes: 131 additions & 100 deletions flaskr/helpers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import ffmpeg
from scipy.signal import butter, lfilter
import scipy.io.wavfile as wav
from scipy.signal import wiener


import numpy as np
# from pydub import AudioSegment
import os
_AUDIO_FILE_ = "audio.wav"
APP_ROOT = os.path.dirname(os.path.abspath(__file__))
#UPLOAD_FOLDER = os.path.join(APP_ROOT, 'static', "result.mp4")

UPLOAD_FOLDER = os.path.join(APP_ROOT, "static")
pathMaker = lambda prefix, fileName: os.path.join(UPLOAD_FOLDER, f"{prefix}.{fileName.split('.')[-1]}")


def upscaler(tw, th, readFrom, writeTo):
Expand All @@ -27,8 +29,7 @@ def makePhoneLike(filterOrder, sideGain, readFrom, writeTo):
if os.path.exists(_AUDIO_FILE_):
os.remove(_AUDIO_FILE_)

os.system(f'ffmpeg -i {readFrom} -af "pan=2c|c0={sideGain}*c0|c1={1-sideGain}*c1" {_AUDIO_FILE_}')

os.system(f'ffmpeg -i "{readFrom}" -af "pan=2c|c0={sideGain}*c0|c1={1-sideGain}*c1" {_AUDIO_FILE_}')
sample_rate, samples_original = wav.read(_AUDIO_FILE_)
num, denom = butter(filterOrder, [800, 3400] , "bandpass", fs=sample_rate)
ot = lfilter(num, denom, samples_original)
Expand All @@ -39,109 +40,71 @@ def makePhoneLike(filterOrder, sideGain, readFrom, writeTo):
# ot = ffmpeg.output(prob, au, "video.mp4")
return

def denoise_and_delay(readFrom, writeTo, noise_power_db, delay_ms, delay_gain_percent):

global _AUDIO_FILE_
vid = ffmpeg.input(readFrom).video
au = ffmpeg.input(readFrom).audio
info = ffmpeg.probe(readFrom, cmd="ffprobe")

noChannels = info["streams"][1]["channels"]
audioStream = au.output(_AUDIO_FILE_, ac=noChannels).overwrite_output().run()

sample_rate, samples_original = wav.read(_AUDIO_FILE_)

filter_order = 4
low_cutoff = 300
high_cutoff = 3400

noise_factor = 10 ** (noise_power_db / 20)

b, a = butter(filter_order, [low_cutoff, high_cutoff], btype='bandpass', fs=sample_rate)


if len(samples_original.shape) > 1: # For stereo audio
denoised_audio = np.zeros_like(samples_original)
for channel in range(samples_original.shape[1]):
denoised_audio[:, channel] = lfilter(b, a, samples_original[:, channel])
else: # For mono audio
denoised_audio = lfilter(b, a, samples_original)

# reducing noise
denoised_audio = denoised_audio * noise_factor


delay_samples = int((delay_ms / 1000) * sample_rate)
delay_gain = delay_gain_percent / 100


if len(denoised_audio.shape) > 1: # For stereo audio
delayed_audio = np.zeros_like(denoised_audio)
for channel in range(denoised_audio.shape[1]):
delayed_channel = np.zeros_like(denoised_audio[:, channel])
delayed_channel[delay_samples:] = denoised_audio[:-delay_samples, channel] if delay_samples < len(denoised_audio) else 0
#scale the delayed audio
delayed_channel = delayed_channel * delay_gain
# Mix with the original
delayed_audio[:, channel] = denoised_audio[:, channel] + delayed_channel
else: # For mono audio
delayed_audio = np.zeros_like(denoised_audio)
delayed_audio[delay_samples:] = denoised_audio[:-delay_samples] if delay_samples < len(denoised_audio) else 0
delayed_audio = delayed_audio * delay_gain
delayed_audio = denoised_audio + delayed_audio

# Normalize to prevent clipping
max_val = np.max(np.abs(delayed_audio))
if max_val > 32767: # Max value for 16-bit audio
delayed_audio = delayed_audio * (32767 / max_val)

# Convert back to int16 for saving
delayed_audio = np.asarray(delayed_audio, dtype=np.int16)

# Save the processed audio
wav.write(_AUDIO_FILE_, sample_rate, delayed_audio)

# Combine processed audio with the original video
auInpF = ffmpeg.input(_AUDIO_FILE_)
ffmpeg.output(vid, auInpF, writeTo).overwrite_output().run()

return

def denoise_and_delay( noise_power, delay_ms, delay_gain, readFrom, writeTo):
global _AUDIO_FILE_
vid = ffmpeg.input(readFrom).video
au = ffmpeg.input(readFrom).audio
info = ffmpeg.probe(readFrom , cmd = "ffprobe")
noChannels = info ["streams"][1]["channels"]
au.output(_AUDIO_FILE_ , ac = noChannels).overwrite_output().run()
sample_rate , sample_originals = wav.read(_AUDIO_FILE_)
sample_float = sample_originals.astype(np.float64) #converting to float since we need higher percision

noise_reduction_streanght = max(3,min(15,int(abs(noise_power)/2)))

denoised_audio = wiener (sample_float , mysize = noise_reduction_streanght)

def applyGainCompression(threshold_db, limiter_db, readFrom, writeTo):

stream = ffmpeg.input(readFrom)

# Handle threshold parameter (ensure it's negative)
abs_threshold = abs(threshold_db) if threshold_db < 0 else threshold_db
threshold_point = f"-{abs_threshold}/-{abs_threshold}"

# Handle limiter parameter
mid_point = f"-{abs_threshold/2}/-{abs_threshold+limiter_db/2}"
limiter_point = f"0/-{limiter_db}"

# Combine points to create the compression curve
points = f"{threshold_point}|{mid_point}|{limiter_point}"

# Apply compression filter using 'compand'
compressed_audio = stream.audio.filter(
'compand',
attacks='0.01',
decays='0.5',
points=points,
gain='0'
)

# Combine original video with compressed audio
result = ffmpeg.output(stream.video, compressed_audio, writeTo).overwrite_output().run()
delay_samples = int ((delay_ms/1000) * sample_rate )
delay_gain_decimal = delay_gain / 100.0

delayed_signal = np.zeros_like(denoised_audio)
if delay_samples < len(denoised_audio) :
delayed_signal[delay_samples:] = denoised_audio[:-delay_samples]
delayed_audio = denoised_audio + delayed_signal * delay_gain_decimal

if np.max(np.abs(delayed_audio)) > 0:
delayed_audio = delayed_audio * (32767 / np.max(np.abs(delayed_audio))*0.9)

data2 = np.asarray(delayed_audio , dtype = np.int16)
wav.write(_AUDIO_FILE_ , sample_rate , data2)
ffmpeg.output(vid,ffmpeg.input(_AUDIO_FILE_) , writeTo).overwrite_output().run()
return

def frameInterpolation (targetFps , readFrom , writeTo):
stream = ffmpeg.input(readFrom)

vid = stream.video
audio = stream.audio

info = ffmpeg.probe(readFrom , cmd = "ffprobe")
video_stream = next ((s for s in info['streams'] if s['codec_type'] == 'video' ), None)
if 'avg_frame_rate' in video_stream:
frame_rate_fraction = video_stream['avg_frame_rate']
if '/' in frame_rate_fraction:
num,den = map(int , frame_rate_fraction.split('/'))
original_fps = num / den if den != 0 else 0
else:
original_fps = float(frame_rate_fraction)
else:
original_fps = 30

if targetFps >= original_fps:
interpolated = vid.filter('minterpolate' ,
fps = targetFps,
mi_mode = 'mci',
mc_mode = 'aobmc',
me_mode = 'bidir')
ffmpeg.output(interpolated , audio , writeTo ,
vcodec = 'libx264',
acodec = 'aac').overwrite_output().run()
else:
decreased = vid.filter('fps',fps = targetFps)
ffmpeg.output(decreased , audio , writeTo , vcodec = 'libx264' , acodec = 'aac').overwrite_output().run()



return


def voiceEnhancement(preEmphasisAlpha, filterOrder, readFrom, writeTo):
Expand Down Expand Up @@ -212,6 +175,7 @@ def voiceEnhancement(preEmphasisAlpha, filterOrder, readFrom, writeTo):




def applyGrayscale(readFrom, writeTo):
# Load video input
stream = ffmpeg.input(readFrom)
Expand All @@ -238,9 +202,76 @@ def colorInvert(readFrom, writeTo):
acodec='copy' # Copy audio to avoid re-encoding
)
# Run the ffmpeg command
out, err = output.overwrite_output().run(capture_stdout=True, capture_stderr=True)
print("FFmpeg stdout:", out)
print("FFmpeg stderr:", err)
output.overwrite_output().run()

return



def makeCarLike(sideGain_db, filterOrder, readFrom, writeTo):
global _AUDIO_FILE_

# Extract video stream
vid = ffmpeg.input(readFrom).video

# Clean up existing audio file
if os.path.exists(_AUDIO_FILE_):
os.remove(_AUDIO_FILE_)

# Convert sideGain from dB to linear
sideGain_linear = 10 ** (sideGain_db / 20)

# Calculate pan coefficients for stereo enhancement
# In fact, for each output channel (new left or new right), we take 50% of the original left and 50% of the original right,
# then push them outward or inward depending on sideGain_linear.
# This widens or narrows the stereo image.
left_c0 = 0.5 + sideGain_linear * 0.5
left_c1 = 0.5 - sideGain_linear * 0.5
right_c0 = 0.5 - sideGain_linear * 0.5
right_c1 = 0.5 + sideGain_linear * 0.5

# Extract audio with stereo enhancement
os.system(f'ffmpeg -i "{readFrom}" -af "pan=2c|c0={left_c0}*c0+{left_c1}*c1|c1={right_c0}*c0+{right_c1}*c1" {_AUDIO_FILE_}')

# Read audio file. samples can be numpy array of shape (num_samples,) or is it is stereo it is (num_samples,2)
# Each element is a signed 16-bit integer (int16) in the range [–32768, +32767].
sample_rate, samples = wav.read(_AUDIO_FILE_)

# Convert to float for processing(as lfilter works best with float numbers
samples_float = samples.astype(np.float32) / 32768.0

# Apply low-pass filter at 10000 Hz as specified
# Butterworth design routines expect a cutoff in the range [0, 1], where 1 corresponds to Nyquist.
nyquist = sample_rate / 2
normalized_cutoff = 10000 / nyquist
num, denom = butter(filterOrder, normalized_cutoff, btype="low")

# Process each channel separately if stereo[Processes left channel separately: samples_float[:, 0],Processes right channel separately: samples_float[:, 1]]
# For mono just apply the filter
if len(samples_float.shape) == 2:
filtered_samples = np.zeros_like(samples_float)
for channel in range(samples_float.shape[1]):
filtered_samples[:, channel] = lfilter(num, denom, samples_float[:, channel])
else:
filtered_samples = lfilter(num, denom, samples_float)


# Convert back to int16 with proper scaling
max_val = np.max(np.abs(filtered_samples))
if max_val > 1.0:
filtered_samples = filtered_samples / max_val

# Final outputed sample which is then written back to the wav file
output_samples = (filtered_samples * 32767).astype(np.int16)


# Write processed audio back
wav.write(_AUDIO_FILE_, sample_rate, output_samples)

# Combine audio and video
auInpF = ffmpeg.input(_AUDIO_FILE_)
ffmpeg.output(vid, auInpF, writeTo).overwrite_output().run()

return


Loading