Coverage for apps/automower.py: 100%
115 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 12:23 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-12 12:23 +0000
1# The MIT License (MIT)
2#
3# Copyright © 2023 Xavier Berger
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy of this software
6# and associated documentation files (the “Software”), to deal in the Software without restriction,
7# including without limitation the rights to use, copy, modify, merge, publish, distribute,
8# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
9# furnished to do so, subject to the following conditions:
10#
11# The above copyright notice and this permission notice shall be included in all copies or
12# substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
15# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
17# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
19from datetime import datetime
21import appdaemon.plugins.hass.hassapi as hass
22import pytz
24#
25# Automower App
26#
27# Args:
28# message_park_because_of_rain: "It starts raining, park until rain stops and lawn dries."
29# message_end_of_session_soon: "End session is in less than 1 hour, stay parked."
30# message_lawn_is_dry: "No rain during last 6h. Lawn should be dry now."
31# message_activated: "Advanced automation is activated."
32# message_deactivated: "Advanced automation is deactivated."
34#
35# End session management:
36# If remaining duration of mowing after next start is less than 1 hour, stay parked
37#
38# Rain management:
39# When it starts raining, go to dock or stay parcked for maximum possible duration (42 days)
40# When sun is at its top, if no rain occurs during the 6 previous hours, restart mowing
41# If no rain occurs during the 6 previous hours and sun is setting, restart mowing
42#
43# Automation management:
44# This automation is active only automower is not "parked until further notice"
45# This verification is designed to be sure that manual order will never be overwritten
46#
47# Notification:
48# Notification are sent to Telegram. This allow to have an history of what happen and when it happens.
51class Automower(hass.Hass):
52 def initialize(self):
53 """
54 Initialize the Automower Automation.
55 """
56 self.log("Starting Automower Automation")
58 # Handles to register / unregister callbacks
59 self.state_handles = []
61 # Max duration time is used to park automover when it's raining (max = 42 days)
62 self.park_max_duration = self.get_state("number.nono_park_for", attribute="max")
63 self.log(f"\tpark max duration : {self.park_max_duration}")
65 # This sensor tells if 'sheddule' or 'park until further notice' has been activated
66 self.listen_state(self.callback_automower_automation, "sensor.nono_problem_sensor", immediate=True)
68 ####################################################################################################################
69 # UTILITIES
70 ####################################################################################################################
72 def log_parked_because_of_rain(self):
73 """
74 Log the status of the 'parked_because_of_rain' binary sensor.
75 """
76 self.log(f"\tbinary_sensor.parked_because_of_rain: {self.get_state('binary_sensor.parked_because_of_rain')}")
78 def send_notification(self, **kwargs):
79 """
80 Send a notification.
81 """
82 self.log("Send notification")
83 self.log(f"\tMessage: {kwargs['message']}")
84 self.call_service(service="telegram_bot/send_message", title="🏡 Nono", **kwargs)
86 def service(self, message, command, **kwargs):
87 """
88 Call a service and send a notification.
89 """
90 self.log("Call service")
91 self.log(f"\t{command} ({kwargs})")
92 self.call_service(command, **kwargs)
93 self.send_notification(message=message, disable_notification=True)
95 def force_park(self, message, duration):
96 """
97 Force the Automower to park for a specific duration.
98 """
99 self.service(
100 message=message,
101 command="number/set_value",
102 entity_id="number.nono_park_for",
103 value=duration,
104 )
106 def restart_after_rain(self):
107 """
108 Restart the Automower after a period of rain.
109 """
110 self.service(
111 message=self.args["message_lawn_is_dry"],
112 command="vacuum/start",
113 entity_id="vacuum.nono",
114 )
115 self.set_state("binary_sensor.parked_because_of_rain", state="off")
117 ####################################################################################################################
118 # APPLICATION MANAGEMENT
119 ####################################################################################################################
121 def callback_automower_automation(self, entity, attribute, old, new, kwargs):
122 """
123 Callback for automower automation activation.
124 """
125 # self.log(f"new={new}")
126 if new == "parked_until_further_notice":
127 # Deregister callbacks
128 while len(self.state_handles) >= 1:
129 handle = self.state_handles.pop()
130 self.cancel_listen_state(handle)
131 message = self.args["message_deactivated"]
132 elif new in ["week_schedule", "charging"]:
133 if len(self.state_handles) != 0:
134 # callback are already registred. No need to register again
135 return
137 # register callbacks
138 # Listen for rain sensors
139 self.state_handles.append(self.listen_state(self.callback_rain_changed, "sensor.rain_last_6h"))
141 # Listen for sun start to decrease
142 self.state_handles.append(
143 self.listen_state(self.callback_sun_is_at_top, "sun.sun", attribute="rising", new=False)
144 )
146 # Listen next start
147 self.state_handles.append(
148 self.listen_state(
149 self.callback_next_start_changed,
150 "sensor.nono_next_start",
151 immediate=True,
152 )
153 )
154 message = self.args["message_activated"]
155 else:
156 # Robot is mowing or having an error
157 return
159 self.log("Automower automation activation triggered")
160 self.log(f"\t{message}")
161 self.send_notification(message=message)
163 ####################################################################################################################
164 # RAIN MANAGEMENT
165 ####################################################################################################################
167 def callback_rain_changed(self, entity, attribute, old, new, kwargs):
168 """
169 Callback for handling rain sensor changes.
170 """
171 self.log("Rain event triggered")
172 self.log_parked_because_of_rain()
174 try:
175 old_value = float(old)
176 except Exception:
177 # at startup: old is None
178 # if old is unavailable :
179 # let's considere that no rain occured
180 old_value = 0.0
182 try:
183 new_value = float(new)
184 except Exception:
185 # if new is unavailable, we can't do anything
186 return
188 if (old_value == 0.0) and (new_value > 0.0):
189 # Rain is starting
190 self.set_state("binary_sensor.parked_because_of_rain", state="on")
191 self.force_park(
192 message=self.args["message_park_because_of_rain"],
193 duration=60480,
194 )
195 elif new_value == 0.0:
196 # No rain occurs during last 6 hours and sun is setting
197 if self.get_state("sun.sun", attribute="rising"):
198 message = "No rain during last 6h, waiting for noon to restart."
199 self.log(message)
200 self.send_notification(message=message, disable_notification=True)
201 elif self.get_state("sun.sun") == "below_horizon":
202 message = "No rain during last 6h, sun is below horizon, waiting for tomorow noon to restart."
203 self.log(f"\t{message}")
204 self.send_notification(message=message, disable_notification=True)
205 else:
206 self.restart_after_rain()
207 else:
208 # It is still raining or rain has stopped recently
209 self.log("\tRain occured during last 6h, lawn shouldn't be dry yet.")
210 self.log_parked_because_of_rain()
212 def callback_sun_is_at_top(self, entity, attribute, old, new, kwargs):
213 """
214 Callback for handling sun position changes.
215 """
216 self.log("Sun event triggered")
217 self.log_parked_because_of_rain()
218 if self.get_state("binary_sensor.parked_because_of_rain") == "on":
219 if self.get_state("sensor.rain_last_6h") == 0.0:
220 self.restart_after_rain()
221 else:
222 message = "Lawn shouldn't be dry yet. Staying parked."
223 self.log(f"\t{message}")
224 self.send_notification(message=message, disable_notification=True)
225 else:
226 message = "Not park because of rain. Nothing to do."
227 self.log(f"\t{message}")
228 self.log_parked_because_of_rain()
230 ####################################################################################################################
231 # SESSION MANAGEMENT
232 ####################################################################################################################
234 def callback_next_start_changed(self, entity, attribute, old, new, kwargs):
235 """
236 Callback for handling changes in the next start time.
237 """
238 self.log("Next start event triggered")
239 if self.get_state("binary_sensor.parked_because_of_rain") == "on":
240 message = "Robot is parked because of rain. Nothing to check."
241 self.log(f"\t{message}")
242 self.send_notification(message=message, disable_notification=True)
243 return
245 self.log(f"\told={old}")
246 self.log(f"\tnew={new}")
248 # If robot is currently mowing, we don't have next start
249 if new == "unknown":
250 message = "Robot is currently mowing, let it come back to base before checking."
251 self.log(f"\t{message}")
252 self.send_notification(message=message, disable_notification=True)
253 return
255 # Get next end of session
256 mowing_session_end = datetime.strptime(
257 self.get_state("calendar.nono", attribute="end_time"), "%Y-%m-%d %H:%M:%S"
258 )
260 print(f"self.get_timezone() = {self.get_timezone()}")
261 local = pytz.timezone(self.get_timezone())
262 mowing_session_end_utc = local.localize(mowing_session_end, is_dst=None).astimezone(pytz.utc)
264 self.log(f"\tMowing session will end at {mowing_session_end} => {mowing_session_end_utc} UTC")
266 # Get next start
267 next_start_utc = datetime.strptime(new, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=pytz.utc)
268 next_start = next_start_utc.astimezone(local)
270 # Check delta and decide action to perform
271 delta = (mowing_session_end_utc - next_start_utc).total_seconds() / 3600
272 self.log(f"\tNext start is planned at {next_start} => {next_start_utc} UTC")
273 self.log(f"\tThe number of hour before mowing session end is {delta}")
274 if delta < 0:
275 message = f"Session completed. Lets restart tomorrow at {next_start}"
276 self.log(f"\t{message}")
277 self.send_notification(message=message, disable_notification=True)
278 elif delta < 1:
279 self.log(f"\t{self.args['message_end_of_session_soon']}")
280 self.force_park(
281 message=self.args["message_end_of_session_soon"],
282 duration=180,
283 )
284 else: # delta >= 1
285 message = f"Duration between next start ({next_start}) and end of session is greater than 1 hour."
286 self.log(f"\t{message}")
287 self.send_notification(message=f"Next start planned at {next_start}", disable_notification=True)