Featured Event-Driven Access to My Home After a Run

Published on February 12th, 2023 📆 | 5885 Views ⚑

0

Event-Driven Access to My Home After a Run


iSpeech.org

I always run with my phone on me, so accessing my home afterward has been convenient since installing MyQ W-Fi connected garage doors several years ago. No need to remember to carry a separate house key anymore and getting into my house is (usually) an app launch away.

It has worked great, until one day when I returned from a particularly cold run ā€” though it was fine playing music the entire 8 miles, as soon as I opened my phoneā€™s home screen, its battery charge rapidly faded and died. This was certainly not ideal. After getting back inside, I was determined to fully automate home access going forward.

Solution

Using my home Wi-Fi network was the obvious choice in solving this problem. I simply needed to determine two states:

  1. When I left for a run
  2. When I returned

Knowing I had left for a run was key to knowing when I had returned from a run, rather than a trip in the car for example. Therefore, I would at least need to start a process that would understand my intent to leave, detect my departure has occurred, and then detect my return to open the door.

Departure

Using ARP pings (who-has), I can determine if my device is currently connected to my network by filtering the replies using my MAC address. Though in practice it doesnā€™t necessarily respond to every ARP, it responds often enough. This meant I just needed to detect my phone had been disconnected for some period of time. Hereā€™s a quick and dirty implementation using scapy:

def is_connected():
    ans, unans = arping(LOCAL_SUBNET, verbose=0)
    for s, r in ans:        
        mac_address = str(r[Ether].src)
        ip_address = s[ARP].pdst
        if mac_address == PHONE_MAC_ADDRESS:
            logging.debug(f"PHONE FOUND, IP: {ip_address}")
            return True
    return False


def wait_for_departure(seconds_until_gone):
    time_last_seen = time.time()
    while True:
        if is_connected():
            time_last_seen = time.time()
        else:
            time_gone = (time.time() - time_last_seen)
            logging.debug(f"Missing PHONE for {time_gone}")
            if time_gone > seconds_until_gone:
                logging.info("PHONE is gone")
                return

As you can see, we start by assuming the device is initially connected and keep track of the elapsed time since it was last seen (replied to an ARP). Iā€™ve set this timeout to five minutes, or 300 seconds, and thatā€™s seemed to work well enough to consider the device as having left the network.

Admittedly, this is not the ā€œevent-drivenā€ solution promised in the title (since weā€™re polling)ā€¦ but weā€™re getting there!

Arrival

After the wait_for_departure function ends, it is time to kick off the wait for return:

def handle_dhcp_packet(packet):
    if DHCP in packet and packet[DHCP].options[0][1] == 3:
        mac_address = packet[Ether].src
        if mac_address == PHONE_MAC_ADDRESS:
            logging.debug("PHONE IS BACK!")
            raise ReturnedPacketException
    return


def wait_for_return():
    logging.info("Waiting for return...")
    try:
        sniff(filter="udp and (port 67 or 68)", prn=handle_dhcp_packet)
    except ReturnedPacketException:
        logging.info("Returned")

Here we use scapy to start sniffing DHCP packets on the network. In the above handler function, we are looking for any ā€œDHCP Requestā€ packets and further filtering them using my deviceā€™s MAC address. Once such a packet is seen, indicating Iā€™m back in Wi-Fi range, we raise an Exception and complete the blocking wait_for_return function.





Open Sesame

After receiving the DHCP packet, itā€™s time to open the garage door. This was made easy with pymyq:

def open_door():
    async def main() -> None:
        """Create the aiohttp session and run."""
        async with ClientSession() as websession:
            myq = await pymyq.login(MYQ_USERNAME, MYQ_PASSWORD, websession)
            devices = myq.devices
            await devices[GARAGE_DEVICE_ID].open()
    asyncio.get_event_loop().run_until_complete(main())

Again, quick and dirty.

Final Result

Putting it all together:

def start_process():
    seconds_until_gone = (MINUTES_UNTIL_GONE * 60)
    
    wait_for_departure(seconds_until_gone)
    wait_for_return()
    logging.info("Opening the garage door...")
    open_door()
    exit()

I wrapped everything up in a small Flask app to serve a simple HTML form on a Raspberry Pi. Clicking Submit launches the start_process function as a background process:

@app.get("/")
def get():
    csrf_token = generate_csrf()
    return f"""
     
""" @app.post("/") def post(): p = multiprocessing.Process(target=start_process) p.start() return "

Started background job...

"

Just a note, you may need to use setcap to add ā€˜CAP_NET_RAW+eip CAP_NET_ADMIN+eipā€™ permissions in order to listen to raw packets.

So far, this has been working well. Now when I get home from a cold run, the garage door opens as a I stop at the driveway ā€” completely hands free!

Share this: Facebooktwitterlinkedin



Source link

Tagged with: ā€¢ ā€¢ ā€¢ ā€¢ ā€¢ ā€¢



Comments are closed.