Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 44 additions & 104 deletions OMPython/ModelicaSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,112 +854,52 @@ def checkValidInputs(self, name):
else:
ModelicaSystemError('Error!!! Value must be in tuple format')

# To create csv file for inputs
def createCSVData(self):
sl = [] # Actual timestamps
skip = False

# check for NONE in input list and replace with proper data (e.g) [(startTime, 0.0), (stopTime, 0.0)]
tmpinputlist = {}
for key, value in self.inputlist.items():
if value is None:
tmpinputlist[key] = [(float(self.simulateOptions["startTime"]), 0.0),
(float(self.simulateOptions["stopTime"]), 0.0)]
def createCSVData(self) -> None:
start_time: float = float(self.simulateOptions["startTime"])
stop_time: float = float(self.simulateOptions["stopTime"])

# Replace None inputs with a default constant zero signal
inputs: dict[str, list[tuple[float, float]]] = {}
for input_name, input_signal in self.inputlist.items():
if input_signal is None:
inputs[input_name] = [(start_time, 0.0), (stop_time, 0.0)]
else:
tmpinputlist[key] = value

inp = list(tmpinputlist.values())

for i in inp:
cl = list()
el = list()
for t, x in i:
cl.append(t)
for i in cl:
if skip is True:
skip = False
continue
if i not in sl:
el.append(i)
else:
elem_no = cl.count(i)
sl_no = sl.count(i)
if elem_no == 2 and sl_no == 1:
el.append(i)
skip = True
sl = sl + el

sl.sort()
for t in sl:
for i in inp:
for ttt in [tt[0] for tt in i]:
if t not in [tt[0] for tt in i]:
i.append((t, '?'))
inpSortedList = list()
sortedList = list()
for i in inp:
sortedList = sorted(i, key=lambda x: x[0])
inpSortedList.append(sortedList)
for i in inpSortedList:
ind = 0
for t, x in i:
if x == '?':
t1 = i[ind - 1][0]
u1 = i[ind - 1][1]
t2 = i[ind + 1][0]
u2 = i[ind + 1][1]
nex = 2
while (u2 == '?'):
u2 = i[ind + nex][1]
t2 = i[ind + nex][0]
nex += 1
x = float(u1 + (u2 - u1) * (t - t1) / (t2 - t1))
i[ind] = (t, x)
ind += 1
slSet = list()
slSet = set(sl)
for i in inpSortedList:
tempTime = list()
for (t, x) in i:
tempTime.append(t)
inSl = None
inI = None
for s in slSet:
inSl = sl.count(s)
inI = tempTime.count(s)
if inSl != inI:
test = list()
test = [(x, y) for x, y in i if x == s]
i.append(test[0])
newInpList = list()
tempSorting = list()
for i in inpSortedList:
# i.sort() => just sorting might not work so need to sort according to 1st element of a tuple
tempSorting = sorted(i, key=lambda x: x[0])
newInpList.append(tempSorting)

interpolated_inputs_all = list()
for i in newInpList:
templist = list()
for (t, x) in i:
templist.append(x)
interpolated_inputs_all.append(templist)

name = ','.join(list(self.inputlist.keys()))
name = f'time,{name},end'

a = ''
l = []
l.append(name)
for i in range(0, len(sl)):
a = f'{float(sl[i])},{",".join(str(float(inppp[i])) for inppp in interpolated_inputs_all)},0'
l.append(a)

self.csvFile = (pathlib.Path(self.tempdir) / f'{self.modelName}.csv').as_posix()
inputs[input_name] = input_signal

# Collect all unique timestamps across all input signals
all_times = np.array(
sorted({t for signal in inputs.values() for t, _ in signal}),
dtype=float
)

# Interpolate missing values
interpolated_inputs: dict[str, np.ndarray] = {}
for signal_name, signal_values in inputs.items():
signal = np.array(signal_values)
interpolated_inputs[signal_name] = np.interp(
all_times,
signal[:, 0], # times
signal[:, 1] # values
)

# Write CSV file
input_names = list(interpolated_inputs.keys())
header = ['time'] + input_names + ['end']

csv_rows = [header]
for i, t in enumerate(all_times):
row = [
t, # time
*(interpolated_inputs[name][i] for name in input_names), # input values
0 # trailing 'end' column
]
csv_rows.append(row)

self.csvFile: str = (pathlib.Path(self.tempdir) / f'{self.modelName}.csv').as_posix()

with open(self.csvFile, "w", newline="") as f:
writer = csv.writer(f, delimiter='\n')
writer.writerow(l)
f.close()
writer = csv.writer(f)
writer.writerows(csv_rows)

# to convert Modelica model to FMU
def convertMo2Fmu(self, version="2.0", fmuType="me_cs", fileNamePrefix="<default>", includeResources=True): # 19
Expand Down
Loading