90 lines
2.3 KiB
Python
Executable file
90 lines
2.3 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import subprocess
|
|
import time
|
|
import eq3crypto
|
|
import os
|
|
import config
|
|
import sys
|
|
import socket
|
|
|
|
lsock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
|
|
def send_lsock(msg):
|
|
lsock.sendto(msg, ("127.0.0.1", 3667))
|
|
|
|
pstart = time.time()
|
|
writeHandle = 0x411
|
|
readHandle = 0x421
|
|
|
|
def tsLog(m = ""):
|
|
global pstart
|
|
print("{:.3f}".format(time.time() - pstart) + " " + m)
|
|
|
|
def getBuffer(p):
|
|
val = p.stdout.peek()
|
|
val = p.stdout.read(len(val))
|
|
#print(val)
|
|
return val
|
|
|
|
def expect(p, s, t=0):
|
|
b = ""
|
|
while True:
|
|
if p.poll() is not None:
|
|
raise Exception("Can't read data: Process Terminated")
|
|
b = b + getBuffer(p).decode()
|
|
if s in b:
|
|
return b
|
|
|
|
def getNotif(p):
|
|
expect(p, "Notification")
|
|
|
|
|
|
def sendCmd(p, s):
|
|
print("<< " + s)
|
|
p.stdin.write(s.encode())
|
|
p.stdin.write(b"\n")
|
|
p.stdin.flush()
|
|
|
|
def getNotifVal(s):
|
|
r = s.split("value: ")[-1].split("\n")[0]
|
|
print(">> " + r)
|
|
return r
|
|
|
|
actionStr = os.getenv("HOME")
|
|
if "door" in actionStr:
|
|
print("Door user")
|
|
actionStr = sys.argv[-1]
|
|
action = 0
|
|
tsLog("Start")
|
|
if "open" in actionStr:
|
|
action = 2
|
|
if "unlock" in actionStr:
|
|
action = 1
|
|
try:
|
|
send_lsock(bytes([2,0,action]))
|
|
p = subprocess.Popen(["gatttool", "-I"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
|
|
expect(p, "[LE]")
|
|
tsLog("INIT DONE")
|
|
sendCmd(p, "connect " + config.device_mac)
|
|
p.stdin.flush()
|
|
expect(p, "Connection successful")
|
|
tsLog("CONNECT DONE")
|
|
sendCmd(p, "char-write-req 0x0411 8002014DC7e11BAE13FECB0000000000")
|
|
notifNonce = bytes.fromhex(getNotifVal(expect(p, "Notification")))
|
|
if notifNonce[0:2] == b'\x80\x03':
|
|
tsLog("Nonce received")
|
|
remoteNonce = notifNonce[3:(3+8)]
|
|
msg = eq3crypto.getMsg(135, bytes([action,0,0,0,0,0,0,0]), remoteNonce, bytes.fromhex(config.user_key))
|
|
sendCmd(p, "char-write-req 0x0411 " + msg.hex())
|
|
getNotifVal(expect(p, "Notification"))
|
|
tsLog("MOVING")
|
|
getNotifVal(expect(p, "Notification"))
|
|
tsLog("DONE")
|
|
finally:
|
|
print("FINALLY")
|
|
send_lsock(bytes([2,1,action]))
|
|
try:
|
|
p.communicate(input=b"exit\n",timeout=2)
|
|
print("NORMAL EXIT")
|
|
except:
|
|
print("KILL")
|
|
p.kill()
|