首先,确保您已安装websockets库,您可以使用以下命令安装它:
pip install websockets
接下来,创建一个WebSocket工具类的示例代码如下:
import asyncio
import websockets
class WebSocketClient:
def __init__(self, server_url):
self.server_url = server_url
self.websocket = None
async def connect(self):
try:
self.websocket = await websockets.connect(self.server_url)
print(f"Connected to {self.server_url}")
except Exception as e:
print(f"Connection failed: {e}")
async def send_message(self, message):
if self.websocket:
await self.websocket.send(message)
print(f"Sent message: {message}")
async def receive_message(self):
if self.websocket:
message = await self.websocket.recv()
print(f"Received message: {message}")
return message
async def close(self):
if self.websocket:
await self.websocket.close()
print("WebSocket connection closed")
# 使用示例
async def main():
server_url = "wss://example.com/websocket"
client = WebSocketClient(server_url)
await client.connect()
# 发送和接收消息
await client.send_message("Hello, WebSocket!")
response = await client.receive_message()
# 关闭连接
await client.close()
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
在上述示例中,我们创建了一个WebSocketClient
类,它包括连接、发送消息、接收消息和关闭连接的方法。通过实例化该类并调用这些方法,您可以在Python中创建WebSocket连接并与服务器进行通信。
请将示例中的server_url
替换为您要连接的WebSocket服务器的URL。这只是一个简单的示例,您可以根据您的具体需求扩展和优化这个工具类。