Skip to content

Commit f611e8e

Browse files
Merge pull request #116 from contentstack/enh/dx-3451
TOTP support added.
2 parents 5b8ff96 + ffe9b0a commit f611e8e

File tree

9 files changed

+355
-28
lines changed

9 files changed

+355
-28
lines changed

.talismanrc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,9 +394,13 @@ fileignoreconfig:
394394
- filename: tests/unit/contentstack/test_contentstack.py
395395
checksum: 98503cbd96cb546a19aed037a6ca28ef54fcea312efcd9bac1171e43760f6e86
396396
- filename: contentstack_management/contentstack.py
397-
checksum: 520f6fa236569a05579011fa67cb29381f187616d96526ecdfad5ec8255231a5
397+
checksum: 591978d70ecbe5fc3e6587544e9c112a6cd85fd8da2051b48ff87ab6a2e9eb57
398398
- filename: tests/unit/test_oauth_handler.py
399399
checksum: 8b6853ba64c3de4f9097ca506719c5e33c7468ae5985b8adcda3eb6461d76be5
400400
- filename: contentstack_management/oauth/oauth_handler.py
401401
checksum: e33cfd32d90c0553c4959c0d266fef1247cd0e0fe7bbe85cae98bb205e62c70e
402+
- filename: tests/unit/user_session/test_user_session_totp.py
403+
checksum: 0db30c5a306783b10d345d73cff3c61490d7cbc47273623df47e6849c3e97002
404+
- filename: tests/unit/contentstack/test_totp_login.py
405+
checksum: cefad0ddc1a2db1bf59d6e04501c4381acc8b44fad1e5e2e24c06e33d827c859
402406
version: "1.0"

