diff --git a/cbrain_cli/cli_utils.py b/cbrain_cli/cli_utils.py
index a79aadb..48b17b5 100644
--- a/cbrain_cli/cli_utils.py
+++ b/cbrain_cli/cli_utils.py
@@ -1,6 +1,6 @@
-import datetime
import functools
import json
+import re
import urllib.error
# import importlib.metadata
@@ -27,13 +27,6 @@ def is_authenticated():
"""
Check if the user is authenticated.
"""
-
- if cbrain_timestamp:
- timestamp_obj = datetime.datetime.fromisoformat(cbrain_timestamp)
- if datetime.datetime.now() - timestamp_obj > datetime.timedelta(days=1):
- print("Session expired. Please log in again using 'cbrain login'.")
- CREDENTIALS_FILE.unlink()
- return False
# Check if user is logged in.
if not api_token or not cbrain_url or not user_id:
print("Not logged in. Use 'cbrain login' to login first.")
@@ -41,6 +34,37 @@ def is_authenticated():
return True
+def get_status_code_description(status_code):
+ """
+ Get a user-friendly description for HTTP status codes.
+
+ Parameters
+ ----------
+ status_code : int
+ HTTP status code
+
+ Returns
+ -------
+ str
+ Description of the status code category
+ """
+ if 400 <= status_code < 500:
+ if status_code == 401:
+ return "Authentication error (401)"
+ elif status_code == 403:
+ return "Access forbidden (403)"
+ elif status_code == 404:
+ return "Resource not found (404)"
+ elif status_code == 422:
+ return "Validation error (422)"
+ else:
+ return f"Client error ({status_code})"
+ elif 500 <= status_code < 600:
+ return f"Server error ({status_code})"
+ else:
+ return f"HTTP error ({status_code})"
+
+
def handle_connection_error(error):
"""
Handle connection errors with informative messages including server URL.
@@ -55,20 +79,75 @@ def handle_connection_error(error):
None
Prints appropriate error messages
"""
- if isinstance(error, urllib.error.URLError):
+ if isinstance(error, urllib.error.HTTPError):
+ status_description = get_status_code_description(error.code)
+
+ if error.code == 401:
+ print(f"{status_description}: {error.reason}")
+ print("Try with Authorized Access")
+ elif error.code == 404 or error.code == 422 or error.code == 500:
+ # Try to extract specific error message from response
+ try:
+ # Check if the error response has already been read
+ if hasattr(error, "read"):
+ error_response = error.read().decode("utf-8")
+ elif hasattr(error, "response") and hasattr(error.response, "read"):
+ error_response = error.response.read().decode("utf-8")
+ else:
+ error_response = ""
+
+ # Try to parse as JSON first (for 422 validation errors)
+ try:
+ error_data = json.loads(error_response)
+ if isinstance(error_data, dict):
+ # Look for common error message fields
+ error_msg = (
+ error_data.get("message")
+ or error_data.get("error")
+ or error_data.get("notice")
+ or str(error_data)
+ )
+ print(f"{status_description}: {error_msg}")
+ return
+ except json.JSONDecodeError:
+ # Not JSON, try HTML parsing
+ pass
+
+ # Extract header h1 and h2 content from HTML
+ header_pattern = r""
+ header_match = re.search(header_pattern, error_response, re.DOTALL)
+ h2_match = re.search(r"
(.*?)
", error_response)
+
+ error_parts = []
+
+ if header_match:
+ header_text = header_match.group(1).strip()
+ # Decode HTML entities
+ header_text = header_text.replace("'", "'").replace(""", '"')
+ error_parts.append(header_text)
+
+ if h2_match:
+ h2_text = h2_match.group(1).strip()
+ # Decode HTML entities
+ h2_text = h2_text.replace("'", "'").replace(""", '"')
+ # Remove SQL WHERE clause details
+ h2_text = re.sub(r"\s*\[WHERE.*?\]", "", h2_text)
+ error_parts.append(h2_text)
+
+ if error_parts:
+ print(f"{status_description}: " + " - ".join(error_parts))
+ else:
+ print(f"{status_description}: {error.reason}")
+ except Exception:
+ print(f"{status_description}: {error.reason}")
+ else:
+ print(f"{status_description}: {error.reason}")
+ elif isinstance(error, urllib.error.URLError):
if "Connection refused" in str(error):
print(f"Error: Cannot connect to CBRAIN server at {cbrain_url}")
print("Please check if the CBRAIN server is running and accessible.")
else:
print(f"Connection failed: {error.reason}")
- elif isinstance(error, urllib.error.HTTPError):
- print(f"Request failed: HTTP {error.code} - {error.reason}")
- if error.code == 401:
- print("Invalid username or password")
- elif error.code == 404:
- print("Resource not found")
- elif error.code == 500:
- print("Internal server error")
else:
print(f"Connection error: {str(error)}")
diff --git a/cbrain_cli/data/background_activities.py b/cbrain_cli/data/background_activities.py
index 0c82ad2..d8973d7 100644
--- a/cbrain_cli/data/background_activities.py
+++ b/cbrain_cli/data/background_activities.py
@@ -64,18 +64,7 @@ def show_background_activity(args):
background_activity_endpoint, data=None, headers=headers, method="GET"
)
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- activity_data = json.loads(data)
- return activity_data
-
- except urllib.error.HTTPError as e:
- if e.code == 404:
- print(f"Error: Background activity with ID {activity_id} not found")
- else:
- print(f"Error: HTTP {e.code} - {e.reason}")
- return None
- except Exception as e:
- print(f"Error getting background activity details: {str(e)}")
- return None
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ activity_data = json.loads(data)
+ return activity_data
diff --git a/cbrain_cli/data/data_providers.py b/cbrain_cli/data/data_providers.py
index c88fb2a..56c26af 100644
--- a/cbrain_cli/data/data_providers.py
+++ b/cbrain_cli/data/data_providers.py
@@ -34,24 +34,16 @@ def show_data_provider(args):
data_provider_endpoint, data=None, headers=headers, method="GET"
)
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- provider_data = json.loads(data)
-
- if provider_data.get("error"):
- print(f"Error: {provider_data.get('error')}")
- return None
-
- return provider_data
-
- except urllib.error.HTTPError as e:
- if e.code == 404:
- print(f"Error: Data provider with ID {data_provider_id} not found")
- else:
- print(f"Error: HTTP {e.code} - {e.reason}")
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ provider_data = json.loads(data)
+
+ if provider_data.get("error"):
+ print(f"Error: {provider_data.get('error')}")
return None
+ return provider_data
+
def list_data_providers(args):
"""
diff --git a/cbrain_cli/data/files.py b/cbrain_cli/data/files.py
index bb6d491..d5a0203 100644
--- a/cbrain_cli/data/files.py
+++ b/cbrain_cli/data/files.py
@@ -48,7 +48,7 @@ def upload_file(args):
Parameters
----------
args : argparse.Namespace
- Command line arguments including data_provider, file_type, file_path, and group_id
+ Command line arguments including data_provider, file_path, and group_id
Returns
-------
@@ -94,12 +94,6 @@ def upload_file(args):
body_parts.append("")
body_parts.append(str(group_id))
- # Add file_type field.
- body_parts.append(f"--{boundary}")
- body_parts.append('Content-Disposition: form-data; name="file_type"')
- body_parts.append("")
- body_parts.append(args.file_type)
-
# Add file data.
body_parts.append(f"--{boundary}")
body_parts.append(f'Content-Disposition: form-data; name="upload_file"; filename="{file_name}"')
@@ -127,21 +121,10 @@ def upload_file(args):
upload_endpoint = f"{cbrain_url}/userfiles"
request = urllib.request.Request(upload_endpoint, data=body, headers=headers, method="POST")
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- response_data = json.loads(data)
- return response_data, response.status, file_name, file_size, args.data_provider
-
- except urllib.error.HTTPError as e:
- # Handle HTTP errors (like 422) that contain JSON responses.
- try:
- error_data = e.read().decode("utf-8")
- error_response = json.loads(error_data)
- return error_response, e.code, file_name, file_size, args.data_provider
- except (json.JSONDecodeError, UnicodeDecodeError):
- error_data = e.read().decode("utf-8", errors="ignore")
- return {"error": error_data}, e.code, file_name, file_size, args.data_provider
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ response_data = json.loads(data)
+ return response_data, response.status, file_name, file_size, args.data_provider
def copy_file(args):
@@ -187,20 +170,10 @@ def copy_file(args):
request = urllib.request.Request(
change_provider_endpoint, data=json_data, headers=headers, method="POST"
)
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- response_data = json.loads(data)
- return response_data, response.status
-
- except urllib.error.HTTPError as e:
- try:
- error_data = e.read().decode("utf-8")
- error_response = json.loads(error_data)
- return error_response, e.code
- except (json.JSONDecodeError, UnicodeDecodeError):
- error_data = e.read().decode("utf-8", errors="ignore")
- return {"error": error_data}, e.code
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ response_data = json.loads(data)
+ return response_data, response.status
def move_file(args):
@@ -248,20 +221,10 @@ def move_file(args):
change_provider_endpoint, data=json_data, headers=headers, method="POST"
)
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- response_data = json.loads(data)
- return response_data, response.status
-
- except urllib.error.HTTPError as e:
- try:
- error_data = e.read().decode("utf-8")
- error_response = json.loads(error_data)
- return error_response, e.code
- except (json.JSONDecodeError, UnicodeDecodeError):
- error_data = e.read().decode("utf-8", errors="ignore")
- return {"error": error_data}, e.code
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ response_data = json.loads(data)
+ return response_data, response.status
def list_files(args):
@@ -345,20 +308,7 @@ def delete_file(args):
delete_endpoint, data=json_data, headers=headers, method="DELETE"
)
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- try:
- response_data = json.loads(data)
- except json.JSONDecodeError:
- response_data = data
- return response_data
-
- except urllib.error.HTTPError as e:
- try:
- error_data = e.read().decode("utf-8")
- error_response = json.loads(error_data)
- return error_response, e.code
- except (json.JSONDecodeError, UnicodeDecodeError):
- error_data = e.read().decode("utf-8", errors="ignore")
- return {"error": error_data}, e.code
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ response_data = json.loads(data)
+ return response_data
diff --git a/cbrain_cli/data/projects.py b/cbrain_cli/data/projects.py
index 2122831..d639dc2 100644
--- a/cbrain_cli/data/projects.py
+++ b/cbrain_cli/data/projects.py
@@ -33,47 +33,28 @@ def switch_project(args):
# Create the request
request = urllib.request.Request(switch_endpoint, data=None, headers=headers, method="POST")
- # Make the switch request
- try:
- with urllib.request.urlopen(request) as response:
- if response.status == 200:
- # Step 2: Get group details
- group_endpoint = f"{cbrain_url}/groups/{group_id}"
- group_request = urllib.request.Request(
- group_endpoint, data=None, headers=headers, method="GET"
- )
-
- with urllib.request.urlopen(group_request) as group_response:
- group_data_text = group_response.read().decode("utf-8")
- group_data = json.loads(group_data_text)
+ with urllib.request.urlopen(request):
+ group_endpoint = f"{cbrain_url}/groups/{group_id}"
+ group_request = urllib.request.Request(
+ group_endpoint, data=None, headers=headers, method="GET"
+ )
- # Step 3: Update credentials file with current group_id
- if CREDENTIALS_FILE.exists():
- with open(CREDENTIALS_FILE) as f:
- credentials = json.load(f)
+ with urllib.request.urlopen(group_request) as group_response:
+ group_data_text = group_response.read().decode("utf-8")
+ group_data = json.loads(group_data_text)
- credentials["current_group_id"] = group_id
- credentials["current_group_name"] = group_data.get("name", "Unknown")
+ # Step 3: Update credentials file with current group_id
+ if CREDENTIALS_FILE.exists():
+ with open(CREDENTIALS_FILE) as f:
+ credentials = json.load(f)
- with open(CREDENTIALS_FILE, "w") as f:
- json.dump(credentials, f, indent=2)
+ credentials["current_group_id"] = group_id
+ credentials["current_group_name"] = group_data.get("name", "Unknown")
- return group_data
- else:
- print(f"Project switch failed with status: {response.status}")
- return None
+ with open(CREDENTIALS_FILE, "w") as f:
+ json.dump(credentials, f, indent=2)
- except urllib.error.HTTPError as e:
- if e.code == 404:
- print(f"Error: Project with ID {group_id} not found")
- elif e.code == 403:
- print(f"Error: Access denied to project {group_id}")
- else:
- print(f"Project switch failed with status: {e.code}")
- return None
- except Exception as e:
- print(f"Error switching project: {str(e)}")
- return None
+ return group_data
def show_project(args):
@@ -117,12 +98,9 @@ def show_project(args):
credentials.pop("current_group_name", None)
with open(CREDENTIALS_FILE, "w") as f:
json.dump(credentials, f, indent=2)
+ return None
else:
- print(f"Error getting project details: HTTP {e.code}")
- return None
- except Exception as e:
- print(f"Error getting project details: {str(e)}")
- return None
+ raise
def list_projects(args):
diff --git a/cbrain_cli/data/remote_resources.py b/cbrain_cli/data/remote_resources.py
index b122a4f..9133992 100644
--- a/cbrain_cli/data/remote_resources.py
+++ b/cbrain_cli/data/remote_resources.py
@@ -57,15 +57,7 @@ def show_remote_resource(args):
request = urllib.request.Request(bourreau_endpoint, data=None, headers=headers, method="GET")
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- bourreau_data = json.loads(data)
- return bourreau_data
-
- except urllib.error.HTTPError as e:
- if e.code == 404:
- print(f"Error: Remote resource with ID {resource_id} not found")
- else:
- print(f"Error: HTTP {e.code} - {e.reason}")
- return None
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ bourreau_data = json.loads(data)
+ return bourreau_data
diff --git a/cbrain_cli/data/tags.py b/cbrain_cli/data/tags.py
index d041824..1105cc4 100644
--- a/cbrain_cli/data/tags.py
+++ b/cbrain_cli/data/tags.py
@@ -1,10 +1,10 @@
import json
import urllib.error
+import urllib.parse
import urllib.request
from cbrain_cli.cli_utils import api_token, cbrain_url, pagination
from cbrain_cli.config import auth_headers
-from cbrain_cli.formatter.tags_fmt import print_interactive_prompts
def list_tags(args):
@@ -63,18 +63,10 @@ def show_tag(args):
request = urllib.request.Request(tag_endpoint, data=None, headers=headers, method="GET")
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- tag_data = json.loads(data)
- return tag_data
-
- except urllib.error.HTTPError as e:
- if e.code == 404:
- print(f"Error: Tag with ID {tag_id} not found")
- else:
- print(f"Error: HTTP {e.code} - {e.reason}")
- return None
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ tag_data = json.loads(data)
+ return tag_data
def create_tag(args):
@@ -91,33 +83,22 @@ def create_tag(args):
tuple
(response_data, success, error_msg, response_status)
"""
- # Check if interactive mode is enabled
- interactive = getattr(args, "interactive", False)
-
- # Get tag details
- if interactive:
- inputs = print_interactive_prompts("create")
- if not inputs:
- return None, False, None, None
- tag_name = inputs["tag_name"]
- user_id = inputs["user_id"]
- group_id = inputs["group_id"]
- else:
- tag_name = getattr(args, "name", None)
- user_id = getattr(args, "user_id", None)
- group_id = getattr(args, "group_id", None)
-
- if not tag_name:
- print("Error: Tag name is required. Use --name flag or -i for interactive mode")
- return None, False, None, None
-
- if not user_id:
- print("Error: User ID is required. Use --user-id flag or -i for interactive mode")
- return None, False, None, None
-
- if not group_id:
- print("Error: Group ID is required. Use --group-id flag or -i for interactive mode")
- return None, False, None, None
+ # Get tag details from command line arguments
+ tag_name = getattr(args, "name", None)
+ user_id = getattr(args, "user_id", None)
+ group_id = getattr(args, "group_id", None)
+
+ if not tag_name:
+ print("Error: Tag name is required. Use --name flag")
+ return None, False, None, None
+
+ if not user_id:
+ print("Error: User ID is required. Use --user-id flag")
+ return None, False, None, None
+
+ if not group_id:
+ print("Error: Group ID is required. Use --group-id flag")
+ return None, False, None, None
# Prepare the API request
tags_endpoint = f"{cbrain_url}/tags"
@@ -130,20 +111,10 @@ def create_tag(args):
request = urllib.request.Request(tags_endpoint, data=json_data, headers=headers, method="POST")
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- response_data = json.loads(data)
- return response_data, True, None, response.status
-
- except urllib.error.HTTPError as e:
- try:
- error_data = e.read().decode("utf-8")
- error_response = json.loads(error_data)
- error_msg = error_response.get("notice", error_data)
- except (json.JSONDecodeError, UnicodeDecodeError):
- error_msg = e.read().decode("utf-8", errors="ignore")
- return None, False, error_msg, e.code
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ response_data = json.loads(data)
+ return response_data, True, None, response.status
def update_tag(args):
@@ -160,42 +131,27 @@ def update_tag(args):
tuple
(response_data, success, error_msg, response_status)
"""
- # Check if interactive mode is enabled
- interactive = getattr(args, "interactive", False)
-
- # Get tag ID and details
- if interactive:
- inputs = print_interactive_prompts("update")
- if not inputs:
- return None, False, None, None
- tag_id = inputs["tag_id"]
- tag_name = inputs["tag_name"]
- user_id = inputs["user_id"]
- group_id = inputs["group_id"]
- else:
- tag_id = getattr(args, "tag_id", None)
- if not tag_id:
- print(
- "Error: Tag ID is required. Use -i flag for interactive mode "
- "or provide tag_id argument"
- )
- return None, False, None, None
-
- tag_name = getattr(args, "name", None)
- user_id = getattr(args, "user_id", None)
- group_id = getattr(args, "group_id", None)
-
- if not tag_name:
- print("Error: Tag name is required. Use --name flag or -i for interactive mode")
- return None, False, None, None
-
- if not user_id:
- print("Error: User ID is required. Use --user-id flag or -i for interactive mode")
- return None, False, None, None
-
- if not group_id:
- print("Error: Group ID is required. Use --group-id flag or -i for interactive mode")
- return None, False, None, None
+ # Get tag ID and details from command line arguments
+ tag_id = getattr(args, "tag_id", None)
+ if not tag_id:
+ print("Error: Tag ID is required. Provide tag_id argument")
+ return None, False, None, None
+
+ tag_name = getattr(args, "name", None)
+ user_id = getattr(args, "user_id", None)
+ group_id = getattr(args, "group_id", None)
+
+ if not tag_name:
+ print("Error: Tag name is required. Use --name flag")
+ return None, False, None, None
+
+ if not user_id:
+ print("Error: User ID is required. Use --user-id flag")
+ return None, False, None, None
+
+ if not group_id:
+ print("Error: Group ID is required. Use --group-id flag")
+ return None, False, None, None
# Prepare the API request
tag_endpoint = f"{cbrain_url}/tags/{tag_id}"
@@ -208,23 +164,10 @@ def update_tag(args):
request = urllib.request.Request(tag_endpoint, data=json_data, headers=headers, method="PUT")
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- response_data = json.loads(data)
- return response_data, True, None, response.status
-
- except urllib.error.HTTPError as e:
- try:
- error_data = e.read().decode("utf-8")
- error_response = json.loads(error_data)
- if e.code == 404:
- error_msg = f"Error: Tag with ID {tag_id} not found"
- else:
- error_msg = error_response.get("notice", error_data)
- except (json.JSONDecodeError, UnicodeDecodeError):
- error_msg = e.read().decode("utf-8", errors="ignore")
- return None, False, error_msg, e.code
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ response_data = json.loads(data)
+ return response_data, True, None, response.status
def delete_tag(args):
@@ -241,23 +184,11 @@ def delete_tag(args):
tuple
(success, error_msg, response_status)
"""
- # Check if interactive mode is enabled
- interactive = getattr(args, "interactive", False)
-
- # Get tag ID
- if interactive:
- inputs = print_interactive_prompts("delete")
- if not inputs:
- return False, None, None
- tag_id = inputs["tag_id"]
- else:
- tag_id = getattr(args, "tag_id", None)
- if not tag_id:
- print(
- "Error: Tag ID is required. Use -i flag for interactive mode "
- "or provide tag_id argument"
- )
- return False, None, None
+ # Get tag ID from command line arguments
+ tag_id = getattr(args, "tag_id", None)
+ if not tag_id:
+ print("Error: Tag ID is required. Provide tag_id argument")
+ return False, None, None
# Prepare the API request
tag_endpoint = f"{cbrain_url}/tags/{tag_id}"
@@ -265,13 +196,5 @@ def delete_tag(args):
request = urllib.request.Request(tag_endpoint, data=None, headers=headers, method="DELETE")
- try:
- with urllib.request.urlopen(request) as response:
- return True, None, response.status
-
- except urllib.error.HTTPError as e:
- if e.code == 404:
- error_msg = f"Error: Tag with ID {tag_id} not found"
- else:
- error_msg = f"Tag deletion failed with status: {e.code}"
- return False, error_msg, e.code
+ with urllib.request.urlopen(request) as response:
+ return True, None, response.status
diff --git a/cbrain_cli/data/tasks.py b/cbrain_cli/data/tasks.py
index ffcd3a5..63fa235 100644
--- a/cbrain_cli/data/tasks.py
+++ b/cbrain_cli/data/tasks.py
@@ -78,17 +78,9 @@ def show_task(args):
request = urllib.request.Request(task_endpoint, data=None, headers=headers, method="GET")
- try:
- with urllib.request.urlopen(request) as response:
- data = response.read().decode("utf-8")
- task_data = json.loads(data)
- except urllib.error.HTTPError as e:
- if e.code == 404:
- print(f"Error: Task with ID {task_id} not found")
- return 1
- else:
- print(f"Error: HTTP {e.code} - {e.reason}")
- return 1
+ with urllib.request.urlopen(request) as response:
+ data = response.read().decode("utf-8")
+ task_data = json.loads(data)
return task_data
diff --git a/cbrain_cli/formatter/files_fmt.py b/cbrain_cli/formatter/files_fmt.py
index d412dd0..391e80a 100644
--- a/cbrain_cli/formatter/files_fmt.py
+++ b/cbrain_cli/formatter/files_fmt.py
@@ -84,12 +84,6 @@ def print_upload_result(response_data, response_status, file_name, file_size, da
print("File uploaded successfully!")
if response_data.get("notice"):
print(f"Server response: {response_data['notice']}")
- else:
- print(f"Upload failed with status: {response_status}")
- if response_data.get("notice"):
- print(f"Error: {response_data['notice']}")
- else:
- print(f"Response: {response_data}")
def print_move_copy_result(response_data, response_status, operation="move"):
diff --git a/cbrain_cli/formatter/tags_fmt.py b/cbrain_cli/formatter/tags_fmt.py
index 5031709..29d9d4b 100644
--- a/cbrain_cli/formatter/tags_fmt.py
+++ b/cbrain_cli/formatter/tags_fmt.py
@@ -24,14 +24,14 @@ def print_tags_list(tags_data, args):
return
print("TAGS")
- print("-" * 60)
+ print("-" * 40)
# Use the reusable dynamic table formatter
dynamic_table_print(
tags_data, ["id", "name", "user_id", "group_id"], ["ID", "Name", "User", "Group"]
)
- print("-" * 60)
+ print("-" * 40)
print(f"Total: {len(tags_data)} tag(s)")
@@ -88,91 +88,13 @@ def print_tag_operation_result(
"""
if success:
if operation == "create":
- print("TAG CREATED SUCCESSFULLY!")
+ print("Tag created successfully!")
elif operation == "update":
- print(f"\nTag {tag_id} updated successfully!")
+ print(f"Tag {tag_id} updated successfully!")
elif operation == "delete":
- print(f"\nTag {tag_id} deleted successfully!")
+ print(f"Tag {tag_id} deleted successfully!")
else:
if error_msg:
print(error_msg)
else:
print(f"Tag {operation} failed with status: {response_status}")
-
-
-def print_interactive_prompts(operation="create"):
- """
- Print interactive prompts for tag operations.
-
- Parameters
- ----------
- operation : str
- Operation type ("create", "update", or "delete")
-
- Returns
- -------
- dict
- Dictionary containing user inputs
- """
- if operation == "delete":
- tag_id_input = input("Enter tag ID to delete: ").strip()
- if not tag_id_input:
- print("Error: Tag ID is required")
- return None
- try:
- tag_id = int(tag_id_input)
- confirm = (
- input(f"\nAre you sure you want to delete tag {tag_id}? (y/N): ").strip().lower()
- )
- if confirm not in ["y", "yes"]:
- print("Tag deletion cancelled.")
- return None
- return {"tag_id": tag_id}
- except ValueError:
- print("Error: Tag ID must be a number")
- return None
-
- if operation == "update":
- tag_id_input = input("Enter tag ID to update: ").strip()
- if not tag_id_input:
- print("Error: Tag ID is required")
- return None
- try:
- tag_id = int(tag_id_input)
- print(f"\nUpdating tag {tag_id}...")
- print("Enter new values:")
- except ValueError:
- print("Error: Tag ID must be a number")
- return None
- else:
- tag_id = None
-
- # Get tag details
- tag_name = input(f"Enter {'new ' if operation == 'update' else ''}tag name: ").strip()
- if not tag_name:
- print("Error: Tag name is required")
- return None
-
- user_id_input = input(f"Enter {'new ' if operation == 'update' else ''}user ID: ").strip()
- if not user_id_input:
- print("Error: User ID is required")
- return None
-
- try:
- user_id = int(user_id_input)
- except ValueError:
- print("Error: User ID must be a number")
- return None
-
- group_id_input = input(f"Enter {'new ' if operation == 'update' else ''}group ID: ").strip()
- if not group_id_input:
- print("Error: Group ID is required")
- return None
-
- try:
- group_id = int(group_id_input)
- except ValueError:
- print("Error: Group ID must be a number")
- return None
-
- return {"tag_id": tag_id, "tag_name": tag_name, "user_id": user_id, "group_id": group_id}
diff --git a/cbrain_cli/formatter/tool_configs_fmt.py b/cbrain_cli/formatter/tool_configs_fmt.py
index 16b2ca4..bd9215d 100644
--- a/cbrain_cli/formatter/tool_configs_fmt.py
+++ b/cbrain_cli/formatter/tool_configs_fmt.py
@@ -56,10 +56,28 @@ def print_tool_config_details(tool_config, args):
"""
Pretty print the details of a tool configuration.
"""
+ if getattr(args, "json", False):
+ json_printer(tool_config)
+ return
+ elif getattr(args, "jsonl", False):
+ jsonl_printer(tool_config)
+ return
+
if not tool_config:
print("No tool configuration found.")
return
- json_printer(tool_config)
+
+ print(
+ f"id: {tool_config.get('id', 'N/A')}\n"
+ f"version_name: {tool_config.get('version_name', 'N/A')}\n"
+ f"tool_id: {tool_config.get('tool_id', 'N/A')}\n"
+ f"bourreau_id: {tool_config.get('bourreau_id', 'N/A')}\n"
+ f"group_id: {tool_config.get('group_id', 'N/A')}\n"
+ f"ncpus: {tool_config.get('ncpus', 'N/A')}"
+ )
+
+ if tool_config.get("description"):
+ print(f"description: {tool_config.get('description')}")
def print_boutiques_descriptor(boutiques_descriptor, args):
diff --git a/cbrain_cli/main.py b/cbrain_cli/main.py
index 1210187..4cfc7e8 100644
--- a/cbrain_cli/main.py
+++ b/cbrain_cli/main.py
@@ -85,13 +85,6 @@ def main():
help="Output in JSONL format (one JSON object per line)",
)
- parser.add_argument(
- "-i",
- "--interactive",
- action="store_true",
- help="Use interactive mode for commands",
- )
-
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Version command
@@ -130,7 +123,9 @@ def main():
)
file_list_parser.set_defaults(
func=handle_errors(
- lambda args: print_files_list(list_files(args), args) if list_files(args) else None
+ lambda args: (lambda result: print_files_list(result, args) if result else None)(
+ list_files(args)
+ )
)
)
@@ -139,7 +134,9 @@ def main():
file_show_parser.add_argument("file", type=int, help="File ID")
file_show_parser.set_defaults(
func=handle_errors(
- lambda args: print_file_details(show_file(args), args) if show_file(args) else None
+ lambda args: (lambda result: print_file_details(result, args) if result else None)(
+ show_file(args)
+ )
)
)
@@ -173,9 +170,9 @@ def main():
)
file_copy_parser.set_defaults(
func=handle_errors(
- lambda args: print_move_copy_result(*copy_file(args), operation="copy")
- if copy_file(args)
- else None
+ lambda args: (
+ lambda result: print_move_copy_result(*result, operation="copy") if result else None
+ )(copy_file(args))
)
)
@@ -195,9 +192,9 @@ def main():
)
file_move_parser.set_defaults(
func=handle_errors(
- lambda args: print_move_copy_result(*move_file(args), operation="move")
- if move_file(args)
- else None
+ lambda args: (
+ lambda result: print_move_copy_result(*result, operation="move") if result else None
+ )(move_file(args))
)
)
@@ -206,7 +203,9 @@ def main():
file_delete_parser.add_argument("file_id", type=int, help="ID of the file to delete")
file_delete_parser.set_defaults(
func=handle_errors(
- lambda args: json_printer(delete_file(args)) if delete_file(args) else None
+ lambda args: (lambda result: json_printer(result) if result else None)(
+ delete_file(args)
+ )
)
)
@@ -221,7 +220,11 @@ def main():
"list", help="List data providers"
)
dataprovider_list_parser.set_defaults(
- func=handle_errors(lambda args: print_providers_list(list_data_providers(args), args))
+ func=handle_errors(
+ lambda args: (lambda result: print_providers_list(result, args))(
+ list_data_providers(args)
+ )
+ )
)
dataprovider_list_parser.add_argument(
@@ -239,7 +242,11 @@ def main():
)
dataprovider_show_parser.add_argument("id", type=int, help="Data provider ID")
dataprovider_show_parser.set_defaults(
- func=handle_errors(lambda args: print_provider_details(show_data_provider(args), args))
+ func=handle_errors(
+ lambda args: (lambda result: print_provider_details(result, args))(
+ show_data_provider(args)
+ )
+ )
)
# dataprovider is_alive
@@ -248,7 +255,7 @@ def main():
)
dataprovider_is_alive_parser.add_argument("id", type=int, help="Data provider ID")
dataprovider_is_alive_parser.set_defaults(
- func=handle_errors(lambda args: json_printer(is_alive(args)))
+ func=handle_errors(lambda args: (lambda result: json_printer(result))(is_alive(args)))
)
# dataprovider delete-unregistered-files
@@ -260,7 +267,9 @@ def main():
"id", type=int, help="Data provider ID"
)
dataprovider_delete_unregistered_files_parser.set_defaults(
- func=handle_errors(lambda args: json_printer(delete_unregistered_files(args)))
+ func=handle_errors(
+ lambda args: (lambda result: json_printer(result))(delete_unregistered_files(args))
+ )
)
# Project commands
@@ -270,7 +279,9 @@ def main():
# project list
project_list_parser = project_subparsers.add_parser("list", help="List projects")
project_list_parser.set_defaults(
- func=handle_errors(lambda args: print_projects_list(list_projects(args), args))
+ func=handle_errors(
+ lambda args: (lambda result: print_projects_list(result, args))(list_projects(args))
+ )
)
# project switch
@@ -278,9 +289,9 @@ def main():
project_switch_parser.add_argument("group_id", type=int, help="Project/Group ID")
project_switch_parser.set_defaults(
func=handle_errors(
- lambda args: print_current_project(switch_project(args))
- if switch_project(args)
- else None
+ lambda args: (lambda result: print_current_project(result) if result else None)(
+ switch_project(args)
+ )
)
)
@@ -288,9 +299,9 @@ def main():
project_show_parser = project_subparsers.add_parser("show", help="Show current project")
project_show_parser.set_defaults(
func=handle_errors(
- lambda args: print_current_project(show_project(args))
- if show_project(args)
- else print_no_project()
+ lambda args: (
+ lambda result: print_current_project(result) if result else print_no_project()
+ )(show_project(args))
)
)
@@ -303,7 +314,9 @@ def main():
tool_show_parser.add_argument("id", type=int, help="Tool ID")
tool_show_parser.set_defaults(
func=handle_errors(
- lambda args: print_tool_details(list_tools(args), args) if list_tools(args) else None
+ lambda args: (lambda result: print_tool_details(result, args) if result else None)(
+ list_tools(args)
+ )
)
)
@@ -315,7 +328,9 @@ def main():
)
tool_list_parser.set_defaults(
func=handle_errors(
- lambda args: print_tools_list(list_tools(args), args) if list_tools(args) else None
+ lambda args: (lambda result: print_tools_list(result, args) if result else None)(
+ list_tools(args)
+ )
)
)
@@ -330,7 +345,11 @@ def main():
"list", help="List all tool configurations"
)
tool_configs_list_parser.set_defaults(
- func=handle_errors(lambda args: print_tool_configs_list(list_tool_configs(args), args))
+ func=handle_errors(
+ lambda args: (lambda result: print_tool_configs_list(result, args))(
+ list_tool_configs(args)
+ )
+ )
)
tool_configs_list_parser.add_argument(
@@ -350,9 +369,9 @@ def main():
tool_configs_show_parser.add_argument("id", type=int, help="Tool configuration ID")
tool_configs_show_parser.set_defaults(
func=handle_errors(
- lambda args: print_tool_config_details(show_tool_config(args), args)
- if show_tool_config(args)
- else None
+ lambda args: (
+ lambda result: print_tool_config_details(result, args) if result else None
+ )(show_tool_config(args))
)
)
@@ -363,9 +382,9 @@ def main():
tool_configs_boutiques_parser.add_argument("id", type=int, help="Tool configuration ID")
tool_configs_boutiques_parser.set_defaults(
func=handle_errors(
- lambda args: print_boutiques_descriptor(tool_config_boutiques_descriptor(args), args)
- if tool_config_boutiques_descriptor(args)
- else None
+ lambda args: (
+ lambda result: print_boutiques_descriptor(result, args) if result else None
+ )(tool_config_boutiques_descriptor(args))
)
)
@@ -376,7 +395,9 @@ def main():
# tag list
tag_list_parser = tag_subparsers.add_parser("list", help="List tags")
tag_list_parser.set_defaults(
- func=handle_errors(lambda args: print_tags_list(list_tags(args), args))
+ func=handle_errors(
+ lambda args: (lambda result: print_tags_list(result, args))(list_tags(args))
+ )
)
tag_list_parser.add_argument("--page", type=int, default=1, help="Page number (default: 1)")
@@ -389,28 +410,26 @@ def main():
tag_show_parser.add_argument("id", type=int, help="Tag ID")
tag_show_parser.set_defaults(
func=handle_errors(
- lambda args: print_tag_details(show_tag(args), args) if show_tag(args) else None
+ lambda args: (lambda result: print_tag_details(result, args) if result else None)(
+ show_tag(args)
+ )
)
)
# tag create
tag_create_parser = tag_subparsers.add_parser("create", help="Create a new tag")
- tag_create_parser.add_argument(
- "--name", type=str, help="Tag name (required for non-interactive mode)"
- )
- tag_create_parser.add_argument(
- "--user-id", type=int, help="User ID (required for non-interactive mode)"
- )
- tag_create_parser.add_argument(
- "--group-id", type=int, help="Group ID (required for non-interactive mode)"
- )
+ tag_create_parser.add_argument("--name", type=str, required=True, help="Tag name")
+ tag_create_parser.add_argument("--user-id", type=int, required=True, help="User ID")
+ tag_create_parser.add_argument("--group-id", type=int, required=True, help="Group ID")
tag_create_parser.set_defaults(
func=handle_errors(
- lambda args: print_tag_operation_result(
- "create", success=result[1], error_msg=result[2], response_status=result[3]
- )
- if (result := create_tag(args))
- else None
+ lambda args: (
+ lambda result: print_tag_operation_result(
+ "create", success=result[1], error_msg=result[2], response_status=result[3]
+ )
+ if result
+ else None
+ )(create_tag(args))
)
)
@@ -419,29 +438,24 @@ def main():
tag_update_parser.add_argument(
"tag_id",
type=int,
- nargs="?",
- help="Tag ID to update (optional if using -i flag)",
- )
- tag_update_parser.add_argument(
- "--name", type=str, help="Tag name (required for non-interactive mode)"
- )
- tag_update_parser.add_argument(
- "--user-id", type=int, help="User ID (required for non-interactive mode)"
- )
- tag_update_parser.add_argument(
- "--group-id", type=int, help="Group ID (required for non-interactive mode)"
+ help="Tag ID to update",
)
+ tag_update_parser.add_argument("--name", type=str, required=True, help="Tag name")
+ tag_update_parser.add_argument("--user-id", type=int, required=True, help="User ID")
+ tag_update_parser.add_argument("--group-id", type=int, required=True, help="Group ID")
tag_update_parser.set_defaults(
func=handle_errors(
- lambda args: print_tag_operation_result(
- "update",
- tag_id=args.tag_id,
- success=result[1],
- error_msg=result[2],
- response_status=result[3],
- )
- if (result := update_tag(args))
- else None
+ lambda args: (
+ lambda result: print_tag_operation_result(
+ "update",
+ tag_id=args.tag_id,
+ success=result[1],
+ error_msg=result[2],
+ response_status=result[3],
+ )
+ if result
+ else None
+ )(update_tag(args))
)
)
@@ -450,20 +464,21 @@ def main():
tag_delete_parser.add_argument(
"tag_id",
type=int,
- nargs="?",
- help="Tag ID to delete (optional if using -i flag)",
+ help="Tag ID to delete",
)
tag_delete_parser.set_defaults(
func=handle_errors(
- lambda args: print_tag_operation_result(
- "delete",
- tag_id=args.tag_id,
- success=result[0],
- error_msg=result[1],
- response_status=result[2],
- )
- if (result := delete_tag(args))
- else None
+ lambda args: (
+ lambda result: print_tag_operation_result(
+ "delete",
+ tag_id=args.tag_id,
+ success=result[0],
+ error_msg=result[1],
+ response_status=result[2],
+ )
+ if result
+ else None
+ )(delete_tag(args))
)
)
@@ -479,9 +494,9 @@ def main():
)
background_list_parser.set_defaults(
func=handle_errors(
- lambda args: print_activities_list(list_background_activities(args), args)
- if list_background_activities(args)
- else None
+ lambda args: (lambda result: print_activities_list(result, args) if result else None)(
+ list_background_activities(args)
+ )
)
)
@@ -492,9 +507,9 @@ def main():
background_show_parser.add_argument("id", type=int, help="Background activity ID")
background_show_parser.set_defaults(
func=handle_errors(
- lambda args: print_activity_details(show_background_activity(args), args)
- if show_background_activity(args)
- else None
+ lambda args: (lambda result: print_activity_details(result, args) if result else None)(
+ show_background_activity(args)
+ )
)
)
@@ -518,7 +533,9 @@ def main():
help="Filter value (required if filter_type is specified)",
)
task_list_parser.set_defaults(
- func=handle_errors(lambda args: print_task_data(list_tasks(args), args))
+ func=handle_errors(
+ lambda args: (lambda result: print_task_data(result, args))(list_tasks(args))
+ )
)
# task show
@@ -526,7 +543,9 @@ def main():
task_show_parser.add_argument("task", type=int, help="Task ID")
task_show_parser.set_defaults(
func=handle_errors(
- lambda args: print_task_details(show_task(args), args) if show_task(args) else None
+ lambda args: (lambda result: print_task_details(result, args) if result else None)(
+ show_task(args)
+ )
)
)
@@ -547,7 +566,11 @@ def main():
"list", help="List remote resources"
)
remote_resource_list_parser.set_defaults(
- func=handle_errors(lambda args: print_resources_list(list_remote_resources(args), args))
+ func=handle_errors(
+ lambda args: (lambda result: print_resources_list(result, args))(
+ list_remote_resources(args)
+ )
+ )
)
# remote-resource show
@@ -557,9 +580,9 @@ def main():
remote_resource_show_parser.add_argument("remote_resource", type=int, help="Remote resource ID")
remote_resource_show_parser.set_defaults(
func=handle_errors(
- lambda args: print_resource_details(show_remote_resource(args), args)
- if show_remote_resource(args)
- else None
+ lambda args: (lambda result: print_resource_details(result, args) if result else None)(
+ show_remote_resource(args)
+ )
)
)
diff --git a/cbrain_cli/sessions.py b/cbrain_cli/sessions.py
index 934dda9..4559f52 100644
--- a/cbrain_cli/sessions.py
+++ b/cbrain_cli/sessions.py
@@ -118,11 +118,19 @@ def logout_session(args):
)
# Make the request to logout from server.
- with urllib.request.urlopen(request) as response:
- if response.status == 200:
- print("Successfully logged out from CBRAIN server.")
+ try:
+ with urllib.request.urlopen(request) as response:
+ if response.status == 200:
+ print("Successfully logged out from CBRAIN server.")
+ else:
+ print("Logout failed")
+ except urllib.error.HTTPError as e:
+ if e.code == 401:
+ print("Session already expired on server.")
else:
- print("Logout failed")
+ print(f"Logout request failed: HTTP {e.code}")
+ except urllib.error.URLError as e:
+ print(f"Network error during logout: {e}")
# Always remove local credentials file.
CREDENTIALS_FILE.unlink()