如何创建一个WebSocket工具类来连接到WebSocket服务器并发送和接收消息

内容纲要

首先,确保您已安装websockets库,您可以使用以下命令安装它:

pip install websockets
接下来,创建一个WebSocket工具类的示例代码如下:

import asyncio
import websockets

class WebSocketClient:
def __init__(self, server_url):
self.server_url = server_url
self.websocket = None

async def connect(self):
try:
self.websocket = await websockets.connect(self.server_url)
print(f"Connected to {self.server_url}")
except Exception as e:
print(f"Connection failed: {e}")

async def send_message(self, message):
if self.websocket:
await self.websocket.send(message)
print(f"Sent message: {message}")

async def receive_message(self):
if self.websocket:
message = await self.websocket.recv()
print(f"Received message: {message}")
return message

async def close(self):
if self.websocket:
await self.websocket.close()
print("WebSocket connection closed")

# 使用示例
async def main():
server_url = "wss://example.com/websocket"
client = WebSocketClient(server_url)
await client.connect()

# 发送和接收消息
await client.send_message("Hello, WebSocket!")
response = await client.receive_message()

# 关闭连接
await client.close()

if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())

在上述示例中,我们创建了一个WebSocketClient类,它包括连接、发送消息、接收消息和关闭连接的方法。通过实例化该类并调用这些方法,您可以在Python中创建WebSocket连接并与服务器进行通信。

请将示例中的server_url替换为您要连接的WebSocket服务器的URL。这只是一个简单的示例,您可以根据您的具体需求扩展和优化这个工具类。

Leave a Comment

您的电子邮箱地址不会被公开。 必填项已用*标注

close
arrow_upward