From 563b312d3f0f6370dfad171412c5a69c43c201ab Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sun, 10 Aug 2025 19:10:44 +0000 Subject: [PATCH] Implement YouTube Video Uploader and Scheduler with AgentOps integration --- .env.example | 14 +++ config.py | 90 ++++++++++++++ requirements.txt | 9 +- streamlit_app.py | 251 ++++++++++++++++++++++++++++++++++++- youtube_uploader.py | 297 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 657 insertions(+), 4 deletions(-) create mode 100644 .env.example create mode 100644 config.py create mode 100644 youtube_uploader.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..fdcbd8f --- /dev/null +++ b/.env.example @@ -0,0 +1,14 @@ +# AgentOps Configuration +AGENTOPS_API_KEY=your_agentops_api_key_here + +# YouTube API Configuration +YOUTUBE_CREDENTIALS_PATH=path/to/your/youtube_credentials.json +YOUTUBE_TOKEN_PATH=token.json + +# Optional: Default upload settings +DEFAULT_PRIVACY_STATUS=private +DEFAULT_CATEGORY_ID=22 + +# Security Settings +MAX_FILE_SIZE_MB=128 +AUTO_CLEANUP_TEMP_FILES=true \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..052ba27 --- /dev/null +++ b/config.py @@ -0,0 +1,90 @@ +import os +import json +from typing import Optional, Dict, Any +import streamlit as st + +class ConfigManager: + def __init__(self): + self.config_file = "app_config.json" + self.load_config() + + def load_config(self) -> Dict[str, Any]: + """Load configuration from file or create default""" + if os.path.exists(self.config_file): + try: + with open(self.config_file, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError): + pass + + return { + "agentops_api_key": "", + "youtube_credentials_path": "", + "default_privacy": "private", + "default_category": "People & Blogs", + "auto_cleanup_temp_files": True, + "max_file_size_mb": 128, + "supported_video_formats": ["mp4", "avi", "mov", "mkv", "webm"], + "supported_thumbnail_formats": ["png", "jpg", "jpeg", "gif"] + } + + def save_config(self, config: Dict[str, Any]): + """Save configuration to file""" + try: + with open(self.config_file, 'w') as f: + json.dump(config, f, indent=2) + except Exception as e: + st.error(f"Failed to save configuration: {str(e)}") + + def get_env_vars(self) -> Dict[str, Optional[str]]: + """Get configuration from environment variables""" + return { + "AGENTOPS_API_KEY": os.getenv("AGENTOPS_API_KEY"), + "YOUTUBE_CREDENTIALS_PATH": os.getenv("YOUTUBE_CREDENTIALS_PATH"), + "YOUTUBE_TOKEN_PATH": os.getenv("YOUTUBE_TOKEN_PATH", "token.json"), + } + + def validate_api_key(self, api_key: str) -> bool: + """Basic validation for AgentOps API key format""" + return len(api_key) > 10 and api_key.replace('-', '').replace('_', '').isalnum() + + def validate_credentials_file(self, file_content: bytes) -> bool: + """Validate YouTube API credentials file""" + try: + credentials = json.loads(file_content) + required_fields = ["client_id", "client_secret", "auth_uri", "token_uri"] + + if "installed" in credentials: + client_info = credentials["installed"] + elif "web" in credentials: + client_info = credentials["web"] + else: + return False + + return all(field in client_info for field in required_fields) + except (json.JSONDecodeError, KeyError): + return False + + def get_secure_api_key(self) -> Optional[str]: + """Get API key from session state or environment""" + if 'agentops_api_key' in st.session_state: + return st.session_state.agentops_api_key + return os.getenv("AGENTOPS_API_KEY") + + def cleanup_temp_files(self): + """Clean up temporary files""" + temp_patterns = ["temp_video_", "temp_thumbnail_", "scheduled_video_", "scheduled_thumbnail_"] + + for filename in os.listdir("."): + if any(filename.startswith(pattern) for pattern in temp_patterns): + try: + os.remove(filename) + except OSError: + pass # File might be in use + + def get_file_size_mb(self, file_obj) -> float: + """Get file size in MB""" + file_obj.seek(0, 2) # Seek to end + size = file_obj.tell() + file_obj.seek(0) # Reset to beginning + return size / (1024 * 1024) # Convert to MB \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e251330..0c0cb70 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,8 @@ -streamlit \ No newline at end of file +streamlit +agentops +google-auth +google-auth-oauthlib +google-auth-httplib2 +google-api-python-client +schedule +python-dateutil \ No newline at end of file diff --git a/streamlit_app.py b/streamlit_app.py index fde12be..bde597f 100644 --- a/streamlit_app.py +++ b/streamlit_app.py @@ -1,6 +1,251 @@ import streamlit as st +import os +import datetime +import json +from youtube_uploader import YouTubeUploaderService, VideoSchedule -st.title("🎈 My new app") -st.write( - "Let's start building! For help and inspiration, head over to [docs.streamlit.io](https://docs.streamlit.io/)." +st.set_page_config( + page_title="YouTube Video Uploader & Scheduler", + page_icon="📹", + layout="wide" ) + +st.title("📹 YouTube Video Uploader & Scheduler") +st.write("Upload videos to YouTube immediately or schedule them for later.") + +if 'uploader_service' not in st.session_state: + st.session_state.uploader_service = None + st.session_state.authenticated = False + +st.sidebar.header("🔑 Configuration") + +agentops_api_key = st.sidebar.text_input( + "AgentOps API Key", + type="password", + help="Get your API key from https://www.agentops.ai/" +) + +credentials_file = st.sidebar.file_uploader( + "YouTube API Credentials (JSON)", + type=['json'], + help="Download from Google Cloud Console" +) + +if agentops_api_key and credentials_file and not st.session_state.authenticated: + if st.sidebar.button("Initialize Service"): + try: + credentials_content = credentials_file.read() + credentials_path = "temp_credentials.json" + + with open(credentials_path, 'wb') as f: + f.write(credentials_content) + + st.session_state.uploader_service = YouTubeUploaderService( + agentops_api_key=agentops_api_key, + credentials_path=credentials_path + ) + + if st.session_state.uploader_service.initialize(): + st.session_state.authenticated = True + st.sidebar.success("✅ Service initialized successfully!") + st.rerun() + else: + st.sidebar.error("❌ Failed to authenticate with YouTube API") + + except Exception as e: + st.sidebar.error(f"❌ Initialization failed: {str(e)}") + +if st.session_state.authenticated and st.session_state.uploader_service: + tab1, tab2, tab3 = st.tabs(["📤 Upload Now", "⏰ Schedule Upload", "📊 Status"]) + + with tab1: + st.header("Upload Video Now") + + col1, col2 = st.columns(2) + + with col1: + video_file = st.file_uploader("Select Video File", type=['mp4', 'avi', 'mov', 'mkv']) + thumbnail_file = st.file_uploader("Select Thumbnail (Optional)", type=['png', 'jpg', 'jpeg']) + + title = st.text_input("Video Title", max_chars=100) + description = st.text_area("Video Description", max_chars=5000) + + with col2: + tags_input = st.text_area("Tags (one per line)") + tags = [tag.strip() for tag in tags_input.split('\n') if tag.strip()] + + category_options = { + "Film & Animation": "1", + "Autos & Vehicles": "2", + "Music": "10", + "Pets & Animals": "15", + "Sports": "17", + "Travel & Events": "19", + "Gaming": "20", + "People & Blogs": "22", + "Comedy": "23", + "Entertainment": "24", + "News & Politics": "25", + "Howto & Style": "26", + "Education": "27", + "Science & Technology": "28", + "Nonprofits & Activism": "29" + } + + category = st.selectbox("Category", list(category_options.keys())) + privacy = st.selectbox("Privacy", ["private", "unlisted", "public"]) + + if st.button("Upload Video", type="primary"): + if video_file and title: + with st.spinner("Uploading video..."): + video_path = f"temp_video_{video_file.name}" + with open(video_path, 'wb') as f: + f.write(video_file.read()) + + thumbnail_path = None + if thumbnail_file: + thumbnail_path = f"temp_thumbnail_{thumbnail_file.name}" + with open(thumbnail_path, 'wb') as f: + f.write(thumbnail_file.read()) + + try: + result = st.session_state.uploader_service.upload_video_now( + video_file_path=video_path, + title=title, + description=description, + tags=tags, + category_id=category_options[category], + privacy_status=privacy, + thumbnail_path=thumbnail_path + ) + + os.remove(video_path) + if thumbnail_path: + os.remove(thumbnail_path) + + if result['success']: + st.success(f"✅ Video uploaded successfully!") + st.info(f"Video URL: {result['video_url']}") + else: + st.error(f"❌ Upload failed: {result['error']}") + + except Exception as e: + st.error(f"❌ Upload error: {str(e)}") + else: + st.error("Please provide both video file and title") + + with tab2: + st.header("Schedule Video Upload") + + col1, col2 = st.columns(2) + + with col1: + sched_video_file = st.file_uploader("Select Video File", type=['mp4', 'avi', 'mov', 'mkv'], key="sched") + sched_thumbnail_file = st.file_uploader("Select Thumbnail (Optional)", type=['png', 'jpg', 'jpeg'], key="sched_thumb") + + sched_title = st.text_input("Video Title", max_chars=100, key="sched_title") + sched_description = st.text_area("Video Description", max_chars=5000, key="sched_desc") + + with col2: + sched_tags_input = st.text_area("Tags (one per line)", key="sched_tags") + sched_tags = [tag.strip() for tag in sched_tags_input.split('\n') if tag.strip()] + + sched_category = st.selectbox("Category", list(category_options.keys()), key="sched_cat") + sched_privacy = st.selectbox("Privacy", ["private", "unlisted", "public"], key="sched_priv") + + schedule_date = st.date_input("Upload Date", min_value=datetime.date.today()) + schedule_time = st.time_input("Upload Time") + + if st.button("Schedule Upload", type="primary"): + if sched_video_file and sched_title: + try: + sched_video_path = f"scheduled_video_{sched_video_file.name}" + with open(sched_video_path, 'wb') as f: + f.write(sched_video_file.read()) + + sched_thumbnail_path = None + if sched_thumbnail_file: + sched_thumbnail_path = f"scheduled_thumbnail_{sched_thumbnail_file.name}" + with open(sched_thumbnail_path, 'wb') as f: + f.write(sched_thumbnail_file.read()) + + scheduled_datetime = datetime.datetime.combine(schedule_date, schedule_time) + + success = st.session_state.uploader_service.schedule_video_upload( + scheduled_time=scheduled_datetime.isoformat(), + title=sched_title, + description=sched_description, + tags=sched_tags, + category_id=category_options[sched_category], + privacy_status=sched_privacy, + video_file_path=sched_video_path, + thumbnail_path=sched_thumbnail_path + ) + + if success: + st.success(f"✅ Video scheduled for {scheduled_datetime}") + else: + st.error("❌ Failed to schedule upload") + + except Exception as e: + st.error(f"❌ Scheduling error: {str(e)}") + else: + st.error("Please provide both video file and title") + + with tab3: + st.header("Upload Status") + + if st.button("Refresh Status"): + st.rerun() + + try: + status = st.session_state.uploader_service.get_upload_status() + + st.subheader("Scheduler Status") + if status['scheduler_running']: + st.success("🟢 Scheduler is running") + else: + st.info("🟡 Scheduler is stopped") + + st.subheader("Scheduled Uploads") + if status['scheduled_uploads']: + for upload in status['scheduled_uploads']: + with st.expander(f"📹 {upload['title']}"): + st.write(f"**Scheduled Time:** {upload['scheduled_time']}") + st.write(f"**Video File:** {upload['video_file']}") + + if st.button(f"Cancel", key=f"cancel_{upload['title']}"): + if st.session_state.uploader_service.uploader.cancel_scheduled_upload(upload['title']): + st.success("Upload cancelled") + st.rerun() + else: + st.error("Failed to cancel upload") + else: + st.info("No scheduled uploads") + + except Exception as e: + st.error(f"Error getting status: {str(e)}") + +else: + st.info("👆 Please configure your API keys and credentials in the sidebar to get started.") + + with st.expander("📖 Setup Instructions"): + st.markdown(""" + ### 1. Get AgentOps API Key + 1. Visit [AgentOps](https://www.agentops.ai/) + 2. Create an account + 3. Generate an API key from settings + + ### 2. Get YouTube API Credentials + 1. Go to [Google Cloud Console](https://console.cloud.google.com/) + 2. Create a new project or select existing + 3. Enable YouTube Data API v3 + 4. Create credentials (OAuth 2.0 Client ID) + 5. Download the JSON credentials file + + ### 3. Upload and Initialize + 1. Enter your AgentOps API key + 2. Upload your YouTube credentials JSON file + 3. Click "Initialize Service" + 4. Complete OAuth flow in browser if prompted + """) diff --git a/youtube_uploader.py b/youtube_uploader.py new file mode 100644 index 0000000..ea840e5 --- /dev/null +++ b/youtube_uploader.py @@ -0,0 +1,297 @@ +import os +import json +import datetime +from typing import Optional, Dict, Any +import agentops +from agentops.sdk.decorators import agent, operation, tool, trace +import google.auth +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from google_auth_oauthlib.flow import InstalledAppFlow +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from googleapiclient.http import MediaFileUpload +import schedule +import time +import threading +from dataclasses import dataclass + +SCOPES = ['https://www.googleapis.com/auth/youtube.upload'] +API_SERVICE_NAME = 'youtube' +API_VERSION = 'v3' + +@dataclass +class VideoSchedule: + title: str + description: str + tags: list + category_id: str + privacy_status: str + video_file_path: str + thumbnail_path: Optional[str] + scheduled_time: datetime.datetime + +@agent(name="YouTubeUploaderAgent") +class YouTubeUploader: + def __init__(self, credentials_path: str, token_path: str = "token.json"): + self.credentials_path = credentials_path + self.token_path = token_path + self.service = None + self.scheduled_uploads = [] + self.scheduler_thread = None + self.running = False + + @operation + def authenticate(self) -> bool: + """Authenticate with YouTube API using OAuth2""" + creds = None + + if os.path.exists(self.token_path): + creds = Credentials.from_authorized_user_file(self.token_path, SCOPES) + + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + if not os.path.exists(self.credentials_path): + raise FileNotFoundError(f"Credentials file not found: {self.credentials_path}") + + flow = InstalledAppFlow.from_client_secrets_file( + self.credentials_path, SCOPES) + creds = flow.run_local_server(port=0) + + with open(self.token_path, 'w') as token: + token.write(creds.to_json()) + + self.service = build(API_SERVICE_NAME, API_VERSION, credentials=creds) + return True + + @tool(name="VideoUploadTool", cost=0.10) + def upload_video( + self, + video_file_path: str, + title: str, + description: str, + tags: list = None, + category_id: str = "22", # Default to People & Blogs + privacy_status: str = "private", + thumbnail_path: str = None + ) -> Dict[str, Any]: + """Upload a video to YouTube""" + if not self.service: + raise Exception("Not authenticated. Call authenticate() first.") + + if not os.path.exists(video_file_path): + raise FileNotFoundError(f"Video file not found: {video_file_path}") + + tags = tags or [] + + body = { + 'snippet': { + 'title': title, + 'description': description, + 'tags': tags, + 'categoryId': category_id + }, + 'status': { + 'privacyStatus': privacy_status, + 'selfDeclaredMadeForKids': False + } + } + + media = MediaFileUpload(video_file_path, chunksize=-1, resumable=True) + + try: + insert_request = self.service.videos().insert( + part=",".join(body.keys()), + body=body, + media_body=media + ) + + response = self._resumable_upload(insert_request) + + if thumbnail_path and os.path.exists(thumbnail_path): + self._upload_thumbnail(response['id'], thumbnail_path) + + return { + 'success': True, + 'video_id': response['id'], + 'video_url': f"https://www.youtube.com/watch?v={response['id']}", + 'title': title, + 'upload_time': datetime.datetime.now().isoformat() + } + + except HttpError as e: + return { + 'success': False, + 'error': f"HTTP error occurred: {e}", + 'details': e.content.decode() if e.content else None + } + except Exception as e: + return { + 'success': False, + 'error': f"An error occurred: {str(e)}" + } + + @operation + def _resumable_upload(self, insert_request): + """Execute the upload with resumable upload support""" + response = None + error = None + retry = 0 + + while response is None: + try: + status, response = insert_request.next_chunk() + if response is not None: + if 'id' in response: + return response + else: + raise Exception(f"Upload failed with unexpected response: {response}") + except HttpError as e: + if e.resp.status in [500, 502, 503, 504]: + error = f"Retriable HTTP error {e.resp.status}: {e.content}" + retry += 1 + if retry > 5: + raise Exception(f"Maximum retries exceeded: {error}") + time.sleep(2 ** retry) + else: + raise + + return response + + @tool(name="ThumbnailUploadTool", cost=0.02) + def _upload_thumbnail(self, video_id: str, thumbnail_path: str): + """Upload a custom thumbnail for the video""" + try: + self.service.thumbnails().set( + videoId=video_id, + media_body=MediaFileUpload(thumbnail_path) + ).execute() + except HttpError as e: + print(f"Thumbnail upload failed: {e}") + + @operation + def schedule_upload(self, video_schedule: VideoSchedule): + """Schedule a video upload for a specific time""" + self.scheduled_uploads.append(video_schedule) + + def upload_job(): + result = self.upload_video( + video_file_path=video_schedule.video_file_path, + title=video_schedule.title, + description=video_schedule.description, + tags=video_schedule.tags, + category_id=video_schedule.category_id, + privacy_status=video_schedule.privacy_status, + thumbnail_path=video_schedule.thumbnail_path + ) + + if result['success']: + print(f"Scheduled upload completed: {result['video_url']}") + else: + print(f"Scheduled upload failed: {result['error']}") + + self.scheduled_uploads = [s for s in self.scheduled_uploads if s != video_schedule] + + schedule.every().day.at(video_schedule.scheduled_time.strftime("%H:%M")).do(upload_job) + + if not self.running: + self.start_scheduler() + + @operation + def start_scheduler(self): + """Start the background scheduler thread""" + if self.running: + return + + self.running = True + + def run_scheduler(): + while self.running: + schedule.run_pending() + time.sleep(60) # Check every minute + + self.scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) + self.scheduler_thread.start() + + @operation + def stop_scheduler(self): + """Stop the background scheduler""" + self.running = False + if self.scheduler_thread: + self.scheduler_thread.join(timeout=5) + + @operation + def get_scheduled_uploads(self) -> list: + """Get list of scheduled uploads""" + return [ + { + 'title': s.title, + 'scheduled_time': s.scheduled_time.isoformat(), + 'video_file': s.video_file_path + } + for s in self.scheduled_uploads + ] + + @operation + def cancel_scheduled_upload(self, title: str) -> bool: + """Cancel a scheduled upload by title""" + original_count = len(self.scheduled_uploads) + self.scheduled_uploads = [s for s in self.scheduled_uploads if s.title != title] + schedule.clear(title) + return len(self.scheduled_uploads) < original_count + +@trace(name="YouTubeUploaderWorkflow", tags=["youtube", "upload", "scheduler"]) +class YouTubeUploaderService: + def __init__(self, agentops_api_key: str, credentials_path: str): + agentops.init(agentops_api_key, tags=["youtube", "uploader", "scheduler"]) + self.uploader = YouTubeUploader(credentials_path) + + @operation + def initialize(self) -> bool: + """Initialize the YouTube uploader service""" + try: + return self.uploader.authenticate() + except Exception as e: + agentops.record({ + 'event_type': 'error', + 'message': f"Failed to initialize YouTube uploader: {str(e)}" + }) + return False + + @operation + def upload_video_now(self, **kwargs) -> Dict[str, Any]: + """Upload a video immediately""" + return self.uploader.upload_video(**kwargs) + + @operation + def schedule_video_upload(self, scheduled_time: str, **kwargs) -> bool: + """Schedule a video upload for later""" + try: + schedule_dt = datetime.datetime.fromisoformat(scheduled_time) + video_schedule = VideoSchedule( + scheduled_time=schedule_dt, + **kwargs + ) + self.uploader.schedule_upload(video_schedule) + return True + except Exception as e: + agentops.record({ + 'event_type': 'error', + 'message': f"Failed to schedule upload: {str(e)}" + }) + return False + + @operation + def get_upload_status(self) -> Dict[str, Any]: + """Get the current status of uploads""" + return { + 'scheduled_uploads': self.uploader.get_scheduled_uploads(), + 'scheduler_running': self.uploader.running + } + + def shutdown(self): + """Shutdown the service""" + self.uploader.stop_scheduler() + agentops.end_trace(end_state="Success") \ No newline at end of file