Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions refactron/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,20 @@ def to_dict(self) -> Dict[str, Any]:
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "WorkspaceMapping":
"""Create from dictionary."""
# Robustly handle missing repo_name by extracting it from repo_full_name if needed
repo_full_name = data.get("repo_full_name", "")
repo_name = data.get("repo_name")
if not repo_name and "/" in repo_full_name:
repo_name = repo_full_name.split("/")[-1]
elif not repo_name:
repo_name = repo_full_name or "unknown"

return cls(
repo_name=data["repo_name"],
repo_full_name=data["repo_full_name"],
local_path=data["local_path"],
connected_at=data["connected_at"],
repo_id=data.get("repo_id"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

repo_id can silently become None despite int annotation.

At Line 49, data.get("repo_id") returns None when missing, which breaks the WorkspaceMapping.repo_id: int contract and can propagate invalid state.

Proposed fix
+        raw_repo_id = data.get("repo_id", 0)
+        try:
+            repo_id = int(raw_repo_id)
+        except (TypeError, ValueError):
+            repo_id = 0
+
         return cls(
-            repo_id=data.get("repo_id"),
+            repo_id=repo_id,
             repo_name=repo_name,
             repo_full_name=repo_full_name,
             local_path=data.get("local_path", ""),
             connected_at=data.get("connected_at", ""),
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@refactron/core/workspace.py` at line 49, The current use of
data.get("repo_id") can yield None and violate WorkspaceMapping.repo_id: int;
change the assignment to validate and coerce the value (e.g., read via
data["repo_id"] and cast to int) or explicitly check and raise a clear error if
missing/invalid. Update the code that constructs WorkspaceMapping (replace
data.get("repo_id") with int(data["repo_id"]) wrapped in a try/except or an
explicit if not present check) so repo_id is always an int or a descriptive
exception is raised.

repo_name=repo_name,
repo_full_name=repo_full_name,
local_path=data.get("local_path", ""),
connected_at=data.get("connected_at", ""),
)


Expand Down
146 changes: 132 additions & 14 deletions tests/test_cli_repo.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,143 @@
"""Tests for the refactron repo CLI commands."""
"""Tests for refactron.cli.repo module."""

import sys
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest
from click.testing import CliRunner

from refactron.cli.repo import repo
from refactron.core.workspace import WorkspaceManager
# Mock torch BEFORE any refactron imports to prevent DLL crash on Windows
mock_torch = MagicMock()
mock_torch.__spec__ = MagicMock()
sys.modules["torch"] = mock_torch

mock_st = MagicMock()
mock_st.__spec__ = MagicMock()
sys.modules["sentence_transformers"] = mock_st


from refactron.core.repositories import Repository # noqa: E402
from refactron.core.workspace import WorkspaceManager # noqa: E402
from refactron.core.workspace import WorkspaceMapping # noqa: E402


@pytest.fixture
def runner():
"""Provides a Click CLI runner for testing."""
return CliRunner()


@pytest.fixture
def mock_repo():
return Repository(
id=1,
name="test-repo",
full_name="user/test-repo",
description="A test repository",
private=False,
html_url="https://github.com/user/test-repo",
clone_url="https://github.com/user/test-repo.git",
ssh_url="git@github.com:user/test-repo.git",
default_branch="main",
language="Python",
updated_at="2023-01-01T00:00:00Z",
)


def _get_repo_group():
from refactron.cli.repo import repo

return repo


@patch("refactron.cli.repo._auth_banner")
def test_repo_list_no_repos(mock_banner, runner):
"""Test 'repo list' when no repositories are returned."""
with patch("refactron.cli.repo.list_repositories", return_value=[]):
result = runner.invoke(_get_repo_group(), ["list"])
assert result.exit_code == 0
assert "No Repositories" in result.output or "No repositories" in result.output


@patch("refactron.cli.repo._auth_banner")
def test_repo_list_with_repos(mock_banner, runner, mock_repo):
"""Test 'repo list' with some repositories."""
with patch("refactron.cli.repo.list_repositories", return_value=[mock_repo]):
with patch("refactron.cli.repo.WorkspaceManager.get_workspace", return_value=None):
result = runner.invoke(_get_repo_group(), ["list"])
assert result.exit_code == 0
assert "test-repo" in result.output


@patch("refactron.cli.repo._auth_banner")
def test_repo_list_error(mock_banner, runner):
"""Test 'repo list' when the API raises a RuntimeError."""
with patch(
"refactron.cli.repo.list_repositories", side_effect=RuntimeError("Not authenticated")
):
result = runner.invoke(_get_repo_group(), ["list"])
assert result.exit_code != 0
assert "Error" in result.output or "Not authenticated" in result.output


@patch("refactron.cli.repo._auth_banner")
def test_repo_connect_with_path(mock_banner, runner, mock_repo, tmp_path):
"""Test 'repo connect' with existing path."""
with patch("refactron.cli.repo.list_repositories", return_value=[mock_repo]):
with patch("refactron.cli.repo.WorkspaceManager.add_workspace") as mock_add:
with patch("refactron.cli.repo.subprocess.run"):
result = runner.invoke(
_get_repo_group(),
["connect", "test-repo", "--path", str(tmp_path)],
)
assert result.exit_code == 0
assert "Successfully connected" in result.output
mock_add.assert_called_once()
mapping = mock_add.call_args[0][0]
assert mapping.repo_name == "test-repo"
expected_path = Path.home() / ".refactron" / "workspaces" / "test-repo"
assert Path(mapping.local_path).resolve() == expected_path.resolve()


@patch("refactron.cli.repo._auth_banner")
def test_repo_disconnect_not_connected(mock_banner, runner):
"""Test 'repo disconnect' when repo is not connected."""
with patch("refactron.cli.repo.WorkspaceManager.get_workspace", return_value=None):
with patch("refactron.cli.repo.WorkspaceManager.get_workspace_by_path", return_value=None):
result = runner.invoke(_get_repo_group(), ["disconnect", "unknown-repo"])
assert result.exit_code != 0
assert "is not connected" in result.output


@patch("refactron.cli.repo._auth_banner")
def test_repo_disconnect_success(mock_banner, runner, tmp_path):
"""Test 'repo disconnect' success."""
mapping = WorkspaceMapping(
repo_id=1,
repo_name="test-repo",
repo_full_name="user/test-repo",
local_path=str(tmp_path),
connected_at="2023-01-01T00:00:00Z",
)
with patch("refactron.cli.repo.WorkspaceManager.get_workspace", return_value=mapping):
with patch("refactron.cli.repo.WorkspaceManager.remove_workspace") as mock_remove:
result = runner.invoke(_get_repo_group(), ["disconnect", "test-repo"])
assert result.exit_code == 0
assert "Removed workspace mapping" in result.output
mock_remove.assert_called_once_with("test-repo")


def temp_workspace(tmp_path):
"""Provides an isolated workspace manager for tests."""
config_path = tmp_path / "workspaces.json"
mgr = WorkspaceManager(config_path=config_path)
return mgr


@patch("refactron.cli.repo._auth_banner")
@patch("refactron.cli.repo.WorkspaceManager")
@patch("refactron.cli.repo._spawn_background_indexer")
def test_repo_connect_local_offline(mock_spawn, mock_wsm_cls, runner, tmp_path):
def test_repo_connect_local_offline(mock_spawn, mock_wsm_cls, mock_banner, runner, tmp_path):
"""Scenario 1 & 2: Inside existing local repo, connects offline instantly."""
# Setup mock manager
mock_mgr = MagicMock()
Expand All @@ -35,7 +147,7 @@ def test_repo_connect_local_offline(mock_spawn, mock_wsm_cls, runner, tmp_path):
mock_mgr.detect_repository.return_value = "user/my-offline-repo"

with runner.isolated_filesystem(temp_dir=tmp_path):
result = runner.invoke(repo, ["connect"])
result = runner.invoke(_get_repo_group(), ["connect"])

# Verify it succeeds offline without hitting the API
assert result.exit_code == 0
Expand All @@ -54,12 +166,13 @@ def test_repo_connect_local_offline(mock_spawn, mock_wsm_cls, runner, tmp_path):
mock_spawn.assert_called_once()


@patch("refactron.cli.repo._auth_banner")
@patch("refactron.cli.repo.WorkspaceManager")
@patch("refactron.cli.repo.list_repositories")
@patch("refactron.cli.repo.subprocess.run")
@patch("refactron.cli.repo._spawn_background_indexer")
def test_repo_connect_api_fallback(
mock_spawn, mock_subp_run, mock_list_repos, mock_wsm_cls, runner, tmp_path
mock_spawn, mock_subp_run, mock_list_repos, mock_wsm_cls, mock_banner, runner, tmp_path
):
"""Scenario 3: Outside git repo, repo name provided -> clones via API."""
mock_mgr = MagicMock()
Expand All @@ -78,7 +191,7 @@ def test_repo_connect_api_fallback(

with runner.isolated_filesystem(temp_dir=tmp_path):
# Pass repo explicit name
result = runner.invoke(repo, ["connect", "user/my-online-repo"])
result = runner.invoke(_get_repo_group(), ["connect", "user/my-online-repo"])

assert result.exit_code == 0
assert "Connected (API)" in result.output
Expand All @@ -99,24 +212,28 @@ def test_repo_connect_api_fallback(
assert mapping.repo_name == "my-online-repo"


@patch("refactron.cli.repo._auth_banner")
@patch("refactron.cli.repo.WorkspaceManager")
def test_repo_connect_outside_git_no_args(mock_wsm_cls, runner, tmp_path):
def test_repo_connect_outside_git_no_args(mock_wsm_cls, mock_banner, runner, tmp_path):
"""If outside git repo and no args provided, it should fail nicely."""
mock_mgr = MagicMock()
mock_wsm_cls.return_value = mock_mgr
mock_mgr.detect_repository.return_value = None

with runner.isolated_filesystem(temp_dir=tmp_path):
result = runner.invoke(repo, ["connect"])
result = runner.invoke(_get_repo_group(), ["connect"])

assert result.exit_code == 1
assert "Not a git repository" in result.output
assert "Usage" in result.output


@patch("refactron.cli.repo._auth_banner")
@patch("refactron.cli.repo.WorkspaceManager")
@patch("refactron.cli.repo.list_repositories")
def test_repo_connect_api_error_fallback(mock_list_repos, mock_wsm_cls, runner, tmp_path):
def test_repo_connect_api_error_fallback(
mock_list_repos, mock_wsm_cls, mock_banner, runner, tmp_path
):
"""Scenario 4: CI runner (no token, no git context) -> clean error message."""
mock_mgr = MagicMock()
mock_wsm_cls.return_value = mock_mgr
Expand All @@ -126,19 +243,20 @@ def test_repo_connect_api_error_fallback(mock_list_repos, mock_wsm_cls, runner,
mock_list_repos.side_effect = RuntimeError("Invalid credentials")

with runner.isolated_filesystem(temp_dir=tmp_path):
result = runner.invoke(repo, ["connect", "some-repo"])
result = runner.invoke(_get_repo_group(), ["connect", "some-repo"])

assert result.exit_code == 1
assert "Authentication required for cloning" in result.output
assert "connect offline." in result.output.replace("\n", "")


@patch("refactron.cli.repo._auth_banner")
@patch("refactron.cli.repo.WorkspaceManager")
@patch("refactron.cli.repo.list_repositories")
@patch("refactron.cli.repo.subprocess.run")
@patch("refactron.cli.repo._spawn_background_indexer")
def test_repo_connect_api_fallback_ssh(
mock_spawn, mock_subp_run, mock_list_repos, mock_wsm_cls, runner, tmp_path
mock_spawn, mock_subp_run, mock_list_repos, mock_wsm_cls, mock_banner, runner, tmp_path
):
"""Scenario 6: Clone using SSH flag."""
mock_mgr = MagicMock()
Expand All @@ -154,7 +272,7 @@ def test_repo_connect_api_fallback_ssh(
mock_list_repos.return_value = [mock_repo]

with runner.isolated_filesystem(temp_dir=tmp_path):
result = runner.invoke(repo, ["connect", "--ssh", "user/my-ssh-repo"])
result = runner.invoke(_get_repo_group(), ["connect", "--ssh", "user/my-ssh-repo"])

assert result.exit_code == 0
assert "Connected (API)" in result.output
Expand Down
116 changes: 116 additions & 0 deletions tests/test_device_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Tests for device-code authentication helpers."""

import json
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unused import json.

The pipeline reports this import is unused. Remove it to fix the CI failure.

Proposed fix
-import json
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import json
🧰 Tools
🪛 GitHub Actions: Pre-commit

[error] 3-3: flake8 F401: 'json' imported but unused

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_device_auth.py` at line 3, Remove the unused top-level import
statement "import json" from the test module; simply delete the line containing
"import json" so the file no longer imports json unnecessarily and the CI
unused-import error is resolved.

from unittest.mock import MagicMock, patch
from urllib.error import HTTPError

import pytest

from refactron.core.device_auth import (
DeviceAuthorization,
TokenResponse,
_normalize_base_url,
_post_json,
poll_for_token,
start_device_authorization,
)


def test_normalize_base_url():
"""Test URL normalization."""
assert _normalize_base_url("https://api.test.com/") == "https://api.test.com"
assert _normalize_base_url("https://api.test.com") == "https://api.test.com"
assert _normalize_base_url(" https://api.test.com/ ") == "https://api.test.com"
assert _normalize_base_url(None) == ""


@patch("refactron.core.device_auth.urlopen")
def test_post_json_success(mock_urlopen):
"""Test successful JSON POST."""
mock_response = MagicMock()
mock_response.read.return_value = b'{"status": "ok"}'
mock_response.__enter__.return_value = mock_response
mock_urlopen.return_value = mock_response

result = _post_json("https://api.test.com", {"data": "test"})
assert result == {"status": "ok"}


@patch("refactron.core.device_auth.urlopen")
def test_post_json_http_error(mock_urlopen):
"""Test HTTP error handling in _post_json."""
error_response = MagicMock()
error_response.read.return_value = b'{"error": "forbidden"}'
mock_error = HTTPError("url", 403, "Forbidden", {}, error_response)
mock_urlopen.side_effect = mock_error

result = _post_json("https://api.test.com", {"data": "test"})
assert result == {"error": "forbidden"}


@patch("refactron.core.device_auth._post_json")
def test_start_device_authorization(mock_post):
"""Test starting device authorization."""
mock_post.return_value = {
"device_code": "dev_123",
"user_code": "USER-123",
"verification_uri": "https://refactron.dev/verify",
"expires_in": 900,
"interval": 5,
}

auth = start_device_authorization()
assert isinstance(auth, DeviceAuthorization)
assert auth.device_code == "dev_123"
assert auth.user_code == "USER-123"


@patch("refactron.core.device_auth._post_json")
def test_start_device_authorization_invalid_response(mock_post):
"""Test handling of invalid authorization response."""
mock_post.return_value = {"error": "invalid_client"}
with pytest.raises(RuntimeError, match="Invalid /oauth/device response"):
start_device_authorization()


@patch("refactron.core.device_auth._post_json")
def test_poll_for_token_success(mock_post):
"""Test successful token polling."""
# First response is pending, second is success
mock_post.side_effect = [
{"error": "authorization_pending"},
{
"access_token": "token_123",
"token_type": "Bearer",
"expires_in": 3600,
"user": {"email": "test@example.com", "plan": "pro"},
},
]

# Mock sleep to avoid waiting
with patch("time.sleep"):
token = poll_for_token("dev_123")
assert isinstance(token, TokenResponse)
assert token.access_token == "token_123"
assert token.email == "test@example.com"
assert token.plan == "pro"
Comment on lines +90 to +96
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how poll_for_token handles sleep - look for the function signature and sleep usage
ast-grep --pattern 'def poll_for_token($$$) {
  $$$
}'

# Also search for how sleep_fn is defined/used
rg -n "sleep_fn|time\.sleep" --type py -C3

Repository: Refactron-ai/Refactron_lib

Length of output: 8930


🏁 Script executed:

#!/bin/bash
# Check test file structure and imports
head -50 tests/test_device_auth.py

# Check if there's a conftest.py with shared fixtures
find . -name "conftest.py" -type f

# Look for the specific test function
grep -n "def test_poll_for_token_success" tests/test_device_auth.py -A 30

Repository: Refactron-ai/Refactron_lib

Length of output: 2852


🏁 Script executed:

#!/bin/bash
# Check for conftest.py
cat conftest.py 2>/dev/null || echo "No conftest.py at root"

# Check in tests directory
ls -la tests/conftest.py 2>/dev/null || echo "No conftest.py in tests/"

# Look for any module reload or import mechanics
rg -n "reload|importlib" tests/test_device_auth.py

Repository: Refactron-ai/Refactron_lib

Length of output: 117


Patch target time.sleep won't intercept the default bound reference.

The function signature has sleep_fn: Callable[[float], None] = time.sleep, which binds time.sleep at module import time. Patching the global time.sleep afterward won't affect this bound default. Either patch at the use site with patch("refactron.core.device_auth.time.sleep") or pass a mock sleep_fn parameter: poll_for_token("dev_123", sleep_fn=mock_sleep).

🧰 Tools
🪛 Ruff (0.15.7)

[error] 94-94: Possible hardcoded password assigned to: "access_token"

(S105)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_device_auth.py` around lines 90 - 96, The test patches time.sleep
but poll_for_token binds time.sleep as the default argument sleep_fn at import
time, so the patch doesn't take effect; update the test to either patch the
function at the use site (patch "refactron.core.device_auth.time.sleep") or call
poll_for_token with an explicit mock sleep_fn (e.g., poll_for_token("dev_123",
sleep_fn=mock_sleep)) so the sleep call is intercepted; target the
poll_for_token function and its sleep_fn default to ensure the mock is used.



@patch("refactron.core.device_auth._post_json")
def test_poll_for_token_timeout(mock_post):
"""Test token polling timeout."""
mock_post.return_value = {"error": "authorization_pending"}

with patch("time.monotonic") as mock_time:
# Simulate time passing quickly
mock_time.side_effect = [0, 1000]
with pytest.raises(RuntimeError, match="Login timed out"):
poll_for_token("dev_123", expires_in_seconds=10)


@patch("refactron.core.device_auth._post_json")
def test_poll_for_token_expired(mock_post):
"""Test token polling with expired code."""
mock_post.return_value = {"error": "expired_token"}
with pytest.raises(RuntimeError, match="Device code expired"):
poll_for_token("dev_123")
Loading
Loading