diff --git a/pyproject.toml b/pyproject.toml index 26bbe10a..194647df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "sampo" -version = "0.1.2" +version = "0.1.2.0.6" description = "Open-source framework for adaptive manufacturing processes scheduling" authors = ["iAirLab "] license = "BSD-3-Clause" diff --git a/sampo/schemas/graph.py b/sampo/schemas/graph.py index 771a474e..c0552d6f 100644 --- a/sampo/schemas/graph.py +++ b/sampo/schemas/graph.py @@ -362,9 +362,8 @@ def to_frame(self, save_req=False) -> pd.DataFrame: # Define the format of the output DataFrame graph_df_structure = {'activity_id': [], 'activity_name': [], - 'granular_name': [], + 'model_name': [], 'volume': [], - 'measurement': [], 'priority': [], 'predecessor_ids': [], 'connection_types': [], @@ -385,9 +384,9 @@ def to_frame(self, save_req=False) -> pd.DataFrame: # Get information about tasks from graph nodes (work units) graph_df_structure['activity_id'].append(node_info_dict['work_unit']['id']) graph_df_structure['activity_name'].append(node_info_dict['work_unit']['display_name']) - graph_df_structure['granular_name'].append(node_info_dict['work_unit']['name']) + # model_name must be string! + graph_df_structure['model_name'].append(str(node_info_dict['work_unit']['model_name'])) graph_df_structure['volume'].append(node_info_dict['work_unit']['volume']) - graph_df_structure['measurement'].append(node_info_dict['work_unit']['volume_type']) graph_df_structure['priority'].append(node_info_dict['work_unit']['priority']) if save_req: diff --git a/sampo/schemas/schedule.py b/sampo/schemas/schedule.py index de53fbfe..efddc984 100644 --- a/sampo/schemas/schedule.py +++ b/sampo/schemas/schedule.py @@ -29,7 +29,7 @@ class Schedule(JSONSerializable['Schedule']): _data_columns: list[str] = ['idx', 'task_id', 'task_name', 'contractor', 'cost', 'volume', 'start', - 'finish', 'duration', 'workers'] + 'finish', 'duration', 'workers', 'model_name'] _scheduled_work_column: str = 'scheduled_work_object' _columns: list[str] = _data_columns + [_scheduled_work_column] @@ -53,7 +53,7 @@ def pure_schedule_df(self) -> DataFrame: return self._schedule[~self._schedule.apply( lambda row: row[self._scheduled_work_column].is_service_unit, axis=1 - )][self._data_columns + _get_granular_name_columns(self._schedule[self._scheduled_work_column])] + )][self._data_columns] @property def works(self) -> Iterable[ScheduledWork]: @@ -124,7 +124,7 @@ def unite_stages(self) -> 'Schedule': def f(row): swork: ScheduledWork = deepcopy(row[self._scheduled_work_column]) row[self._scheduled_work_column] = swork - swork.model_name['granular_name'] = row['granular_name'] + swork.model_name = row['model_name'] swork.display_name = row['task_name'] swork.volume = float(row['volume']) swork.start_end_time = Time(int(row['start'])), Time(int(row['finish'])) @@ -157,11 +157,6 @@ def sed(time1, time2) -> tuple: start, end = tuple(sorted((time1, time2))) return start, end, end - start - model_name_columns = _get_granular_name_columns(works) - - def make_model_name_columns(swork: ScheduledWork) -> list[Any]: - return list(map(itemgetter(1), sorted(swork.model_name.items(), key=itemgetter(0)))) - data_frame = [(i, # idx w.id, # task_id w.display_name, # task_name @@ -170,11 +165,11 @@ def make_model_name_columns(swork: ScheduledWork) -> list[Any]: w.volume, # work volume *sed(*(t.value for t in w.start_end_time)), # start, end, duration repr(dict((i.name, i.count) for i in w.workers)), # workers + w.model_name, # model_name columns w, # full ScheduledWork info - *make_model_name_columns(w), # model_name columns ) for i, w in enumerate(works)] - data_frame = DataFrame.from_records(data_frame, columns=Schedule._columns + model_name_columns) + data_frame = DataFrame.from_records(data_frame, columns=Schedule._columns) data_frame = data_frame.set_index('idx', drop=False) @@ -184,7 +179,7 @@ def make_model_name_columns(swork: ScheduledWork) -> list[Any]: data_frame = data_frame.sort_values(['task_id']) data_frame.task_id = data_frame.task_id.astype(str) - data_frame = data_frame.reindex(columns=Schedule._columns + model_name_columns) + data_frame = data_frame.reindex(columns=Schedule._columns) data_frame = data_frame.reset_index(drop=True) return Schedule(data_frame) diff --git a/sampo/userinput/parser/csv_parser.py b/sampo/userinput/parser/csv_parser.py index aae71e49..ae79b92f 100644 --- a/sampo/userinput/parser/csv_parser.py +++ b/sampo/userinput/parser/csv_parser.py @@ -94,7 +94,6 @@ def read_graph_info(project_info: str | pd.DataFrame, @staticmethod def work_graph(works_info: pd.DataFrame, - name_mapper: NameMapper | None = None, work_resource_estimator: WorkTimeEstimator = DefaultWorkEstimator()) -> WorkGraph: """ Gets a info about WorkGraph and Contractors from file .csv. @@ -120,8 +119,6 @@ def work_graph(works_info: pd.DataFrame, """ works_info['activity_name_original'] = works_info.activity_name - if name_mapper: - works_info.activity_name = works_info.activity_name.apply(lambda name: name_mapper[name]) resources = [dict((worker_req.kind, int(worker_req.volume)) for worker_req in work_resource_estimator.find_work_resources(model_name=w[0], @@ -191,7 +188,7 @@ def work_graph_and_contractors(works_info: pd.DataFrame, equipments=dict()) ) - work_graph = CSVParser.work_graph(works_info, name_mapper, work_resource_estimator) + work_graph = CSVParser.work_graph(works_info, work_resource_estimator) # if we have no info about contractors or the user send an empty .csv file if len(contractors) == 0: diff --git a/sampo/userinput/parser/general_build.py b/sampo/userinput/parser/general_build.py index 23fb7bc1..2cf4f3cf 100644 --- a/sampo/userinput/parser/general_build.py +++ b/sampo/userinput/parser/general_build.py @@ -173,7 +173,9 @@ def normalize_if_number(s): def map_activity(row): model_name_dict = eval(row['model_name']) if 'granular_name' not in model_name_dict: - model_name_dict['granular_name'] = name_mapper[row['activity_name']] + model_name_dict['granular_name'] = row['activity_name'] + if name_mapper: + model_name_dict = name_mapper[model_name_dict] return str(model_name_dict) frame['model_name'] = frame[['activity_name', 'model_name']].apply(map_activity, axis=1) @@ -264,7 +266,7 @@ def build_work_graph(frame: pd.DataFrame, resource_names: list[str], work_estima group = row['group'] if 'group' in frame.columns else 'main project' priority = row['priority'] if 'priority' in frame.columns else 1 - work_unit = WorkUnit(row['activity_id'], row['model_name'], reqs, group=group, + work_unit = WorkUnit(row['activity_id'], eval(row['model_name']), reqs, group=group, description=description, volume=row['volume'], is_service_unit=is_service_unit, display_name=row['activity_name_original'], zone_reqs=zone_reqs, priority=priority) diff --git a/sampo/userinput/parser/history.py b/sampo/userinput/parser/history.py index c1c8cecd..7f08ae7f 100644 --- a/sampo/userinput/parser/history.py +++ b/sampo/userinput/parser/history.py @@ -14,7 +14,7 @@ def get_all_connections(graph_df: pd.DataFrame, mapper: NameMapper | None = None) \ -> Tuple[dict[str, list], dict[str, list]]: - task_name_column = 'granular_name' + task_name_column = 'model_name' num_tasks = len(graph_df) # Get the upper triangular indices to avoid duplicate pairs @@ -103,18 +103,26 @@ def gather_links_types_statistics(s1: str, f1: str, s2: str, f2: str) \ ffs21, ffs21_lags, ffs21_percent_lags +def add_granular_name_if_absent(row) -> str: + model_name = row['model_name'] + if 'granular_name' not in model_name: + model_name['granular_name'] = row['work_name'] + return model_name + + def get_all_seq_statistic(history_data: pd.DataFrame, graph_df: pd.DataFrame, use_model_name: bool = False, mapper: NameMapper | None = None): - df_grouped = history_data.copy() + if 'model_name' not in history_data.columns: + history_data['model_name'] = [{} for _ in range(len(history_data))] - if use_model_name: - column_name = 'model_name' - else: - if 'granular_name' not in history_data.columns: - history_data['granular_name'] = [activity_name for activity_name in history_data['work_name']] - column_name = 'granular_name' + if len(history_data) > 0: + history_data['model_name'] = history_data.apply(add_granular_name_if_absent, axis=1) + # [{'granular_name': activity_name} for activity_name in history_data['work_name']] + column_name = 'model_name' + + df_grouped = history_data.copy() df_grouped = df_grouped.groupby('upper_works')[column_name].apply(list).reset_index(name="Works") works1, works2 = get_all_connections(graph_df, use_model_name, mapper) diff --git a/sampo/utilities/schedule.py b/sampo/utilities/schedule.py index 05322bec..5a15fce5 100644 --- a/sampo/utilities/schedule.py +++ b/sampo/utilities/schedule.py @@ -60,19 +60,20 @@ def merge_split_stages(task_df: pd.DataFrame) -> pd.Series: :return: pd.Series with the full information about the task """ - def get_stage_num(name: str): - split_name = name.split(STAGE_SEP) + def get_stage_num(model_name: dict): + split_name = model_name['granular_name'].split(STAGE_SEP) return int(split_name[-1]) if len(split_name) > 1 else -1 if len(task_df) > 1: df = task_df.copy() - df['stage_num'] = df['granular_name'].apply(get_stage_num) + df['stage_num'] = df['model_name'].apply(get_stage_num) df = df.sort_values(by='stage_num') df = df.reset_index(drop=True) df = df.iloc[-1:].reset_index(drop=True) - for column in ['task_id', 'task_name', 'granular_name']: + for column in ['task_id', 'task_name']: df.loc[0, column] = df.loc[0, column].split(STAGE_SEP)[0] # fix task id and name + df.loc[0, 'model_name'] = df.loc[0, 'model_name']['granular_name'].split(STAGE_SEP)[0] # fix task model name # sum up volumes through all stages df.loc[0, 'volume'] = sum(task_df.loc[:, 'volume']) diff --git a/tests/parser/csv_parser_test.py b/tests/parser/csv_parser_test.py index 3261b3b7..6705d693 100644 --- a/tests/parser/csv_parser_test.py +++ b/tests/parser/csv_parser_test.py @@ -1,24 +1,43 @@ import os import sys +from operator import attrgetter import pandas as pd +from sampo.pipeline import SchedulingPipeline +from sampo.scheduler import HEFTScheduler from sampo.userinput.parser.csv_parser import CSVParser from sampo.userinput.parser.exception import WorkGraphBuildingException -def test_work_graph_csv_parser(): - try: - history = pd.DataFrame(columns=['marker_for_glue', 'work_name', 'first_day', 'last_day', - 'upper_works', 'work_name_clear_old', 'smr_name', - 'work_name_clear', 'granular_smr_name']) - works_info = CSVParser.read_graph_info(project_info=os.path.join(sys.path[0], 'tests/parser/test_wg.csv'), - history_data=history, - all_connections=True) - works_info.to_csv(os.path.join(sys.path[0], 'tests/parser/repaired.csv'), sep=';') - wg, contractors = CSVParser.work_graph_and_contractors(works_info) - print(f'\n\nWork graph has works: {wg.nodes}, and the number of contractors is {len(contractors)}\n\n') - except Exception as e: - raise WorkGraphBuildingException(f'There is no way to build work graph, {e}') - - os.remove(os.path.join(sys.path[0], 'tests/parser/repaired.csv')) +def test_work_graph_csv_parser_without_history(): + history = pd.DataFrame(columns=['marker_for_glue', 'model_name', 'first_day', 'last_day', + 'upper_works', 'work_name_clear_old', 'smr_name', + 'work_name_clear', 'granular_smr_name']) + works_info = CSVParser.read_graph_info(project_info=os.path.join(sys.path[0], 'tests/parser/test_wg.csv'), + history_data=history, + all_connections=True) + wg, contractors = CSVParser.work_graph_and_contractors(works_info) + print(f'\n\nWork graph has works: {wg.nodes}, and the number of contractors is {len(contractors)}\n\n') + + +def test_work_graph_csv_parser_with_history(): + works_info = CSVParser.read_graph_info(project_info=os.path.join(sys.path[0], 'tests/parser/test_wg.csv'), + history_data=os.path.join(sys.path[0], 'tests/parser/test_history_data.csv'), + all_connections=False, + change_connections_info=True) + wg, contractors = CSVParser.work_graph_and_contractors(works_info) + print(f'\n\nWork graph has works: {wg.nodes}, and the number of contractors is {len(contractors)}\n\n') + + +def test_work_graph_frame_serialization(setup_wg): + frame = setup_wg.to_frame() + + rebuilt_wg = CSVParser.work_graph(frame) + + project = SchedulingPipeline.create().wg(rebuilt_wg).schedule(HEFTScheduler()).finish()[0] + + origin_ids = set([node.id for node in setup_wg.nodes if not node.work_unit.is_service_unit]) + rebuilt_ids = set([node.id for node in rebuilt_wg.nodes if not node.work_unit.is_service_unit]) + + assert origin_ids == rebuilt_ids