|
|
|
@ -1,13 +1,14 @@
|
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
import os
|
|
|
|
|
import time
|
|
|
|
|
from typing import Union
|
|
|
|
|
|
|
|
|
|
import simplejson
|
|
|
|
|
import toml
|
|
|
|
|
from upnpy import UPnP
|
|
|
|
|
|
|
|
|
|
from .HueBridge import HueBridge
|
|
|
|
|
from .HueUtils import connect, is_valid_config
|
|
|
|
|
from .HueUtils import connect, is_valid_config, make_request
|
|
|
|
|
from .UserOrError import UserOrError
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -23,8 +24,10 @@ class Tinge:
|
|
|
|
|
self.m_config = os.path.join(os.environ['HOME'], ".config/tinge/config")
|
|
|
|
|
self.create_confdir()
|
|
|
|
|
self.read_bridges_from_file()
|
|
|
|
|
self.discover_new_bridges()
|
|
|
|
|
self.write_all_bridges_to_conf()
|
|
|
|
|
|
|
|
|
|
def append_bridge(self, bridge: HueBridge):
|
|
|
|
|
self.m_bridges.append(bridge)
|
|
|
|
|
self.m_discovered.append(bridge.get_ipaddress())
|
|
|
|
|
|
|
|
|
|
def create_confdir(self):
|
|
|
|
|
"""Create the config dir if it does not allready exist
|
|
|
|
@ -32,29 +35,43 @@ class Tinge:
|
|
|
|
|
if not os.path.exists(os.path.dirname(self.m_config)):
|
|
|
|
|
os.makedirs(os.path.dirname(self.m_config))
|
|
|
|
|
|
|
|
|
|
def discover_new_bridges(self):
|
|
|
|
|
def discover_new_bridges(self) -> Union[None, list[dict]]:
|
|
|
|
|
"""Use UPnP to discover bridges on the current network
|
|
|
|
|
"""
|
|
|
|
|
upnp: UPnP = UPnP()
|
|
|
|
|
discovered_devices = upnp.discover()
|
|
|
|
|
discovered_bridges: list[dict] = list()
|
|
|
|
|
seen_ips: list[str] = list()
|
|
|
|
|
if not discovered_devices:
|
|
|
|
|
print("No devices discovered at this time")
|
|
|
|
|
return
|
|
|
|
|
return None
|
|
|
|
|
for device in discovered_devices:
|
|
|
|
|
if device.get_friendly_name().startswith("Philips hue") and device.host not in self.m_discovered:
|
|
|
|
|
user_or_error: UserOrError = connect(device.host)
|
|
|
|
|
print("Is error: {}".format(str(user_or_error.is_error())))
|
|
|
|
|
while user_or_error.is_error():
|
|
|
|
|
print("Is error: {}".format(str(user_or_error.get_error_code())))
|
|
|
|
|
if user_or_error.get_error_code() == 101:
|
|
|
|
|
print("Please press the button on your Hue Bridge")
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
user_or_error = connect(device.host)
|
|
|
|
|
bridge: HueBridge = HueBridge(device.host, user_or_error.get_user())
|
|
|
|
|
discovered: bool = False
|
|
|
|
|
if (device.host not in self.m_discovered) and (device.host not in seen_ips):
|
|
|
|
|
seen_ips.append(device.host)
|
|
|
|
|
# Let's check if the device has the default name, if so we assume it's a hue bridge
|
|
|
|
|
if device.get_friendly_name().startswith("Philips hue"):
|
|
|
|
|
discovered = True
|
|
|
|
|
# If not we try to do a request against the api and see if we get an answer we can understand
|
|
|
|
|
else:
|
|
|
|
|
try:
|
|
|
|
|
response = make_request(device.host, "1234/lights")
|
|
|
|
|
if response:
|
|
|
|
|
resp = response.json()[0]
|
|
|
|
|
if 'error' in resp.keys():
|
|
|
|
|
m_keys = resp['error'].keys()
|
|
|
|
|
if 'description' in m_keys and 'address' in m_keys and 'type' in m_keys:
|
|
|
|
|
if resp['error']['description'] == "unauthorized user" and resp['error'][
|
|
|
|
|
'address'] == "/lights" and resp['error']['type'] == 1:
|
|
|
|
|
discovered = True
|
|
|
|
|
except simplejson.errors.JSONDecodeError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
self.m_bridges.append(bridge)
|
|
|
|
|
self.m_discovered.append(device.host)
|
|
|
|
|
return
|
|
|
|
|
if discovered:
|
|
|
|
|
bridge = {'ipaddress': device.host, 'name': device.get_friendly_name()}
|
|
|
|
|
if bridge not in discovered_bridges:
|
|
|
|
|
discovered_bridges.append(bridge)
|
|
|
|
|
return discovered_bridges
|
|
|
|
|
|
|
|
|
|
def get_bridges(self) -> list[HueBridge]:
|
|
|
|
|
"""Get the bridges
|
|
|
|
|