Skip to content
145 changes: 145 additions & 0 deletions flexmeasures/api/common/utils/api_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from copy import deepcopy
import json
from timely_beliefs.beliefs.classes import BeliefsDataFrame
from typing import Sequence
from datetime import timedelta
Expand All @@ -14,6 +16,8 @@

from flexmeasures.data import db
from flexmeasures.data.models.user import Account
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.utils import save_to_db
from flexmeasures.auth.policy import check_access
from flexmeasures.api.common.responses import (
Expand All @@ -22,6 +26,7 @@
request_processed,
already_received_and_successfully_processed,
)
from flexmeasures.data.schemas.generic_assets import GenericAssetSchema as AssetSchema
from flexmeasures.utils.error_utils import error_handling_router
from flexmeasures.utils.flexmeasures_inflection import capitalize

Expand Down Expand Up @@ -182,3 +187,143 @@ def get_accessible_accounts() -> list[Account]:
pass

return accounts


def convert_asset_json_fields(asset_kwargs):
"""
Convert string fields in asset_kwargs to JSON where needed.
"""
if "attributes" in asset_kwargs and isinstance(asset_kwargs["attributes"], str):
asset_kwargs["attributes"] = json.loads(asset_kwargs["attributes"])
if "sensors_to_show" in asset_kwargs and isinstance(
asset_kwargs["sensors_to_show"], str
):
asset_kwargs["sensors_to_show"] = json.loads(asset_kwargs["sensors_to_show"])
if "flex_context" in asset_kwargs and isinstance(asset_kwargs["flex_context"], str):
asset_kwargs["flex_context"] = json.loads(asset_kwargs["flex_context"])
if "flex_model" in asset_kwargs and isinstance(asset_kwargs["flex_model"], str):
asset_kwargs["flex_model"] = json.loads(asset_kwargs["flex_model"])
if "sensors_to_show_as_kpis" in asset_kwargs and isinstance(
asset_kwargs["sensors_to_show_as_kpis"], str
):
asset_kwargs["sensors_to_show_as_kpis"] = json.loads(
asset_kwargs["sensors_to_show_as_kpis"]
)
return asset_kwargs


def _copy_direct_sensors(
source_asset: GenericAsset, copied_asset: GenericAsset
) -> None:
"""Copy sensors directly attached to one asset."""
source_sensors = db.session.scalars(
select(Sensor).filter(Sensor.generic_asset_id == source_asset.id)
).all()
for source_sensor in source_sensors:
sensor_kwargs = {}
for column in source_sensor.__table__.columns:
if column.name in [
"id",
"generic_asset_id",
"knowledge_horizon_fnc",
"knowledge_horizon_par",
]:
continue
sensor_kwargs[column.name] = deepcopy(getattr(source_sensor, column.name))

sensor_kwargs["generic_asset_id"] = copied_asset.id

db.session.add(Sensor(**sensor_kwargs))


def _copy_asset_subtree(
source_asset: GenericAsset,
destination_account_id: int,
destination_parent_asset_id: int | None,
asset_schema: AssetSchema,
) -> GenericAsset:
"""Recursively copy one asset and all descendants."""
asset_kwargs = asset_schema.dump(source_asset)

for key in ["id", "owner", "generic_asset_type", "child_assets", "sensors"]:
asset_kwargs.pop(key, None)

asset_kwargs["name"] = f"{asset_kwargs['name']} (Copy)"
asset_kwargs["account_id"] = destination_account_id
asset_kwargs["parent_asset_id"] = destination_parent_asset_id
asset_kwargs = convert_asset_json_fields(asset_kwargs)

copied_asset = GenericAsset(**asset_kwargs)
db.session.add(copied_asset)
db.session.flush()

_copy_direct_sensors(source_asset, copied_asset)

source_children = db.session.scalars(
select(GenericAsset)
.filter(GenericAsset.parent_asset_id == source_asset.id)
.order_by(GenericAsset.id)
).all()
for source_child in source_children:
_copy_asset_subtree(
source_asset=source_child,
destination_account_id=destination_account_id,
destination_parent_asset_id=copied_asset.id,
asset_schema=asset_schema,
)

return copied_asset


def copy_asset(
asset: GenericAsset,
account=None,
parent_asset=None,
) -> GenericAsset:
"""
Copy an asset subtree to a target account and/or under a target parent asset.

The copied subtree includes:
- the selected asset
- all descendant child assets (recursively)
- all sensors directly attached to each copied asset

Resolution rules:

- If neither ``account`` nor ``parent_asset`` is given, the copy is placed in
the same account and under the same parent as the original (i.e. a sibling).
- If ``account`` is given but ``parent_asset`` is not, the copy becomes a
top-level asset (no parent) in the given account.
- If ``parent_asset`` is given but ``account`` is not, the copy is placed under
the given parent and inherits that parent's account.
- If both are given, the copy belongs to the given account and is placed under
the given parent. This allows creating a copy that belongs to a different
account than its parent.
"""
try:
asset_schema = AssetSchema()

if account is None and parent_asset is None:
target_account_id = int(asset.account_id)
target_parent_asset_id = asset.parent_asset_id
elif account is not None and parent_asset is None:
target_account_id = int(account.id)
target_parent_asset_id = None
elif account is None and parent_asset is not None:
target_account_id = int(parent_asset.account_id)
target_parent_asset_id = int(parent_asset.id)
else:
target_account_id = int(account.id)
target_parent_asset_id = int(parent_asset.id)

