-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdynamic_entities_objects.py
More file actions
89 lines (71 loc) · 2.96 KB
/
dynamic_entities_objects.py
File metadata and controls
89 lines (71 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import requests
from obp_client import token, obp_host
from dynamic_entities import (
PREFIX,
ENTITY_PROJECT,
ENTITY_PARCEL,
ENTITY_PARCEL_OWNERSHIP_VERIFICATION,
ENTITY_PROJECT_PARCEL_VERIFICATION,
ENTITY_PROJECT_VERIFICATION,
ENTITY_PARCEL_MONITORING_PERIOD_VERIFICATION,
ENTITY_PROJECT_MONITORING_PERIOD_VERIFICATION
)
# Configuration
BASE_URL = obp_host # Replace with your OBP instance URL
DIRECTLOGIN_TOKEN = token # Optional: Replace with your DirectLogin token
def create_entity_object(entity_name, payload, token=None):
url = f"{BASE_URL}/obp/dynamic-entity/{entity_name}"
headers = {
"Content-Type": "application/json"
}
# Add authentication if token is provided
if token:
headers["Authorization"] = f"DirectLogin token={token}"
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error creating system dynamic entity object for {entity_name}: {e}")
if hasattr(e.response, 'text'):
print(f"Response: {e.response.text}")
raise
def create_project(project_owner="", token=None):
return create_entity_object(ENTITY_PROJECT, {"project_owner": project_owner}, token)
def create_parcel(project_id="", parcel_owner="", geo_data="", token=None):
return create_entity_object(ENTITY_PARCEL, {"project_id": project_id, "parcel_owner": parcel_owner, "geo_data": geo_data}, token)
def create_parcel_ownership_verification(parcel_id="", status_code="", status_message="", authority="", token=None):
return create_entity_object(ENTITY_PARCEL_OWNERSHIP_VERIFICATION, {
"parcel_id": parcel_id,
"status_code": status_code,
"status_message": status_message,
"authority": authority
}, token)
def create_parcel_verification(parcel_id="", project_id_="", status_code="", status_message="", amount="", token=None):
return create_entity_object(ENTITY_PROJECT_PARCEL_VERIFICATION, {
"parcel_id": parcel_id,
"project_id_": project_id_,
"status_code": status_code,
"status_message": status_message,
"amount": amount
}, token)
def create_project_verification(project_id="", status_code=None, status_message=None, token=None):
return create_entity_object(ENTITY_PROJECT_VERIFICATION, {
"project_id": project_id,
"status_code": status_code,
"status_message": status_message
}, token)
def create_parcel_monitoring_period_verification(parcel_id="", project_id="", status_code="", status_message="", amount="", token=None):
return create_entity_object(ENTITY_PARCEL_MONITORING_PERIOD_VERIFICATION, {
"parcel_id": parcel_id,
"project_id": project_id,
"status_code": status_code,
"status_message": status_message,
"amount": amount
}, token)
def create_project_monitoring_period_verification(project_id="", status_code="", status_message="", token=None):
return create_entity_object(ENTITY_PROJECT_MONITORING_PERIOD_VERIFICATION, {
"project_id": project_id,
"status_code": status_code,
"status_message": status_message
}, token)