diff --git a/sapientml_webapp/entrypoint.py b/sapientml_webapp/entrypoint.py index 4a1af8b..38aeb6f 100644 --- a/sapientml_webapp/entrypoint.py +++ b/sapientml_webapp/entrypoint.py @@ -16,6 +16,17 @@ def main() -> None: + + cmd1 = [sys.executable, "/app/execute.py"] + subprocess.Popen( + cmd1, + shell=False, + executable=None, + cwd=None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + cmd2 = [sys.executable, "/app/delete.py"] subprocess.Popen( cmd2, diff --git a/sapientml_webapp/execute.py b/sapientml_webapp/execute.py new file mode 100644 index 0000000..9f7873f --- /dev/null +++ b/sapientml_webapp/execute.py @@ -0,0 +1,52 @@ +import os +import subprocess +import sys +import time +from concurrent.futures import ThreadPoolExecutor + +dirpath = "./outputs" + +if not os.path.isdir(dirpath): + os.mkdir(dirpath) + + +def run_nbconvert(dirname): + if os.path.isdir(f"{dirpath}/{dirname}"): + final_script_path = f"{dirpath}/{dirname}/final_script.ipynb" + script_path = f"{dirpath}/{dirname}/final_script.ipynb.out.ipynb" + state_path = f"{dirpath}/{dirname}/state" + + # dirnameにfinal_script.ipynbが存在し、script.ipynbが存在しないなら以下の処理を実行する + if os.path.isfile(final_script_path) and not os.path.isfile(script_path) and not os.path.isfile(state_path): + + with open(state_path, mode="w") as f: + f.write("doing") + + env = os.environ.copy() + + result = None + try: + result = subprocess.run( + ["python", "execute_notebook.py", "--workdir", f"{dirpath}/{dirname}"], + env=env, + timeout=120, + stderr=subprocess.PIPE, + ) + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + + finally: + if result is not None: + if result.returncode == 0: + msg = "done" + else: + msg = f"error: {result.stderr.decode()}" + with open(state_path, mode="w") as f: + f.write(msg) + + +while True: + with ThreadPoolExecutor() as executor: + executor.map(run_nbconvert, os.listdir(dirpath)) + + time.sleep(1) diff --git a/sapientml_webapp/execute_notebook.py b/sapientml_webapp/execute_notebook.py new file mode 100644 index 0000000..854b6a3 --- /dev/null +++ b/sapientml_webapp/execute_notebook.py @@ -0,0 +1,31 @@ +import argparse +import os + +import nbformat +from nbconvert.preprocessors import ExecutePreprocessor + + +def main(args): + + final_script_path = "./final_script.ipynb" + script_path = "./final_script.ipynb.out.ipynb" + os.chdir(args.workdir) + + with open(final_script_path, "r") as f: + nb = nbformat.read(f, as_version=4) + + ep = ExecutePreprocessor(timeout=120, kernel_name="python3") + try: + ep.preprocess(nb) + except TimeoutError: + print("The execution exceeded the timeout.") + + with open(script_path, "wt") as f: + nbformat.write(nb, f) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--workdir", type=str, required=True, help="File name to process") + args = parser.parse_args() + main(args) diff --git a/sapientml_webapp/lib/app.py b/sapientml_webapp/lib/app.py index 2c4539e..71fdece 100644 --- a/sapientml_webapp/lib/app.py +++ b/sapientml_webapp/lib/app.py @@ -14,15 +14,13 @@ import argparse import copy import io -import logging import os +import pickle import re import time from abc import ABC, abstractmethod -from contextvars import ContextVar from pathlib import Path from typing import Literal -from uuid import UUID import lib.style_set as style import numpy as np @@ -31,16 +29,14 @@ import streamlit as st from PIL import Image from sapientml import SapientML -from sapientml.util.logging import setup_logger from sklearn import metrics from .data_grapher import DataGrapher from .escape_util import escape_csv_columns # from lib.utils import UUIDContextFilter -from .generate_code_thread import GenerateCodeThread +from .generate_code_subprocess import SubprocessExecutor from .i18n import I18N_DICT, use_translation -from .logging_filter import ModifyLogMessageFilter, UUIDContextFilter from .make_result_thread import MakeResultThread from .result_extractor import ResultExtractor from .session_state import CodeGenerationState, MakeResultState, PredictState, QueryParamsState, TrainDataState @@ -282,7 +278,7 @@ def section_view_training_data( (lang, is_tutorial) = self.get_query_params() t = use_translation(lang, self.I18N_DICT) - st.markdown("", unsafe_allow_html=True) + st.markdown("
", unsafe_allow_html=True) st.write("") style.custom_h4(t("header.training_data")) st.sidebar.markdown(f"[{t('header.training_data')}](#train_data)") @@ -306,87 +302,60 @@ def section_configure_params(): def section_generate_code(self, config: dict, estimator_name: str): (lang, _) = self.get_query_params() t = use_translation(lang, self.I18N_DICT) - uuid = st.session_state.uuid + output_dir = config["output_dir"] + config["output_dir"] = str(output_dir) + + train_data = config["training_dataframe"] + + uuid = st.session_state.uuid if "code_generation" not in st.session_state: st.session_state.code_generation = CodeGenerationState() state = st.session_state.code_generation - thread_generatecode = state.thread_generatecode - log_stream = state.log_stream - sml = state.sml + subproc_executor = state.subproc_executor if state.sml is None: - config.update({"debug": True}) - sml = state.sml = SapientML( - config["target_columns"], - model_type=estimator_name, - **({k: v for k, v in config.items() if k != "target_columns"}), - ) - - if (state.log_stream is not None) and isinstance(state.log_stream, io.StringIO): - state.log_stream.close() - log_stream = state.log_stream = io.StringIO() - log_handler = logging.StreamHandler(log_stream) - # ContextVars - ctx_uuid: ContextVar[UUID] = ContextVar("streamlit_uuid", default=None) - log_handler.addFilter(UUIDContextFilter(uuid, ctx_uuid)) - - log_handler.addFilter(ModifyLogMessageFilter(str(output_dir.resolve()))) - - logger = setup_logger() - logger.addHandler(log_handler) + os.makedirs(output_dir, exist_ok=True) + train_data_path = Path(output_dir / f"{uuid}_training.csv") + train_data.to_csv(train_data_path, index=False) - thread_generatecode = state.thread_generatecode = GenerateCodeThread( - sml, config, log_handler, ctx_uuid, uuid - ) - thread_generatecode.start() - - if state.log_stream is None: - st.stop() + if subproc_executor is None: + subproc_executor = state.subproc_executor = SubprocessExecutor(self.debug) + subproc_executor.start_automl(train_data_path, estimator_name, config) - # Execution - st.markdown("", unsafe_allow_html=True) + st.markdown("
", unsafe_allow_html=True) st.write("") style.custom_h4(t("header.execution")) st.sidebar.markdown(f"[{t('header.execution')}](#execution)") + log_area = st.text("") - if thread_generatecode is not None: - log_area = st.text("") - with st.spinner(t("codegen.generating_code")): - while thread_generatecode.is_alive(): - if log_stream is not None: - log_area.text(re.sub(r"/tmp/.+/", "", log_stream.getvalue())) - time.sleep(1) - if log_stream is not None: - log = re.sub(r"/tmp/.+/", "", log_stream.getvalue()) - state.log_message = log - log_area.text(log) + with st.spinner(t("codegen.generating_code")): + while subproc_executor.thread.is_alive(): + if subproc_executor.log_text: + log_area.text(re.sub(r"/tmp/.+/", "", subproc_executor.log_text)) + time.sleep(1) - state.ex = thread_generatecode.get_exception() - state.sml = thread_generatecode.get_sml() + if not subproc_executor.thread.is_alive(): + log = re.sub(r"/tmp/.+/", "", subproc_executor.log_text) + # state.log_message = log + log_area.text(log) - if state.ex is not None: + if subproc_executor.proc.poll() is not None: + if subproc_executor.proc.returncode != 0: st.error(t("codegen.failed")) - import traceback - tb_str = traceback.format_exception(type(state.ex), state.ex, state.ex.__traceback__) - tb_str = "".join(tb_str) - st.error(tb_str) - st.stop() + if Path(output_dir / "sml.pkl").exists(): + with open(Path(output_dir / "sml.pkl"), "rb") as f: + state.sml = pickle.load(f) + if not self.debug: + Path(output_dir / "sml.pkl").unlink(missing_ok=True) + Path(train_data_path).unlink(missing_ok=True) - thread_generatecode = state.thread_generatecode = None + return state.sml - return state.sml - - elif state.sml is not None: - st.text(state.log_message) - return state.sml - - else: - st.stop() def section_make_result( self, @@ -437,7 +406,7 @@ def section_show_generated_code_result(self, files: dict, config: dict, tutorial t = use_translation(lang, self.I18N_DICT) # Result - st.markdown("", unsafe_allow_html=True) + st.markdown("
", unsafe_allow_html=True) st.write("") style.custom_h4(t("header.result")) st.sidebar.markdown(f"[{t('header.result')}](#automl_result)") @@ -525,7 +494,7 @@ def section_show_model_detail(self, files: dict): ) # Model Detail - st.markdown("", unsafe_allow_html=True) + st.markdown("
", unsafe_allow_html=True) st.write("") style.custom_h4(t("header.model_details")) st.sidebar.markdown(f"[{t('header.model_details')}](#model_detail)") @@ -568,7 +537,7 @@ def section_show_correlation(self, files: dict, config: dict): calculate_PI = True if "permutation_importance.csv" in files.keys() else False # Correlation between feature and target column - st.markdown("", unsafe_allow_html=True) + st.markdown("
", unsafe_allow_html=True) st.write("") style.custom_h4(t("header.correlation_feature_and_target")) st.sidebar.markdown(f"[{t('header.correlation_feature_and_target')}](#target_features)") @@ -659,7 +628,7 @@ def section_upload_prediction_data( # Prediction - st.markdown("", unsafe_allow_html=True) + st.markdown("
", unsafe_allow_html=True) st.write("") style.custom_h4(t("header.prediction")) st.sidebar.markdown(f"[{t('header.prediction')}](#prediction)") @@ -766,7 +735,7 @@ def part_show_metrics( adaptation_metric = config["adaptation_metric"] # Displays metrics if the predict data contains correct answers. - st.markdown("", unsafe_allow_html=True) + st.markdown("
", unsafe_allow_html=True) st.write("") style.custom_h4(t("header.metrics")) st.sidebar.markdown(f"[{t('header.metrics')}](#metrics)") diff --git a/sapientml_webapp/lib/app_tabular.py b/sapientml_webapp/lib/app_tabular.py index 0eaf68f..9432867 100644 --- a/sapientml_webapp/lib/app_tabular.py +++ b/sapientml_webapp/lib/app_tabular.py @@ -367,7 +367,7 @@ def section_configure_params( state = st.session_state.conf previous_state = st.session_state.previous_conf # Configuration - st.markdown("", unsafe_allow_html=True) + st.markdown("
", unsafe_allow_html=True) st.write("") style.custom_h4(t("header.configuration")) st.sidebar.markdown(f"[{t('header.configuration')}](#set_param)") @@ -461,7 +461,7 @@ def part_view_predicted( (lang, is_tutorial) = self.get_query_params() t = use_translation(lang, self.I18N_DICT) - st.markdown("", unsafe_allow_html=True) + st.markdown("
", unsafe_allow_html=True) st.write("") style.custom_h4(t("header.prediction_result")) st.sidebar.markdown(f"[{t('header.prediction_result')}](#prediction_result)") diff --git a/sapientml_webapp/lib/call_automl.py b/sapientml_webapp/lib/call_automl.py new file mode 100644 index 0000000..95a56a9 --- /dev/null +++ b/sapientml_webapp/lib/call_automl.py @@ -0,0 +1,117 @@ +import argparse +import json +import os +import pickle +import time +from pathlib import Path + +import numpy as np +import pandas as pd +from sapientml import SapientML +from sapientml.util.json_util import JSONEncoder +from sapientml.util.logging import setup_logger +from sapientml_core.explain.main import process as explain +from sapientml_core.generator import add_prefix + +logger = setup_logger() + + +def convert_int64(obj): + if isinstance(obj, dict): + return {key: convert_int64(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [convert_int64(item) for item in obj] + elif isinstance(obj, np.int64): + return int(obj) + else: + return obj + + +def main(args): + config = json.loads(args.config) + + train_data_path = Path(args.train_data_path) + train_data = pd.read_csv(train_data_path) + + target_columns = config["target_columns"] + n_models = config.get("n_models", 3) + + output_dir = Path(config["output_dir"]) + + # Streamlit 1.33とnbconvertの組み合わせはグラフ出力がなされないため、 + # EDA用のコードは、fit()では生成せず、別途生成し、実行は別プロセスで行う + + config["add_explanation"] = False + sml = SapientML( + target_columns, model_type=args.estimator_name, **({k: v for k, v in config.items() if k != "target_columns"}) + ) + + fit_args = ["save_datasets_format", "csv_encoding", "ignore_columns", "output_dir", "test_data"] + sml.fit(train_data, **({k: v for k, v in config.items() if k in fit_args})) + + explain( + visualization=True, + eda=True, + dataframe=sml.generator.dataset.training_dataframe, + script_path=(sml.generator.output_dir / add_prefix("final_script.py", sml.generator.config.project_name)) + .absolute() + .as_posix(), + target_columns=sml.generator.task.target_columns, + problem_type=sml.generator.task.task_type, + ignore_columns=sml.generator.dataset.ignore_columns, + skeleton=sml.generator._best_pipeline.labels, + explanation=sml.generator._best_pipeline.pipeline_json, + run_info=sml.generator.debug_info, + internal_execution=False, + timeout=sml.generator.config.timeout_for_test, + cancel=sml.generator.config.cancel, + ) + + # 別プロセスによるipynbの実行を待つ + # APIアクセストークンが必要なためフィアルに書き出す + logger.info("Running the explained notebook...") + start_time = time.time() + + while True: + if (output_dir / "final_script.ipynb.out.ipynb").exists(): + logger.info("Saved explained notebook") + break + else: + time.sleep(1) + + if time.time() - start_time > 120: + logger.info("Failed to execute notebook") + break + + with open(output_dir / "sml.pkl", "wb") as f: + pickle.dump(sml, f) + + if not Path(output_dir / "final_script_code_explainability.json").exists(): + script_code_explainability = sml.generator._best_pipeline.pipeline_json + with open(output_dir / "script_code_explainability.json", "w") as f: + json.dump(script_code_explainability, f, ensure_ascii=False, indent=2) + + candidates = sml.generator._candidate_scripts + elements = [t[0] for t in candidates] + + for i in range(n_models): + with open(output_dir / f"{i+1}_script_code_explainability.json", "w") as f: + json.dump(elements[i].pipeline_json, f, ensure_ascii=False, indent=2) + + if not Path(output_dir / ".skeleton.json").exists(): + skeleton = sml.generator._best_pipeline.labels + with open(output_dir / ".skeleton.json", "w") as f: + json.dump(convert_int64(skeleton), f, ensure_ascii=False, indent=2) + + if not Path(output_dir / "run_info.json").exists(): + with open(output_dir / "run_info.json", "w", encoding="utf-8") as f: + json.dump(sml.generator.debug_info, f, cls=JSONEncoder, indent=4) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--train_data_path", type=str, required=True, help="File name to process") + parser.add_argument("--estimator_name", type=str, required=True, help="File name to process") + parser.add_argument("--config", type=str, required=True, help="Config dictionary in JSON format") + args = parser.parse_args() + main(args) diff --git a/sapientml_webapp/lib/generate_code_subprocess.py b/sapientml_webapp/lib/generate_code_subprocess.py new file mode 100644 index 0000000..70322fc --- /dev/null +++ b/sapientml_webapp/lib/generate_code_subprocess.py @@ -0,0 +1,81 @@ +import json +import re +import subprocess +import threading +import time +from pathlib import Path + +LOG_LEVEL = {"DEBUG", "INFO", "WARNING", "ERROR"} + + +class SubprocessExecutor: + def __init__(self, debug): + self.proc = None + self.log_text = "" + self.debug = debug + self.thread = None + self.full_path = None + + def run_subprocess(self, train_data_path, estimator_name, config): + self.proc = subprocess.Popen( + [ + "python", + "lib/call_automl.py", + "--train_data_path", + train_data_path, + "--estimator_name", + estimator_name, + "--config", + json.dumps(config), + ], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + + def get_subprocess_output(self): + while True: + output = self.proc.stdout.readline() + if output == "" and self.proc.poll() is not None: + break + if self.debug: + if output: + self.log_text += "\n" + output.strip() + else: + if any(level in output for level in LOG_LEVEL): + self.log_text += "\n" + self.filter(output.strip()) + + time.sleep(1) + + def start_automl(self, train_data_path, estimator_name, config): + + output_dir = Path(config["output_dir"]) + self.full_path = str(output_dir.resolve()) + + threading.Thread( + target=self.run_subprocess, + args=(str(train_data_path), estimator_name, {k: v for k, v in config.items() if k != "training_dataframe"}), + ).start() + time.sleep(1) + self.thread = threading.Thread(target=self.get_subprocess_output) + self.thread.start() + + def filter(self, record): + + patterns = [ + (r"saved:.+/final_script.ipynb", "Saved notebook."), + (r"Saved explained notebook in:.+/final_script.ipynb.out.ipynb", "Saved explained notebook."), + (re.escape(self.full_path), ""), + (r": request url:.*", ""), + (r": Traceback.*", ""), + ] + + for pattern, replacement in patterns: + match = re.search(pattern, record) + if match: + record = record.replace(match.group(0), replacement) + + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + record = ansi_escape.sub("", record) + + return record diff --git a/sapientml_webapp/lib/generate_code_thread.py b/sapientml_webapp/lib/generate_code_thread.py deleted file mode 100644 index eda016b..0000000 --- a/sapientml_webapp/lib/generate_code_thread.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2023-2024 The SapientML Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import json -import logging -import threading -from contextvars import ContextVar -from pathlib import Path -from uuid import UUID - -from sapientml import SapientML - -from .utils import convert_int64 - - -class GenerateCodeThread(threading.Thread): - def __init__( - self, - sml: SapientML, - config: dict, - log_handler: logging.Handler, - ctx_uuid: ContextVar[UUID], - uuid: UUID, - ): - self.sml = sml - self.config = config - self.result = None - self.exception = None - self.log_handler = log_handler - self.ctx_uuid = ctx_uuid - self.uuid = uuid - threading.Thread.__init__(self) - - def run(self): - try: - self.ctx_uuid.set(self.uuid) - - fit_args = ["save_datasets_format", "csv_encoding", "ignore_columns", "output_dir", "test_data"] - self.sml.fit(self.config["training_dataframe"], **({k: v for k, v in self.config.items() if k in fit_args})) - - output_dir = self.config["output_dir"] - if not Path(output_dir / "final_script_code_explainability.json").exists(): - script_code_explainability = self.sml.generator._best_pipeline.pipeline_json - - with open(output_dir / "script_code_explainability.json", "w") as f: - json.dump(script_code_explainability, f, ensure_ascii=False, indent=2) - - candidates = self.sml.generator._candidate_scripts - elements = [t[0] for t in candidates] - - for i in range(3): - # explainability = - with open(output_dir / f"{i+1}_script_code_explainability.json", "w") as f: - json.dump(elements[i].pipeline_json, f, ensure_ascii=False, indent=2) - - if not Path(output_dir / ".skeleton.json").exists(): - skeleton = self.sml.generator._best_pipeline.labels - with open(output_dir / ".skeleton.json", "w") as f: - json.dump(convert_int64(skeleton), f, ensure_ascii=False, indent=2) - - except Exception as e: - self.exception = e - finally: - pass - - def get_result(self): - return self.result - - def get_exception(self): - return self.exception - - def get_sml(self): - return self.sml - - def trigger_cancel(self): - self.cancel_token.isTriggered = True diff --git a/sapientml_webapp/lib/session_state.py b/sapientml_webapp/lib/session_state.py index cc565aa..7a2a831 100644 --- a/sapientml_webapp/lib/session_state.py +++ b/sapientml_webapp/lib/session_state.py @@ -17,7 +17,7 @@ import pandas as pd from sapientml import SapientML -from .generate_code_thread import GenerateCodeThread +from .generate_code_subprocess import SubprocessExecutor from .make_result_thread import MakeResultThread @@ -44,7 +44,7 @@ class ConfigurationsState: @dataclass class CodeGenerationState: - thread_generatecode: GenerateCodeThread = None + subproc_executor: SubprocessExecutor = None result = None ex: Exception = None log_stream: io.StringIO = None