-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrelay_controller.py
More file actions
167 lines (130 loc) · 4.9 KB
/
relay_controller.py
File metadata and controls
167 lines (130 loc) · 4.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
Módulo de control para relés Waveshare RPi Relay Board
Control de 4 relés mediante GPIO (activos en BAJO)
Instituto Tecnológico Metropolitano (ITM) - Medellín, Colombia
"""
import logging
from typing import Dict
try:
import RPi.GPIO as GPIO
GPIO_AVAILABLE = True
except ImportError:
GPIO_AVAILABLE = False
logging.warning("RPi.GPIO no disponible - Modo simulación activado")
logger = logging.getLogger(__name__)
class RelayController:
"""Controlador para módulo de 4 relés Waveshare (activos en BAJO)"""
def __init__(self, relay_pins: Dict[str, int]):
"""
Inicializa controlador de relés
Args:
relay_pins: Diccionario con nombres y pines GPIO (BCM)
"""
self.relay_pins = relay_pins
self.relay_states = {}
if GPIO_AVAILABLE:
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
for name, pin in self.relay_pins.items():
GPIO.setup(pin, GPIO.OUT, initial=GPIO.HIGH)
self.relay_states[name] = False
logger.info(f"Relé {name} (GPIO {pin}) inicializado: DESACTIVADO")
else:
for name in self.relay_pins.keys():
self.relay_states[name] = False
logger.info(f"Relé {name} inicializado en modo simulación")
def activate_relay(self, relay_name: str) -> bool:
"""
Activa un relé específico (GPIO LOW)
Args:
relay_name: Nombre del relé (ej: "RELAY_1")
Returns:
True si activación exitosa
"""
if relay_name not in self.relay_pins:
logger.error(f"Relé {relay_name} no existe")
return False
pin = self.relay_pins[relay_name]
try:
if GPIO_AVAILABLE:
GPIO.output(pin, GPIO.LOW)
self.relay_states[relay_name] = True
logger.info(f"Relé {relay_name} (GPIO {pin}) ACTIVADO")
return True
except Exception as e:
logger.error(f"Error activando relé {relay_name}: {e}")
return False
def deactivate_relay(self, relay_name: str) -> bool:
"""
Desactiva un relé específico (GPIO HIGH)
Args:
relay_name: Nombre del relé
Returns:
True si desactivación exitosa
"""
if relay_name not in self.relay_pins:
logger.error(f"Relé {relay_name} no existe")
return False
pin = self.relay_pins[relay_name]
try:
if GPIO_AVAILABLE:
GPIO.output(pin, GPIO.HIGH)
self.relay_states[relay_name] = False
logger.info(f"Relé {relay_name} (GPIO {pin}) DESACTIVADO")
return True
except Exception as e:
logger.error(f"Error desactivando relé {relay_name}: {e}")
return False
def toggle_relay(self, relay_name: str) -> bool:
"""
Cambia el estado de un relé (on->off, off->on)
Args:
relay_name: Nombre del relé
Returns:
True si cambio exitoso
"""
if relay_name not in self.relay_pins:
logger.error(f"Relé {relay_name} no existe")
return False
current_state = self.relay_states.get(relay_name, False)
if current_state:
return self.deactivate_relay(relay_name)
else:
return self.activate_relay(relay_name)
def get_relay_state(self, relay_name: str) -> bool:
"""
Consulta el estado de un relé
Args:
relay_name: Nombre del relé
Returns:
True si activo, False si inactivo
"""
return self.relay_states.get(relay_name, False)
def get_all_states(self) -> Dict[str, bool]:
"""
Obtiene estado de todos los relés
Returns:
Diccionario con nombre -> estado
"""
return self.relay_states.copy()
def deactivate_all(self) -> bool:
"""
Desactiva todos los relés (modo seguro)
Returns:
True si todas las desactivaciones fueron exitosas
"""
success = True
for relay_name in self.relay_pins.keys():
if not self.deactivate_relay(relay_name):
success = False
if success:
logger.info("Todos los relés desactivados")
else:
logger.warning("Algunos relés no se desactivaron correctamente")
return success
def cleanup(self):
"""Limpia recursos GPIO al finalizar"""
if GPIO_AVAILABLE:
self.deactivate_all()
GPIO.cleanup()
logger.info("GPIO limpiado")