ayaka.driver.ayakabot_driver.websocket
FastAPI 驱动适配
1"""[FastAPI](https://fastapi.tiangolo.com/) 驱动适配 2""" 3 4from fastapi import status, WebSocket 5 6 7class FastAPIWebSocket: 8 """FastAPI WebSocket Wrapper""" 9 10 def __init__(self, websocket: WebSocket): 11 self.websocket = websocket 12 13 async def accept(self) -> None: 14 await self.websocket.accept() 15 16 async def close( 17 self, code: int = status.WS_1000_NORMAL_CLOSURE, reason: str = "" 18 ) -> None: 19 await self.websocket.close(code) 20 21 async def receive(self) -> str: 22 return await self.websocket.receive_text() 23 24 async def send(self, data: str) -> None: 25 await self.websocket.send({"type": "websocket.send", "text": data})
class
FastAPIWebSocket:
9class FastAPIWebSocket: 10 """FastAPI WebSocket Wrapper""" 11 12 def __init__(self, websocket: WebSocket): 13 self.websocket = websocket 14 15 async def accept(self) -> None: 16 await self.websocket.accept() 17 18 async def close( 19 self, code: int = status.WS_1000_NORMAL_CLOSURE, reason: str = "" 20 ) -> None: 21 await self.websocket.close(code) 22 23 async def receive(self) -> str: 24 return await self.websocket.receive_text() 25 26 async def send(self, data: str) -> None: 27 await self.websocket.send({"type": "websocket.send", "text": data})
FastAPI WebSocket Wrapper