contentstack_management/contentstack.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from enum import Enum
2+
import os
3+
import pyotp
24
from ._api_client import _APIClient
35
from contentstack_management.organizations import organization
46
from contentstack_management.stack import stack
@@ -36,12 +38,14 @@ def __init__(self, host: str = 'api.contentstack.io', scheme: str = 'https://',
3638
region: Region = Region.US.value, version='v3', timeout=2, max_retries: int = 18, early_access: list = None,
3739
oauth_config: dict = None, **kwargs):
3840
self.endpoint = 'https://api.contentstack.io/v3/'
39-
if region is not None and host is not None and region is not Region.US.value:
40-
self.endpoint = f'{scheme}{region}-{host}/{version}/'
41-
if region is not None and host is None and region is not Region.US.value:
42-
host = 'api.contentstack.com'
43-
self.endpoint = f'{scheme}{region}-{host}/{version}/'
44-
if host is not None and region is None:
41+
42+
if region is not None and region is not Region.US.value:
43+
if host is not None and host != 'api.contentstack.io':
44+
self.endpoint = f'{scheme}{region}-api.{host}/{version}/'
45+
else:
46+
host = 'api.contentstack.com'
47+
self.endpoint = f'{scheme}{region}-{host}/{version}/'
48+
elif host is not None and host != 'api.contentstack.io':
4549
self.endpoint = f'{scheme}{host}/{version}/'
4650
if headers is None:
4751
headers = {}
@@ -91,9 +95,36 @@ def __init__(self, host: str = 'api.contentstack.io', scheme: str = 'https://',
9195
-------------------------------
9296
"""
9397

94-
def login(self, email: str, password: str, tfa_token: str = None):
95-
return user_session.UserSession(self.client).login(email, password, tfa_token)
96-
pass
98+
def login(self, email: str, password: str, tfa_token: str = None, mfa_secret: str = None):
99+
"""
100+
Login to Contentstack with optional TOTP support.
101+
102+
:param email: User's email address
103+
:param password: User's password
104+
:param tfa_token: Optional two-factor authentication token
105+
:param mfa_secret: Optional MFA secret for automatic TOTP generation.
106+
If not provided, will check MFA_SECRET environment variable
107+
:return: Response object from the login request
108+
"""
109+
final_tfa_token = tfa_token
110+
111+
if not mfa_secret:
112+
mfa_secret = os.getenv('MFA_SECRET')
113+
114+
if mfa_secret and not tfa_token:
115+
final_tfa_token = self._generate_totp(mfa_secret)
116+
117+
return user_session.UserSession(self.client).login(email, password, final_tfa_token)
118+
119+
def _generate_totp(self, secret: str) -> str:
120+
"""
121+
Generate a Time-Based One-Time Password (TOTP) from the provided secret.
122+
123+
:param secret: The MFA secret key for TOTP generation
124+
:return: The current TOTP code as a string
125+
"""
126+
totp = pyotp.TOTP(secret)
127+
return totp.now()
97128

98129
def logout(self):
99130
return user_session.UserSession(client=self.client).logout()

contentstack_management/user_session/user_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def login(self, email=None, password=None, tfa_token=None):
6363
}
6464

6565
if tfa_token is not None:
66-
data["user"]["tf_token"] = tfa_token
66+
data["user"]["tfa_token"] = tfa_token
6767

6868
data = json.dumps(data)
6969
response = self.client.post(_path, headers=self.client.headers, data=data, json_data=None)

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ requests>=2.32.0,<3.0.0
44
pylint>=2.0.0
55
bson>=0.5.9,<1.0.0
66
requests-toolbelt>=1.0.0,<2.0.0
7+
pyotp==2.9.0

tests/unit/contentstack/test_contentstack.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,35 @@ class ContentstackRegionUnitTests(unittest.TestCase):
88
def test_au_region(self):
99
"""Test that au region creates the correct endpoint URL"""
1010
client = contentstack_management.Client(authtoken='your_authtoken', region='au')
11-
expected_endpoint = 'https://au-api.contentstack.io/v3/'
11+
expected_endpoint = 'https://au-api.contentstack.com/v3/'
1212
self.assertEqual(client.endpoint, expected_endpoint)
1313

1414
def test_gcp_eu_region(self):
1515
"""Test that gcp-eu region creates the correct endpoint URL"""
1616
client = contentstack_management.Client(authtoken='your_authtoken', region='gcp-eu')
17-
expected_endpoint = 'https://gcp-eu-api.contentstack.io/v3/'
17+
expected_endpoint = 'https://gcp-eu-api.contentstack.com/v3/'
1818
self.assertEqual(client.endpoint, expected_endpoint)
1919

2020
def test_azure_eu_region(self):
2121
"""Test that azure-eu region creates the correct endpoint URL"""
2222
client = contentstack_management.Client(authtoken='your_authtoken', region='azure-eu')
23-
expected_endpoint = 'https://azure-eu-api.contentstack.io/v3/'
23+
expected_endpoint = 'https://azure-eu-api.contentstack.com/v3/'
2424
self.assertEqual(client.endpoint, expected_endpoint)
2525

2626
def test_azure_na_region(self):
2727
"""Test that azure-na region creates the correct endpoint URL"""
2828
client = contentstack_management.Client(authtoken='your_authtoken', region='azure-na')
29-
expected_endpoint = 'https://azure-na-api.contentstack.io/v3/'
29+
expected_endpoint = 'https://azure-na-api.contentstack.com/v3/'
3030
self.assertEqual(client.endpoint, expected_endpoint)
3131

3232
def test_au_region_with_custom_host(self):
3333
"""Test that au region with custom host creates the correct endpoint URL"""
3434
client = contentstack_management.Client(
3535
authtoken='your_authtoken',
3636
region='au',
37-
host='custom.contentstack.io'
37+
host='example.com'
3838
)
39-
expected_endpoint = 'https://au-custom.contentstack.io/v3/'
39+
expected_endpoint = 'https://au-api.example.com/v3/'
4040
self.assertEqual(client.endpoint, expected_endpoint)
4141

4242
def test_gcp_eu_region_with_custom_host(self):
@@ -46,19 +46,19 @@ def test_gcp_eu_region_with_custom_host(self):
4646
region='gcp-eu',
4747
host='custom.contentstack.io'
4848
)
49-
expected_endpoint = 'https://gcp-eu-custom.contentstack.io/v3/'
49+
expected_endpoint = 'https://gcp-eu-api.custom.contentstack.io/v3/'
5050
self.assertEqual(client.endpoint, expected_endpoint)
5151

5252
def test_au_region_enum_value(self):
5353
"""Test that au region using enum value creates the correct endpoint URL"""
5454
client = contentstack_management.Client(authtoken='your_authtoken', region=Region.AU.value)
55-
expected_endpoint = 'https://au-api.contentstack.io/v3/'
55+
expected_endpoint = 'https://au-api.contentstack.com/v3/'
5656
self.assertEqual(client.endpoint, expected_endpoint)
5757

5858
def test_gcp_eu_region_enum_value(self):
5959
"""Test that gcp-eu region using enum value creates the correct endpoint URL"""
6060
client = contentstack_management.Client(authtoken='your_authtoken', region=Region.GCP_EU.value)
61-
expected_endpoint = 'https://gcp-eu-api.contentstack.io/v3/'
61+
expected_endpoint = 'https://gcp-eu-api.contentstack.com/v3/'
6262
self.assertEqual(client.endpoint, expected_endpoint)
6363

6464
def test_au_region_with_custom_scheme(self):
@@ -68,7 +68,7 @@ def test_au_region_with_custom_scheme(self):
6868
region='au',
6969
scheme='http://'
7070
)
71-
expected_endpoint = 'http://au-api.contentstack.io/v3/'
71+
expected_endpoint = 'http://au-api.contentstack.com/v3/'
7272
self.assertEqual(client.endpoint, expected_endpoint)
7373

7474
def test_gcp_eu_region_with_custom_scheme(self):
@@ -78,7 +78,7 @@ def test_gcp_eu_region_with_custom_scheme(self):
7878
region='gcp-eu',
7979
scheme='http://'
8080
)
81-
expected_endpoint = 'http://gcp-eu-api.contentstack.io/v3/'
81+
expected_endpoint = 'http://gcp-eu-api.contentstack.com/v3/'
8282
self.assertEqual(client.endpoint, expected_endpoint)
8383

8484
def test_au_region_with_custom_version(self):
@@ -88,7 +88,7 @@ def test_au_region_with_custom_version(self):
8888
region='au',
8989
version='v2'
9090
)
91-
expected_endpoint = 'https://au-api.contentstack.io/v2/'
91+
expected_endpoint = 'https://au-api.contentstack.com/v2/'
9292
self.assertEqual(client.endpoint, expected_endpoint)
9393

9494
def test_gcp_eu_region_with_custom_version(self):
@@ -98,7 +98,7 @@ def test_gcp_eu_region_with_custom_version(self):
9898
region='gcp-eu',
9999
version='v2'
100100
)
101-
expected_endpoint = 'https://gcp-eu-api.contentstack.io/v2/'
101+
expected_endpoint = 'https://gcp-eu-api.contentstack.com/v2/'
102102
self.assertEqual(client.endpoint, expected_endpoint)
103103

104104
def test_au_region_headers(self):
@@ -222,13 +222,13 @@ def test_us_region_default_behavior(self):
222222
def test_eu_region(self):
223223
"""Test that eu region creates the correct endpoint URL"""
224224
client = contentstack_management.Client(authtoken='your_authtoken', region='eu')
225-
expected_endpoint = 'https://eu-api.contentstack.io/v3/'
225+
expected_endpoint = 'https://eu-api.contentstack.com/v3/'
226226
self.assertEqual(client.endpoint, expected_endpoint)
227227

228228
def test_gcp_na_region(self):
229229
"""Test that gcp-na region creates the correct endpoint URL"""
230230
client = contentstack_management.Client(authtoken='your_authtoken', region='gcp-na')
231-
expected_endpoint = 'https://gcp-na-api.contentstack.io/v3/'
231+
expected_endpoint = 'https://gcp-na-api.contentstack.com/v3/'
232232
self.assertEqual(client.endpoint, expected_endpoint)
233233

234234
def test_region_with_none_host(self):

tests/unit/contentstack/test_contentstack_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def test_region_endpoint_construction_logic(self):
115115

116116
# Test non-US region with default host
117117
client = contentstack_management.Client(region='eu')
118-
self.assertEqual(client.endpoint, 'https://eu-api.contentstack.io/v3/')
118+
self.assertEqual(client.endpoint, 'https://eu-api.contentstack.com/v3/')
119119

120120
# Skip custom host tests due to implementation issues
121121
# Test custom host without region
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import unittest
2+
import os
3+
import sys
4+
from unittest.mock import patch, MagicMock
5+
6+
# Add the contentstack_management module to the path
7+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..', '..'))
8+
9+
import contentstack_management
10+
from contentstack_management.contentstack import Client
11+
12+
13+
class TOTPLoginTests(unittest.TestCase):
14+
"""Unit tests for TOTP login functionality in Contentstack Management Python SDK"""
15+
16+
def setUp(self):
17+
"""Set up test fixtures before each test method"""
18+
self.client = Client()
19+
self.test_email = "test@example.com"
20+
self.test_password = "test_password"
21+
self.test_secret = "JBSWY3DPEHPK3PXP" # Standard test secret for TOTP
22+
self.test_tfa_token = "123456"
23+
24+
def tearDown(self):
25+
"""Clean up after each test method"""
26+
# Clean up environment variables
27+
if 'MFA_SECRET' in os.environ:
28+
del os.environ['MFA_SECRET']
29+
30+
def test_login_method_signature_with_totp(self):
31+
"""Test that login method accepts TOTP parameters"""
32+
client = contentstack_management.Client()
33+
# Test that the method exists and can be called with the expected parameters
34+
self.assertTrue(hasattr(client, 'login'))
35+
self.assertTrue(callable(client.login))
36+
37+
# Test that the method accepts TOTP parameters without error
38+
try:
39+
client.login(self.test_email, self.test_password, tfa_token=self.test_tfa_token)
40+
client.login(self.test_email, self.test_password, mfa_secret=self.test_secret)
41+
client.login(self.test_email, self.test_password, tfa_token=self.test_tfa_token, mfa_secret=self.test_secret)
42+
except Exception as e:
43+
self.fail(f"Login method should accept TOTP parameters without error: {e}")
44+
45+
def test_generate_totp_method(self):
46+
"""Test the _generate_totp method generates correct TOTP codes"""
47+
# Test with a known secret and verify the TOTP generation
48+
totp_code = self.client._generate_totp(self.test_secret)
49+
50+
# Verify the TOTP code is a 6-digit string
51+
self.assertIsInstance(totp_code, str)
52+
self.assertEqual(len(totp_code), 6)
53+
self.assertTrue(totp_code.isdigit())
54+
55+
def test_login_with_mfa_secret_generates_totp(self):
56+
"""Test that login with mfa_secret generates TOTP automatically"""
57+
with patch.object(self.client, 'client') as mock_client:
58+
mock_response = MagicMock()
59+
mock_response.status_code = 200
60+
mock_response.json.return_value = {'user': {'authtoken': 'test_token'}}
61+
mock_client.post.return_value = mock_response
62+
63+
# Mock the UserSession class
64+
with patch('contentstack_management.user_session.user_session.UserSession') as mock_user_session:
65+
mock_session_instance = MagicMock()
66+
mock_session_instance.login.return_value = mock_response
67+
mock_user_session.return_value = mock_session_instance
68+
69+
# Mock the _generate_totp method to return a predictable value
70+
with patch.object(self.client, '_generate_totp', return_value='654321') as mock_generate_totp:
71+
result = self.client.login(
72+
self.test_email,
73+
self.test_password,
74+
mfa_secret=self.test_secret
75+
)
76+
77+
# Verify _generate_totp was called with the secret
78+
mock_generate_totp.assert_called_once_with(self.test_secret)
79+
80+
# Verify UserSession was called with generated TOTP
81+
mock_session_instance.login.assert_called_once_with(
82+
self.test_email,
83+
self.test_password,
84+
'654321'
85+
)
86+
self.assertEqual(result, mock_response)
87+
88+
def test_login_with_environment_variable(self):
89+
"""Test that login uses MFA_SECRET environment variable when mfa_secret is not provided"""
90+
# Set environment variable
91+
os.environ['MFA_SECRET'] = self.test_secret
92+
93+
with patch.object(self.client, 'client') as mock_client:
94+
mock_response = MagicMock()
95+
mock_response.status_code = 200
96+
mock_response.json.return_value = {'user': {'authtoken': 'test_token'}}
97+
mock_client.post.return_value = mock_response
98+
99+
# Mock the UserSession class
100+
with patch('contentstack_management.user_session.user_session.UserSession') as mock_user_session:
101+
mock_session_instance = MagicMock()
102+
mock_session_instance.login.return_value = mock_response
103+
mock_user_session.return_value = mock_session_instance
104+
105+
# Mock the _generate_totp method
106+
with patch.object(self.client, '_generate_totp', return_value='789012') as mock_generate_totp:
107+
result = self.client.login(self.test_email, self.test_password)
108+
109+
# Verify _generate_totp was called with the environment secret
110+
mock_generate_totp.assert_called_once_with(self.test_secret)
111+
112+
# Verify UserSession was called with generated TOTP
113+
mock_session_instance.login.assert_called_once_with(
114+
self.test_email,
115+
self.test_password,
116+
'789012'
117+
)
118+
self.assertEqual(result, mock_response)
119+
120+
def test_backward_compatibility(self):
121+
"""Test that existing login patterns continue to work (backward compatibility)"""
122+
with patch.object(self.client, 'client') as mock_client:
123+
mock_response = MagicMock()
124+
mock_response.status_code = 200
125+
mock_response.json.return_value = {'user': {'authtoken': 'test_token'}}
126+
mock_client.post.return_value = mock_response
127+
128+
# Mock the UserSession class
129+
with patch('contentstack_management.user_session.user_session.UserSession') as mock_user_session:
130+
mock_session_instance = MagicMock()
131+
mock_session_instance.login.return_value = mock_response
132+
mock_user_session.return_value = mock_session_instance
133+
134+
# Test old pattern: client.login(email, password)
135+
result1 = self.client.login(self.test_email, self.test_password)
136+
137+
# Test old pattern: client.login(email, password, tfa_token)
138+
result2 = self.client.login(self.test_email, self.test_password, self.test_tfa_token)
139+
140+
# Both should work without errors
141+
self.assertEqual(result1, mock_response)
142+
self.assertEqual(result2, mock_response)
143+
144+
145+
if __name__ == '__main__':
146+
unittest.main()

0 commit comments

Comments
 (0)