diff --git a/ARGO/oceancurrent/README_FATAL_LOG_CONFIG.md b/ARGO/oceancurrent/README_FATAL_LOG_CONFIG.md new file mode 100644 index 00000000..9d43dd55 --- /dev/null +++ b/ARGO/oceancurrent/README_FATAL_LOG_CONFIG.md @@ -0,0 +1,120 @@ +# Fatal Log Monitoring Configuration + +## Overview + +The `oceancurrent_file_server_api.py` script supports sending fatal error notifications to a monitoring API endpoint using EC2 instance identity authentication. + +## Configuration + +The API endpoint is configured via a simple text file: `oc_api_endpoint.conf` + +### Setup on EC2 + +Create the configuration file with your monitoring API endpoint: + +```bash +# Create directory +sudo mkdir -p /etc/imos + +# Create config file with API endpoint +echo "https://replace-to-the-production-domain/api/v1/monitoring/fatal-log" | sudo tee /etc/imos/oc_api_endpoint.conf + +# Set proper permissions (readable by all, writable only by root) +sudo chown root:root /etc/imos/oc_api_endpoint.conf +sudo chmod 644 /etc/imos/oc_api_endpoint.conf +``` + +### Verify Setup + +```bash +# Check file exists and has correct permissions +ls -l /etc/imos/oc_api_endpoint.conf +# Expected: -rw-r--r-- 1 root root 69 Dec 10 10:00 /etc/imos/oc_api_endpoint.conf + +# Check content +cat /etc/imos/oc_api_endpoint.conf +# Should display: https://replace-to-the-production-domain/api/v1/monitoring/fatal-log + +# Test if cron user can read it +sudo -u projectofficer cat /etc/imos/oc_api_endpoint.conf +``` + +## How It Works + +The script automatically looks for the config file in these locations (in order): + +1. `/etc/imos/oc_api_endpoint.conf` (system-wide - **recommended**) +2. `./oc_api_endpoint.conf` (local to script) +3. `OC_API_ENDPOINT` environment variable (fallback) + +If no configuration is found, fatal log notifications are **gracefully disabled** and the script continues to run normally. + +## Authentication + +The script uses **EC2 Instance Identity Document** (PKCS7 signature) for authentication: + +- Automatically fetched from EC2 metadata service (IMDSv2 with IMDSv1 fallback) +- Cryptographically verifiable by the backend +- Cannot be spoofed from outside AWS +- No credentials needed in the config file + +## Behavior + +### When Configured ✅ + +On script startup, you'll see: + +``` +INFO: EC2 instance identity fetched successfully +INFO: Fatal log notifications enabled - API endpoint: https://... +``` + +When errors occur: + +``` +INFO: Sending fatal log notification to monitoring API: https://... +INFO: ✓ Fatal log notification sent successfully (HTTP 200) +``` + +### When Not Configured ⚠️ + +On script startup, you'll see: + +``` +INFO: Fatal log notifications disabled - API endpoint not configured +INFO: To enable: Create /etc/imos/oc_api_endpoint.conf or ./oc_api_endpoint.conf with the API endpoint URL +``` + +The script **continues to run normally** - only the notifications are disabled. + +### When EC2 Metadata Unavailable (Not on EC2) 🖥️ + +``` +INFO: Fatal log notifications disabled - EC2 instance identity not available (not running on EC2 or metadata service unavailable) +``` + +## Update the Endpoint + +To change the API endpoint, simply edit the config file: + +```bash +# Method 1: Direct echo +echo "https://new-endpoint.example.com/api/v1/monitoring/fatal-log" | sudo tee /etc/imos/oc_api_endpoint.conf + +# Method 2: Edit with text editor +sudo nano /etc/imos/oc_api_endpoint.conf +``` + +## Security + +### File Permissions + +The config file should be: + +- **Readable by all** users (so the cron job can read it) +- **Writable only by root** (to prevent unauthorized modification) + +```bash +# Correct permissions: -rw-r--r-- (644) +sudo chmod 644 /etc/imos/oc_api_endpoint.conf +``` diff --git a/ARGO/oceancurrent/oceancurrent_file_server_api.py b/ARGO/oceancurrent/oceancurrent_file_server_api.py index 7b7f9ba1..0a394dcb 100644 --- a/ARGO/oceancurrent/oceancurrent_file_server_api.py +++ b/ARGO/oceancurrent/oceancurrent_file_server_api.py @@ -9,6 +9,7 @@ import traceback from datetime import datetime import resource +import requests # Define the absolute path of the file directory root path # Use local test data if DEV_MODE environment variable is set @@ -18,6 +19,44 @@ else: OCEAN_CURRENT_FILE_ROOT_PATH = base_path +# EC2 Instance Metadata Service endpoints (IMDSv2) +METADATA_BASE_URL = "http://169.254.169.254/latest" +IMDS_TOKEN_URL = f"{METADATA_BASE_URL}/api/token" +INSTANCE_IDENTITY_PKCS7_URL = f"{METADATA_BASE_URL}/dynamic/instance-identity/pkcs7" + +# Backend API endpoint for fatal log notifications +# Reads from config file or environment variable (config file takes precedence) +# Config file location: /etc/imos/oc_api_endpoint.conf or ./oc_api_endpoint.conf +def _load_api_endpoint(): + """Load API endpoint from config file or environment variable.""" + # Try config file locations (in order of preference) + config_locations = [ + "/etc/imos/oc_api_endpoint.conf", # System-wide config + os.path.join(os.path.dirname(__file__), "oc_api_endpoint.conf"), # Local to script + ] + + for config_file in config_locations: + if os.path.exists(config_file): + try: + with open(config_file, 'r') as f: + endpoint = f.read().strip() + if endpoint: + return endpoint + except Exception: + pass # Try next location + + # Fallback to environment variable + return os.getenv("OC_API_ENDPOINT") + +OC_API_ENDPOINT = _load_api_endpoint() + +# Metadata service timeout +METADATA_TIMEOUT = 2 # seconds +IMDS_TOKEN_TTL_SECONDS = 21600 # 6 hours + +# Global cache for PKCS7 signature (fetched once at startup) +_cached_pkcs7 = None + # Production logging configuration with fallback log_format = '%(asctime)s - %(name)s - %(levelname)s - PID:%(process)d - %(funcName)s:%(lineno)d - %(message)s' handlers = [logging.StreamHandler(sys.stdout)] @@ -447,6 +486,135 @@ } ] + +def get_imds_token(): + """ + Fetch IMDSv2 session token for secure metadata access. + + Returns: + Token string or None on error + """ + try: + response = requests.put( + IMDS_TOKEN_URL, + headers={"X-aws-ec2-metadata-token-ttl-seconds": str(IMDS_TOKEN_TTL_SECONDS)}, + timeout=METADATA_TIMEOUT + ) + response.raise_for_status() + return response.text + except requests.exceptions.RequestException: + return None + + +def fetch_instance_identity(): + """ + Fetch PKCS7 signature from EC2 metadata service. + Uses IMDSv2 for enhanced security, with fallback to IMDSv1. + + Returns: + PKCS7 signature string or None on error + """ + try: + # Get IMDSv2 token (optional, will fallback to IMDSv1 if unavailable) + token = get_imds_token() + headers = {} + if token: + headers["X-aws-ec2-metadata-token"] = token + + # Fetch PKCS7 signature + pkcs7_response = requests.get( + INSTANCE_IDENTITY_PKCS7_URL, + headers=headers, + timeout=METADATA_TIMEOUT + ) + pkcs7_response.raise_for_status() + return pkcs7_response.text + + except requests.exceptions.RequestException: + return None + + +def send_fatal_log(error_message, source_type=None, additional_context=None): + """ + Send fatal log notification to the monitoring API. + Uses cached PKCS7 signature for authentication. + + Args: + error_message: The error message to send + source_type: Optional source type identifier (e.g., 'startup', 'scan', 'config') + additional_context: Optional additional context information + + Returns: + True if successful, False otherwise + """ + global _cached_pkcs7 + + # Skip if API endpoint is not configured + if not OC_API_ENDPOINT: + logger.debug("Skipping fatal log notification - API endpoint not configured") + return False + + # Skip if we don't have a valid PKCS7 signature (not running on EC2 or disabled) + if not _cached_pkcs7: + logger.debug("Skipping fatal log notification - No PKCS7 signature available") + return False + + # Determine source identifier + script_name = os.path.basename(__file__) + if source_type: + source = f"oceancurrent-file-server-api/{source_type}" + else: + source = "oceancurrent-file-server-api" + + # Build context with additional info + context_parts = [ + f"script={script_name}", + f"pid={os.getpid()}", + f"python={sys.version.split()[0]}" + ] + if additional_context: + context_parts.append(additional_context) + context = ", ".join(context_parts) + + payload = { + "pkcs7": _cached_pkcs7, + "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), + "errorMessage": error_message, + "source": source, + "context": context + } + + try: + logger.info(f"Sending fatal log notification to monitoring API: {OC_API_ENDPOINT}") + logger.debug(f"Fatal log message: {error_message[:100]}...") # Log first 100 chars + + response = requests.post( + OC_API_ENDPOINT, + json=payload, + headers={"Content-Type": "application/json"}, + timeout=10 + ) + + if response.status_code == 200: + logger.info(f"✓ Fatal log notification sent successfully (HTTP {response.status_code})") + return True + else: + logger.warning(f"✗ Fatal log notification failed with HTTP {response.status_code}") + try: + error_detail = response.json() + logger.warning(f"API response: {error_detail.get('message', 'Unknown error')}") + except: + logger.warning(f"API response: {response.text[:200]}") + return False + + except requests.exceptions.Timeout: + logger.warning("✗ Fatal log notification failed - Request timeout") + return False + except requests.exceptions.RequestException as e: + logger.warning(f"✗ Fatal log notification failed - Request error: {str(e)}") + return False + + def recursive_scan(folder: Path, current_layer: int, include_rules: Dict[int, List[re.Pattern]], @@ -598,10 +766,23 @@ def get_memory_usage(): return 0 def main(): + global _cached_pkcs7 start_time = time.time() script_name = os.path.basename(__file__) initial_memory = get_memory_usage() - + + # Fetch EC2 instance identity for fatal log notifications + if not OC_API_ENDPOINT: + logger.info("Fatal log notifications disabled - API endpoint not configured") + logger.info("To enable: Create /etc/imos/oc_api_endpoint.conf or ./oc_api_endpoint.conf with the API endpoint URL") + _cached_pkcs7 = None + else: + _cached_pkcs7 = fetch_instance_identity() + if _cached_pkcs7: + logger.info(f"Fatal log notifications enabled - API endpoint: {OC_API_ENDPOINT}") + else: + logger.info("Fatal log notifications disabled - EC2 instance identity not available (not running on EC2 or metadata service unavailable)") + # Startup logging logger.info("=" * 80) logger.info(f"STARTING: {script_name}") @@ -614,29 +795,31 @@ def main(): logger.info(f"Total configurations to process: {len(FILE_PATH_CONFIG)}") logger.info(f"Initial memory usage: {initial_memory:.1f} MB") logger.info("=" * 80) - + parent_folder = Path(OCEAN_CURRENT_FILE_ROOT_PATH) - + # Verify root path exists if not parent_folder.exists(): - logger.error(f"Root path does not exist: {parent_folder}") + error_msg = f"Root path does not exist: {parent_folder}" + logger.error(error_msg) + send_fatal_log(error_msg, source_type="startup", additional_context="validation=root_path") sys.exit(1) - + logger.info(f"Root path verified: {parent_folder}") - + total_files_processed = 0 successful_configs = 0 failed_configs = [] - + try: for idx, config in enumerate(FILE_PATH_CONFIG, 1): product_id = config['productId'] progress_pct = (idx / len(FILE_PATH_CONFIG)) * 100 elapsed_time = time.time() - start_time - + logger.info(f"[{progress_pct:.1f}%] Processing configuration {idx}/{len(FILE_PATH_CONFIG)}: {product_id}") logger.debug(f"Elapsed time: {elapsed_time:.1f}s, Files processed so far: {total_files_processed}") - + try: config_start_time = time.time() scanned_files = scan_files_from_config( @@ -644,13 +827,13 @@ def main(): config=config ) config_duration = time.time() - config_start_time - + if len(scanned_files) > 0: logger.info(f"✓ Configuration {product_id}: Found {len(scanned_files)} files in {config_duration:.2f} seconds") save_result_as_json(scanned_files, config, parent_folder) successful_configs += 1 total_files_processed += len(scanned_files) - + # Log progress every 5 configurations if idx % 5 == 0 or idx == len(FILE_PATH_CONFIG): current_memory = get_memory_usage() @@ -663,23 +846,29 @@ def main(): f"Memory: {current_memory:.1f}MB (+{memory_delta:+.1f}MB)") else: logger.warning(f"⚠ Configuration {product_id}: No files found matching criteria") - + except Exception as e: - logger.error(f"✗ Configuration {product_id} failed: {str(e)}") - logger.error(f"Configuration {product_id} traceback: {traceback.format_exc()}") + error_msg = f"✗ Configuration {product_id} failed: {str(e)}" + logger.error(error_msg) + send_fatal_log(error_msg, source_type="scan", additional_context=f"product_id={product_id}") + traceback_msg = f"Configuration {product_id} traceback: {traceback.format_exc()}" + logger.error(traceback_msg) failed_configs.append(product_id) - + except Exception as e: - logger.error(f"Critical error during processing: {str(e)}") - logger.error(f"Critical error traceback: {traceback.format_exc()}") + error_msg = f"Critical error during processing: {str(e)}" + logger.error(error_msg) + send_fatal_log(error_msg, source_type="scan", additional_context="error_type=critical") + traceback_msg = f"Critical error traceback: {traceback.format_exc()}" + logger.error(traceback_msg) sys.exit(1) - + # Summary and shutdown logging end_time = time.time() total_duration = end_time - start_time final_memory = get_memory_usage() peak_memory_delta = final_memory - initial_memory - + logger.info("=" * 80) logger.info("EXECUTION SUMMARY:") logger.info(f"Total execution time: {total_duration:.2f} seconds") @@ -694,9 +883,11 @@ def main(): logger.info(f"Memory usage - Initial: {initial_memory:.1f}MB, Final: {final_memory:.1f}MB, Delta: {peak_memory_delta:+.1f}MB") logger.info(f"COMPLETED: {script_name}") logger.info("=" * 80) - + if failed_configs: - logger.error(f"Script completed with {len(failed_configs)} failed configurations") + error_msg = f"Script completed with {len(failed_configs)} failed configurations" + logger.error(error_msg) + send_fatal_log(error_msg, source_type="completion", additional_context=f"failed_count={len(failed_configs)}, failed_ids={','.join(failed_configs[:5])}") sys.exit(1) else: logger.info("Script completed successfully") @@ -709,6 +900,9 @@ def main(): logger.warning("Script interrupted by user (SIGINT)") sys.exit(130) except Exception as e: - logger.error(f"Unhandled exception: {str(e)}") - logger.error(f"Unhandled exception traceback: {traceback.format_exc()}") + error_msg = f"Unhandled exception: {str(e)}" + logger.error(error_msg) + send_fatal_log(error_msg, source_type="unhandled", additional_context=f"exception_type={type(e).__name__}") + traceback_msg = f"Unhandled exception traceback: {traceback.format_exc()}" + logger.error(traceback_msg) sys.exit(1) diff --git a/ARGO/oceancurrent/test_oceancurrent_file_server_api.py b/ARGO/oceancurrent/test_oceancurrent_file_server_api.py index 7ff7e1a7..37e4ffea 100644 --- a/ARGO/oceancurrent/test_oceancurrent_file_server_api.py +++ b/ARGO/oceancurrent/test_oceancurrent_file_server_api.py @@ -3,7 +3,8 @@ import tempfile import shutil import unittest -from unittest.mock import patch +import unittest.mock +from unittest.mock import patch, MagicMock, mock_open from oceancurrent_file_server_api import main @@ -429,9 +430,16 @@ def verify_json(self, product_key, relative_path, file_name): def test_file_structure_explorer(self): """Tests file structure exploration and JSON generation.""" + # Mock monitoring-related functions to avoid EC2 metadata service calls with patch( "oceancurrent_file_server_api.OCEAN_CURRENT_FILE_ROOT_PATH", new=self.file_test_dir, + ), patch( + "oceancurrent_file_server_api._load_api_endpoint", + return_value=None # Disable monitoring in tests + ), patch( + "oceancurrent_file_server_api.fetch_instance_identity", + return_value=None # No EC2 identity in tests ): main() @@ -458,5 +466,151 @@ def test_file_structure_explorer(self): not_existed_path = os.path.join(self.file_test_dir, "timeseries", "currentMetersCalendar-48.json") self.assertFalse(os.path.exists(not_existed_path)) + +class TestMonitoringFunctions(unittest.TestCase): + """Tests for EC2 monitoring and fatal log notification functions.""" + + @patch('oceancurrent_file_server_api.requests.put') + def test_get_imds_token_success(self, mock_put): + """Test successful IMDSv2 token retrieval.""" + from oceancurrent_file_server_api import get_imds_token + + mock_response = MagicMock() + mock_response.text = "test-token-123" + mock_response.raise_for_status = MagicMock() + mock_put.return_value = mock_response + + token = get_imds_token() + + self.assertEqual(token, "test-token-123") + mock_put.assert_called_once() + + @patch('oceancurrent_file_server_api.requests.put') + def test_get_imds_token_failure(self, mock_put): + """Test IMDSv2 token retrieval failure.""" + from oceancurrent_file_server_api import get_imds_token + import requests + + # Use proper requests exception type + mock_put.side_effect = requests.exceptions.RequestException("Connection failed") + + token = get_imds_token() + + self.assertIsNone(token) + + @patch('oceancurrent_file_server_api.requests.get') + @patch('oceancurrent_file_server_api.get_imds_token') + def test_fetch_instance_identity_success(self, mock_get_token, mock_get): + """Test successful EC2 instance identity fetch.""" + from oceancurrent_file_server_api import fetch_instance_identity + + mock_get_token.return_value = "test-token" + mock_response = MagicMock() + mock_response.text = "test-pkcs7-signature" + mock_response.raise_for_status = MagicMock() + mock_get.return_value = mock_response + + pkcs7 = fetch_instance_identity() + + self.assertEqual(pkcs7, "test-pkcs7-signature") + mock_get.assert_called_once() + + @patch('oceancurrent_file_server_api.requests.get') + @patch('oceancurrent_file_server_api.get_imds_token') + def test_fetch_instance_identity_failure(self, mock_get_token, mock_get): + """Test EC2 instance identity fetch failure.""" + from oceancurrent_file_server_api import fetch_instance_identity + import requests + + mock_get_token.return_value = None + # Use proper requests exception type + mock_get.side_effect = requests.exceptions.RequestException("Connection failed") + + pkcs7 = fetch_instance_identity() + + self.assertIsNone(pkcs7) + + @patch('oceancurrent_file_server_api.requests.post') + @patch('oceancurrent_file_server_api._cached_pkcs7', 'test-pkcs7') + @patch('oceancurrent_file_server_api.OC_API_ENDPOINT', 'https://test.example.com/api') + def test_send_fatal_log_success(self, mock_post): + """Test successful fatal log notification.""" + from oceancurrent_file_server_api import send_fatal_log + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_post.return_value = mock_response + + result = send_fatal_log("Test error message", source_type="test", additional_context="unit_test=true") + + self.assertTrue(result) + mock_post.assert_called_once() + + # Verify payload structure + call_args = mock_post.call_args + payload = call_args[1]['json'] + self.assertIn('pkcs7', payload) + self.assertIn('timestamp', payload) + self.assertIn('errorMessage', payload) + self.assertIn('source', payload) + self.assertIn('context', payload) + self.assertEqual(payload['errorMessage'], "Test error message") + self.assertIn('test', payload['source']) + + @patch('oceancurrent_file_server_api._cached_pkcs7', None) + @patch('oceancurrent_file_server_api.OC_API_ENDPOINT', 'https://test.example.com/api') + def test_send_fatal_log_no_pkcs7(self): + """Test fatal log notification when PKCS7 signature is not available.""" + from oceancurrent_file_server_api import send_fatal_log + + result = send_fatal_log("Test error message") + + self.assertFalse(result) + + @patch('oceancurrent_file_server_api._cached_pkcs7', 'test-pkcs7') + @patch('oceancurrent_file_server_api.OC_API_ENDPOINT', None) + def test_send_fatal_log_no_endpoint(self): + """Test fatal log notification when API endpoint is not configured.""" + from oceancurrent_file_server_api import send_fatal_log + + result = send_fatal_log("Test error message") + + self.assertFalse(result) + + def test_load_api_endpoint_from_env(self): + """Test loading API endpoint from environment variable.""" + from oceancurrent_file_server_api import _load_api_endpoint + + # Mock both path checks and environment variable (use correct env var name) + with patch.dict(os.environ, {'OC_API_ENDPOINT': 'https://env.example.com/api'}, clear=False): + with patch('oceancurrent_file_server_api.os.path.exists', return_value=False): + endpoint = _load_api_endpoint() + + self.assertEqual(endpoint, 'https://env.example.com/api') + + def test_load_api_endpoint_from_file(self): + """Test loading API endpoint from config file.""" + from oceancurrent_file_server_api import _load_api_endpoint + + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.conf') as f: + f.write('https://file.example.com/api') + config_file = f.name + + try: + # Mock the config file location to point to our temp file + with patch.dict(os.environ, {'OC_API_ENDPOINT': ''}, clear=False): + with patch('oceancurrent_file_server_api.os.path.exists') as mock_exists: + with patch('builtins.open', mock_open(read_data='https://file.example.com/api')): + def exists_side_effect(path): + return path == '/etc/imos/oc_api_endpoint.conf' + mock_exists.side_effect = exists_side_effect + + endpoint = _load_api_endpoint() + + self.assertEqual(endpoint, 'https://file.example.com/api') + finally: + os.unlink(config_file) + + if __name__ == '__main__': unittest.main()