diff --git a/.gitignore b/.gitignore index 828e9003..c5b85cb9 100644 --- a/.gitignore +++ b/.gitignore @@ -47,5 +47,4 @@ coverage.xml .cci .sfdx /src.orig -/src myvenv \ No newline at end of file diff --git a/snowfakery/api.py b/snowfakery/api.py index 7b76ac82..a2946f9d 100644 --- a/snowfakery/api.py +++ b/snowfakery/api.py @@ -32,6 +32,8 @@ "jpg": "snowfakery.output_streams.ImageOutputStream", "ps": "snowfakery.output_streams.ImageOutputStream", "dot": "snowfakery.output_streams.GraphvizOutputStream", + "datapack": "snowfakery.experimental.DataPack", + "apex": "snowfakery.experimental.DataPack.ApexDataPack", "json": "snowfakery.output_streams.JSONOutputStream", "txt": "snowfakery.output_streams.DebugOutputStream", "csv": "snowfakery.output_streams.CSVOutputStream", @@ -251,6 +253,10 @@ def _get_output_streams(dburls, output_files, output_format, output_folder): if output_stream_cls.uses_folder: output_streams.append(output_stream_cls(output_folder)) + elif output_folder and str(output_folder) != "." and not output_files: + raise exc.DataGenError( + "--output-folder can only be used with --output-file= or --output-format=csv" + ) if output_files: for f in output_files: diff --git a/snowfakery/cli.py b/snowfakery/cli.py index 75b1e67c..7a8326bd 100755 --- a/snowfakery/cli.py +++ b/snowfakery/cli.py @@ -276,14 +276,6 @@ def validate_options( "Sorry, you need to pick --dburl or --output-file " "because they are mutually exclusive." ) - if ( - output_folder - and str(output_folder) != "." - and not (output_files or output_format == "csv") - ): - raise click.ClickException( - "--output-folder can only be used with --output-file= or --output-format=csv" - ) if target_number and reps: raise click.ClickException( diff --git a/snowfakery/data_generator_runtime.py b/snowfakery/data_generator_runtime.py index cad83c88..ed988aa2 100644 --- a/snowfakery/data_generator_runtime.py +++ b/snowfakery/data_generator_runtime.py @@ -421,6 +421,8 @@ def loop_over_templates_until_finished(self, continuing): self.iteration_count += 1 continuing = True self.globals.reset_slots() + # let the output stream know that the recipe was finished + self.output_stream.complete_recipe() self.row_history.reset_locals() def loop_over_templates_once(self, statement_list, continuing: bool): diff --git a/snowfakery/experimental/DataPack.py b/snowfakery/experimental/DataPack.py new file mode 100644 index 00000000..8bd9f4c6 --- /dev/null +++ b/snowfakery/experimental/DataPack.py @@ -0,0 +1,210 @@ +# Use this experimental OutputStream like this: + +# snowfakery --output-format snowfakery.experimental.DataPack recipe.yml > composite.json +# +# Once you have the file you can make it accessible to Salesforce by uploading it +# to some form of server. E.g. Github gist, Heroku, etc. +# +# Then you can use Anon Apex like that in `LoadCompositeAPIData.apex` to load it into +# any org. e.g.: + +# sfdx force:apex:execute -f ./examples/salesforce/LoadCompositeAPIData.apex -u Snowfakery__qa +# or +# cci task run execute_anon --path examples/salesforce/LoadCompositeAPIData.apex --org qa +# +# Note that Salesforce will complain if the dataset has more than 500 rows. + +# TODO: Add tests + +import json +from logging import warning +from io import StringIO +import typing as T +import datetime +from pathlib import Path +from tempfile import TemporaryDirectory + +from snowfakery.output_streams import FileOutputStream, OutputStream + +MAX_BATCH_SIZE = 500 # https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/resources_composite_graph_limits.htm + + +class SalesforceCompositeAPIOutput(FileOutputStream): + """Output stream that generates records for Salesforce's Composite API""" + + encoders: T.Mapping[type, T.Callable] = { + **FileOutputStream.encoders, + datetime.date: str, + datetime.datetime: str, + bool: bool, + } + is_text = True + + def __init__(self, file, **kwargs): + assert file + super().__init__(file, **kwargs) + self.rows = [] + + def write_single_row(self, tablename: str, row: T.Dict) -> None: + row_without_id = row.copy() + del row_without_id["id"] + _sf_update_key = row_without_id.pop("_sf_update_key", None) + if _sf_update_key: + method = "PATCH" + url = f"/services/data/v50.0/sobjects/{tablename}/{_sf_update_key}/{row_without_id[_sf_update_key]}" + else: + method = "POST" + url = f"/services/data/v50.0/sobjects/{tablename}/" + + values = { + "method": method, + "referenceId": f"{tablename}_{row['id']}", + "url": url, + "body": row_without_id, + } + self.rows.append(values) + + def flatten( + self, + sourcetable: str, + fieldname: str, + source_row_dict, + target_object_row, + ) -> T.Union[str, int]: + target_reference = f"{target_object_row._tablename}_{target_object_row.id}" + return "@{%s.id}" % target_reference + + def close(self, **kwargs) -> T.Optional[T.Sequence[str]]: + # NOTE: Could improve loading performance by breaking graphs up + # to allow server-side parallelization, but I'd risk locking issues + assert self.rows + data = {"graphs": [{"graphId": "graph", "compositeRequest": self.rows}]} + self.write(json.dumps(data, indent=2)) + return super().close() + + +class Folder(OutputStream): + uses_folder = True + + def __init__(self, output_folder, **kwargs): + super().__init__(None, **kwargs) + self.target_path = Path(output_folder) + if not Path.exists(self.target_path): + Path.mkdir(self.target_path, exist_ok=True) # pragma: no cover + self.recipe_sets = [[]] + self.current_batch = [] + self.filenum = 0 + self.filenames = [] + + def write_row( + self, tablename: str, row_with_references: T.Dict, *args, **kwargs + ) -> None: + self.recipe_sets[-1].append((tablename, row_with_references)) + + def write_single_row(self, tablename: str, row: T.Dict, *args, **kwargs) -> None: + raise NotImplementedError( + "Shouldn't be called. write_row should be called instead" + ) + + def close(self, **kwargs) -> T.Optional[T.Sequence[str]]: + self.flush_sets() + self.flush_batch() + table_metadata = [{"url": str(filename)} for filename in self.filenames] + metadata = { + "@context": "http://www.w3.org/ns/csvw", + "tables": table_metadata, + } + metadata_filename = self.target_path / "csvw_metadata.json" + with open(metadata_filename, "w") as f: + json.dump(metadata, f, indent=2) + return [f"Created {self.target_path}"] + + def complete_recipe(self, *args): + self.flush_sets() + self.recipe_sets.append([]) + + def flush_sets(self): + while self.recipe_sets: + next_set = self.recipe_sets.pop(0) + assert len(next_set) <= MAX_BATCH_SIZE + if len(self.current_batch) + len(next_set) > MAX_BATCH_SIZE: + self.flush_batch() + self.current_batch.extend(next_set) + + def flush_batch(self): + self.filenum += 1 + filename = Path(self.target_path) / f"{self.filenum}.composite.json" + + with open(filename, "w") as open_file, SalesforceCompositeAPIOutput( + open_file + ) as out: + self.filenames.append(filename) + assert self.current_batch + for tablename, row in self.current_batch: + out.write_row(tablename, row) + + self.current_batch = [] + + +class DataPack(FileOutputStream): + def __init__(self, file, **kwargs): + super().__init__(file, **kwargs) + warning("DataPack is an experimental data format") + self.tempdir = TemporaryDirectory() + self.folder_os = Folder(self.tempdir.name) + + def write_row( + self, tablename: str, row_with_references: T.Dict, *args, **kwargs + ) -> None: + self.folder_os.write_row(tablename, row_with_references) + + def write_single_row(self, tablename: str, row: T.Dict, *args, **kwargs) -> None: + raise NotImplementedError( + "Shouldn't be called. write_row should be called instead" + ) + + def complete_recipe(self, *args): + self.folder_os.complete_recipe() + + def close(self): + self.folder_os.close() + data = self.organize_bundle() + self.write(json.dumps(data, indent=2)) + self.tempdir.cleanup() + return super().close() + + def organize_bundle(self): + files = Path(self.tempdir.name).glob("*.composite.json") + data = [file.read_text() for file in files] + assert data + return {"datapack_format": 1, "data": data} + + +class ApexDataPack(FileOutputStream): + """Wrap in Anon Apex but note that the amount of data you can load + this way is very limited due to limitations of the REST API (used by CCI) + and SOAP API (used by sfdx)""" + + def __init__(self, file, **kwargs): + super().__init__(file, **kwargs) + self.data = StringIO() + self.datapack = DataPack(self.data) + + def write_row( + self, tablename: str, row_with_references: T.Dict, *args, **kwargs + ) -> None: + self.datapack.write_row(tablename, row_with_references) + + def write_single_row(self, tablename: str, row: T.Dict, *args, **kwargs) -> None: + raise NotImplementedError( + "Shouldn't be called. write_row should be called instead" + ) + + def complete_recipe(self, *args): + self.datapack.complete_recipe() + + def close(self): + self.datapack.close() + quoted_data = repr(self.data.getvalue()) + self.write(f"String json_data = {quoted_data};\n") + self.write("LoadCompositeAPIData.loadBundledJsonSet(json_data);\n") diff --git a/snowfakery/output_streams.py b/snowfakery/output_streams.py index 5a719c70..1097a1ee 100644 --- a/snowfakery/output_streams.py +++ b/snowfakery/output_streams.py @@ -136,6 +136,11 @@ def __enter__(self, *args): def __exit__(self, *args): self.close() + def complete_recipe(self, *args): + """Let the output stream know that a complete recipe + set was generated.""" + pass + class SmartStream: """Common code for managing stream/file opening/closing diff --git a/tests/test_DataPack.py b/tests/test_DataPack.py new file mode 100644 index 00000000..a306a720 --- /dev/null +++ b/tests/test_DataPack.py @@ -0,0 +1,69 @@ +from io import StringIO +from unittest.mock import patch + +from snowfakery.data_generator import generate +from snowfakery.data_generator_runtime import StoppingCriteria +from snowfakery.experimental.DataPack import ( + DataPack, + ApexDataPack, + SalesforceCompositeAPIOutput, +) +import json + +## Fill this out when it isn't experimental anymore + + +class TestSalesforceCompositeAPIOutput: + @patch("snowfakery.experimental.DataPack.MAX_BATCH_SIZE", 10) + def test_composite(self): + out = StringIO() + output_stream = DataPack(out) + with open("examples/basic-salesforce.recipe.yml") as f: + generate( + f, {}, output_stream, stopping_criteria=StoppingCriteria("Account", 15) + ) + output_stream.close() + data = json.loads(out.getvalue()) + assert data["datapack_format"] == 1 + assert len(data["data"]) == 8 + single_payload = json.loads(data["data"][0]) + print(single_payload) + assert single_payload["graphs"][0]["compositeRequest"][0]["method"] == "POST" + + def test_reference(self): + out = StringIO() + output_stream = SalesforceCompositeAPIOutput(out) + with open("examples/basic-salesforce.recipe.yml") as f: + generate(f, {}, output_stream) + output_stream.close() + print(out.getvalue()) + data = json.loads(out.getvalue()) + assert ( + data["graphs"][0]["compositeRequest"][-1]["body"]["AccountId"] + == "@{Account_2.id}" + ) + + @patch("snowfakery.experimental.DataPack.MAX_BATCH_SIZE", 50) + def test_composite_upsert(self): + out = StringIO() + output_stream = DataPack(out) + with open("tests/upsert-2.yml") as f: + generate( + f, {}, output_stream, stopping_criteria=StoppingCriteria("Account", 50) + ) + output_stream.close() + data = json.loads(out.getvalue()) + assert data["datapack_format"] == 1 + single_payload = json.loads(data["data"][1]) + assert single_payload["graphs"][0]["compositeRequest"][-1]["method"] == "PATCH" + + def test_apex(self): + out = StringIO() + output_stream = ApexDataPack(out) + with open("examples/basic-salesforce.recipe.yml") as f: + generate( + f, {}, output_stream, stopping_criteria=StoppingCriteria("Account", 50) + ) + output_stream.close() + out = out.getvalue() + assert out.startswith("String json_data") diff --git a/tests/upsert-2.yml b/tests/upsert-2.yml index bca7a6a8..c48d2adc 100644 --- a/tests/upsert-2.yml +++ b/tests/upsert-2.yml @@ -1,6 +1,145 @@ +# Accounts + +- object: Account + nickname: Bluth Company + fields: + Name: Bluth Company +- object: Account + nickname: Michael B Company + fields: + Name: Michael B. Company +- object: Account + nickname: Austero Bluth Company + fields: + Name: Austero Bluth Company +- object: Account + nickname: Sitwell Enterprises + fields: + Name: Sitwell Enterprises + +# Contacts + +- object: Contact + update_key: email + nickname: GOB + fields: + firstname: GOB + lastname: Bluth + email: George.Oscar@bluth.com + AccountId: + reference: Bluth Company - object: Contact update_key: email + nickname: Michael + fields: + firstname: Michael + lastname: Bluth + email: michael@bluth.com + AccountId: + reference: Michael B Company +- object: Contact + update_key: email + nickname: Lucille-2 + fields: + firstname: Lucille + lastname: Austero + email: Lucille.Austero@bluth.com + AccountId: + reference: Austero Bluth Company +- object: Contact + update_key: email + nickname: Lindsay + fields: + firstname: Lindsay + lastname: Bluth + email: lindsay@bluth.com + AccountId: + reference: Sitwell Enterprises +- object: Contact + update_key: email + nickname: anyong fields: firstname: Hel-Loh lastname: Bluth email: anyong@bluth.com +- object: Contact + update_key: email + nickname: george + fields: + firstname: George + lastname: Bluth + email: boss@bluth.com +- object: Contact + update_key: email + nickname: buster + fields: + firstname: Buster + lastname: Bluth + email: buster@bluth.com + +# Opportunities + +- object: Opportunity + update_key: Name + fields: + Name: Bluth Company Opp + AccountId: + reference: Bluth Company + # ContactId: + # reference: GOB + CloseDate: + date_between: + start_date: -1y + end_date: today + StageName: + random_choice: + - Prospecting + - Pledged +- object: Opportunity + update_key: Name + fields: + Name: Michael B Company Opp + AccountId: + reference: Michael B Company + # ContactId: + # reference: Michael + CloseDate: + date_between: + start_date: -1y + end_date: today + StageName: + random_choice: + - Prospecting + - Pledged +- object: Opportunity + update_key: Name + fields: + Name: Austero Bluth Company Opp + AccountId: + reference: Austero Bluth Company + # ContactId: + # reference: Lucille-2 + CloseDate: + date_between: + start_date: -1y + end_date: today + StageName: + random_choice: + - Prospecting + - Pledged +- object: Opportunity + update_key: Name + fields: + Name: Sitwell Enterprises Opp + AccountId: + reference: Sitwell Enterprises + # ContactId: + # reference: Lindsay + CloseDate: + date_between: + start_date: -1y + end_date: today + StageName: + random_choice: + - Prospecting + - Pledged