@@ -550,13 +550,13 @@ async def test_superuser_can_create_transfer(
550550 "message": (
551551 "Input tag 'some unknown transformation type' found using 'type' "
552552 "does not match any of the expected tags: 'dataframe_rows_filter', "
553- "'dataframe_columns_filter', 'file_metadata_filter'"
553+ "'dataframe_columns_filter', 'file_metadata_filter', 'sql' "
554554 ),
555555 "code": "union_tag_invalid",
556556 "context": {
557557 "discriminator": "'type'",
558558 "expected_tags": (
559- "'dataframe_rows_filter', 'dataframe_columns_filter', 'file_metadata_filter'"
559+ "'dataframe_rows_filter', 'dataframe_columns_filter', 'file_metadata_filter', 'sql' "
560560 ),
561561 "tag": "some unknown transformation type",
562562 },
@@ -809,6 +809,72 @@ async def test_superuser_can_create_transfer(
809809 },
810810 id="invalid_name_regexp",
811811 ),
812+ pytest.param(
813+ {
814+ "transformations": [
815+ {
816+ "type": "sql",
817+ "query": "INSERT INTO table1 VALUES (1, 2)",
818+ "dialect": "spark",
819+ },
820+ ],
821+ },
822+ {
823+ "error": {
824+ "code": "invalid_request",
825+ "message": "Invalid request",
826+ "details": [
827+ {
828+ "location": [
829+ "body",
830+ "transformations",
831+ 0,
832+ "sql",
833+ "query",
834+ ],
835+ "message": "Value error, Query must be a SELECT statement, got 'INSERT INTO table1 VALUES (1, 2)'",
836+ "code": "value_error",
837+ "context": {},
838+ "input": "INSERT INTO table1 VALUES (1, 2)",
839+ },
840+ ],
841+ },
842+ },
843+ id="sql_non_select_query",
844+ ),
845+ pytest.param(
846+ {
847+ "transformations": [
848+ {
849+ "type": "sql",
850+ "query": None,
851+ "dialect": "spark",
852+ },
853+ ],
854+ },
855+ {
856+ "error": {
857+ "code": "invalid_request",
858+ "message": "Invalid request",
859+ "details": [
860+ {
861+ "location": [
862+ "body",
863+ "transformations",
864+ 0,
865+ "sql",
866+ "query",
867+ ],
868+ "message": "Input should be a valid string",
869+ "code": "string_type",
870+ "context": {},
871+ "input": None,
872+ },
873+ ],
874+ },
875+ },
876+ id="sql_non_string_query",
877+ ),
812878 pytest.param(
813879 {
814880 "resources": {
@@ -1290,3 +1356,106 @@ async def test_superuser_cannot_create_transfer_with_unknown_queue_error(
12901356 "details": None,
12911357 },
12921358 }
1359+
1360+
1361+ async def test_developer_plus_can_create_transfer_with_sql_transformation(
1362+ client: AsyncClient,
1363+ two_group_connections: tuple[MockConnection, MockConnection],
1364+ session: AsyncSession,
1365+ role_developer_plus: UserTestRoles,
1366+ group_queue: Queue,
1367+ mock_group: MockGroup,
1368+ ):
1369+ first_connection, second_connection = two_group_connections
1370+ user = mock_group.get_member_of_role(role_developer_plus)
1371+
1372+ response = await client.post(
1373+ "v1/transfers",
1374+ headers={"Authorization": f"Bearer {user.token}"},
1375+ json={
1376+ "group_id": mock_group.group.id,
1377+ "name": "new test transfer with sql",
1378+ "source_connection_id": first_connection.id,
1379+ "target_connection_id": second_connection.id,
1380+ "source_params": {"type": "postgres", "table_name": "schema.source_table"},
1381+ "target_params": {"type": "postgres", "table_name": "schema.target_table"},
1382+ "transformations": [
1383+ {
1384+ "type": "sql",
1385+ "query": "SELECT * FROM table",
1386+ "dialect": "spark",
1387+ },
1388+ ],
1389+ "queue_id": group_queue.id,
1390+ },
1391+ )
1392+ assert response.status_code == 200, response.text
1393+
1394+ transfer = (
1395+ await session.scalars(
1396+ select(Transfer).filter_by(
1397+ name="new test transfer with sql",
1398+ group_id=mock_group.group.id,
1399+ ),
1400+ )
1401+ ).one()
1402+
1403+ assert response.json() == {
1404+ "id": transfer.id,
1405+ "group_id": transfer.group_id,
1406+ "name": transfer.name,
1407+ "description": transfer.description,
1408+ "schedule": transfer.schedule,
1409+ "is_scheduled": transfer.is_scheduled,
1410+ "source_connection_id": transfer.source_connection_id,
1411+ "target_connection_id": transfer.target_connection_id,
1412+ "source_params": transfer.source_params,
1413+ "target_params": transfer.target_params,
1414+ "strategy_params": transfer.strategy_params,
1415+ "transformations": transfer.transformations,
1416+ "resources": transfer.resources,
1417+ "queue_id": transfer.queue_id,
1418+ }
1419+
1420+
1421+ @pytest.mark.parametrize(
1422+ "query",
1423+ [
1424+ "SELECT col1, col2 FROM table1 WHERE col1 > 100;",
1425+ "select col1 from table1",
1426+ " SELECT col1 FROM table1 ",
1427+ " WITH some AS (SELECT col1 FROM table1) SELECT * FROM some;",
1428+ ],
1429+ )
1430+ async def test_sql_transformation_with_valid_select_queries(
1431+ client: AsyncClient,
1432+ two_group_connections: tuple[MockConnection, MockConnection],
1433+ session: AsyncSession,
1434+ superuser: MockUser,
1435+ group_queue: Queue,
1436+ mock_group: MockGroup,
1437+ query: str,
1438+ ):
1439+ first_conn, second_conn = two_group_connections
1440+
1441+ response = await client.post(
1442+ "v1/transfers",
1443+ headers={"Authorization": f"Bearer {superuser.token}"},
1444+ json={
1445+ "group_id": mock_group.id,
1446+ "name": f"test transfer sql {query}",
1447+ "source_connection_id": first_conn.id,
1448+ "target_connection_id": second_conn.id,
1449+ "source_params": {"type": "postgres", "table_name": "schema.source_table"},
1450+ "target_params": {"type": "postgres", "table_name": "schema.target_table"},
1451+ "transformations": [
1452+ {
1453+ "type": "sql",
1454+ "query": query,
1455+ "dialect": "spark",
1456+ },
1457+ ],
1458+ "queue_id": group_queue.id,
1459+ },
1460+ )
1461+ assert response.status_code == 200, response.text
0 commit comments