Coverage for apps/automower.py: 100%

115 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-03-12 12:23 +0000

1# The MIT License (MIT) 

2# 

3# Copyright © 2023 Xavier Berger 

4# 

5# Permission is hereby granted, free of charge, to any person obtaining a copy of this software 

6# and associated documentation files (the “Software”), to deal in the Software without restriction, 

7# including without limitation the rights to use, copy, modify, merge, publish, distribute, 

8# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is 

9# furnished to do so, subject to the following conditions: 

10# 

11# The above copyright notice and this permission notice shall be included in all copies or 

12# substantial portions of the Software. 

13# 

14# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT 

15# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 

16# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 

17# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 

19from datetime import datetime 

20 

21import appdaemon.plugins.hass.hassapi as hass 

22import pytz 

23 

24# 

25# Automower App 

26# 

27# Args: 

28# message_park_because_of_rain: "It starts raining, park until rain stops and lawn dries." 

29# message_end_of_session_soon: "End session is in less than 1 hour, stay parked." 

30# message_lawn_is_dry: "No rain during last 6h. Lawn should be dry now." 

31# message_activated: "Advanced automation is activated." 

32# message_deactivated: "Advanced automation is deactivated." 

33 

34# 

35# End session management: 

36# If remaining duration of mowing after next start is less than 1 hour, stay parked 

37# 

38# Rain management: 

39# When it starts raining, go to dock or stay parcked for maximum possible duration (42 days) 

40# When sun is at its top, if no rain occurs during the 6 previous hours, restart mowing 

41# If no rain occurs during the 6 previous hours and sun is setting, restart mowing 

42# 

43# Automation management: 

44# This automation is active only automower is not "parked until further notice" 

45# This verification is designed to be sure that manual order will never be overwritten 

46# 

47# Notification: 

48# Notification are sent to Telegram. This allow to have an history of what happen and when it happens. 

49 

50 

51class Automower(hass.Hass): 

52 def initialize(self): 

53 """ 

54 Initialize the Automower Automation. 

55 """ 

56 self.log("Starting Automower Automation") 

57 

58 # Handles to register / unregister callbacks 

59 self.state_handles = [] 

60 

61 # Max duration time is used to park automover when it's raining (max = 42 days) 

62 self.park_max_duration = self.get_state("number.nono_park_for", attribute="max") 

63 self.log(f"\tpark max duration : {self.park_max_duration}") 

64 

65 # This sensor tells if 'sheddule' or 'park until further notice' has been activated 

66 self.listen_state(self.callback_automower_automation, "sensor.nono_problem_sensor", immediate=True) 

67 

68 #################################################################################################################### 

69 # UTILITIES 

70 #################################################################################################################### 

71 

72 def log_parked_because_of_rain(self): 

73 """ 

74 Log the status of the 'parked_because_of_rain' binary sensor. 

75 """ 

76 self.log(f"\tbinary_sensor.parked_because_of_rain: {self.get_state('binary_sensor.parked_because_of_rain')}") 

77 

78 def send_notification(self, **kwargs): 

79 """ 

80 Send a notification. 

81 """ 

82 self.log("Send notification") 

83 self.log(f"\tMessage: {kwargs['message']}") 

84 self.call_service(service="telegram_bot/send_message", title="🏡 Nono", **kwargs) 

85 

86 def service(self, message, command, **kwargs): 

87 """ 

88 Call a service and send a notification. 

89 """ 

90 self.log("Call service") 

91 self.log(f"\t{command} ({kwargs})") 

92 self.call_service(command, **kwargs) 

93 self.send_notification(message=message, disable_notification=True) 

94 

95 def force_park(self, message, duration): 

96 """ 

97 Force the Automower to park for a specific duration. 

98 """ 

99 self.service( 

100 message=message, 

101 command="number/set_value", 

102 entity_id="number.nono_park_for", 

103 value=duration, 

104 ) 

105 

106 def restart_after_rain(self): 

107 """ 

108 Restart the Automower after a period of rain. 

109 """ 

110 self.service( 

111 message=self.args["message_lawn_is_dry"], 

112 command="vacuum/start", 

113 entity_id="vacuum.nono", 

114 ) 

115 self.set_state("binary_sensor.parked_because_of_rain", state="off") 

116 

117 #################################################################################################################### 

118 # APPLICATION MANAGEMENT 

119 #################################################################################################################### 

120 

121 def callback_automower_automation(self, entity, attribute, old, new, kwargs): 

122 """ 

123 Callback for automower automation activation. 

124 """ 

125 # self.log(f"new={new}") 

126 if new == "parked_until_further_notice": 

127 # Deregister callbacks 

128 while len(self.state_handles) >= 1: 

129 handle = self.state_handles.pop() 

130 self.cancel_listen_state(handle) 

131 message = self.args["message_deactivated"] 

132 elif new in ["week_schedule", "charging"]: 

133 if len(self.state_handles) != 0: 

134 # callback are already registred. No need to register again 

135 return 

136 

137 # register callbacks 

138 # Listen for rain sensors 

139 self.state_handles.append(self.listen_state(self.callback_rain_changed, "sensor.rain_last_6h")) 

140 

141 # Listen for sun start to decrease 

142 self.state_handles.append( 

143 self.listen_state(self.callback_sun_is_at_top, "sun.sun", attribute="rising", new=False) 

144 ) 

145 

146 # Listen next start 

147 self.state_handles.append( 

148 self.listen_state( 

149 self.callback_next_start_changed, 

150 "sensor.nono_next_start", 

151 immediate=True, 

152 ) 

153 ) 