copied_root = _copy_asset_subtree(
source_asset=asset,
destination_account_id=target_account_id,
destination_parent_asset_id=target_parent_asset_id,
asset_schema=asset_schema,
)
db.session.commit()
return copied_root
except Exception as e:
db.session.rollback()
raise e
119 changes: 111 additions & 8 deletions flexmeasures/api/v3_0/assets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import annotations

from typing import Any
import json
from datetime import datetime, timedelta
from http import HTTPStatus
Expand All @@ -11,7 +13,7 @@
from flask_json import as_json
from flask_sqlalchemy.pagination import SelectPagination

from marshmallow import fields, ValidationError, Schema, validate
from marshmallow import fields, post_load, ValidationError, Schema, validate

from webargs.flaskparser import use_kwargs, use_args
from sqlalchemy import select, func, or_
Expand Down Expand Up @@ -56,7 +58,10 @@
create_sequential_scheduling_job,
create_simultaneous_scheduling_job,
)
from flexmeasures.api.common.utils.api_utils import get_accessible_accounts
from flexmeasures.api.common.utils.api_utils import (
get_accessible_accounts,
copy_asset,
)
from flexmeasures.api.common.responses import (
unprocessable_entity,
request_processed,
Expand Down Expand Up @@ -159,6 +164,59 @@ class KPIKwargsSchema(Schema):
event_ends_before = AwareDateTimeField(format="iso", required=False)


class CopyAssetSchema(Schema):
account = AccountIdField(
data_key="account_id",
required=False,
metadata=dict(
description="Target account to copy the asset to.",
example=67,
),
)
parent_asset = AssetIdField(
data_key="parent_id",
required=False,
metadata=dict(
description="Target parent asset to copy the asset under.",
example=482,
),
)

@post_load
def resolve_account_and_parent(self, data, **kwargs):
"""
Resolve the account/parent relationship after loading:

- If ``account`` is explicitly given but ``parent_asset`` is not, the copy
becomes a top-level asset (no parent) in the given account.
- If ``parent_asset`` is explicitly given but ``account`` is not, the copy
inherits the account of the parent asset.
- If both are explicitly given, the copy can belong to a different account
than its parent, which is a valid cross-account parent relationship.
- If neither is given, the original asset's account and parent are preserved.
"""
account_given = "account" in data
parent_given = "parent_asset" in data

if account_given and not parent_given:
data["parent_asset"] = None
elif parent_given and not account_given:
data["account"] = data["parent_asset"].owner

# Resolve effective targets for permission checks and fallback behavior.
# If neither target is given, use the source asset's account/parent.
source_asset: GenericAsset | None = self.context.get("asset")
if source_asset is not None:
if data.get("account") is None:
data["resolved_account"] = source_asset.owner
data["resolved_parent"] = source_asset.parent_asset
else:
data["resolved_account"] = data["account"]
data["resolved_parent"] = data.get("parent_asset")

return data


class AssetTypesAPI(FlaskView):
"""
This API view exposes generic asset types.
Expand Down Expand Up @@ -301,13 +359,14 @@ def index(
check_access(account, "read")
account_ids = [account.id]
else:
use_all_accounts = all_accessible or root_asset
include_public = all_accessible or include_public or root_asset
account_ids = (
[a.id for a in get_accessible_accounts()]
if use_all_accounts
else [current_user.account.id]
use_all_accounts = all_accessible or (root_asset is not None)
include_public = (
all_accessible or include_public or (root_asset is not None)
)
if use_all_accounts:
account_ids = [a.id for a in get_accessible_accounts()]
else:
account_ids = [current_user.account.id]
filter_statement = GenericAsset.account_id.in_(account_ids)
if include_public:
filter_statement = filter_statement | GenericAsset.account_id.is_(None)
Expand Down Expand Up @@ -441,6 +500,9 @@ def asset_sensors(
tags:
- Assets
"""
if asset is None:
return unprocessable_entity("No asset found for the given id.")

query_statement = Sensor.generic_asset_id == asset.id

query = select(Sensor).filter(query_statement)
Expand Down Expand Up @@ -1542,3 +1604,44 @@ def get_kpis(self, id: int, asset: GenericAsset, start, end):
}
kpis.append(kpi_dict)
return dict(data=kpis), 200

@route("/<id>/copy", methods=["POST"])
@use_kwargs(
{
"asset": AssetIdField(
data_key="id", status_if_not_found=HTTPStatus.NOT_FOUND
)
},
location="path",
)
@as_json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a decorator here to check for read access (or whatever we named that) on the asset itself.

def copy_assets(self, id, asset: GenericAsset):
"""
.. :quickref: Assets; Copy an asset to a target account and/or parent.
"""
copy_asset_schema: Any = CopyAssetSchema()
copy_asset_schema.context["asset"] = asset

try:
copy_data = copy_asset_schema.load(request.args)
except ValidationError as e:
return unprocessable_entity(str(e.messages))

account = copy_data.get("account")
parent_asset = copy_data.get("parent_asset")
resolved_account = copy_data["resolved_account"]
resolved_parent = copy_data["resolved_parent"]

# Check create-children permission on the target account.
check_access(resolved_account, "create-children")

# Also check create-children permission on the target parent (if any).
if resolved_parent is not None:
check_access(resolved_parent, "create-children")

new_asset = copy_asset(asset, account=account, parent_asset=parent_asset)

return {
"message": f"Successfully copied asset {asset.id} to account {new_asset.account_id}.",
"asset": new_asset.id,
}, 201
Loading
Loading