安装Python
CentOS 7默认安装Python 2.7,建议升级至Python 3:
sudo yum install python3 # 安装Python 3
python3 --version # 验证版本
安装依赖库
sudo yum groupinstall "Development Tools"
sudo yum install openssl-devel
pip3
安装第三方库(如requests
):pip3 install requests
服务器端
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', 12345)) # 绑定IP和端口
server_socket.listen(5) # 监听连接
print("Server listening on port 12345...")
while True:
client_socket, client_address = server_socket.accept()
print(f"Connected by {client_address}")
data = client_socket.recv(1024).decode('utf-8')
print(f"Received: {data}")
client_socket.sendall("Hello from server!".encode('utf-8'))
client_socket.close()
客户端
import socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('服务器IP', 12345)) # 替换为实际IP
client_socket.sendall("Hello from client!".encode('utf-8'))
data = client_socket.recv(1024).decode('utf-8')
print(f"Received: {data}")
client_socket.close()
GET请求
import requests
response = requests.get('http://httpbin.org/get')
print(response.text)
POST请求
import requests
response = requests.post('http://httpbin.org/post', data={'key': 'value'})
print(response.text)
import asyncio
async def handle_client(reader, writer):
data = await reader.read(100)
if not data:
return
message = data.decode()
print(f"Received: {message}")
writer.write("Hello from server!".encode())
await writer.drain()
writer.close()
async def main():
server = await asyncio.start_server(handle_client, '0.0.0.0', 12345)
async with server:
await server.serve_forever()
asyncio.run(main())
import socket
import threading
def handle_client(client_socket, address):
print(f"Connected by {address}")
data = client_socket.recv(1024).decode('utf-8')
print(f"Received: {data}")
client_socket.sendall("Hello from server!".encode('utf-8'))
client_socket.close()
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', 12345))
server_socket.listen(5)
print("Multi-threaded server started...")
while True:
client_socket, client_address = server_socket.accept()
client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
client_thread.start()
nc
(netcat)测试TCP服务nc localhost 12345 # 连接服务器并发送消息
netstat -tulnp | grep 12345 # 检查端口是否被监听