diff --git a/api/csv_sync_sample/README.md b/api/csv_sync_sample/README.md new file mode 100644 index 0000000..f98ff97 --- /dev/null +++ b/api/csv_sync_sample/README.md @@ -0,0 +1,180 @@ +# CSV 처리 프로그램 + +이 프로젝트는 CSV 파일을 읽고 처리하는 파이썬 프로그램입니다. + +## 기능 + +### 1. 사용자 등록 프로그램 + +- CSV 파일에서 사용자 정보 읽기 +- 사용자 존재 여부 확인 +- API를 통한 새 사용자 등록 + +### 2. 서버 및 서버 그룹 관리 프로그램 + +- CSV 파일에서 서버 정보 읽기 +- 서버 등록 및 태그 추가 +- 서버 그룹 생성 +- 서버 그룹에 계정 추가 + +### 3. 정책 및 역할 관리 프로그램 + +- 서버 그룹별 계정 추출 +- 서버 그룹별 정책 생성 및 내용 업데이트 +- 서버 그룹별 역할 생성 및 정책 연결 + +### 4. 역할 할당 프로그램 + +- CSV 파일에서 사용자 정보 및 역할 읽기 +- API를 통한 사용자 검색 +- 사용자에게 지정된 역할 부여 +- 역할이 이미 할당된 사용자 건너뛰기 + +### 5. 통합 처리 프로그램 + +- 사용자 및 서버 CSV 파일 두 개를 입력으로 받음 +- 위 4가지 프로그램을 순차적으로 실행 +- 중간 실패 시 전체 프로세스 중단 +- 과정별 진행 상황 로깅 + +## 설치 방법 + +```bash +# 프로젝트 클론 +git clone [repository_url] +cd csv-processor + +# 필요한 패키지 설치 +pip install -r requirements.txt +``` + +## 사용 방법 + +### 사용자 등록 프로그램 + +```bash +# API URL과 API Key를 명령행 인자로 제공 +python process_users.py data/sample_users.csv --api-url https://example.com --api-key your-api-key + +# 또는 환경 변수로 제공 +export API_BASE_URL=https://example.com +export API_KEY=your-api-key +python process_users.py data/sample_users.csv +``` + +### 서버 및 서버 그룹 관리 프로그램 + +```bash +# API URL과 API Key를 명령행 인자로 제공 +python process_servers.py data/sample_servers.csv --api-url https://example.com --api-key your-api-key + +# 또는 환경 변수로 제공 +export API_BASE_URL=https://example.com +export API_KEY=your-api-key +python process_servers.py data/sample_servers.csv +``` + +### 정책 및 역할 관리 프로그램 + +```bash +# API URL과 API Key를 명령행 인자로 제공 +python process_policies.py data/sample_servers.csv --api-url https://example.com --api-key your-api-key + +# 또는 환경 변수로 제공 +export API_BASE_URL=https://example.com +export API_KEY=your-api-key +python process_policies.py data/sample_servers.csv +``` + +### 역할 할당 프로그램 + +```bash +# API URL과 API Key를 명령행 인자로 제공 +python process_roles.py data/sample_users.csv --api-url https://example.com --api-key your-api-key + +# 또는 환경 변수로 제공 +export API_BASE_URL=https://example.com +export API_KEY=your-api-key +python process_roles.py data/sample_users.csv +``` + +### 통합 처리 프로그램 + +```bash +# API URL과 API Key를 명령행 인자로 제공 +python process_all.py data/sample_users.csv data/sample_servers.csv --api-url https://example.com --api-key your-api-key + +# 또는 환경 변수로 제공 +export API_BASE_URL=https://example.com +export API_KEY=your-api-key +python process_all.py data/sample_users.csv data/sample_servers.csv +``` + +## 프로젝트 구조 + +- `process_users.py`: 사용자 등록 프로그램 진입점 +- `user_processor.py`: 사용자 CSV 처리 로직 +- `process_servers.py`: 서버 관리 프로그램 진입점 +- `server_processor.py`: 서버 CSV 처리 로직 +- `process_policies.py`: 정책 및 역할 관리 프로그램 진입점 +- `policy_processor.py`: 정책 및 역할 처리 로직 +- `process_roles.py`: 역할 할당 프로그램 진입점 +- `role_assigner.py`: 역할 할당 처리 로직 +- `process_all.py`: 모든 처리를 순차 실행하는 통합 프로그램 +- `tests/`: 테스트 코드 +- `data/`: 샘플 CSV 파일 + +## CSV 파일 형식 + +### 사용자 CSV 형식 +``` +email,loginId,name,password,role +user1@example.com,user1,사용자1,password123,ADMIN +``` +(role 필드는 선택 사항이며, 세미콜론(;)으로 구분하여 여러 역할을 지정할 수 있습니다) + +### 서버 CSV 형식 +``` +host,name,osType,sshport,server_group,account_name +10.10.10.10,server1,AWS_LINUX,22,WEB_GROUP,ec2-user +``` + +### 역할 할당 CSV 형식 +``` +email,loginId,name,password,role +user1@example.com,user1,User 1,password123,ADMIN +user2@example.com,user2,User 2,password456,USER;MANAGER +``` +(여기서 'role' 값에 'role'을 붙여 역할명이 생성되며, 세미콜론(;)으로 구분하여 여러 역할을 지정할 수 있습니다. 예: 'ADMIN role', 'USER role', 'MANAGER role') + +## 정책 형식 + +정책은 다음과 같은 YAML 형식으로 생성됩니다: + +```yaml +apiVersion: server.rbac.querypie.com/v1 +kind: SacPolicy + +spec: + allow: + resources: + - serverGroup: {{server_group}} + account: {{account_name}} + actions: + protocols: ["ssh", "sftp"] + commandsRef: "Default Policy" + conditions: + accessTime: "00:00-23:59" + accessWeekday: ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] + ipAddresses: ["0.0.0.0/0"] + options: + commandAudit: true + commandDetection: false + useProxy: true + maxSessions: 5 + sessionTimeout: 10 +``` + +## 요구 사항 + +Python 3.6 이상 diff --git a/api/csv_sync_sample/data/sample_roles.csv b/api/csv_sync_sample/data/sample_roles.csv new file mode 100644 index 0000000..e3f6a07 --- /dev/null +++ b/api/csv_sync_sample/data/sample_roles.csv @@ -0,0 +1,6 @@ +email,loginId,name,password,role +user1@example.com,user1,사용자1,password123,ADMIN +user2@example.com,user2,사용자2,password456,USER +user3@example.com,user3,사용자3,password789,MANAGER +user4@example.com,user4,사용자4,password010,ADMIN;USER +user5@example.com,user5,사용자5,password011,USER;MANAGER;ADMIN \ No newline at end of file diff --git a/api/csv_sync_sample/data/sample_servers.csv b/api/csv_sync_sample/data/sample_servers.csv new file mode 100644 index 0000000..25bf5e4 --- /dev/null +++ b/api/csv_sync_sample/data/sample_servers.csv @@ -0,0 +1,7 @@ +host,name,osType,sshport,server_group,account_name +10.10.10.10,server1,AWS_LINUX,22,WEB_GROUP,ec2-user +10.10.10.11,server2,AWS_LINUX,22,WEB_GROUP,ec2-user +10.10.10.12,server3,CENTOS,22,DB_GROUP,ec2-user +10.10.10.13,server4,CENTOS,22,DB_GROUP,ec2-user +10.10.10.14,server5,WINDOWS,3389,APP_GROUP,ec2-user +10.10.10.15,server6,WINDOWS,3389,APP_GROUP,ec2-user diff --git a/api/csv_sync_sample/data/sample_users.csv b/api/csv_sync_sample/data/sample_users.csv new file mode 100644 index 0000000..84fea7d --- /dev/null +++ b/api/csv_sync_sample/data/sample_users.csv @@ -0,0 +1,4 @@ +email,loginId,name,password,role +user1@example.com,user1,사용자1,password123,ADMIN +user2@example.com,user2,사용자2,password456,USER +user3@example.com,user3,사용자3,password789,USER diff --git a/api/csv_sync_sample/policy_processor.py b/api/csv_sync_sample/policy_processor.py new file mode 100644 index 0000000..2220be3 --- /dev/null +++ b/api/csv_sync_sample/policy_processor.py @@ -0,0 +1,386 @@ +import csv +import requests +import logging +import yaml +import json +from collections import defaultdict + +class PolicyProcessor: + def __init__(self, api_base_url, api_key): + """ + API를 통해 정책 및 역할을 처리하는 클래스 초기화 + + Args: + api_base_url (str): API 기본 URL (예: 'https://example.com') + api_key (str): API 인증에 사용할 키 + """ + self.api_base_url = api_base_url + self.headers = { + "Authorization": api_key, + "Content-Type": "application/json" + } + # 로깅 설정 + self.logger = logging.getLogger(__name__) + # 정책 UUID를 저장할 딕셔너리 (name -> uuid) + self.policy_uuids = {} + # 역할 UUID를 저장할 딕셔너리 (name -> uuid) + self.role_uuids = {} + + def policy_exists(self, policy_name): + """ + 정책 존재 여부 확인 및 UUID 반환 + + Args: + policy_name (str): 확인할 정책 이름 + + Returns: + str: 정책 UUID 또는 None (존재하지 않는 경우) + """ + # 이미 캐시된 경우 바로 반환 + if policy_name in self.policy_uuids: + return self.policy_uuids[policy_name] + + url = f"{self.api_base_url}/api/external/v2/sac/policies?name={policy_name}" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + data = response.json() + policies = data.get("list", []) + + if policies and len(policies) > 0: + policy_uuid = policies[0]["uuid"] + self.policy_uuids[policy_name] = policy_uuid + return policy_uuid + return None + except requests.exceptions.RequestException as e: + self.logger.error(f"정책 확인 중 오류 발생: {e}") + return None + + def add_policy(self, policy_name, description): + """ + 새 정책 추가 + + Args: + policy_name (str): 정책 이름 + description (str): 정책 설명 + + Returns: + str: 정책 UUID 또는 None (오류 발생 시) + """ + url = f"{self.api_base_url}/api/external/v2/sac/policies" + + data = { + "name": policy_name, + "description": description + } + + try: + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + result = response.json() + policy_uuid = result.get("uuid") + + if policy_uuid: + self.policy_uuids[policy_name] = policy_uuid + + return policy_uuid + except requests.exceptions.RequestException as e: + self.logger.error(f"정책 추가 중 오류 발생: {e}") + return None + + def update_policy_content(self, policy_uuid, server_group, account_names, justification="initial"): + """ + 정책 내용 업데이트 + + Args: + policy_uuid (str): 정책 UUID + server_group (str): 서버 그룹 이름 + account_names (list): 계정 이름 목록 + justification (str, optional): 변경 사유. 기본값은 "initial" + + Returns: + bool: 성공 여부 + """ + url = f"{self.api_base_url}/api/external/v2/sac/policies/{policy_uuid}/content" + + # 계정이 1개인 경우와 여러 개인 경우 처리 + if len(account_names) == 1: + account = account_names[0] + else: + account = account_names + + # 정책 내용 생성 (YAML 형식) + policy_content = { + "apiVersion": "server.rbac.querypie.com/v1", + "kind": "SacPolicy", + "spec": { + "allow": { + "resources": [ + { + "serverGroup": server_group, + "account": account + } + ], + "actions": { + "protocols": ["ssh", "sftp"], + "commandsRef": "Default Policy" + }, + "conditions": { + "accessTime": "00:00-23:59", + "accessWeekday": ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"], + "ipAddresses": ["0.0.0.0/0"] + }, + "options": { + "commandAudit": True, + "commandDetection": False, + "useProxy": True, + "maxSessions": 5, + "sessionTimeout": 10 + } + } + } + } + + # YAML로 변환 + policy_yaml = yaml.dump(policy_content, default_flow_style=False) + + data = { + "content": policy_yaml, + "justification": justification + } + + try: + response = requests.put(url, headers=self.headers, json=data) + response.raise_for_status() + return True + except requests.exceptions.RequestException as e: + self.logger.error(f"정책 내용 업데이트 중 오류 발생: {e}") + return False + + def role_exists(self, role_name): + """ + 역할 존재 여부 확인 및 UUID 반환 + + Args: + role_name (str): 확인할 역할 이름 + + Returns: + str: 역할 UUID 또는 None (존재하지 않는 경우) + """ + # 이미 캐시된 경우 바로 반환 + if role_name in self.role_uuids: + return self.role_uuids[role_name] + + url = f"{self.api_base_url}/api/external/v2/sac/roles" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + data = response.json() + roles = data.get("list", []) + + for role in roles: + if role.get("name") == role_name: + role_uuid = role["uuid"] + self.role_uuids[role_name] = role_uuid + return role_uuid + return None + except requests.exceptions.RequestException as e: + self.logger.error(f"역할 확인 중 오류 발생: {e}") + return None + + def add_role(self, role_name, description): + """ + 새 역할 추가 + + Args: + role_name (str): 역할 이름 + description (str): 역할 설명 + + Returns: + str: 역할 UUID 또는 None (오류 발생 시) + """ + url = f"{self.api_base_url}/api/external/v2/sac/roles" + + data = { + "name": role_name, + "description": description + } + + try: + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + result = response.json() + role_uuid = result.get("uuid") + + if role_uuid: + self.role_uuids[role_name] = role_uuid + + return role_uuid + except requests.exceptions.RequestException as e: + self.logger.error(f"역할 추가 중 오류 발생: {e}") + return None + + def add_policies_to_role(self, role_uuid, policy_uuids): + """ + 역할에 정책 추가 + + Args: + role_uuid (str): 역할 UUID + policy_uuids (list): 정책 UUID 목록 + + Returns: + bool: 성공 여부 + """ + url = f"{self.api_base_url}/api/external/v2/sac/roles/{role_uuid}/policies" + + data = { + "serverPolicyIdentifiers": policy_uuids + } + + try: + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + return True + except requests.exceptions.RequestException as e: + self.logger.error(f"역할에 정책 추가 중 오류 발생: {e}") + return False + +def process_policies_from_csv(csv_file_path, api_base_url, api_key): + """ + CSV 파일을 처리하여 정책 및 역할 생성 + + Args: + csv_file_path (str): CSV 파일 경로 + api_base_url (str): API 기본 URL + api_key (str): API 키 + + Returns: + bool: 성공 여부 + """ + processor = PolicyProcessor(api_base_url, api_key) + + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + # 서버 그룹별 계정 이름 매핑 + server_group_accounts = defaultdict(set) + + logger.info(f"CSV 파일 처리 시작: {csv_file_path}") + + # 1단계: CSV 파일을 읽고 서버 그룹별 계정 목록 수집 + try: + with open(csv_file_path, 'r', encoding='utf-8') as csvfile: + csv_reader = csv.DictReader(csvfile) + + for row_num, row in enumerate(csv_reader, start=2): # 헤더를 제외하고 2부터 시작 + try: + # 필요한 필드가 모두 있는지 확인 + required_fields = ['server_group', 'account_name'] + if not all(field in row for field in required_fields): + missing = [f for f in required_fields if f not in row] + logger.warning(f"행 {row_num}: 필수 필드 누락 - {', '.join(missing)}") + continue + + server_group = row['server_group'] + account_name = row['account_name'] + + # 계정 이름을 서버 그룹별로 수집 + server_group_accounts[server_group].add(account_name) + + except Exception as e: + logger.error(f"행 {row_num} 처리 중 오류 발생: {e}") + + # 2단계: 각 서버 그룹에 대한 정책 및 역할 생성 + policy_success_count = 0 + policy_skip_count = 0 + policy_error_count = 0 + role_success_count = 0 + role_skip_count = 0 + role_error_count = 0 + + for server_group, account_names in server_group_accounts.items(): + try: + # 정책 이름 및 설명 설정 + policy_name = f"{server_group} policy" + policy_description = server_group + + # 정책이 이미 존재하는지 확인 + policy_uuid = processor.policy_exists(policy_name) + + if policy_uuid: + logger.info(f"정책 '{policy_name}'이(가) 이미 존재합니다. UUID: {policy_uuid}") + policy_skip_count += 1 + else: + # 정책 추가 + policy_uuid = processor.add_policy(policy_name, policy_description) + + if not policy_uuid: + logger.error(f"정책 '{policy_name}' 추가 실패") + policy_error_count += 1 + continue + + logger.info(f"정책 '{policy_name}' 추가 성공 (UUID: {policy_uuid})") + policy_success_count += 1 + + # 정책 내용 업데이트 + account_list = list(account_names) + update_result = processor.update_policy_content(policy_uuid, server_group, account_list) + + if update_result: + logger.info(f"정책 '{policy_name}' 내용 업데이트 성공") + else: + logger.error(f"정책 '{policy_name}' 내용 업데이트 실패") + + # 역할 이름 및 설명 설정 + role_name = f"{server_group} role" + role_description = server_group + + # 역할이 이미 존재하는지 확인 + role_uuid = processor.role_exists(role_name) + + if role_uuid: + logger.info(f"역할 '{role_name}'이(가) 이미 존재합니다. UUID: {role_uuid}") + role_skip_count += 1 + else: + # 역할 추가 + role_uuid = processor.add_role(role_name, role_description) + + if not role_uuid: + logger.error(f"역할 '{role_name}' 추가 실패") + role_error_count += 1 + continue + + logger.info(f"역할 '{role_name}' 추가 성공 (UUID: {role_uuid})") + role_success_count += 1 + + # 역할에 정책 추가 + policies_result = processor.add_policies_to_role(role_uuid, [policy_uuid]) + + if policies_result: + logger.info(f"역할 '{role_name}'에 정책 추가 성공") + else: + logger.error(f"역할 '{role_name}'에 정책 추가 실패") + + except Exception as e: + logger.error(f"서버 그룹 '{server_group}' 처리 중 오류 발생: {e}") + policy_error_count += 1 + role_error_count += 1 + + logger.info(f"정책 처리 결과: 성공={policy_success_count}, 스킵={policy_skip_count}, 오류={policy_error_count}") + logger.info(f"역할 처리 결과: 성공={role_success_count}, 스킵={role_skip_count}, 오류={role_error_count}") + + return True + + except Exception as e: + logger.error(f"CSV 파일 처리 중 오류 발생: {e}") + return False \ No newline at end of file diff --git a/api/csv_sync_sample/process_all.py b/api/csv_sync_sample/process_all.py new file mode 100644 index 0000000..1f48600 --- /dev/null +++ b/api/csv_sync_sample/process_all.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +사용자 및 서버 CSV 파일을 처리하여 등록, 서버 그룹 생성, 정책 및 역할 관리, 역할 할당까지 모든 과정을 순차적으로 수행하는 프로그램 +""" + +import os +import sys +import argparse +import logging +import subprocess +import time + +def run_process(command, description): + """ + 주어진 명령을 실행하고 결과를 반환 + + Args: + command (list): 실행할 명령 (subprocess.run에 전달할 형식) + description (str): 명령에 대한 설명 + + Returns: + bool: 성공 여부 + """ + logger = logging.getLogger(__name__) + + logger.info(f"{description} 시작") + try: + result = subprocess.run( + command, + stdout=None, # stdout을 None으로 설정하여 터미널에 직접 출력 + stderr=None, # stderr을 None으로 설정하여 터미널에 직접 출력 + text=True, + check=False # 명령이 실패해도 예외를 발생시키지 않음 + ) + + if result.returncode == 0: + logger.info(f"{description} 성공") + return True + else: + logger.error(f"{description} 실패") + return False + except Exception as e: + logger.error(f"{description} 실행 중 오류 발생: {str(e)}") + return False + +def main(): + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + # 명령행 인자 파싱 + parser = argparse.ArgumentParser( + description='사용자 및 서버 CSV 파일을 처리하여 모든 과정을 순차적으로 수행합니다.' + ) + parser.add_argument('users_csv', help='사용자 정보가 담긴 CSV 파일 경로') + parser.add_argument('servers_csv', help='서버 정보가 담긴 CSV 파일 경로') + parser.add_argument('--api-url', dest='api_url', default=os.environ.get('API_BASE_URL'), + help='API 기본 URL (기본값: 환경 변수 API_BASE_URL)') + parser.add_argument('--api-key', dest='api_key', default=os.environ.get('API_KEY'), + help='API 인증 키 (기본값: 환경 변수 API_KEY)') + + args = parser.parse_args() + + # 필수 매개변수 확인 + if not args.users_csv or not args.servers_csv: + logger.error("사용자 CSV 파일과 서버 CSV 파일 모두 필요합니다.") + parser.print_help() + return 1 + + if not args.api_url: + logger.error("API 기본 URL이 필요합니다. --api-url 옵션을 사용하거나 API_BASE_URL 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + if not args.api_key: + logger.error("API 키가 필요합니다. --api-key 옵션을 사용하거나 API_KEY 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + # CSV 파일 존재 확인 + if not os.path.isfile(args.users_csv): + logger.error(f"사용자 CSV 파일을 찾을 수 없습니다: {args.users_csv}") + return 1 + + if not os.path.isfile(args.servers_csv): + logger.error(f"서버 CSV 파일을 찾을 수 없습니다: {args.servers_csv}") + return 1 + + logger.info("모든 처리 프로세스 시작") + logger.info(f"API URL: {args.api_url}") + logger.info(f"사용자 CSV 파일: {args.users_csv}") + logger.info(f"서버 CSV 파일: {args.servers_csv}") + + # 공통 명령 인자 + api_args = [ + '--api-url', args.api_url, + '--api-key', args.api_key + ] + + # 1. 사용자 등록 처리 + user_cmd = [sys.executable, 'process_users.py', args.users_csv] + api_args + if not run_process(user_cmd, "사용자 등록 처리"): + logger.error("사용자 등록 처리 실패로 전체 프로세스를 중단합니다.") + return 1 + + # 처리 간 딜레이 (API 호출 부하 방지) + time.sleep(1) + + # 2. 서버 등록 및 그룹 처리 + server_cmd = [sys.executable, 'process_servers.py', args.servers_csv] + api_args + if not run_process(server_cmd, "서버 등록 및 그룹 처리"): + logger.error("서버 등록 처리 실패로 전체 프로세스를 중단합니다.") + return 1 + + # 처리 간 딜레이 (API 호출 부하 방지) + time.sleep(1) + + # 3. 정책 및 역할 생성 처리 + policy_cmd = [sys.executable, 'process_policies.py', args.servers_csv] + api_args + if not run_process(policy_cmd, "정책 및 역할 생성 처리"): + logger.error("정책 및 역할 생성 처리 실패로 전체 프로세스를 중단합니다.") + return 1 + + # 처리 간 딜레이 (API 호출 부하 방지) + time.sleep(1) + + # 4. 역할 할당 처리 + role_cmd = [sys.executable, 'process_roles.py', args.users_csv] + api_args + if not run_process(role_cmd, "역할 할당 처리"): + logger.error("역할 할당 처리 실패로 전체 프로세스를 중단합니다.") + return 1 + + logger.info("모든 처리 프로세스 성공적으로 완료") + return 0 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/api/csv_sync_sample/process_policies.py b/api/csv_sync_sample/process_policies.py new file mode 100644 index 0000000..aabce5a --- /dev/null +++ b/api/csv_sync_sample/process_policies.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +""" +CSV 파일에서 서버 그룹 정보를 읽고 API를 통해 정책 및 역할을 생성하는 프로그램 +""" + +import os +import sys +import argparse +import logging +from policy_processor import process_policies_from_csv + +def main(): + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + # 명령행 인자 파싱 + parser = argparse.ArgumentParser(description='CSV 파일에서 서버 그룹 정보를 읽고 API를 통해 정책 및 역할을 생성합니다.') + parser.add_argument('csv_file', help='처리할 CSV 파일 경로') + parser.add_argument('--api-url', dest='api_url', default=os.environ.get('API_BASE_URL'), + help='API 기본 URL (기본값: 환경 변수 API_BASE_URL)') + parser.add_argument('--api-key', dest='api_key', default=os.environ.get('API_KEY'), + help='API 인증 키 (기본값: 환경 변수 API_KEY)') + + args = parser.parse_args() + + # 필수 매개변수 확인 + if not args.csv_file: + logger.error("CSV 파일 경로가 필요합니다.") + parser.print_help() + return 1 + + if not args.api_url: + logger.error("API 기본 URL이 필요합니다. --api-url 옵션을 사용하거나 API_BASE_URL 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + if not args.api_key: + logger.error("API 키가 필요합니다. --api-key 옵션을 사용하거나 API_KEY 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + # CSV 파일 존재 확인 + if not os.path.isfile(args.csv_file): + logger.error(f"CSV 파일을 찾을 수 없습니다: {args.csv_file}") + return 1 + + logger.info(f"API URL: {args.api_url}") + logger.info(f"CSV 파일: {args.csv_file}") + + # CSV 처리 시작 + success = process_policies_from_csv(args.csv_file, args.api_url, args.api_key) + + return 0 if success else 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/api/csv_sync_sample/process_roles.py b/api/csv_sync_sample/process_roles.py new file mode 100644 index 0000000..ce88f9e --- /dev/null +++ b/api/csv_sync_sample/process_roles.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +""" +CSV 파일에서 사용자 정보를 읽고 API를 통해 역할을 할당하는 프로그램 +""" + +import os +import sys +import argparse +import logging +from role_assigner import process_role_assignment + +def main(): + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + # 명령행 인자 파싱 + parser = argparse.ArgumentParser(description='CSV 파일에서 사용자 정보를 읽고 API를 통해 역할을 할당합니다.') + parser.add_argument('csv_file', help='처리할 CSV 파일 경로') + parser.add_argument('--api-url', dest='api_url', default=os.environ.get('API_BASE_URL'), + help='API 기본 URL (기본값: 환경 변수 API_BASE_URL)') + parser.add_argument('--api-key', dest='api_key', default=os.environ.get('API_KEY'), + help='API 인증 키 (기본값: 환경 변수 API_KEY)') + + args = parser.parse_args() + + # 필수 매개변수 확인 + if not args.csv_file: + logger.error("CSV 파일 경로가 필요합니다.") + parser.print_help() + return 1 + + if not args.api_url: + logger.error("API 기본 URL이 필요합니다. --api-url 옵션을 사용하거나 API_BASE_URL 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + if not args.api_key: + logger.error("API 키가 필요합니다. --api-key 옵션을 사용하거나 API_KEY 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + # CSV 파일 존재 확인 + if not os.path.isfile(args.csv_file): + logger.error(f"CSV 파일을 찾을 수 없습니다: {args.csv_file}") + return 1 + + logger.info(f"API URL: {args.api_url}") + logger.info(f"CSV 파일: {args.csv_file}") + + # CSV 처리 시작 + success = process_role_assignment(args.csv_file, args.api_url, args.api_key) + + return 0 if success else 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/api/csv_sync_sample/process_servers.py b/api/csv_sync_sample/process_servers.py new file mode 100644 index 0000000..eaea0ad --- /dev/null +++ b/api/csv_sync_sample/process_servers.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +""" +CSV 파일에서 서버 정보를 읽고 API를 통해 서버, 서버 그룹 및 계정을 등록하는 프로그램 +""" + +import os +import sys +import argparse +import logging +from server_processor import process_server_csv + +def main(): + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + # 명령행 인자 파싱 + parser = argparse.ArgumentParser(description='CSV 파일에서 서버 정보를 읽고 API를 통해 서버, 서버 그룹 및 계정을 등록합니다.') + parser.add_argument('csv_file', help='처리할 CSV 파일 경로') + parser.add_argument('--api-url', dest='api_url', default=os.environ.get('API_BASE_URL'), + help='API 기본 URL (기본값: 환경 변수 API_BASE_URL)') + parser.add_argument('--api-key', dest='api_key', default=os.environ.get('API_KEY'), + help='API 인증 키 (기본값: 환경 변수 API_KEY)') + + args = parser.parse_args() + + # 필수 매개변수 확인 + if not args.csv_file: + logger.error("CSV 파일 경로가 필요합니다.") + parser.print_help() + return 1 + + if not args.api_url: + logger.error("API 기본 URL이 필요합니다. --api-url 옵션을 사용하거나 API_BASE_URL 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + if not args.api_key: + logger.error("API 키가 필요합니다. --api-key 옵션을 사용하거나 API_KEY 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + # CSV 파일 존재 확인 + if not os.path.isfile(args.csv_file): + logger.error(f"CSV 파일을 찾을 수 없습니다: {args.csv_file}") + return 1 + + logger.info(f"API URL: {args.api_url}") + logger.info(f"CSV 파일: {args.csv_file}") + + # CSV 처리 시작 + success = process_server_csv(args.csv_file, args.api_url, args.api_key) + + return 0 if success else 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/api/csv_sync_sample/process_users.py b/api/csv_sync_sample/process_users.py new file mode 100644 index 0000000..6d321c9 --- /dev/null +++ b/api/csv_sync_sample/process_users.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +""" +CSV 파일에서 사용자 정보를 읽고 API를 통해 사용자를 등록하는 프로그램 +""" + +import os +import sys +import argparse +import logging +from user_processor import process_users_csv + +def main(): + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + # 명령행 인자 파싱 + parser = argparse.ArgumentParser(description='CSV 파일에서 사용자 정보를 읽고 API를 통해 사용자를 등록합니다.') + parser.add_argument('csv_file', help='처리할 CSV 파일 경로') + parser.add_argument('--api-url', dest='api_url', default=os.environ.get('API_BASE_URL'), + help='API 기본 URL (기본값: 환경 변수 API_BASE_URL)') + parser.add_argument('--api-key', dest='api_key', default=os.environ.get('API_KEY'), + help='API 인증 키 (기본값: 환경 변수 API_KEY)') + + args = parser.parse_args() + + # 필수 매개변수 확인 + if not args.csv_file: + logger.error("CSV 파일 경로가 필요합니다.") + parser.print_help() + return 1 + + if not args.api_url: + logger.error("API 기본 URL이 필요합니다. --api-url 옵션을 사용하거나 API_BASE_URL 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + if not args.api_key: + logger.error("API 키가 필요합니다. --api-key 옵션을 사용하거나 API_KEY 환경 변수를 설정하세요.") + parser.print_help() + return 1 + + # CSV 파일 존재 확인 + if not os.path.isfile(args.csv_file): + logger.error(f"CSV 파일을 찾을 수 없습니다: {args.csv_file}") + return 1 + + logger.info(f"API URL: {args.api_url}") + logger.info(f"CSV 파일: {args.csv_file}") + + # CSV 처리 시작 + success = process_users_csv(args.csv_file, args.api_url, args.api_key) + + return 0 if success else 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/api/csv_sync_sample/requirements.txt b/api/csv_sync_sample/requirements.txt new file mode 100644 index 0000000..6b7adad --- /dev/null +++ b/api/csv_sync_sample/requirements.txt @@ -0,0 +1,2 @@ +requests==2.31.0 +PyYAML==6.0 \ No newline at end of file diff --git a/api/csv_sync_sample/role_assigner.py b/api/csv_sync_sample/role_assigner.py new file mode 100644 index 0000000..7889017 --- /dev/null +++ b/api/csv_sync_sample/role_assigner.py @@ -0,0 +1,283 @@ +import csv +import requests +import logging +import datetime +from collections import defaultdict + +class RoleAssigner: + def __init__(self, api_base_url, api_key): + """ + API를 통해 사용자에게 역할을 할당하는 클래스 초기화 + + Args: + api_base_url (str): API 기본 URL (예: 'https://example.com') + api_key (str): API 인증에 사용할 키 + """ + self.api_base_url = api_base_url + self.headers = { + "Authorization": api_key, + "Content-Type": "application/json" + } + # 로깅 설정 + self.logger = logging.getLogger(__name__) + # 사용자 UUID를 저장할 딕셔너리 (loginId -> uuid) + self.user_uuids = {} + # 역할 UUID를 저장할 딕셔너리 (roleName -> uuid) + self.role_uuids = {} + # 사용자에게 할당된 역할을 저장할 딕셔너리 (user_uuid -> {role_uuid}) + self.assigned_roles = defaultdict(set) + + def find_user(self, login_id): + """ + 사용자 찾기 + + Args: + login_id (str): 사용자 로그인 ID + + Returns: + str: 사용자 UUID 또는 None (존재하지 않는 경우) + """ + # 이미 캐시된 경우 바로 반환 + if login_id in self.user_uuids: + return self.user_uuids[login_id] + + url = f"{self.api_base_url}/api/external/v2/users?loginId={login_id}" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + data = response.json() + users = data.get("list", []) + + if users and len(users) > 0: + user = users[0] + if user.get("loginId") == login_id: + user_uuid = user.get("uuid") + self.user_uuids[login_id] = user_uuid + return user_uuid + return None + except requests.exceptions.RequestException as e: + self.logger.error(f"사용자 검색 중 오류 발생: {e}") + return None + + def find_role(self, role_name): + """ + 역할 찾기 + + Args: + role_name (str): 역할 이름 + + Returns: + str: 역할 UUID 또는 None (존재하지 않는 경우) + """ + # 이미 캐시된 경우 바로 반환 + if role_name in self.role_uuids: + return self.role_uuids[role_name] + + url = f"{self.api_base_url}/api/external/v2/sac/roles?name={role_name}" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + data = response.json() + roles = data.get("list", []) + + for role in roles: + if role.get("name") == role_name: + role_uuid = role.get("uuid") + self.role_uuids[role_name] = role_uuid + return role_uuid + return None + except requests.exceptions.RequestException as e: + self.logger.error(f"역할 검색 중 오류 발생: {e}") + return None + + def get_user_roles(self, user_uuid): + """ + 사용자에게 할당된 역할 목록 가져오기 + + Args: + user_uuid (str): 사용자 UUID + + Returns: + set: 역할 UUID 집합 + """ + # 이미 캐시된 경우 바로 반환 + if user_uuid in self.assigned_roles: + return self.assigned_roles[user_uuid] + + url = f"{self.api_base_url}/api/external/v2/sac/access-controls/{user_uuid}/roles" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + data = response.json() + roles = data.get("list", []) + + role_uuids = set() + for role in roles: + role_uuid = role.get("serverRoleUuid") + role_uuids.add(role_uuid) + + self.assigned_roles[user_uuid] = role_uuids + return role_uuids + except requests.exceptions.RequestException as e: + self.logger.error(f"사용자 역할 목록 조회 중 오류 발생: {e}") + return set() + + def assign_role(self, user_uuid, role_uuid, expiry_years=10): + """ + 사용자에게 역할 할당 + + Args: + user_uuid (str): 사용자 UUID + role_uuid (str): 역할 UUID + expiry_years (int, optional): 만료 기간(년). 기본값은 10 + + Returns: + bool: 성공 여부 + """ + url = f"{self.api_base_url}/api/external/v2/sac/access-controls/{user_uuid}/roles" + + # 10년 후 날짜 계산 + today = datetime.date.today() + expiry_date = today.replace(year=today.year + expiry_years) + expiry_str = expiry_date.isoformat() + + data = { + "expiryAt": expiry_str, + "serverRoleUuids": [role_uuid] + } + + try: + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + # 할당 성공 시 캐시 업데이트 + if user_uuid in self.assigned_roles: + self.assigned_roles[user_uuid].add(role_uuid) + else: + self.assigned_roles[user_uuid] = {role_uuid} + + return True + except requests.exceptions.RequestException as e: + self.logger.error(f"역할 할당 중 오류 발생: {e}") + return False + +def process_role_assignment(csv_file_path, api_base_url, api_key): + """ + CSV 파일을 처리하여 사용자에게 역할 할당 + + Args: + csv_file_path (str): CSV 파일 경로 + api_base_url (str): API 기본 URL + api_key (str): API 키 + + Returns: + bool: 성공 여부 + """ + assigner = RoleAssigner(api_base_url, api_key) + + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + success_count = 0 + skip_count = 0 + error_count = 0 + + logger.info(f"CSV 파일 처리 시작: {csv_file_path}") + + try: + with open(csv_file_path, 'r', encoding='utf-8') as csvfile: + csv_reader = csv.DictReader(csvfile) + + for row_num, row in enumerate(csv_reader, start=2): # 헤더를 제외하고 2부터 시작 + try: + # 필요한 필드가 모두 있는지 확인 + required_fields = ['loginId', 'role'] + if not all(field in row for field in required_fields): + missing = [f for f in required_fields if f not in row] + logger.warning(f"행 {row_num}: 필수 필드 누락 - {', '.join(missing)}") + error_count += 1 + continue + + login_id = row['loginId'] + role_values = row['role'].split(';') + + # 역할 값이 비어있으면 건너뛰기 + if not any(role_value.strip() for role_value in role_values): + logger.warning(f"행 {row_num}: 역할 값이 비어 있습니다. 건너뜁니다.") + skip_count += 1 + continue + + # 사용자 찾기 + user_uuid = assigner.find_user(login_id) + if not user_uuid: + logger.error(f"행 {row_num}: 사용자 '{login_id}'를 찾을 수 없습니다.") + error_count += 1 + continue + + # 사용자에게 이미 할당된 역할 가져오기 + existing_roles = assigner.get_user_roles(user_uuid) + + role_success = 0 + role_skip = 0 + role_error = 0 + + # 각 역할에 대해 처리 + for role_value in role_values: + role_value = role_value.strip() + if not role_value: + continue + + role_name = f"{role_value} role" + + # 역할 찾기 + role_uuid = assigner.find_role(role_name) + if not role_uuid: + logger.error(f"행 {row_num}: 역할 '{role_name}'을 찾을 수 없습니다.") + role_error += 1 + continue + + # 이미 할당된 역할인지 확인 + if role_uuid in existing_roles: + logger.info(f"행 {row_num}: 사용자 '{login_id}'에게 이미 역할 '{role_name}'이 할당되어 있습니다.") + role_skip += 1 + continue + + # 역할 할당 + result = assigner.assign_role(user_uuid, role_uuid) + if result: + logger.info(f"행 {row_num}: 사용자 '{login_id}'에게 역할 '{role_name}' 할당 성공") + role_success += 1 + else: + logger.error(f"행 {row_num}: 사용자 '{login_id}'에게 역할 '{role_name}' 할당 실패") + role_error += 1 + + # 행 처리 결과 집계 + if role_success > 0: + success_count += 1 + elif role_skip > 0: + skip_count += 1 + else: + error_count += 1 + + logger.info(f"행 {row_num}: 처리 완료 (성공: {role_success}, 스킵: {role_skip}, 오류: {role_error})") + + except Exception as e: + logger.error(f"행 {row_num} 처리 중 오류 발생: {e}") + error_count += 1 + + except Exception as e: + logger.error(f"CSV 파일 처리 중 오류 발생: {e}") + return False + + logger.info(f"CSV 파일 처리 완료: 성공={success_count}, 스킵={skip_count}, 오류={error_count}") + return True \ No newline at end of file diff --git a/api/csv_sync_sample/server_processor.py b/api/csv_sync_sample/server_processor.py new file mode 100644 index 0000000..194ae47 --- /dev/null +++ b/api/csv_sync_sample/server_processor.py @@ -0,0 +1,409 @@ +import csv +import requests +import logging +from collections import defaultdict + +class ServerProcessor: + def __init__(self, api_base_url, api_key): + """ + API를 통해 서버 및 서버 그룹을 처리하는 클래스 초기화 + + Args: + api_base_url (str): API 기본 URL (예: 'https://example.com') + api_key (str): API 인증에 사용할 키 + """ + self.api_base_url = api_base_url + self.headers = { + "Authorization": api_key, + "Content-Type": "application/json" + } + # 로깅 설정 + self.logger = logging.getLogger(__name__) + # 서버 UUID를 저장할 딕셔너리 (name -> uuid) + self.server_uuids = {} + # 서버 그룹 UUID를 저장할 딕셔너리 (name -> uuid) + self.server_group_uuids = {} + # Secret Store UUID + self.secret_store_uuid = None + + def get_secret_store_uuid(self): + """ + 첫 번째 Secret Store의 UUID를 가져옴 + + Returns: + str: Secret Store UUID 또는 None (오류 발생 시) + """ + if self.secret_store_uuid: + return self.secret_store_uuid + + url = f"{self.api_base_url}/api/external/v2/security/secret-stores" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + secret_stores = response.json() + if secret_stores and len(secret_stores) > 0: + self.secret_store_uuid = secret_stores[0]["uuid"] + self.logger.info(f"Secret Store UUID: {self.secret_store_uuid}") + return self.secret_store_uuid + else: + self.logger.error("Secret Store를 찾을 수 없습니다.") + return None + except requests.exceptions.RequestException as e: + self.logger.error(f"Secret Store 조회 중 오류 발생: {e}") + return None + + def server_exists(self, name): + """ + 서버 존재 여부 확인 및 UUID 반환 + + Args: + name (str): 확인할 서버 이름 + + Returns: + str: 서버 UUID 또는 None (존재하지 않는 경우) + """ + # 이미 캐시된 경우 바로 반환 + if name in self.server_uuids: + return self.server_uuids[name] + + url = f"{self.api_base_url}/api/external/v2/sac/servers?name={name}" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + data = response.json() + servers = data.get("list", []) + + if servers and len(servers) > 0: + server_uuid = servers[0]["server"]["uuid"] + self.server_uuids[name] = server_uuid + return server_uuid + return None + except requests.exceptions.RequestException as e: + self.logger.error(f"서버 확인 중 오류 발생: {e}") + return None + + def add_server(self, server_data): + """ + 새 서버 추가 + + Args: + server_data (dict): 서버 정보 (host, name, osType, sshPort 등) + + Returns: + str: 서버 UUID 또는 None (오류 발생 시) + """ + url = f"{self.api_base_url}/api/external/v2/sac/servers" + + try: + response = requests.post(url, headers=self.headers, json=server_data) + response.raise_for_status() + + result = response.json() + server_uuid = result.get("uuid") + + if server_uuid: + self.server_uuids[server_data["name"]] = server_uuid + + return server_uuid + except requests.exceptions.RequestException as e: + self.logger.error(f"서버 추가 중 오류 발생: {e}") + return None + + def add_server_tag(self, server_uuid, key, value): + """ + 서버에 태그 추가 + + Args: + server_uuid (str): 서버 UUID + key (str): 태그 키 + value (str): 태그 값 + + Returns: + bool: 성공 여부 + """ + url = f"{self.api_base_url}/api/external/v2/sac/servers/{server_uuid}/tags" + + data = { + "customTags": [ + { + "key": key, + "value": value + } + ], + "overwrite": True + } + + try: + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + return True + except requests.exceptions.RequestException as e: + self.logger.error(f"서버 태그 추가 중 오류 발생: {e}") + return False + + def server_group_exists(self, name): + """ + 서버 그룹 존재 여부 확인 및 UUID 반환 + + Args: + name (str): 확인할 서버 그룹 이름 + + Returns: + str: 서버 그룹 UUID 또는 None (존재하지 않는 경우) + """ + # 이미 캐시된 경우 바로 반환 + if name in self.server_group_uuids: + return self.server_group_uuids[name] + + url = f"{self.api_base_url}/api/external/v2/sac/server-groups?name={name}" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + data = response.json() + groups = data.get("list", []) + + if groups and len(groups) > 0: + group_uuid = groups[0]["uuid"] + self.server_group_uuids[name] = group_uuid + return group_uuid + return None + except requests.exceptions.RequestException as e: + self.logger.error(f"서버 그룹 확인 중 오류 발생: {e}") + return None + + def add_server_group(self, name, server_group_tag, description=""): + """ + 새 서버 그룹 추가 + + Args: + name (str): 서버 그룹 이름 + server_group_tag (str): 서버 그룹 태그 값 + description (str, optional): 설명 + + Returns: + str: 서버 그룹 UUID 또는 None (오류 발생 시) + """ + secret_store_uuid = self.get_secret_store_uuid() or "" + + url = f"{self.api_base_url}/api/external/v2/sac/server-groups" + + data = { + "name": name, + "description": description, + "filterTags": [ + { + "key": "server_group", + "operator": "=", + "value": server_group_tag + } + ], + "secretStoreUuid": secret_store_uuid + } + + try: + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + result = response.json() + group_uuid = result.get("uuid") + + if group_uuid: + self.server_group_uuids[name] = group_uuid + + return group_uuid + except requests.exceptions.RequestException as e: + self.logger.error(f"서버 그룹 추가 중 오류 발생: {e}") + return None + + def add_server_group_account(self, server_group_uuid, account_name): + """ + 서버 그룹에 계정 추가 + + Args: + server_group_uuid (str): 서버 그룹 UUID + account_name (str): 계정 이름 + + Returns: + str: 계정 UUID 또는 None (오류 발생 시) + """ + url = f"{self.api_base_url}/api/external/v2/sac/server-groups/{server_group_uuid}/accounts" + + data = { + "auth": { + "accountId": account_name, + "authType": "PASSWORD", + "password": None, + "sshKeyUuid": "" + }, + "accountType": "QUERYPIE" + } + + try: + response = requests.post(url, headers=self.headers, json=data) + response.raise_for_status() + + result = response.json() + return result.get("uuid") + except requests.exceptions.RequestException as e: + self.logger.error(f"서버 그룹 계정 추가 중 오류 발생: {e}") + return None + +def process_server_csv(csv_file_path, api_base_url, api_key): + """ + CSV 파일을 처리하여 서버, 서버 그룹 및 계정 등록 + + Args: + csv_file_path (str): CSV 파일 경로 + api_base_url (str): API 기본 URL + api_key (str): API 키 + + Returns: + bool: 성공 여부 + """ + processor = ServerProcessor(api_base_url, api_key) + + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + server_success_count = 0 + server_skip_count = 0 + server_error_count = 0 + + # 서버 그룹별 계정 이름 매핑 + server_group_accounts = defaultdict(set) + + logger.info(f"CSV 파일 처리 시작: {csv_file_path}") + + # 1단계: CSV 파일을 읽고 서버 생성 및 태그 추가 + try: + with open(csv_file_path, 'r', encoding='utf-8') as csvfile: + csv_reader = csv.DictReader(csvfile) + + for row_num, row in enumerate(csv_reader, start=2): # 헤더를 제외하고 2부터 시작 + try: + # 필요한 필드가 모두 있는지 확인 + required_fields = ['host', 'name', 'osType', 'sshport', 'server_group', 'account_name'] + if not all(field in row for field in required_fields): + missing = [f for f in required_fields if f not in row] + logger.warning(f"행 {row_num}: 필수 필드 누락 - {', '.join(missing)}") + server_error_count += 1 + continue + + name = row['name'] + server_group = row['server_group'] + account_name = row['account_name'] + + # 계정 이름을 서버 그룹별로 수집 + server_group_accounts[server_group].add(account_name) + + # 서버가 이미 존재하는지 확인 + server_uuid = processor.server_exists(name) + if server_uuid: + logger.info(f"행 {row_num}: 서버 '{name}'가 이미 존재합니다. UUID: {server_uuid}") + server_skip_count += 1 + else: + # 서버 추가 + server_data = { + 'host': row['host'], + 'name': name, + 'osType': row['osType'], + 'sshPort': int(row['sshport']) + } + + server_uuid = processor.add_server(server_data) + if not server_uuid: + logger.error(f"행 {row_num}: 서버 '{name}' 추가 실패") + server_error_count += 1 + continue + + logger.info(f"행 {row_num}: 서버 '{name}' 추가 성공 (UUID: {server_uuid})") + server_success_count += 1 + + # 서버에 태그 추가 + tag_result = processor.add_server_tag( + server_uuid, + "server_role", + server_group + ) + + # 서버 그룹 필터를 위한 태그 추가 + tag_result = processor.add_server_tag( + server_uuid, + "server_group", + server_group + ) + + if tag_result: + logger.info(f"행 {row_num}: 서버 '{name}' 태그 추가 성공") + else: + logger.warning(f"행 {row_num}: 서버 '{name}' 태그 추가 실패") + + except Exception as e: + logger.error(f"행 {row_num} 처리 중 오류 발생: {e}") + server_error_count += 1 + + # 2단계: 고유한 서버 그룹을 생성하고 계정 설정 + group_success_count = 0 + group_skip_count = 0 + group_error_count = 0 + account_success_count = 0 + account_error_count = 0 + + for server_group, account_names in server_group_accounts.items(): + try: + # 서버 그룹이 이미 존재하는지 확인 + group_uuid = processor.server_group_exists(server_group) + + if group_uuid: + logger.info(f"서버 그룹 '{server_group}'가 이미 존재합니다. UUID: {group_uuid}") + group_skip_count += 1 + else: + # 서버 그룹 추가 + group_uuid = processor.add_server_group( + server_group, + server_group, + f"Auto created server group for {server_group}" + ) + + if group_uuid: + logger.info(f"서버 그룹 '{server_group}' 추가 성공 (UUID: {group_uuid})") + group_success_count += 1 + else: + logger.error(f"서버 그룹 '{server_group}' 추가 실패") + group_error_count += 1 + continue + + # 계정 설정 + for account_name in account_names: + account_uuid = processor.add_server_group_account(group_uuid, account_name) + + if account_uuid: + logger.info(f"서버 그룹 '{server_group}' 계정 추가 성공 (UUID: {account_uuid})") + account_success_count += 1 + else: + logger.error(f"서버 그룹 '{server_group}' 계정 추가 실패") + account_error_count += 1 + + except Exception as e: + logger.error(f"서버 그룹 '{server_group}' 처리 중 오류 발생: {e}") + group_error_count += 1 + + logger.info(f"서버 처리 결과: 성공={server_success_count}, 스킵={server_skip_count}, 오류={server_error_count}") + logger.info(f"서버 그룹 처리 결과: 성공={group_success_count}, 스킵={group_skip_count}, 오류={group_error_count}") + logger.info(f"계정 처리 결과: 성공={account_success_count}, 오류={account_error_count}") + + return True + + except Exception as e: + logger.error(f"CSV 파일 처리 중 오류 발생: {e}") + return False diff --git a/api/csv_sync_sample/tests/test_policy_processor.py b/api/csv_sync_sample/tests/test_policy_processor.py new file mode 100644 index 0000000..164f045 --- /dev/null +++ b/api/csv_sync_sample/tests/test_policy_processor.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python3 +""" +정책 프로세서 테스트 +""" + +import unittest +import tempfile +import os +from unittest.mock import patch, MagicMock +from policy_processor import process_policies_from_csv, PolicyProcessor + +class TestPolicyProcessor(unittest.TestCase): + """PolicyProcessor 클래스 테스트""" + + def setUp(self): + """테스트 설정""" + self.api_url = "https://api.example.com" + self.api_key = "test-api-key" + self.processor = PolicyProcessor(self.api_url, self.api_key) + + @patch('requests.get') + def test_policy_exists(self, mock_get): + """policy_exists 메서드 테스트""" + # 정책이 존재하는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "list": [{"name": "existing_policy", "uuid": "policy-uuid"}], + "page": {"totalElements": 1} + } + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.processor.policy_exists("existing_policy") + + # 결과 확인 + self.assertEqual(result, "policy-uuid") + mock_get.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/policies?name=existing_policy", + headers=self.processor.headers + ) + + @patch('requests.get') + def test_policy_not_exists(self, mock_get): + """policy_exists 메서드 테스트 - 정책이 존재하지 않는 경우""" + # 정책이 존재하지 않는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = {"list": [], "page": {"totalElements": 0}} + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.processor.policy_exists("non_existing_policy") + + # 결과 확인 + self.assertIsNone(result) + + @patch('requests.post') + def test_add_policy(self, mock_post): + """add_policy 메서드 테스트""" + # 성공적인 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "uuid": "new-policy-uuid" + } + mock_post.return_value = mock_response + + # 테스트 실행 + result = self.processor.add_policy("new_policy", "New Policy Description") + + # 결과 확인 + self.assertEqual(result, "new-policy-uuid") + mock_post.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/policies", + headers=self.processor.headers, + json={ + "name": "new_policy", + "description": "New Policy Description" + } + ) + + @patch('requests.put') + def test_update_policy_content(self, mock_put): + """update_policy_content 메서드 테스트""" + # 성공적인 응답 모의 + mock_response = MagicMock() + mock_put.return_value = mock_response + + # 테스트 실행 + result = self.processor.update_policy_content( + "policy-uuid", + "test-group", + ["account1"], + "test-justification" + ) + + # 결과 확인 + self.assertTrue(result) + mock_put.assert_called_once() + + # 여러 계정으로 테스트 + mock_put.reset_mock() + result = self.processor.update_policy_content( + "policy-uuid", + "test-group", + ["account1", "account2"], + "test-justification" + ) + + # 결과 확인 + self.assertTrue(result) + mock_put.assert_called_once() + + @patch('requests.get') + def test_role_exists(self, mock_get): + """role_exists 메서드 테스트""" + # 역할이 존재하는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "list": [{"name": "existing_role", "uuid": "role-uuid"}] + } + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.processor.role_exists("existing_role") + + # 결과 확인 + self.assertEqual(result, "role-uuid") + mock_get.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/roles", + headers=self.processor.headers + ) + + @patch('requests.post') + def test_add_role(self, mock_post): + """add_role 메서드 테스트""" + # 성공적인 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "uuid": "new-role-uuid" + } + mock_post.return_value = mock_response + + # 테스트 실행 + result = self.processor.add_role("new_role", "New Role Description") + + # 결과 확인 + self.assertEqual(result, "new-role-uuid") + mock_post.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/roles", + headers=self.processor.headers, + json={ + "name": "new_role", + "description": "New Role Description" + } + ) + + @patch('requests.post') + def test_add_policies_to_role(self, mock_post): + """add_policies_to_role 메서드 테스트""" + # 성공적인 응답 모의 + mock_response = MagicMock() + mock_post.return_value = mock_response + + # 테스트 실행 + result = self.processor.add_policies_to_role( + "role-uuid", + ["policy-uuid1", "policy-uuid2"] + ) + + # 결과 확인 + self.assertTrue(result) + mock_post.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/roles/role-uuid/policies", + headers=self.processor.headers, + json={ + "serverPolicyIdentifiers": ["policy-uuid1", "policy-uuid2"] + } + ) + +class TestProcessPoliciesFromCSV(unittest.TestCase): + """process_policies_from_csv 함수 테스트""" + + def setUp(self): + """테스트 설정""" + # 임시 CSV 파일 생성 + self.temp_dir = tempfile.mkdtemp() + self.csv_file = os.path.join(self.temp_dir, "test_servers.csv") + + with open(self.csv_file, "w", encoding="utf-8") as f: + f.write("host,name,osType,sshport,server_group,account_name\n") + f.write("10.10.10.10,server1,AWS_LINUX,22,WEB_GROUP,account1\n") + f.write("10.10.10.11,server2,AWS_LINUX,22,WEB_GROUP,account2\n") + f.write("10.10.10.12,server3,CENTOS,22,DB_GROUP,account3\n") + + self.api_url = "https://api.example.com" + self.api_key = "test-api-key" + + def tearDown(self): + """테스트 정리""" + # 임시 파일 삭제 + if os.path.exists(self.csv_file): + os.remove(self.csv_file) + if os.path.exists(self.temp_dir): + os.rmdir(self.temp_dir) + + @patch('policy_processor.PolicyProcessor') + def test_process_policies_from_csv(self, MockPolicyProcessor): + """process_policies_from_csv 함수 테스트""" + # PolicyProcessor 모의 객체 설정 + mock_processor = MagicMock() + MockPolicyProcessor.return_value = mock_processor + + # policy_exists 메서드가 정책이 존재하지 않도록 설정 + mock_processor.policy_exists.return_value = None + + # add_policy 메서드는 정책 UUID를 반환하도록 설정 + mock_processor.add_policy.side_effect = ["policy1-uuid", "policy2-uuid"] + + # update_policy_content 메서드는 항상 성공하도록 설정 + mock_processor.update_policy_content.return_value = True + + # role_exists 메서드가 역할이 존재하지 않도록 설정 + mock_processor.role_exists.return_value = None + + # add_role 메서드는 역할 UUID를 반환하도록 설정 + mock_processor.add_role.side_effect = ["role1-uuid", "role2-uuid"] + + # add_policies_to_role 메서드는 항상 성공하도록 설정 + mock_processor.add_policies_to_role.return_value = True + + # 테스트 실행 + result = process_policies_from_csv(self.csv_file, self.api_url, self.api_key) + + # 결과 확인 + self.assertTrue(result) + + # 정확한 수의 호출이 이루어졌는지 확인 + self.assertEqual(mock_processor.policy_exists.call_count, 2) # 두 개의 서버 그룹이 있음 + self.assertEqual(mock_processor.add_policy.call_count, 2) # 두 개의 정책이 생성됨 + self.assertEqual(mock_processor.update_policy_content.call_count, 2) # 두 개의 정책 내용이 업데이트됨 + self.assertEqual(mock_processor.role_exists.call_count, 2) # 두 개의 역할을 확인함 + self.assertEqual(mock_processor.add_role.call_count, 2) # 두 개의 역할이 생성됨 + self.assertEqual(mock_processor.add_policies_to_role.call_count, 2) # 두 개의 역할에 정책이 추가됨 + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/api/csv_sync_sample/tests/test_process_all.py b/api/csv_sync_sample/tests/test_process_all.py new file mode 100644 index 0000000..54d66d1 --- /dev/null +++ b/api/csv_sync_sample/tests/test_process_all.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +""" +통합 프로세스 테스트 +""" + +import unittest +import tempfile +import os +from unittest.mock import patch, MagicMock, call + +# process_all 모듈을 임포트하기 전에 현재 디렉토리를 테스트용 모듈 검색 경로에 추가합니다. +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from process_all import run_process + +class TestProcessAll(unittest.TestCase): + """통합 프로세스 프로그램 테스트""" + + def setUp(self): + """테스트 설정""" + # 임시 CSV 파일 생성 + self.temp_dir = tempfile.mkdtemp() + self.users_csv = os.path.join(self.temp_dir, "test_users.csv") + self.servers_csv = os.path.join(self.temp_dir, "test_servers.csv") + + # 사용자 CSV 파일 생성 + with open(self.users_csv, "w", encoding="utf-8") as f: + f.write("email,loginId,name,password,role\n") + f.write("test1@example.com,test1,Test User 1,password1,ADMIN\n") + + # 서버 CSV 파일 생성 + with open(self.servers_csv, "w", encoding="utf-8") as f: + f.write("host,name,osType,sshport,server_group,account_name\n") + f.write("10.0.0.1,server1,LINUX,22,TEST_GROUP,testuser\n") + + def tearDown(self): + """테스트 정리""" + # 임시 파일 삭제 + for file_path in [self.users_csv, self.servers_csv]: + if os.path.exists(file_path): + os.remove(file_path) + if os.path.exists(self.temp_dir): + os.rmdir(self.temp_dir) + + @patch('subprocess.run') + def test_run_process_success(self, mock_run): + """run_process 함수 성공 테스트""" + # 성공적인 프로세스 실행 모의 + mock_result = MagicMock() + mock_result.returncode = 0 + mock_run.return_value = mock_result + + # 테스트 실행 + command = ["echo", "test"] + result = run_process(command, "테스트 명령") + + # 결과 확인 + self.assertTrue(result) + mock_run.assert_called_once_with( + command, + stdout=unittest.mock.ANY, + stderr=unittest.mock.ANY, + text=True, + check=False + ) + + @patch('subprocess.run') + def test_run_process_failure(self, mock_run): + """run_process 함수 실패 테스트""" + # 실패한 프로세스 실행 모의 + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stderr = "오류 메시지" + mock_run.return_value = mock_result + + # 테스트 실행 + command = ["echo", "test"] + result = run_process(command, "테스트 명령") + + # 결과 확인 + self.assertFalse(result) + mock_run.assert_called_once() + + @patch('subprocess.run') + def test_run_process_exception(self, mock_run): + """run_process 함수 예외 발생 테스트""" + # 예외 발생 모의 + mock_run.side_effect = Exception("테스트 예외") + + # 테스트 실행 + command = ["echo", "test"] + result = run_process(command, "테스트 명령") + + # 결과 확인 + self.assertFalse(result) + mock_run.assert_called_once() + + @patch('process_all.run_process') + @patch('sys.executable', 'python') + def test_main_function_success(self, mock_run_process): + """main 함수의 성공적인 실행 테스트""" + # 모든 프로세스가 성공적으로 실행되도록 설정 + mock_run_process.return_value = True + + # 테스트를 위한 명령행 인자 설정 + test_args = [ + 'process_all.py', + self.users_csv, + self.servers_csv, + '--api-url', 'https://api.example.com', + '--api-key', 'test-api-key' + ] + + # main 함수 실행 모의 + with patch('sys.argv', test_args): + from process_all import main + result = main() + + # 결과 확인 + self.assertEqual(result, 0) + + # 각 프로세스에 대한 호출 확인 + expected_calls = [ + call(['python', 'process_users.py', self.users_csv, '--api-url', 'https://api.example.com', '--api-key', 'test-api-key'], "사용자 등록 처리"), + call(['python', 'process_servers.py', self.servers_csv, '--api-url', 'https://api.example.com', '--api-key', 'test-api-key'], "서버 등록 및 그룹 처리"), + call(['python', 'process_policies.py', self.servers_csv, '--api-url', 'https://api.example.com', '--api-key', 'test-api-key'], "정책 및 역할 생성 처리"), + call(['python', 'process_roles.py', self.users_csv, '--api-url', 'https://api.example.com', '--api-key', 'test-api-key'], "역할 할당 처리") + ] + + self.assertEqual(mock_run_process.call_count, 4) + mock_run_process.assert_has_calls(expected_calls) + + @patch('process_all.run_process') + def test_main_function_failure(self, mock_run_process): + """main 함수의 실패 테스트 (두 번째 프로세스 실패)""" + # 두 번째 프로세스가 실패하도록 설정 + mock_run_process.side_effect = [True, False] + + # 테스트를 위한 명령행 인자 설정 + test_args = [ + 'process_all.py', + self.users_csv, + self.servers_csv, + '--api-url', 'https://api.example.com', + '--api-key', 'test-api-key' + ] + + # main 함수 실행 모의 + with patch('sys.argv', test_args): + from process_all import main + result = main() + + # 결과 확인 + self.assertEqual(result, 1) + + # 두 번의 호출만 발생해야 함 + self.assertEqual(mock_run_process.call_count, 2) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/api/csv_sync_sample/tests/test_role_assigner.py b/api/csv_sync_sample/tests/test_role_assigner.py new file mode 100644 index 0000000..f08d736 --- /dev/null +++ b/api/csv_sync_sample/tests/test_role_assigner.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python3 +""" +역할 할당 프로세서 테스트 +""" + +import unittest +import tempfile +import os +from unittest.mock import patch, MagicMock +from role_assigner import process_role_assignment, RoleAssigner + +class TestRoleAssigner(unittest.TestCase): + """RoleAssigner 클래스 테스트""" + + def setUp(self): + """테스트 설정""" + self.api_url = "https://api.example.com" + self.api_key = "test-api-key" + self.assigner = RoleAssigner(self.api_url, self.api_key) + + @patch('requests.get') + def test_find_user(self, mock_get): + """find_user 메서드 테스트""" + # 사용자가 존재하는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "list": [{"loginId": "existing_user", "uuid": "user-uuid"}], + "page": {"totalElements": 1} + } + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.assigner.find_user("existing_user") + + # 결과 확인 + self.assertEqual(result, "user-uuid") + mock_get.assert_called_once_with( + f"{self.api_url}/api/external/v2/users?loginId=existing_user", + headers=self.assigner.headers + ) + + @patch('requests.get') + def test_find_user_not_exists(self, mock_get): + """find_user 메서드 테스트 - 사용자가 존재하지 않는 경우""" + # 사용자가 존재하지 않는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = {"list": [], "page": {"totalElements": 0}} + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.assigner.find_user("non_existing_user") + + # 결과 확인 + self.assertIsNone(result) + + @patch('requests.get') + def test_find_role(self, mock_get): + """find_role 메서드 테스트""" + # 역할이 존재하는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "list": [{"name": "existing_role", "uuid": "role-uuid"}] + } + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.assigner.find_role("existing_role") + + # 결과 확인 + self.assertEqual(result, "role-uuid") + mock_get.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/roles?name=existing_role", + headers=self.assigner.headers + ) + + @patch('requests.get') + def test_find_role_not_exists(self, mock_get): + """find_role 메서드 테스트 - 역할이 존재하지 않는 경우""" + # 역할이 존재하지 않는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = {"list": []} + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.assigner.find_role("non_existing_role") + + # 결과 확인 + self.assertIsNone(result) + + @patch('requests.get') + def test_get_user_roles(self, mock_get): + """get_user_roles 메서드 테스트""" + # 사용자에게 할당된 역할이 있는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "list": [ + {"serverRoleUuid": "role1-uuid"}, + {"serverRoleUuid": "role2-uuid"} + ] + } + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.assigner.get_user_roles("user-uuid") + + # 결과 확인 + self.assertEqual(result, {"role1-uuid", "role2-uuid"}) + mock_get.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/access-controls/user-uuid/roles", + headers=self.assigner.headers + ) + + @patch('datetime.date') + @patch('requests.post') + def test_assign_role(self, mock_post, mock_date): + """assign_role 메서드 테스트""" + # 날짜 모의 + today = MagicMock() + today.year = 2023 + today_replace_mock = MagicMock() + today_replace_mock.isoformat.return_value = '2023-11-06T00:00:00Z' + today.replace.return_value = today_replace_mock + mock_date.today.return_value = today + + # 성공적인 응답 모의 + mock_response = MagicMock() + mock_post.return_value = mock_response + + # 테스트 실행 + result = self.assigner.assign_role("user-uuid", "role-uuid") + + # 결과 확인 + self.assertTrue(result) + mock_post.assert_called_once() + + # 호출 인자 확인 + call_args = mock_post.call_args + url = call_args[0][0] + self.assertEqual(url, f"{self.api_url}/api/external/v2/sac/access-controls/user-uuid/roles") + + # 캐시 업데이트 확인 + self.assertIn("user-uuid", self.assigner.assigned_roles) + self.assertIn("role-uuid", self.assigner.assigned_roles["user-uuid"]) + +class TestProcessRoleAssignment(unittest.TestCase): + """process_role_assignment 함수 테스트""" + + def setUp(self): + """테스트 설정""" + # 임시 CSV 파일 생성 + self.temp_dir = tempfile.mkdtemp() + self.csv_file = os.path.join(self.temp_dir, "test_users.csv") + + with open(self.csv_file, "w", encoding="utf-8") as f: + f.write("email,loginId,name,password,role\n") + f.write("test1@example.com,test1,Test User 1,password1,ADMIN\n") + f.write("test2@example.com,test2,Test User 2,password2,USER\n") + f.write("test3@example.com,test3,Test User 3,password3,ADMIN;USER;MANAGER\n") + + self.api_url = "https://api.example.com" + self.api_key = "test-api-key" + + def tearDown(self): + """테스트 정리""" + # 임시 파일 삭제 + if os.path.exists(self.csv_file): + os.remove(self.csv_file) + if os.path.exists(self.temp_dir): + os.rmdir(self.temp_dir) + + @patch('role_assigner.RoleAssigner') + def test_process_role_assignment(self, MockRoleAssigner): + """process_role_assignment 함수 테스트""" + # RoleAssigner 모의 객체 설정 + mock_assigner = MagicMock() + MockRoleAssigner.return_value = mock_assigner + + # find_user 메서드가 사용자를 찾도록 설정 + mock_assigner.find_user.side_effect = ["user1-uuid", "user2-uuid", "user3-uuid"] + + # find_role 메서드가 역할을 찾거나 찾지 못하도록 설정 + mock_assigner.find_role.side_effect = [ + "admin-role-uuid", # test1의 ADMIN 역할 + None, # test2의 USER 역할 (찾지 못함) + "admin-role-uuid", # test3의 ADMIN 역할 + "user-role-uuid", # test3의 USER 역할 + "manager-role-uuid" # test3의 MANAGER 역할 + ] + + # get_user_roles 메서드가 기존 역할을 반환하도록 설정 + # 첫 번째 사용자는 역할이 없음 + # 세 번째 사용자는 ADMIN 역할이 이미 있음 + mock_assigner.get_user_roles.side_effect = [ + set(), # test1 (역할 없음) + set(), # test2 (역할 없음) + {"admin-role-uuid"}, # test3 (ADMIN 역할 이미 있음) + ] + + # assign_role 메서드는 성공적으로 역할을 할당하도록 설정 + mock_assigner.assign_role.side_effect = [True, True, True] + + # 테스트 실행 + result = process_role_assignment(self.csv_file, self.api_url, self.api_key) + + # 결과 확인 + self.assertTrue(result) + + # 메서드 호출 확인 + self.assertEqual(mock_assigner.find_user.call_count, 3) + self.assertEqual(mock_assigner.find_role.call_count, 5) + self.assertEqual(mock_assigner.get_user_roles.call_count, 3) + self.assertEqual(mock_assigner.assign_role.call_count, 3) + + # 첫 번째 사용자에 대한 호출 확인 + mock_assigner.find_user.assert_any_call("test1") + mock_assigner.find_role.assert_any_call("ADMIN role") + + # 두 번째 사용자에 대한 호출 확인 + mock_assigner.find_user.assert_any_call("test2") + mock_assigner.find_role.assert_any_call("USER role") + + # 세 번째 사용자에 대한 호출 확인 + mock_assigner.find_user.assert_any_call("test3") + mock_assigner.find_role.assert_any_call("ADMIN role") + mock_assigner.find_role.assert_any_call("USER role") + mock_assigner.find_role.assert_any_call("MANAGER role") + + # 역할 할당 호출 확인 + mock_assigner.assign_role.assert_any_call("user1-uuid", "admin-role-uuid") + mock_assigner.assign_role.assert_any_call("user3-uuid", "user-role-uuid") + mock_assigner.assign_role.assert_any_call("user3-uuid", "manager-role-uuid") + + @patch('role_assigner.RoleAssigner') + def test_process_empty_role(self, MockRoleAssigner): + """빈 역할 값이나 공백만 있는 역할 값 처리 테스트""" + # 빈 역할 값이나 공백만 있는 역할 값을 가진 CSV 파일 생성 + empty_role_csv = os.path.join(self.temp_dir, "empty_roles.csv") + with open(empty_role_csv, "w", encoding="utf-8") as f: + f.write("email,loginId,name,password,role\n") + f.write("test1@example.com,test1,Test User 1,password1,\n") + f.write("test2@example.com,test2,Test User 2,password2, \n") + f.write("test3@example.com,test3,Test User 3,password3,;\n") + f.write("test4@example.com,test4,Test User 4,password4, ; \n") + + # RoleAssigner 모의 객체 설정 + mock_assigner = MagicMock() + MockRoleAssigner.return_value = mock_assigner + + # 테스트 실행 + result = process_role_assignment(empty_role_csv, self.api_url, self.api_key) + + # 결과 확인 + self.assertTrue(result) + + # find_user 메서드가 호출되지 않아야 함 (빈 역할 값은 건너뛰기 됨) + mock_assigner.find_user.assert_not_called() + mock_assigner.find_role.assert_not_called() + mock_assigner.get_user_roles.assert_not_called() + mock_assigner.assign_role.assert_not_called() + + # 임시 파일 삭제 + if os.path.exists(empty_role_csv): + os.remove(empty_role_csv) + +if __name__ == '__main__': + unittest.main() diff --git a/api/csv_sync_sample/tests/test_server_processor.py b/api/csv_sync_sample/tests/test_server_processor.py new file mode 100644 index 0000000..f74362b --- /dev/null +++ b/api/csv_sync_sample/tests/test_server_processor.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +""" +서버 프로세서 테스트 +""" + +import unittest +import tempfile +import os +from unittest.mock import patch, MagicMock +from server_processor import process_server_csv, ServerProcessor + +class TestServerProcessor(unittest.TestCase): + """ServerProcessor 클래스 테스트""" + + def setUp(self): + """테스트 설정""" + self.api_url = "https://api.example.com" + self.api_key = "test-api-key" + self.processor = ServerProcessor(self.api_url, self.api_key) + + @patch('requests.get') + def test_server_exists(self, mock_get): + """server_exists 메서드 테스트""" + # 서버가 존재하는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "list": [{"server": {"name": "existing_server", "uuid": "server-uuid"}}], + "page": {"totalElements": 1} + } + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.processor.server_exists("existing_server") + + # 결과 확인 + self.assertEqual(result, "server-uuid") + mock_get.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/servers?name=existing_server", + headers=self.processor.headers + ) + + @patch('requests.get') + def test_server_not_exists(self, mock_get): + """server_exists 메서드 테스트 - 서버가 존재하지 않는 경우""" + # 서버가 존재하지 않는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = {"list": [], "page": {"totalElements": 0}} + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.processor.server_exists("non_existing_server") + + # 결과 확인 + self.assertIsNone(result) + + @patch('requests.post') + def test_add_server(self, mock_post): + """add_server 메서드 테스트""" + # 성공적인 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "uuid": "new-server-uuid", + "name": "new_server", + "host": "10.10.10.10", + "osType": "AWS_LINUX" + } + mock_post.return_value = mock_response + + # 테스트 데이터 + server_data = { + "host": "10.10.10.10", + "name": "new_server", + "osType": "AWS_LINUX", + "sshPort": 22 + } + + # 테스트 실행 + result = self.processor.add_server(server_data) + + # 결과 확인 + self.assertEqual(result, "new-server-uuid") + mock_post.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/servers", + headers=self.processor.headers, + json=server_data + ) + + @patch('requests.get') + def test_get_secret_store_uuid(self, mock_get): + """get_secret_store_uuid 메서드 테스트""" + # 성공적인 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = [ + {"name": "Secret Store 1", "uuid": "secret-store-uuid"} + ] + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.processor.get_secret_store_uuid() + + # 결과 확인 + self.assertEqual(result, "secret-store-uuid") + self.assertEqual(self.processor.secret_store_uuid, "secret-store-uuid") # 캐시 확인 + mock_get.assert_called_once_with( + f"{self.api_url}/api/external/v2/security/secret-stores", + headers=self.processor.headers + ) + + # 두 번째 호출은 캐시된 값을 반환해야 함 + mock_get.reset_mock() + result = self.processor.get_secret_store_uuid() + self.assertEqual(result, "secret-store-uuid") + mock_get.assert_not_called() # API 호출이 발생하지 않아야 함 + + @patch('requests.post') + def test_add_server_tag(self, mock_post): + """add_server_tag 메서드 테스트""" + # 성공적인 응답 모의 + mock_response = MagicMock() + mock_post.return_value = mock_response + + # 테스트 실행 + result = self.processor.add_server_tag("server-uuid", "test-key", "test-value") + + # 결과 확인 + self.assertTrue(result) + mock_post.assert_called_once_with( + f"{self.api_url}/api/external/v2/sac/servers/server-uuid/tags", + headers=self.processor.headers, + json={ + "customTags": [ + { + "key": "test-key", + "value": "test-value" + } + ], + "overwrite": True + } + ) + +class TestProcessServerCSV(unittest.TestCase): + """process_server_csv 함수 테스트""" + + def setUp(self): + """테스트 설정""" + # 임시 CSV 파일 생성 + self.temp_dir = tempfile.mkdtemp() + self.csv_file = os.path.join(self.temp_dir, "test_servers.csv") + + with open(self.csv_file, "w", encoding="utf-8") as f: + f.write("host,name,osType,sshport,server_group,account_name\n") + f.write("10.10.10.10,server1,AWS_LINUX,22,WEB_GROUP,ec2-user\n") + f.write("10.10.10.11,server2,AWS_LINUX,22,WEB_GROUP,ec2-user\n") + + self.api_url = "https://api.example.com" + self.api_key = "test-api-key" + + def tearDown(self): + """테스트 정리""" + # 임시 파일 삭제 + if os.path.exists(self.csv_file): + os.remove(self.csv_file) + if os.path.exists(self.temp_dir): + os.rmdir(self.temp_dir) + + @patch('server_processor.ServerProcessor') + def test_process_server_csv(self, MockServerProcessor): + """process_server_csv 함수 테스트""" + # ServerProcessor 모의 객체 설정 + mock_processor = MagicMock() + MockServerProcessor.return_value = mock_processor + + # server_exists 메서드가 첫 번째 서버는 이미 존재하고, 두 번째 서버는 존재하지 않도록 설정 + mock_processor.server_exists.side_effect = ["server1-uuid", None] + + # add_server 메서드는 성공적으로 서버를 추가하도록 설정 + mock_processor.add_server.return_value = "server2-uuid" + + # add_server_tag 메서드는 항상 성공하도록 설정 + mock_processor.add_server_tag.return_value = True + + # get_secret_store_uuid 메서드는 Secret Store UUID를 반환하도록 설정 + mock_processor.get_secret_store_uuid.return_value = "secret-store-uuid" + + # server_group_exists 메서드는 서버 그룹이 존재하지 않도록 설정 + mock_processor.server_group_exists.return_value = None + + # add_server_group 메서드는 성공적으로 서버 그룹을 추가하도록 설정 + mock_processor.add_server_group.return_value = "server-group-uuid" + + # add_server_group_account 메서드는 성공적으로 계정을 추가하도록 설정 + mock_processor.add_server_group_account.return_value = "account-uuid" + + # 테스트 실행 + result = process_server_csv(self.csv_file, self.api_url, self.api_key) + + # 결과 확인 + self.assertTrue(result) + + # server_exists가 두 번 호출되었는지 확인 + self.assertEqual(mock_processor.server_exists.call_count, 2) + + # add_server가 한 번만 호출되었는지 확인 (두 번째 서버만) + self.assertEqual(mock_processor.add_server.call_count, 1) + expected_server_data = { + "host": "10.10.10.11", + "name": "server2", + "osType": "AWS_LINUX", + "sshPort": 22 + } + mock_processor.add_server.assert_called_once_with(expected_server_data) + + # add_server_tag가 총 4번 호출되었는지 확인 (각 서버마다 2번씩) + self.assertEqual(mock_processor.add_server_tag.call_count, 4) + + # server_group_exists가 한 번 호출되었는지 확인 + self.assertEqual(mock_processor.server_group_exists.call_count, 1) + + # add_server_group이 한 번 호출되었는지 확인 + self.assertEqual(mock_processor.add_server_group.call_count, 1) + + # add_server_group_account가 한 번 호출되었는지 확인 + self.assertEqual(mock_processor.add_server_group_account.call_count, 1) + +if __name__ == '__main__': + unittest.main() diff --git a/api/csv_sync_sample/tests/test_user_processor.py b/api/csv_sync_sample/tests/test_user_processor.py new file mode 100644 index 0000000..d91136f --- /dev/null +++ b/api/csv_sync_sample/tests/test_user_processor.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +""" +사용자 프로세서 테스트 +""" + +import unittest +import tempfile +import os +from unittest.mock import patch, MagicMock +from user_processor import process_users_csv, UserProcessor + +class TestUserProcessor(unittest.TestCase): + """UserProcessor 클래스 테스트""" + + def setUp(self): + """테스트 설정""" + self.api_url = "https://api.example.com" + self.api_key = "test-api-key" + self.processor = UserProcessor(self.api_url, self.api_key) + + @patch('requests.get') + def test_user_exists(self, mock_get): + """user_exists 메서드 테스트""" + # 사용자가 존재하는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "list": [{"loginId": "existing_user", "uuid": "user-uuid"}], + "page": {"totalElements": 1} + } + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.processor.user_exists("existing_user") + + # 결과 확인 + self.assertTrue(result) + mock_get.assert_called_once_with( + f"{self.api_url}/api/external/v2/users?loginId=existing_user", + headers=self.processor.headers + ) + + @patch('requests.get') + def test_user_not_exists(self, mock_get): + """user_exists 메서드 테스트 - 사용자가 존재하지 않는 경우""" + # 사용자가 존재하지 않는 경우 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = {"list": [], "page": {"totalElements": 0}} + mock_get.return_value = mock_response + + # 테스트 실행 + result = self.processor.user_exists("non_existing_user") + + # 결과 확인 + self.assertFalse(result) + + @patch('requests.post') + def test_add_user(self, mock_post): + """add_user 메서드 테스트""" + # 성공적인 응답 모의 + mock_response = MagicMock() + mock_response.json.return_value = { + "uuid": "new-user-uuid", + "loginId": "new_user", + "email": "new_user@example.com", + "name": "New User" + } + mock_post.return_value = mock_response + + # 테스트 데이터 + user_data = { + "email": "new_user@example.com", + "loginId": "new_user", + "name": "New User", + "password": "password123" + } + + # 테스트 실행 + result = self.processor.add_user(user_data) + + # 결과 확인 + self.assertEqual(result["uuid"], "new-user-uuid") + mock_post.assert_called_once_with( + f"{self.api_url}/api/external/v2/users", + headers=self.processor.headers, + json=user_data + ) + +class TestProcessUsersCSV(unittest.TestCase): + """process_users_csv 함수 테스트""" + + def setUp(self): + """테스트 설정""" + # 임시 CSV 파일 생성 + self.temp_dir = tempfile.mkdtemp() + self.csv_file = os.path.join(self.temp_dir, "test_users.csv") + + with open(self.csv_file, "w", encoding="utf-8") as f: + f.write("email,loginId,name,password,role\n") + f.write("test1@example.com,test1,Test User 1,password1,USER\n") + f.write("test2@example.com,test2,Test User 2,password2,ADMIN\n") + + self.api_url = "https://api.example.com" + self.api_key = "test-api-key" + + def tearDown(self): + """테스트 정리""" + # 임시 파일 삭제 + if os.path.exists(self.csv_file): + os.remove(self.csv_file) + if os.path.exists(self.temp_dir): + os.rmdir(self.temp_dir) + + @patch('user_processor.UserProcessor') + def test_process_users_csv(self, MockUserProcessor): + """process_users_csv 함수 테스트""" + # UserProcessor 모의 객체 설정 + mock_processor = MagicMock() + MockUserProcessor.return_value = mock_processor + + # user_exists 메서드가 첫 번째 사용자는 이미 존재하고, 두 번째 사용자는 존재하지 않도록 설정 + mock_processor.user_exists.side_effect = [True, False] + + # add_user 메서드는 성공적으로 사용자를 추가하도록 설정 + mock_processor.add_user.return_value = {"uuid": "new-user-uuid"} + + # 테스트 실행 + result = process_users_csv(self.csv_file, self.api_url, self.api_key) + + # 결과 확인 + self.assertTrue(result) + + # user_exists가 두 번 호출되었는지 확인 + self.assertEqual(mock_processor.user_exists.call_count, 2) + + # add_user가 한 번만 호출되었는지 확인 (두 번째 사용자만) + self.assertEqual(mock_processor.add_user.call_count, 1) + + # 두 번째 사용자에 대해 add_user가 올바른 데이터로 호출되었는지 확인 + expected_user_data = { + "email": "test2@example.com", + "loginId": "test2", + "name": "Test User 2", + "password": "password2", + "role": ["ADMIN"] + } + mock_processor.add_user.assert_called_once_with(expected_user_data) + +if __name__ == '__main__': + unittest.main() diff --git a/api/csv_sync_sample/user_processor.py b/api/csv_sync_sample/user_processor.py new file mode 100644 index 0000000..f083917 --- /dev/null +++ b/api/csv_sync_sample/user_processor.py @@ -0,0 +1,151 @@ +import csv +import requests +import logging + +class UserProcessor: + def __init__(self, api_base_url, api_key): + """ + API를 통해 사용자를 처리하는 클래스 초기화 + + Args: + api_base_url (str): API 기본 URL (예: 'https://example.com') + api_key (str): API 인증에 사용할 키 + """ + self.api_base_url = api_base_url + self.headers = { + "Authorization": api_key, + "Content-Type": "application/json" + } + # 로깅 설정 + self.logger = logging.getLogger(__name__) + + def user_exists(self, login_id): + """ + 사용자 존재 여부 확인 + + Args: + login_id (str): 확인할 사용자 ID + + Returns: + bool: 사용자 존재 여부 + """ + url = f"{self.api_base_url}/api/external/v2/users?loginId={login_id}" + + try: + response = requests.get(url, headers=self.headers) + response.raise_for_status() + + data = response.json() + user_list = data.get("list", []) + + # loginId가 정확히 일치하는 사용자 찾기 + for user in user_list: + if user.get("loginId") == login_id: + return True + return False + + except requests.exceptions.RequestException as e: + self.logger.error(f"사용자 확인 중 오류 발생: {e}") + return False + def add_user(self, user_data): + """ + 새 사용자 추가 + + Args: + user_data (dict): 사용자 정보 (email, loginId, name, password) + + Returns: + dict: 응답 데이터 또는 None (오류 발생 시) + """ + url = f"{self.api_base_url}/api/external/v2/users" + + try: + response = requests.post(url, headers=self.headers, json=user_data) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + self.logger.error(f"사용자 추가 중 오류 발생: {e}") + return None + +def process_users_csv(csv_file_path, api_base_url, api_key): + """ + CSV 파일을 처리하여 사용자 등록 + + Args: + csv_file_path (str): CSV 파일 경로 + api_base_url (str): API 기본 URL + api_key (str): API 키 + """ + processor = UserProcessor(api_base_url, api_key) + + # 로깅 설정 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + success_count = 0 + error_count = 0 + skip_count = 0 + + logger.info(f"CSV 파일 처리 시작: {csv_file_path}") + + try: + with open(csv_file_path, 'r', encoding='utf-8') as csvfile: + csv_reader = csv.DictReader(csvfile) + + for row_num, row in enumerate(csv_reader, start=2): # 헤더를 제외하고 2부터 시작 + try: + # 필요한 필드가 모두 있는지 확인 + required_fields = ['email', 'loginId', 'name', 'password'] + if not all(field in row for field in required_fields): + missing = [f for f in required_fields if f not in row] + logger.warning(f"행 {row_num}: 필수 필드 누락 - {', '.join(missing)}") + error_count += 1 + continue + + login_id = row['loginId'] + + # 사용자가 이미 존재하는지 확인 + if processor.user_exists(login_id): + logger.warning(f"행 {row_num}: 사용자 '{login_id}'가 이미 존재합니다. 건너뜁니다.") + skip_count += 1 + continue + + # 사용자 추가 + user_data = { + 'email': row['email'], + 'loginId': login_id, + 'name': row['name'], + 'password': row['password'] + } + + # role 필드가 있으면 추가 + if 'role' in row and row['role']: + # 세미콜론으로 구분된 역할들을 리스트로 변환 + if ';' in row['role']: + roles = [role.strip() for role in row['role'].split(';')] + logger.info(f"행 {row_num}: {len(roles)}개의 역할이 지정되었습니다: {roles}") + user_data['role'] = roles + else: + user_data['role'] = [row['role']] + + result = processor.add_user(user_data) + if result: + logger.info(f"행 {row_num}: 사용자 '{login_id}' 추가 성공 (UUID: {result.get('uuid', 'N/A')})") + success_count += 1 + else: + logger.error(f"행 {row_num}: 사용자 '{login_id}' 추가 실패") + error_count += 1 + + except Exception as e: + logger.error(f"행 {row_num} 처리 중 오류 발생: {e}") + error_count += 1 + + except Exception as e: + logger.error(f"CSV 파일 처리 중 오류 발생: {e}") + return False + + logger.info(f"CSV 파일 처리 완료: 성공={success_count}, 스킵={skip_count}, 오류={error_count}") + return True \ No newline at end of file