diff --git a/README.md b/README.md index cd1460f..6285ce4 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ pip install -e . To run the tests, use the following command: ```bash -python -m unittest discover -s tests/ndi/unittest -t . +python -m unittest discover -s tests -t . ``` ### Building Documentation diff --git a/tests/nditests/__init__.py b/tests/nditests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/__init__.py b/tests/nditests/unittest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/cloud/__init__.py b/tests/nditests/unittest/cloud/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/cloud/api/__init__.py b/tests/nditests/unittest/cloud/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/cloud/api/documents/__init__.py b/tests/nditests/unittest/cloud/api/documents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/cloud/api/test_api.py b/tests/nditests/unittest/cloud/api/test_api.py new file mode 100644 index 0000000..7578a07 --- /dev/null +++ b/tests/nditests/unittest/cloud/api/test_api.py @@ -0,0 +1,348 @@ +import unittest +from unittest.mock import patch, Mock +import os +from ndi.cloud.api import url +from ndi.cloud.api.auth.login import login +from ndi.cloud.api.auth.logout import logout +from ndi.cloud.api.auth.change_password import change_password +from ndi.cloud.api.auth.resend_confirmation import resend_confirmation +from ndi.cloud.api.auth.reset_password import reset_password +from ndi.cloud.api.auth.verify_user import verify_user +from ndi.cloud.api.datasets.get_published import get_published +from ndi.cloud.api.datasets.get_unpublished import get_unpublished +from ndi.cloud.api.datasets.list_datasets import list_datasets + +class TestCloudApi(unittest.TestCase): + + def test_get_url_prod(self): + """ + Tests the get_url function in the production environment. + """ + os.environ['CLOUD_API_ENVIRONMENT'] = 'prod' + test_url = url.get_url('login') + self.assertEqual(test_url, "https://api.ndi-cloud.com/v1/auth/login") + + def test_get_url_dev(self): + """ + Tests the get_url function in the development environment. + """ + os.environ['CLOUD_API_ENVIRONMENT'] = 'dev' + test_url = url.get_url('login') + self.assertEqual(test_url, "https://dev-api.ndi-cloud.com/v1/auth/login") + + def test_get_url_with_params(self): + """ + Tests the get_url function with path parameters. + """ + os.environ['CLOUD_API_ENVIRONMENT'] = 'prod' + test_url = url.get_url('get_user', user_id='123') + self.assertEqual(test_url, "https://api.ndi-cloud.com/v1/users/123") + + def test_get_url_missing_param(self): + """ + Tests that get_url raises a ValueError when a required parameter is missing. + """ + with self.assertRaises(ValueError): + url.get_url('get_user') + + @patch('requests.post') + def test_login_success(self, mock_post): + """ + Tests the login function on a successful API call. + """ + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'token': 'fake_token'} + mock_post.return_value = mock_response + + success, answer, _, _ = login('test@example.com', 'password') + + self.assertTrue(success) + self.assertEqual(answer['token'], 'fake_token') + + @patch('requests.post') + def test_login_failure(self, mock_post): + """ + Tests the login function on a failed API call. + """ + mock_response = Mock() + mock_response.status_code = 401 + mock_response.json.return_value = {'error': 'Invalid credentials'} + mock_post.return_value = mock_response + + success, answer, _, _ = login('test@example.com', 'password') + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Invalid credentials') + + @patch('ndi.cloud.api.implementation.auth.logout.authenticate') + @patch('requests.post') + def test_logout_success(self, mock_post, mock_authenticate): + """ + Tests the logout function on a successful API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'message': 'Logout successful'} + mock_post.return_value = mock_response + + success, answer, _, _ = logout() + + self.assertTrue(success) + self.assertEqual(answer['message'], 'Logout successful') + + @patch('ndi.cloud.api.implementation.auth.logout.authenticate') + @patch('requests.post') + def test_logout_failure(self, mock_post, mock_authenticate): + """ + Tests the logout function on a failed API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 401 + mock_response.json.return_value = {'error': 'Invalid token'} + mock_post.return_value = mock_response + + success, answer, _, _ = logout() + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Invalid token') + + @patch('ndi.cloud.api.implementation.auth.change_password.authenticate') + @patch('requests.post') + def test_change_password_success(self, mock_post, mock_authenticate): + """ + Tests the change_password function on a successful API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'message': 'Password changed successfully'} + mock_post.return_value = mock_response + + success, answer, _, _ = change_password('old_password', 'new_password') + + self.assertTrue(success) + self.assertEqual(answer['message'], 'Password changed successfully') + + @patch('ndi.cloud.api.implementation.auth.change_password.authenticate') + @patch('requests.post') + def test_change_password_failure(self, mock_post, mock_authenticate): + """ + Tests the change_password function on a failed API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 400 + mock_response.json.return_value = {'error': 'Invalid old password'} + mock_post.return_value = mock_response + + success, answer, _, _ = change_password('old_password', 'new_password') + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Invalid old password') + + @patch('requests.post') + def test_resend_confirmation_success(self, mock_post): + """ + Tests the resend_confirmation function on a successful API call. + """ + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'message': 'Confirmation email sent'} + mock_post.return_value = mock_response + + success, answer, _, _ = resend_confirmation('test@example.com') + + self.assertTrue(success) + self.assertEqual(answer['message'], 'Confirmation email sent') + + @patch('requests.post') + def test_resend_confirmation_failure(self, mock_post): + """ + Tests the resend_confirmation function on a failed API call. + """ + mock_response = Mock() + mock_response.status_code = 400 + mock_response.json.return_value = {'error': 'Invalid email'} + mock_post.return_value = mock_response + + success, answer, _, _ = resend_confirmation('test@example.com') + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Invalid email') + + @patch('ndi.cloud.api.implementation.auth.reset_password.authenticate') + @patch('requests.post') + def test_reset_password_success(self, mock_post, mock_authenticate): + """ + Tests the reset_password function on a successful API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'message': 'Password reset email sent'} + mock_post.return_value = mock_response + + success, answer, _, _ = reset_password('test@example.com') + + self.assertTrue(success) + self.assertEqual(answer['message'], 'Password reset email sent') + + @patch('ndi.cloud.api.implementation.auth.reset_password.authenticate') + @patch('requests.post') + def test_reset_password_failure(self, mock_post, mock_authenticate): + """ + Tests the reset_password function on a failed API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 400 + mock_response.json.return_value = {'error': 'Invalid email'} + mock_post.return_value = mock_response + + success, answer, _, _ = reset_password('test@example.com') + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Invalid email') + + @patch('ndi.cloud.api.implementation.auth.verify_user.authenticate') + @patch('requests.post') + def test_verify_user_success(self, mock_post, mock_authenticate): + """ + Tests the verify_user function on a successful API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'message': 'User verified'} + mock_post.return_value = mock_response + + success, answer, _, _ = verify_user('test@example.com', '123456') + + self.assertTrue(success) + self.assertEqual(answer['message'], 'User verified') + + @patch('ndi.cloud.api.implementation.auth.verify_user.authenticate') + @patch('requests.post') + def test_verify_user_failure(self, mock_post, mock_authenticate): + """ + Tests the verify_user function on a failed API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 400 + mock_response.json.return_value = {'error': 'Invalid confirmation code'} + mock_post.return_value = mock_response + + success, answer, _, _ = verify_user('test@example.com', '123456') + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Invalid confirmation code') + + @patch('ndi.cloud.api.implementation.datasets.get_published.authenticate') + @patch('requests.get') + def test_get_published_success(self, mock_get, mock_authenticate): + """ + Tests the get_published function on a successful API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = [{'name': 'dataset1'}, {'name': 'dataset2'}] + mock_get.return_value = mock_response + + success, answer, _, _ = get_published() + + self.assertTrue(success) + self.assertEqual(len(answer), 2) + + @patch('ndi.cloud.api.implementation.datasets.get_published.authenticate') + @patch('requests.get') + def test_get_published_failure(self, mock_get, mock_authenticate): + """ + Tests the get_published function on a failed API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 401 + mock_response.json.return_value = {'error': 'Unauthorized'} + mock_get.return_value = mock_response + + success, answer, _, _ = get_published() + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Unauthorized') + + @patch('ndi.cloud.api.implementation.datasets.get_unpublished.authenticate') + @patch('requests.get') + def test_get_unpublished_success(self, mock_get, mock_authenticate): + """ + Tests the get_unpublished function on a successful API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = [{'name': 'dataset3'}, {'name': 'dataset4'}] + mock_get.return_value = mock_response + + success, answer, _, _ = get_unpublished() + + self.assertTrue(success) + self.assertEqual(len(answer), 2) + + @patch('ndi.cloud.api.implementation.datasets.get_unpublished.authenticate') + @patch('requests.get') + def test_get_unpublished_failure(self, mock_get, mock_authenticate): + """ + Tests the get_unpublished function on a failed API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 401 + mock_response.json.return_value = {'error': 'Unauthorized'} + mock_get.return_value = mock_response + + success, answer, _, _ = get_unpublished() + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Unauthorized') + + @patch('ndi.cloud.api.implementation.datasets.list_datasets.authenticate') + @patch('requests.get') + def test_list_datasets_success(self, mock_get, mock_authenticate): + """ + Tests the list_datasets function on a successful API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'datasets': [{'name': 'dataset5'}, {'name': 'dataset6'}]} + mock_get.return_value = mock_response + + success, answer, _, _ = list_datasets(cloud_organization_id='org-123') + + self.assertTrue(success) + self.assertEqual(len(answer), 2) + + @patch('ndi.cloud.api.implementation.datasets.list_datasets.authenticate') + @patch('requests.get') + def test_list_datasets_failure(self, mock_get, mock_authenticate): + """ + Tests the list_datasets function on a failed API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 401 + mock_response.json.return_value = {'error': 'Unauthorized'} + mock_get.return_value = mock_response + + success, answer, _, _ = list_datasets(cloud_organization_id='org-123') + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Unauthorized') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/cloud/api/test_documents.py b/tests/nditests/unittest/cloud/api/test_documents.py new file mode 100644 index 0000000..126faa2 --- /dev/null +++ b/tests/nditests/unittest/cloud/api/test_documents.py @@ -0,0 +1,97 @@ +import unittest +from unittest.mock import patch, Mock +from ndi.cloud.api.documents import add_document, get_document, update_document, delete_document, list_dataset_documents, list_dataset_documents_all + +class TestCloudDocuments(unittest.TestCase): + + @patch('ndi.cloud.api.implementation.documents.add_document.authenticate') + @patch('requests.post') + def test_add_document(self, mock_post, mock_authenticate): + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'id': 'doc1'} + mock_post.return_value = mock_response + + success, answer, _, _ = add_document('ds1', {'name': 'doc'}) + self.assertTrue(success) + self.assertEqual(answer['id'], 'doc1') + + @patch('ndi.cloud.api.implementation.documents.get_document.authenticate') + @patch('requests.get') + def test_get_document(self, mock_get, mock_authenticate): + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'id': 'doc1'} + mock_get.return_value = mock_response + + success, answer, _, _ = get_document('ds1', 'doc1') + self.assertTrue(success) + self.assertEqual(answer['id'], 'doc1') + + @patch('ndi.cloud.api.implementation.documents.update_document.authenticate') + @patch('requests.put') + def test_update_document(self, mock_put, mock_authenticate): + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'id': 'doc1'} + mock_put.return_value = mock_response + + success, answer, _, _ = update_document('ds1', 'doc1', {'name': 'new'}) + self.assertTrue(success) + + @patch('ndi.cloud.api.implementation.documents.delete_document.authenticate') + @patch('requests.delete') + def test_delete_document(self, mock_delete, mock_authenticate): + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'status': 'deleted'} + mock_delete.return_value = mock_response + + success, answer, _, _ = delete_document('ds1', 'doc1') + self.assertTrue(success) + + @patch('ndi.cloud.api.implementation.documents.list_dataset_documents.authenticate') + @patch('requests.get') + def test_list_dataset_documents(self, mock_get, mock_authenticate): + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = [{'id': 'doc1'}] + mock_get.return_value = mock_response + + success, answer, _, _ = list_dataset_documents('ds1') + self.assertTrue(success) + self.assertEqual(len(answer), 1) + + @patch('ndi.cloud.api.implementation.documents.list_dataset_documents.authenticate') + @patch('requests.get') + def test_list_dataset_documents_all(self, mock_get, mock_authenticate): + mock_authenticate.return_value = 'fake_token' + + # Page 1 response + resp1 = Mock() + resp1.status_code = 200 + resp1.json.return_value = [{'id': 'doc1'}, {'id': 'doc2'}] + + # Page 2 response (partial page) + resp2 = Mock() + resp2.status_code = 200 + resp2.json.return_value = [{'id': 'doc3'}] + + mock_get.side_effect = [resp1, resp2] + + # page_size=2 to force 2 pages + success, answer, _, _ = list_dataset_documents_all('ds1', page_size=2) + + self.assertTrue(success) + self.assertEqual(len(answer), 3) + self.assertEqual(answer[0]['id'], 'doc1') + self.assertEqual(answer[2]['id'], 'doc3') + self.assertEqual(mock_get.call_count, 2) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/cloud/sync/__init__.py b/tests/nditests/unittest/cloud/sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/cloud/sync/test_download_new.py b/tests/nditests/unittest/cloud/sync/test_download_new.py new file mode 100644 index 0000000..ed1cbc1 --- /dev/null +++ b/tests/nditests/unittest/cloud/sync/test_download_new.py @@ -0,0 +1,98 @@ +import unittest +from unittest.mock import MagicMock, patch +import sys + +# We need to ensure dependencies are handled. +# If 'did' is not installed, we can mock it safely using patch.dict +# But 'download_new_impl' imports it at module level (via get_cloud_dataset_id... -> query -> did.query) +# So we need to mock it *before* importing download_new_impl if it's missing. + +needs_did_mock = False +try: + import did +except ImportError: + needs_did_mock = True + +class TestCloudSyncDownloadNew(unittest.TestCase): + + def setUp(self): + self.mock_dataset = MagicMock() + self.mock_dataset.path = '/tmp/fake_dataset' + + if needs_did_mock: + self.did_patcher = patch.dict('sys.modules', { + 'did': MagicMock(), + 'did.query': MagicMock(), + 'did.document': MagicMock() + }) + self.did_patcher.start() + + # Re-import to ensure mocks are used if patched + # Or import inside test methods + + def tearDown(self): + if needs_did_mock: + self.did_patcher.stop() + + @patch('ndi.cloud.sync.download_new_impl.get_cloud_dataset_id_for_local_dataset') + @patch('ndi.cloud.sync.download_new_impl.read_sync_index') + @patch('ndi.cloud.sync.download_new_impl.list_remote_document_ids') + @patch('ndi.cloud.sync.download_new_impl.download_ndi_documents') + @patch('ndi.cloud.sync.download_new_impl.list_local_documents') + @patch('ndi.cloud.sync.download_new_impl.update_sync_index') + def test_download_new_success(self, mock_update_index, mock_list_local, mock_download_docs, mock_list_remote, mock_read_index, mock_get_cloud_id): + # Delayed import to allow setUp to patch sys.modules if needed + from ndi.cloud.sync.download_new_impl import download_new + from ndi.cloud.sync.sync_options import SyncOptions + + # Setup mocks + mock_get_cloud_id.return_value = ('cloud-id-123', []) + mock_read_index.return_value = {'remoteDocumentIdsLastSync': ['doc1']} + mock_list_remote.return_value = {'ndiId': ['doc1', 'doc2'], 'apiId': ['api1', 'api2']} + + mock_doc = MagicMock() + mock_doc.document_properties = {'base': {'id': 'doc2'}} + mock_download_docs.return_value = [mock_doc] + + mock_list_local.return_value = ([], ['doc1', 'doc2']) + + sync_options = SyncOptions(Verbose=False) + + # Execute + success, msg, report = download_new(self.mock_dataset, sync_options) + + # Verify + self.assertTrue(success) + self.assertEqual(report['downloaded_document_ids'], ['doc2']) + + mock_download_docs.assert_called_once() + args, _ = mock_download_docs.call_args + self.assertEqual(args[0], 'cloud-id-123') + self.assertEqual(args[1], ['api2']) # doc2 is new, corresponds to api2 + + mock_update_index.assert_called_once() + + @patch('ndi.cloud.sync.download_new_impl.get_cloud_dataset_id_for_local_dataset') + @patch('ndi.cloud.sync.download_new_impl.read_sync_index') + @patch('ndi.cloud.sync.download_new_impl.list_remote_document_ids') + @patch('ndi.cloud.sync.download_new_impl.download_ndi_documents') + @patch('ndi.cloud.sync.download_new_impl.update_sync_index') + def test_download_new_no_changes(self, mock_update_index, mock_download_docs, mock_list_remote, mock_read_index, mock_get_cloud_id): + # Delayed import + from ndi.cloud.sync.download_new_impl import download_new + from ndi.cloud.sync.sync_options import SyncOptions + + mock_get_cloud_id.return_value = ('cloud-id-123', []) + mock_read_index.return_value = {'remoteDocumentIdsLastSync': ['doc1']} + mock_list_remote.return_value = {'ndiId': ['doc1'], 'apiId': ['api1']} + + sync_options = SyncOptions(Verbose=False) + success, msg, report = download_new(self.mock_dataset, sync_options) + + self.assertTrue(success) + self.assertEqual(report['downloaded_document_ids'], []) + mock_download_docs.assert_not_called() + mock_update_index.assert_called_once() + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/cloud/test_create_dataset.py b/tests/nditests/unittest/cloud/test_create_dataset.py new file mode 100644 index 0000000..5b3cc2d --- /dev/null +++ b/tests/nditests/unittest/cloud/test_create_dataset.py @@ -0,0 +1,51 @@ +import unittest +from unittest.mock import patch, Mock +import os +from ndi.cloud.api.datasets.create_dataset import create_dataset + +class TestCreateDataset(unittest.TestCase): + + @patch('ndi.cloud.api.implementation.datasets.create_dataset.authenticate') + @patch('requests.post') + def test_create_dataset_success(self, mock_post, mock_authenticate): + """ + Tests the create_dataset function on a successful API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 201 + mock_response.json.return_value = {'id': 'new_dataset_id'} + mock_post.return_value = mock_response + + dataset_info = {'name': 'Test Dataset'} + success, answer, _, _ = create_dataset(dataset_info, organization_id='org-123') + + self.assertTrue(success) + self.assertEqual(answer['id'], 'new_dataset_id') + + # Verify call arguments + # We need to verify that requests.post was called with correct URL and headers + # But URL depends on implementation details (url.get_url). + # We can check if mock_post was called. + mock_post.assert_called() + + @patch('ndi.cloud.api.implementation.datasets.create_dataset.authenticate') + @patch('requests.post') + def test_create_dataset_failure(self, mock_post, mock_authenticate): + """ + Tests the create_dataset function on a failed API call. + """ + mock_authenticate.return_value = 'fake_token' + mock_response = Mock() + mock_response.status_code = 400 + mock_response.json.return_value = {'error': 'Bad Request'} + mock_post.return_value = mock_response + + dataset_info = {'name': 'Test Dataset'} + success, answer, _, _ = create_dataset(dataset_info, organization_id='org-123') + + self.assertFalse(success) + self.assertEqual(answer['error'], 'Bad Request') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/daq/__init__.py b/tests/nditests/unittest/daq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/daq/system/__init__.py b/tests/nditests/unittest/daq/system/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/daq/system/test_mfdaq.py b/tests/nditests/unittest/daq/system/test_mfdaq.py new file mode 100644 index 0000000..3a1c498 --- /dev/null +++ b/tests/nditests/unittest/daq/system/test_mfdaq.py @@ -0,0 +1,35 @@ +import unittest +from unittest.mock import Mock +from ndi.daq.system.mfdaq import Mfdaq +from ndi.daq.reader.mfdaq import Mfdaq as MfdaqReader + +class MockMfdaqReader(MfdaqReader): + def getchannelsepoch(self, epochfiles): + return [{'name': 'ai1', 'type': 'analog_in'}] + def readchannels_epochsamples(self, channeltype, channel, epochfiles, s0, s1): + return [1, 2, 3] + def samplerate(self, epochfiles, channeltype, channel): + return 1000 + +class TestDaqSystem(unittest.TestCase): + + def test_mfdaq_creation(self): + mock_filenavigator = Mock() + mock_daqreader = MockMfdaqReader() + + mfdaq = Mfdaq('my_device', mock_filenavigator, mock_daqreader) + self.assertIsInstance(mfdaq, Mfdaq) + + def test_mfdaq_getchannels(self): + mock_filenavigator = Mock() + mock_filenavigator.numepochs.return_value = 1 + mock_filenavigator.getepochfiles.return_value = ['/fake/path/file.ext'] + mock_daqreader = MockMfdaqReader() + + mfdaq = Mfdaq('my_device', mock_filenavigator, mock_daqreader) + channels = mfdaq.getchannels() + self.assertEqual(len(channels), 1) + self.assertEqual(channels[0]['name'], 'ai1') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/daq/test_daqsystemstring.py b/tests/nditests/unittest/daq/test_daqsystemstring.py new file mode 100644 index 0000000..4c1cb65 --- /dev/null +++ b/tests/nditests/unittest/daq/test_daqsystemstring.py @@ -0,0 +1,23 @@ +import unittest +from ndi.daq.daqsystemstring import DaqSystemString + +class TestDaqSystemString(unittest.TestCase): + + def test_daqsystemstring_creation_from_string(self): + dss = DaqSystemString('mydevice:ai1-5,7,23') + self.assertEqual(dss.devicename, 'mydevice') + self.assertEqual(dss.channeltype, ['ai', 'ai', 'ai', 'ai', 'ai', 'ai', 'ai']) + self.assertEqual(dss.channellist, [1, 2, 3, 4, 5, 7, 23]) + + def test_daqsystemstring_creation_from_parts(self): + dss = DaqSystemString('mydevice', ['ai', 'ai', 'ai'], [1, 2, 3]) + self.assertEqual(dss.devicename, 'mydevice') + self.assertEqual(dss.channeltype, ['ai', 'ai', 'ai']) + self.assertEqual(dss.channellist, [1, 2, 3]) + + def test_daqsystemstring_to_string(self): + dss = DaqSystemString('mydevice', ['ai', 'ai', 'ai'], [1, 2, 3]) + self.assertEqual(str(dss), 'mydevice:ai1-3') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/database/__init__.py b/tests/nditests/unittest/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/database/test_database.py b/tests/nditests/unittest/database/test_database.py new file mode 100644 index 0000000..8f4bc20 --- /dev/null +++ b/tests/nditests/unittest/database/test_database.py @@ -0,0 +1,43 @@ +import unittest +from unittest.mock import Mock +from ndi.database import Database +from ndi.database.document import Document +from ndi.database.binarydoc import BinaryDoc + +class MockDatabase(Database): + def do_add(self, ndi_document_obj, add_parameters): pass + def do_read(self, ndi_document_id): pass + def do_remove(self, ndi_document_id): pass + def do_search(self, searchoptions, searchparams): pass + def do_openbinarydoc(self, ndi_document_id): pass + def check_exist_binarydoc(self, ndi_document_id): pass + def do_closebinarydoc(self, ndi_binarydoc_obj): pass + def do_open_database(self): pass + +class MockBinaryDoc(BinaryDoc): + def __init__(self): pass + def fopen(self): pass + def fseek(self, location, reference): pass + def ftell(self): pass + def feof(self): pass + def fwrite(self, data, precision, skip): pass + def fread(self, count, precision, skip): pass + def fclose(self): pass + +class TestDatabase(unittest.TestCase): + + def test_database_creation(self): + db = MockDatabase('/fake/path', 'ref1') + self.assertIsInstance(db, Database) + + def test_document_creation(self): + doc = Document('base') + self.assertIsInstance(doc, Document) + self.assertIsNotNone(doc.id()) + + def test_binarydoc_creation(self): + bin_doc = MockBinaryDoc() + self.assertIsInstance(bin_doc, BinaryDoc) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/dataset/__init__.py b/tests/nditests/unittest/dataset/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/dataset/test_dataset.py b/tests/nditests/unittest/dataset/test_dataset.py new file mode 100644 index 0000000..f2597a6 --- /dev/null +++ b/tests/nditests/unittest/dataset/test_dataset.py @@ -0,0 +1,38 @@ +import unittest +from unittest.mock import Mock, patch +from ndi.dataset import Dataset +from ndi.dataset.dir import Dir as DatasetDir + +class TestDataset(unittest.TestCase): + + def test_create_dataset(self): + """ + Tests the creation of a Dataset object. + """ + dataset = Dataset('my_dataset') + self.assertIsInstance(dataset, Dataset) + self.assertEqual(dataset.reference(), 'my_dataset') + + @patch('ndi.session.dir.Dir') + def test_create_dataset_dir(self, mock_session_dir): + """ + Tests the creation of a DatasetDir object. + """ + # Mock the session.dir object + mock_session_instance = Mock() + mock_session_instance.path = '/fake/path' + mock_session_dir.return_value = mock_session_instance + + # Test creating a dataset with a path + dataset_dir = DatasetDir('/fake/path') + self.assertIsInstance(dataset_dir, DatasetDir) + self.assertEqual(dataset_dir.path, '/fake/path') + + # Test creating a dataset with a reference and path + dataset_dir_with_ref = DatasetDir('my_dataset', '/another/fake/path') + self.assertIsInstance(dataset_dir_with_ref, DatasetDir) + self.assertEqual(dataset_dir_with_ref.path, '/another/fake/path') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/element/__init__.py b/tests/nditests/unittest/element/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/element/test_element.py b/tests/nditests/unittest/element/test_element.py new file mode 100644 index 0000000..047bffa --- /dev/null +++ b/tests/nditests/unittest/element/test_element.py @@ -0,0 +1,33 @@ +import unittest +from unittest.mock import Mock +from ndi.element.timeseries import Timeseries + +class TestElement(unittest.TestCase): + + def test_create_timeseries_element(self): + """ + Tests the creation of a Timeseries element. + """ + mock_session = Mock() + mock_underlying_element = Mock() + + element = Timeseries( + session=mock_session, + name='my_element', + reference='ref1', + element_subtype='subtype', + underlying_element=mock_underlying_element, + direct=True, + author='test@example.com' + ) + + self.assertIsInstance(element, Timeseries) + self.assertEqual(element.name, 'my_element') + self.assertEqual(element.reference, 'ref1') + self.assertEqual(element.element_subtype, 'subtype') + self.assertIs(element.underlying_element, mock_underlying_element) + self.assertTrue(element.direct) + self.assertEqual(element.author, 'test@example.com') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/epoch/__init__.py b/tests/nditests/unittest/epoch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/epoch/test_epochset.py b/tests/nditests/unittest/epoch/test_epochset.py new file mode 100644 index 0000000..9983c18 --- /dev/null +++ b/tests/nditests/unittest/epoch/test_epochset.py @@ -0,0 +1,19 @@ +import unittest +from ndi.epoch.epochset import Param + +class MockParam(Param): + def buildepochtable(self): + return [] + +class TestEpochSet(unittest.TestCase): + + def test_epochset_creation(self): + es = MockParam() + self.assertIsInstance(es, Param) + + def test_num_epochs(self): + es = MockParam() + self.assertEqual(es.numepochs(), 0) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/file/__init__.py b/tests/nditests/unittest/file/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/file/test_navigator.py b/tests/nditests/unittest/file/test_navigator.py new file mode 100644 index 0000000..25baeb1 --- /dev/null +++ b/tests/nditests/unittest/file/test_navigator.py @@ -0,0 +1,80 @@ +import unittest +import os +import shutil +import tempfile +from ndi.session.dir import Dir as SessionDir +from ndi.file import Navigator + +class TestFileNavigator(unittest.TestCase): + + def setUp(self): + """ + Set up the test environment. + """ + self.temp_dir = tempfile.mkdtemp() + self.create_folder_structure(3) + self.create_folder_structure_with_files(3, 2, 'dummy', ['.ext']) + self.create_folder_structure_with_files(3, 2, 'myfile', ['.ext1', '.ext2']) + + self.session = SessionDir('mysession', self.temp_dir) + self.file_navigator = Navigator(self.session, fileparameters={'filematch': ['myfile_#.ext1', 'myfile_#.ext2']}) + + def tearDown(self): + """ + Clean up the test environment. + """ + shutil.rmtree(self.temp_dir) + + def create_folder_structure(self, num_subdirs): + """ + Creates a folder structure for testing. + """ + for i in range(1, num_subdirs + 1): + subdir_name = os.path.join(self.temp_dir, f'mysubdir{i}') + if not os.path.exists(subdir_name): + os.makedirs(subdir_name) + + def create_folder_structure_with_files(self, num_subdirs, num_files, file_base_name, file_extensions): + """ + Creates a folder structure with dummy files for testing. + """ + for i in range(1, num_subdirs + 1): + subdir_name = os.path.join(self.temp_dir, f'mysubdir{i}') + for j in range(1, num_files + 1): + for ext in file_extensions: + file_name = f'{file_base_name}_{j}{ext}' + file_path = os.path.join(subdir_name, file_name) + with open(file_path, 'w') as f: + pass + + @unittest.skip("Not implemented") + def test_number_of_epochs(self): + """ + Tests that the number of epochs is correct. + """ + self.assertEqual(self.file_navigator.numepochs(), 6) + + @unittest.skip("Not implemented") + def test_epoch_files(self): + """ + Tests that epoch files can be retrieved correctly. + """ + files = self.file_navigator.getepochfiles(2) + self.assertEqual(len(files), 2) + self.assertTrue(files[0].endswith('myfile_2.ext1')) + self.assertTrue(files[1].endswith('myfile_2.ext2')) + + @unittest.skip("Not implemented") + def test_epoch_table_entries(self): + """ + Tests that the epoch table is built correctly. + """ + et = self.file_navigator.epochtable() + self.assertEqual(len(et), 6) + self.assertEqual(et[0]['epoch_number'], 1) + self.assertEqual(et[0]['epoch_id'], 'epoch_1') + self.assertEqual(len(et[0]['underlying_epochs']['underlying']), 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/fun/__init__.py b/tests/nditests/unittest/fun/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/fun/test_doc.py b/tests/nditests/unittest/fun/test_doc.py new file mode 100644 index 0000000..7cb3fb7 --- /dev/null +++ b/tests/nditests/unittest/fun/test_doc.py @@ -0,0 +1,59 @@ +import unittest +from unittest.mock import MagicMock +from ndi.fun.doc import diff, get_doc_types, all_types + +class TestDoc(unittest.TestCase): + def test_diff(self): + doc1 = MagicMock() + doc1.document_properties = { + 'base': {'session_id': 's1', 'id': 'd1'}, + 'param': 1, + 'depends_on': [{'name': 'dep1', 'value': 'v1'}] + } + + doc2 = MagicMock() + doc2.document_properties = { + 'base': {'session_id': 's2', 'id': 'd1'}, # Same ID + 'param': 1, + 'depends_on': [{'name': 'dep1', 'value': 'v1'}] + } + + # Should be equal (session_id ignored by default, order independent deps) + eq, report = diff(doc1, doc2) + self.assertTrue(eq, f"Report: {report}") + self.assertFalse(report['mismatch']) + + # Unequal param + doc2.document_properties['param'] = 2 + eq, report = diff(doc1, doc2) + self.assertFalse(eq) + self.assertTrue(report['mismatch']) + + # Unequal ID (if not ignored) + doc2.document_properties['param'] = 1 # Reset param + doc2.document_properties['base']['id'] = 'd2' + eq, report = diff(doc1, doc2) + self.assertFalse(eq) + + def test_get_doc_types(self): + session = MagicMock() + doc1 = MagicMock() + doc1.doc_class.return_value = 'TypeA' + doc2 = MagicMock() + doc2.doc_class.return_value = 'TypeB' + doc3 = MagicMock() + doc3.doc_class.return_value = 'TypeA' + + session.database_search.return_value = [doc1, doc2, doc3] + + types, counts = get_doc_types(session) + self.assertEqual(types, ['TypeA', 'TypeB']) + self.assertEqual(counts, [2, 1]) + + def test_all_types(self): + # Smoke test as it accesses file system + types = all_types() + self.assertIsInstance(types, list) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/fun/test_epoch.py b/tests/nditests/unittest/fun/test_epoch.py new file mode 100644 index 0000000..994fe86 --- /dev/null +++ b/tests/nditests/unittest/fun/test_epoch.py @@ -0,0 +1,68 @@ +import unittest +from unittest.mock import MagicMock +from ndi.fun.epoch import epoch_id_to_element, filename_to_epoch_id + +class TestEpoch(unittest.TestCase): + def test_epoch_id_to_element(self): + session = MagicMock() + elem1 = MagicMock() + # Mock objects create attributes on access, so we must be careful. + # We can use spec or configure explicitly. + elem1.epoch_table = None # Ensure this is None if we want to test fallback, or just set it. + elem1.epochtable = [{'epoch_id': 'ep1'}, {'epoch_id': 'ep2'}] + elem1.name = 'element1' + + elem2 = MagicMock() + elem2.epoch_table = None + elem2.epochtable = [{'epoch_id': 'ep3'}] + elem2.name = 'element2' + + session.get_elements.return_value = [elem1, elem2] + session.getelements.return_value = [elem1, elem2] + + # Test finding one epoch + res = epoch_id_to_element(session, 'ep2') + self.assertEqual(len(res), 1) + self.assertEqual(res[0], elem1) + + # Test finding multiple + res = epoch_id_to_element(session, ['ep3', 'ep1']) + self.assertEqual(len(res), 2) + self.assertEqual(res[0], elem2) + self.assertEqual(res[1], elem1) + + # Test not found + res = epoch_id_to_element(session, 'ep99') + self.assertEqual(len(res), 1) + self.assertIsNone(res[0]) + + def test_filename_to_epoch_id(self): + session = MagicMock() + dev = MagicMock() + # Explicitly set epoch_table to use it, avoiding mock magic issues + dev.epoch_table = [ + { + 'epoch_id': 'ep1', + 'underlying_epochs': {'underlying': ['fileA.dat', 'fileB.dat']} + }, + { + 'epoch_id': 'ep2', + 'underlying_epochs': {'underlying': ['fileC.dat']} + } + ] + + session.daq_system_load.return_value = [dev] + session.daqsystem_load.return_value = [dev] + + # Find by filename + res = filename_to_epoch_id(session, 'fileB.dat') + self.assertEqual(res[0], 'ep1') + + res = filename_to_epoch_id(session, 'fileC.dat') + self.assertEqual(res[0], 'ep2') + + res = filename_to_epoch_id(session, 'fileZ.dat') + self.assertIsNone(res[0]) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/fun/test_fun_basics.py b/tests/nditests/unittest/fun/test_fun_basics.py new file mode 100644 index 0000000..574effa --- /dev/null +++ b/tests/nditests/unittest/fun/test_fun_basics.py @@ -0,0 +1,67 @@ +import unittest +import os +import shutil +import tempfile +import numpy as np +import datetime +from ndi.fun import pseudorandomint, stimulus_temporal_frequency, channel_name_to_prefix_number, name_to_variable_name, timestamp, find_calc_directories +from ndi.fun.file import md5, date_created, date_updated +from ndi.fun.data import read_ngrid, write_ngrid +from ndi.fun.doc import all_types +from ndi.common.path_constants import PathConstants + +class TestNdiFun(unittest.TestCase): + def test_pseudorandomint(self): + val = pseudorandomint() + self.assertIsInstance(val, int) + + def test_channel_name_to_prefix_number(self): + prefix, number = channel_name_to_prefix_number("Channel 1") + self.assertEqual(prefix, "Channel") + self.assertEqual(number, 1) + + def test_name_to_variable_name(self): + self.assertEqual(name_to_variable_name("My Variable"), "myVariable") + + def test_timestamp(self): + ts = timestamp() + self.assertIsInstance(ts, str) + + def test_md5(self): + with tempfile.NamedTemporaryFile(delete=False) as f: + f.write(b"hello world") + fname = f.name + try: + checksum = md5(fname) + self.assertEqual(checksum, "5eb63bbbe01eeed093cb22bb8f5acdc3") + + dc = date_created(fname) + self.assertIsInstance(dc, datetime.datetime) + + du = date_updated(fname) + self.assertIsInstance(du, datetime.datetime) + finally: + os.remove(fname) + + def test_ngrid(self): + with tempfile.NamedTemporaryFile(delete=False) as f: + fname = f.name + try: + data = np.random.rand(5, 5) + write_ngrid(data, fname) + read_data = read_ngrid(fname, [5, 5]) + np.testing.assert_array_almost_equal(data, read_data) + finally: + os.remove(fname) + + def test_all_types(self): + # Just check it runs + types = all_types() + self.assertIsInstance(types, list) + + def test_find_calc_directories(self): + dirs = find_calc_directories() + self.assertIsInstance(dirs, list) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/fun/test_stimulus.py b/tests/nditests/unittest/fun/test_stimulus.py new file mode 100644 index 0000000..0442393 --- /dev/null +++ b/tests/nditests/unittest/fun/test_stimulus.py @@ -0,0 +1,33 @@ +import unittest +from ndi.fun.stimulus_temporal_frequency import stimulus_temporal_frequency + +class TestStimulus(unittest.TestCase): + def test_stimulus_temporal_frequency(self): + # Case 1: Direct frequency + params1 = {'tFrequency': 8, 'spatialFreq': 0.1} + tf1, name1 = stimulus_temporal_frequency(params1) + self.assertEqual(tf1, 8) + self.assertEqual(name1, 'tFrequency') + + # Case 2: Another name + params2 = {'temporalFrequency': 10} + tf2, name2 = stimulus_temporal_frequency(params2) + self.assertEqual(tf2, 10) + self.assertEqual(name2, 'temporalFrequency') + + # Case 3: Period with multiplier + # t_period -> 1/t_period * refreshRate + # if t_period=10, refreshRate=60, tf = (1/10)*60 = 6 + params3 = {'t_period': 10, 'refreshRate': 60} + tf3, name3 = stimulus_temporal_frequency(params3) + self.assertEqual(tf3, 6.0) + self.assertEqual(name3, 't_period') + + # Case 4: No match + params4 = {'contrast': 1.0} + tf4, name4 = stimulus_temporal_frequency(params4) + self.assertIsNone(tf4) + self.assertEqual(name4, '') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/fun/test_table.py b/tests/nditests/unittest/fun/test_table.py new file mode 100644 index 0000000..66276db --- /dev/null +++ b/tests/nditests/unittest/fun/test_table.py @@ -0,0 +1,34 @@ +import unittest +import pandas as pd +import numpy as np +from ndi.fun.table import vstack + +class TestTable(unittest.TestCase): + def test_vstack(self): + # Example 1: Basic concatenation + df1 = pd.DataFrame({'ID': [1, 2], 'Data': ['a', 'b']}) + df2 = pd.DataFrame({'ID': [3, 4], 'Value': [10.5, 20.6]}) + + # vstack should align columns + stacked = vstack([df1, df2]) + + self.assertEqual(len(stacked), 4) + self.assertIn('ID', stacked.columns) + self.assertIn('Data', stacked.columns) + self.assertIn('Value', stacked.columns) + + # Check values + self.assertEqual(stacked.iloc[0]['Data'], 'a') + self.assertTrue(np.isnan(stacked.iloc[2]['Data']) or stacked.iloc[2]['Data'] is None) # Data is missing for 2nd table + self.assertTrue(np.isnan(stacked.iloc[0]['Value'])) # Value is missing for 1st table + + def test_vstack_empty(self): + df1 = pd.DataFrame({'A': [1]}) + res = vstack([df1]) + self.assertEqual(len(res), 1) + + res = vstack([]) + self.assertTrue(res.empty) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/ndimatlabport/README.md b/tests/nditests/unittest/ndimatlabport/README.md new file mode 100644 index 0000000..092426e --- /dev/null +++ b/tests/nditests/unittest/ndimatlabport/README.md @@ -0,0 +1,16 @@ +# ndimatlabport: Ported MATLAB Unit Tests + +This package contains Python ports of specific unit tests from the VH-Lab/NDI-matlab repository. The purpose is to run identical tests between the MATLAB and Python versions to identify points of divergence and ensure parity. + +## Structure + +The Python packages within this directory should mimic the MATLAB namespaces. For example, a test located at `+ndi/+unittest/+dataset/testDatasetBuild.m` in MATLAB should be ported to `tests/nditests/unittest/ndimatlabport/dataset/test_dataset_build.py`. + +## Comparison Table + +The following table tracks the status of MATLAB unit tests being ported to Python. + +| MatlabTests | Converted_Yet | +| :--- | :--- | +| `ndi.unittest.dataset.testDatasetBuild` | Yes | +| `ndi.unittest.session.testSessionBuild` | Yes | diff --git a/tests/nditests/unittest/ndimatlabport/__init__.py b/tests/nditests/unittest/ndimatlabport/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/ndimatlabport/dataset/__init__.py b/tests/nditests/unittest/ndimatlabport/dataset/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/ndimatlabport/dataset/test_dataset_build.py b/tests/nditests/unittest/ndimatlabport/dataset/test_dataset_build.py new file mode 100644 index 0000000..8a66d9a --- /dev/null +++ b/tests/nditests/unittest/ndimatlabport/dataset/test_dataset_build.py @@ -0,0 +1,32 @@ +import unittest +from ndi.dataset import Dataset +from ndi.session.dir import Dir as SessionDir +import tempfile +import shutil + +class TestDatasetBuild(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.session_name = 'mysession' + self.session = SessionDir(self.session_name, self.temp_dir) + self.dataset_id = 'test_dataset_id' + self.dataset_name = 'test_dataset' + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def test_dataset_instantiation(self): + """ + Tests the basic instantiation of a Dataset object, mimicking ndi.unittest.dataset.testDatasetBuild. + """ + # The Python implementation of Dataset currently only takes a reference argument. + ds = Dataset(self.dataset_name) + + self.assertIsInstance(ds, Dataset) + # Verify the reference is set correctly via the session + self.assertEqual(ds.reference(), self.dataset_name) + # Verify it has an ID (which is generated by IDO in the Session) + self.assertIsNotNone(ds.id()) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/ndimatlabport/session/__init__.py b/tests/nditests/unittest/ndimatlabport/session/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/ndimatlabport/session/test_session_build.py b/tests/nditests/unittest/ndimatlabport/session/test_session_build.py new file mode 100644 index 0000000..83cdc80 --- /dev/null +++ b/tests/nditests/unittest/ndimatlabport/session/test_session_build.py @@ -0,0 +1,33 @@ +import unittest +from ndi.session.dir import Dir as SessionDir +import tempfile +import shutil +import os + +class TestSessionBuild(unittest.TestCase): + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.session_name = 'mysession' + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def test_session_instantiation(self): + """ + Tests the basic instantiation of a Session object, mimicking ndi.unittest.session.testSessionBuild. + """ + session_path = os.path.join(self.temp_dir, self.session_name) + os.makedirs(session_path) + + # Test creation with valid path + session = SessionDir(self.session_name, session_path) + self.assertIsInstance(session, SessionDir) + + # SessionDir stores the session path + self.assertEqual(session.path, session_path) + + # Session ID is generated by IDO, so we just check it exists + self.assertIsNotNone(session.id()) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/probe/__init__.py b/tests/nditests/unittest/probe/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/probe/fun/__init__.py b/tests/nditests/unittest/probe/fun/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/probe/fun/test_fun.py b/tests/nditests/unittest/probe/fun/test_fun.py new file mode 100644 index 0000000..6dbc749 --- /dev/null +++ b/tests/nditests/unittest/probe/fun/test_fun.py @@ -0,0 +1,32 @@ +import unittest +from ndi.probe import fun as probe_fun + +class TestProbeFun(unittest.TestCase): + + def test_init_probe_type_map(self): + """ + Tests the initialization of the probe type map. + """ + probe_type_map = probe_fun.init_probe_type_map() + self.assertIsInstance(probe_type_map, dict) + self.assertIn('n-trode', probe_type_map) + self.assertEqual(probe_type_map['n-trode'], 'ndi.probe.timeseries.mfdaq') + + def test_get_probe_type_map(self): + """ + Tests the retrieval of the probe type map. + """ + # Ensure the global cache is initially empty + probe_fun._cached_probe_type_map = None + + # First call should initialize and cache the map + probe_type_map1 = probe_fun.get_probe_type_map() + self.assertIsInstance(probe_type_map1, dict) + self.assertIsNotNone(probe_fun._cached_probe_type_map) + + # Second call should return the cached map + probe_type_map2 = probe_fun.get_probe_type_map() + self.assertIs(probe_type_map1, probe_type_map2) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/probe/test_probe.py b/tests/nditests/unittest/probe/test_probe.py new file mode 100644 index 0000000..fc60f09 --- /dev/null +++ b/tests/nditests/unittest/probe/test_probe.py @@ -0,0 +1,38 @@ +import unittest +from unittest.mock import Mock +from ndi.probe.timeseries.mfdaq import Mfdaq +from ndi.daq.system.mfdaq import Mfdaq as MfdaqSystem +from ndi.daq.reader.mfdaq import Mfdaq as MfdaqReader + +class MockMfdaqReader(MfdaqReader): + def getchannelsepoch(self, epochfiles): + return [{'name': 'ai1', 'type': 'analog_in'}] + def readchannels_epochsamples(self, channeltype, channel, epochfiles, s0, s1): + return [1, 2, 3] + def samplerate(self, epochfiles, channeltype, channel): + return 1000 + +class TestProbe(unittest.TestCase): + + def test_mfdaq_creation(self): + mock_session = Mock() + probe = Mfdaq(mock_session, 'my_probe', 1, 'mfdaq', 'subj1') + self.assertIsInstance(probe, Mfdaq) + + @unittest.skip("Not implemented") + def test_mfdaq_readtimeseriesepoch(self): + mock_session = Mock() + mock_filenavigator = Mock() + mock_filenavigator.getepochfiles.return_value = ['/fake/path/file.ext'] + mock_daqreader = MockMfdaqReader() + mock_daqsystem = MfdaqSystem('my_device', mock_filenavigator, mock_daqreader) + + probe = Mfdaq(mock_session, 'my_probe', 1, 'mfdaq', 'subj1') + + # This is a placeholder test. The full implementation will require + # the getchanneldevinfo() method to be implemented. + with self.assertRaises(AttributeError): + data, t, timeref = probe.readtimeseriesepoch(1, 0, 1) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/session/__init__.py b/tests/nditests/unittest/session/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/session/test_session.py b/tests/nditests/unittest/session/test_session.py new file mode 100644 index 0000000..5e0c869 --- /dev/null +++ b/tests/nditests/unittest/session/test_session.py @@ -0,0 +1,39 @@ +import unittest +import os +import shutil +from ndi.session import Session +from ndi.session.dir import Dir as SessionDir +from ndi.session.mock import Mock as MockSession + +class TestSession(unittest.TestCase): + + def test_create_session(self): + """ + Tests the creation of a Session object. + """ + session = Session('my_session') + self.assertIsInstance(session, Session) + self.assertEqual(session.reference, 'my_session') + self.assertIsNotNone(session.id()) + + def test_create_session_dir(self): + """ + Tests the creation of a SessionDir object. + """ + session_dir = SessionDir('my_session', '/fake/path') + self.assertIsInstance(session_dir, SessionDir) + self.assertEqual(session_dir.reference, 'my_session') + self.assertEqual(session_dir.getpath(), '/fake/path') + + def test_create_mock_session(self): + """ + Tests the creation of a MockSession object. + """ + mock_session = MockSession() + self.assertIsInstance(mock_session, MockSession) + self.assertTrue(os.path.exists(mock_session.getpath())) + # Clean up the temporary directory + shutil.rmtree(mock_session.getpath()) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/test_cache.py b/tests/nditests/unittest/test_cache.py new file mode 100644 index 0000000..ef142f2 --- /dev/null +++ b/tests/nditests/unittest/test_cache.py @@ -0,0 +1,108 @@ +import unittest +import numpy as np +from ndi.cache import Cache +import time + +class TestCache(unittest.TestCase): + + def test_cache_creation(self): + c = Cache() + self.assertIsInstance(c, Cache) + self.assertEqual(c.maxMemory, 10e9) + self.assertEqual(c.replacement_rule, 'fifo') + + c2 = Cache(maxMemory=5e6, replacement_rule='lifo') + self.assertEqual(c2.maxMemory, 5e6) + self.assertEqual(c2.replacement_rule, 'lifo') + + def test_add_and_lookup(self): + c = Cache(maxMemory=1e6) + test_data = np.random.rand(100, 100) + c.add('mykey', 'mytype', test_data) + retrieved = c.lookup('mykey', 'mytype') + self.assertTrue(np.array_equal(retrieved['data'], test_data)) + + def test_remove(self): + c = Cache(maxMemory=1e6) + test_data = np.random.rand(100, 100) + c.add('mykey', 'mytype', test_data) + c.remove('mykey', 'mytype') + retrieved = c.lookup('mykey', 'mytype') + self.assertIsNone(retrieved) + + def test_clear(self): + c = Cache(maxMemory=1e6) + c.add('mykey1', 'mytype', np.random.rand(10, 10)) + c.add('mykey2', 'mytype', np.random.rand(10, 10)) + c.clear() + self.assertEqual(c.bytes(), 0) + + def test_fifo_replacement(self): + c = Cache(maxMemory=900000, replacement_rule='fifo') + c.add('key1', 'type1', np.random.rand(1, 100000)) + c.add('key2', 'type2', np.random.rand(1, 100000)) + retrieved1 = c.lookup('key1', 'type1') + retrieved2 = c.lookup('key2', 'type2') + self.assertIsNone(retrieved1) + self.assertIsNotNone(retrieved2) + + def test_lifo_replacement(self): + c = Cache(maxMemory=900000, replacement_rule='lifo') + c.add('key1', 'type1', np.random.rand(1, 100000)) + time.sleep(0.1) + c.add('key2', 'type2', np.random.rand(1, 100000)) + retrieved1 = c.lookup('key1', 'type1') + retrieved2 = c.lookup('key2', 'type2') + self.assertIsNotNone(retrieved1) + self.assertIsNone(retrieved2) + + def test_error_replacement(self): + c = Cache(maxMemory=800000, replacement_rule='error') + c.add('key1', 'type1', np.random.rand(1, 100000)) + with self.assertRaises(Exception): + c.add('key2', 'type2', np.random.rand(1, 1)) + + def test_priority_eviction(self): + # Test that high priority items are preserved + c = Cache(maxMemory=800000, replacement_rule='fifo') + c.add('low_priority_old', 'type', np.random.rand(1, 50000), 0) # 400KB + time.sleep(0.01) + c.add('high_priority', 'type', np.random.rand(1, 50000), 10) # 400KB + time.sleep(0.01) + c.add('low_priority_new', 'type', np.random.rand(1, 50000), 0) # 400KB + + # low_priority_old should be gone, high_priority should be preserved + self.assertIsNone(c.lookup('low_priority_old','type')) + self.assertIsNotNone(c.lookup('high_priority','type')) + self.assertIsNotNone(c.lookup('low_priority_new','type')) + + def test_adding_large_item(self): + # Test adding an item that is larger than the cache + c = Cache(maxMemory=1e6) + c.add('small_item', 'type', np.random.rand(1,100)) + + # This should fail with an error + with self.assertRaises(Exception): + c.add('large_item', 'type', np.random.rand(1, 200000)) + + # And the cache should be unchanged + self.assertIsNotNone(c.lookup('small_item','type')) + + def test_complex_lifo_eviction(self): + # Test LIFO eviction with multiple small items + c = Cache(maxMemory=1e6, replacement_rule='lifo') + for i in range(1, 11): + c.add(f'small{i}', 'type', np.random.rand(1, 10000), i) # 80KB each + time.sleep(0.01) + # Cache is now at 800KB + + # Add a large item that will be rejected because it has the lowest priority + c.add('large_item', 'type', np.random.rand(1, 50000), 0) # 400KB + + # The cache should be unchanged because the new item was not safe to add + for i in range(1, 11): + self.assertIsNotNone(c.lookup(f'small{i}','type')) + self.assertIsNone(c.lookup('large_item','type')) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/test_document.py b/tests/nditests/unittest/test_document.py new file mode 100644 index 0000000..5d059d8 --- /dev/null +++ b/tests/nditests/unittest/test_document.py @@ -0,0 +1,45 @@ +import unittest +from unittest.mock import patch +from ndi.document import Document + +class TestDocument(unittest.TestCase): + + @patch('ndi.document.Document.read_blank_definition') + def test_document_creation(self, mock_read_blank_definition): + mock_read_blank_definition.return_value = { + 'base': { + 'id': '', + 'datestamp': '', + 'session_id': '' + }, + 'document_class': { + 'class_name': 'test_document', + 'superclasses': [] + } + } + + doc = Document('test_document', **{'base.name': 'my_doc'}) + self.assertIsInstance(doc, Document) + self.assertEqual(doc.document_properties['base']['name'], 'my_doc') + self.assertIsNotNone(doc.document_properties['base']['id']) + + @patch('ndi.document.Document.read_blank_definition') + def test_set_session_id(self, mock_read_blank_definition): + mock_read_blank_definition.return_value = { + 'base': { + 'id': '', + 'datestamp': '', + 'session_id': '' + }, + 'document_class': { + 'class_name': 'test_document', + 'superclasses': [] + } + } + + doc = Document('test_document') + doc.set_session_id('test_session') + self.assertEqual(doc.document_properties['base']['session_id'], 'test_session') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/test_documentservice.py b/tests/nditests/unittest/test_documentservice.py new file mode 100644 index 0000000..885800d --- /dev/null +++ b/tests/nditests/unittest/test_documentservice.py @@ -0,0 +1,17 @@ +import unittest +from ndi.documentservice import DocumentService +from ndi.document import Document + +class MockDocumentService(DocumentService): + def search_query(self): + pass + +class TestDocumentService(unittest.TestCase): + + def test_new_document(self): + ds = MockDocumentService() + doc = ds.new_document('base') + self.assertIsInstance(doc, Document) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/test_ido.py b/tests/nditests/unittest/test_ido.py new file mode 100644 index 0000000..7c759e3 --- /dev/null +++ b/tests/nditests/unittest/test_ido.py @@ -0,0 +1,17 @@ +import unittest +from ndi.ido import Ido + +class TestIdo(unittest.TestCase): + + def test_ido_creation(self): + ido = Ido() + self.assertIsInstance(ido, Ido) + self.assertIsNotNone(ido.id()) + + def test_ido_uniqueness(self): + ido1 = Ido() + ido2 = Ido() + self.assertNotEqual(ido1.id(), ido2.id()) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/test_query.py b/tests/nditests/unittest/test_query.py new file mode 100644 index 0000000..0993f5c --- /dev/null +++ b/tests/nditests/unittest/test_query.py @@ -0,0 +1,13 @@ +import unittest +from ndi.query import Query + +class TestQuery(unittest.TestCase): + + def test_query_creation(self): + q = Query('test.field', 'exact_string', 'test_value', '') + self.assertIsInstance(q, Query) + # did.query.Query stores search_structure as a list of dicts + self.assertEqual(q.search_structure[0]['field'], 'test.field') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/test_subject.py b/tests/nditests/unittest/test_subject.py new file mode 100644 index 0000000..157d44f --- /dev/null +++ b/tests/nditests/unittest/test_subject.py @@ -0,0 +1,29 @@ +import unittest +from ndi.subject import Subject +from unittest.mock import patch + +class TestSubject(unittest.TestCase): + + def test_subject_creation(self): + s = Subject('test@test.com', 'a test subject') + self.assertIsInstance(s, Subject) + self.assertEqual(s.local_identifier, 'test@test.com') + + @patch('ndi.subject.Document') + @patch('ndi.subject.Session') + def test_new_document(self, mock_session, mock_document): + mock_session.empty_id.return_value = '00000000-0000-0000-0000-000000000000' + s = Subject('test@test.com', 'a test subject') + doc = s.new_document() + mock_document.assert_called_with('subject', + **{ + 'subject.local_identifier': 'test@test.com', + 'subject.description': 'a test subject', + 'base.id': s.id(), + 'base.name': 'test@test.com', + 'base.session_id': '00000000-0000-0000-0000-000000000000' + } + ) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/time/__init__.py b/tests/nditests/unittest/time/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/nditests/unittest/time/test_time.py b/tests/nditests/unittest/time/test_time.py new file mode 100644 index 0000000..527bc60 --- /dev/null +++ b/tests/nditests/unittest/time/test_time.py @@ -0,0 +1,32 @@ +import unittest +from unittest.mock import Mock +from ndi.time.syncgraph import SyncGraph +from ndi.time.syncrule import SyncRule + +class MockSyncRule(SyncRule): + def apply(self, epochnode_a, epochnode_b): + return 1.0, None + def search_query(self): + pass + +class TestTime(unittest.TestCase): + + def test_syncgraph_creation(self): + mock_session = Mock() + sg = SyncGraph(mock_session) + self.assertIsInstance(sg, SyncGraph) + self.assertEqual(sg.session, mock_session) + + def test_syncgraph_add_rule(self): + mock_session = Mock() + sg = SyncGraph(mock_session) + rule = MockSyncRule() + sg.addrule(rule) + self.assertIn(rule, sg.rules) + + def test_syncrule_creation(self): + rule = MockSyncRule() + self.assertIsInstance(rule, SyncRule) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/nditests/unittest/time/test_timemapping.py b/tests/nditests/unittest/time/test_timemapping.py new file mode 100644 index 0000000..2a21cd5 --- /dev/null +++ b/tests/nditests/unittest/time/test_timemapping.py @@ -0,0 +1,20 @@ +import unittest +import numpy as np +from ndi.time.timemapping import TimeMapping + +class TestTimeMapping(unittest.TestCase): + + def test_timemapping_creation(self): + tm = TimeMapping() + self.assertIsInstance(tm, TimeMapping) + self.assertTrue(np.array_equal(tm.mapping, [1, 0])) + + tm2 = TimeMapping([2, 5]) + self.assertTrue(np.array_equal(tm2.mapping, [2, 5])) + + def test_map(self): + tm = TimeMapping([2, 5]) + self.assertEqual(tm.map(10), 25) + +if __name__ == '__main__': + unittest.main()