Source code for tinder.gateway

import aiohttp
import asyncio
import logging

log = logging.getLogger(__name__)


[docs]class TinderWebSocket: def __init__(self, client): self.client = client self.url: str self.ws: aiohttp.ClientWebSocketResponse
[docs] async def fetch_token(self): resp = await self.client.http.fetch_gateway() return resp["token"]
[docs] async def connect(self): self.url = await self.client.http.get_gateway() self.ws = await self.client.http.ws_connect(self.url) self.client.loop.create_task(self.ping()) self.client.loop.create_task(self.receive())
[docs] async def receive(self): while True: resp = await self.ws.receive() try: resp = resp.data.hex() log.debug(f"Gateway message received: {resp}") except AttributeError: log.debug(f"Gateway received: {resp}")
[docs] async def ping(self): while True: await self.ws.send_bytes(bytearray.fromhex("2a00")) await asyncio.sleep(30)