154 message = self.args["message_activated"] 

155 else: 

156 # Robot is mowing or having an error 

157 return 

158 

159 self.log("Automower automation activation triggered") 

160 self.log(f"\t{message}") 

161 self.send_notification(message=message) 

162 

163 #################################################################################################################### 

164 # RAIN MANAGEMENT 

165 #################################################################################################################### 

166 

167 def callback_rain_changed(self, entity, attribute, old, new, kwargs): 

168 """ 

169 Callback for handling rain sensor changes. 

170 """ 

171 self.log("Rain event triggered") 

172 self.log_parked_because_of_rain() 

173 

174 try: 

175 old_value = float(old) 

176 except Exception: 

177 # at startup: old is None 

178 # if old is unavailable : 

179 # let's considere that no rain occured 

180 old_value = 0.0 

181 

182 try: 

183 new_value = float(new) 

184 except Exception: 

185 # if new is unavailable, we can't do anything 

186 return 

187 

188 if (old_value == 0.0) and (new_value > 0.0): 

189 # Rain is starting 

190 self.set_state("binary_sensor.parked_because_of_rain", state="on") 

191 self.force_park( 

192 message=self.args["message_park_because_of_rain"], 

193 duration=60480, 

194 ) 

195 elif new_value == 0.0: 

196 # No rain occurs during last 6 hours and sun is setting 

197 if self.get_state("sun.sun", attribute="rising"): 

198 message = "No rain during last 6h, waiting for noon to restart." 

199 self.log(message) 

200 self.send_notification(message=message, disable_notification=True) 

201 elif self.get_state("sun.sun") == "below_horizon": 

202 message = "No rain during last 6h, sun is below horizon, waiting for tomorow noon to restart." 

203 self.log(f"\t{message}") 

204 self.send_notification(message=message, disable_notification=True) 

205 else: 

206 self.restart_after_rain() 

207 else: 

208 # It is still raining or rain has stopped recently 

209 self.log("\tRain occured during last 6h, lawn shouldn't be dry yet.") 

210 self.log_parked_because_of_rain() 

211 

212 def callback_sun_is_at_top(self, entity, attribute, old, new, kwargs): 

213 """ 

214 Callback for handling sun position changes. 

215 """ 

216 self.log("Sun event triggered") 

217 self.log_parked_because_of_rain() 

218 if self.get_state("binary_sensor.parked_because_of_rain") == "on": 

219 if self.get_state("sensor.rain_last_6h") == 0.0: 

220 self.restart_after_rain() 

221 else: 

222 message = "Lawn shouldn't be dry yet. Staying parked." 

223 self.log(f"\t{message}") 

224 self.send_notification(message=message, disable_notification=True) 

225 else: 

226 message = "Not park because of rain. Nothing to do." 

227 self.log(f"\t{message}") 

228 self.log_parked_because_of_rain() 

229 

230 #################################################################################################################### 

231 # SESSION MANAGEMENT 

232 #################################################################################################################### 

233 

234 def callback_next_start_changed(self, entity, attribute, old, new, kwargs): 

235 """ 

236 Callback for handling changes in the next start time. 

237 """ 

238 self.log("Next start event triggered") 

239 if self.get_state("binary_sensor.parked_because_of_rain") == "on": 

240 message = "Robot is parked because of rain. Nothing to check." 

241 self.log(f"\t{message}") 

242 self.send_notification(message=message, disable_notification=True) 

243 return 

244 

245 self.log(f"\told={old}") 

246 self.log(f"\tnew={new}") 

247 

248 # If robot is currently mowing, we don't have next start 

249 if new == "unknown": 

250 message = "Robot is currently mowing, let it come back to base before checking." 

251 self.log(f"\t{message}") 

252 self.send_notification(message=message, disable_notification=True) 

253 return 

254 

255 # Get next end of session 

256 mowing_session_end = datetime.strptime( 

257 self.get_state("calendar.nono", attribute="end_time"), "%Y-%m-%d %H:%M:%S" 

258 ) 

259 

260 print(f"self.get_timezone() = {self.get_timezone()}") 

261 local = pytz.timezone(self.get_timezone()) 

262 mowing_session_end_utc = local.localize(mowing_session_end, is_dst=None).astimezone(pytz.utc) 

263 

264 self.log(f"\tMowing session will end at {mowing_session_end} => {mowing_session_end_utc} UTC") 

265 

266 # Get next start 

267 next_start_utc = datetime.strptime(new, "%Y-%m-%dT%H:%M:%S+00:00").replace(tzinfo=pytz.utc) 

268 next_start = next_start_utc.astimezone(local) 

269 

270 # Check delta and decide action to perform 

271 delta = (mowing_session_end_utc - next_start_utc).total_seconds() / 3600 

272 self.log(f"\tNext start is planned at {next_start} => {next_start_utc} UTC") 

273 self.log(f"\tThe number of hour before mowing session end is {delta}") 

274 if delta < 0: 

275 message = f"Session completed. Lets restart tomorrow at {next_start}" 

276 self.log(f"\t{message}") 

277 self.send_notification(message=message, disable_notification=True) 

278 elif delta < 1: 

279 self.log(f"\t{self.args['message_end_of_session_soon']}") 

280 self.force_park( 

281 message=self.args["message_end_of_session_soon"], 

282 duration=180, 

283 ) 

284 else: # delta >= 1 

285 message = f"Duration between next start ({next_start}) and end of session is greater than 1 hour." 

286 self.log(f"\t{message}") 

287 self.send_notification(message=f"Next start planned at {next_start}", disable_notification=True)