您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# PHP如何创建简单RPC服务
## 1. RPC基础概念
### 1.1 什么是RPC
RPC(Remote Procedure Call,远程过程调用)是一种计算机通信协议,允许程序像调用本地服务一样调用远程服务。它抽象了网络通信细节,使开发者能够专注于业务逻辑而非底层通信机制。
### 1.2 RPC的核心组成
- **客户端(Client)**:服务调用方
- **服务端(Server)**:服务提供方
- **存根(Stub)**:客户端和服务端的代理
- **序列化协议**:数据编码/解码方式
- **传输协议**:网络通信协议
### 1.3 常见RPC框架对比
| 框架 | 语言 | 特点 |
|------------|---------|--------------------------|
| gRPC | 多语言 | HTTP/2, Protocol Buffers |
| Thrift | 多语言 | Facebook开发, 跨语言 |
| JSON-RPC | 多语言 | 简单, JSON格式 |
| XML-RPC | 多语言 | 早期标准, XML格式 |
## 2. PHP实现RPC的技术选型
### 2.1 原生PHP实现方案
```php
// 基础示例:使用stream_socket_server创建服务
$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);
if (!$socket) {
die("$errstr ($errno)");
}
json_encode()/json_decode()
msgpack_pack()/msgpack_unpack()
serialize()/unserialize()
// server.php
$methods = [
'add' => function($a, $b) {
return $a + $b;
},
'getUser' => function($id) {
return ['id' => $id, 'name' => 'User'.$id];
}
];
$request = json_decode(file_get_contents('php://input'), true);
$response = [
'jsonrpc' => '2.0',
'id' => $request['id'] ?? null
];
try {
if (!isset($methods[$request['method']])) {
throw new Exception('Method not found');
}
$result = $methods[$request['method']](...$request['params']);
$response['result'] = $result;
} catch (Exception $e) {
$response['error'] = [
'code' => -32601,
'message' => $e->getMessage()
];
}
header('Content-Type: application/json');
echo json_encode($response);
// client.php
class JsonRpcClient {
private $url;
public function __construct($url) {
$this->url = $url;
}
public function __call($method, $params) {
$request = [
'jsonrpc' => '2.0',
'method' => $method,
'params' => $params,
'id' => uniqid()
];
$context = stream_context_create([
'http' => [
'method' => 'POST',
'header' => 'Content-Type: application/json',
'content' => json_encode($request)
]
]);
$response = file_get_contents($this->url, false, $context);
$data = json_decode($response, true);
if (isset($data['error'])) {
throw new Exception($data['error']['message'], $data['error']['code']);
}
return $data['result'] ?? null;
}
}
// 使用示例
$client = new JsonRpcClient('http://localhost/server.php');
echo $client->add(2, 3); // 输出5
// tcp_server.php
$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);
if (!$socket) {
die("$errstr ($errno)");
}
$methods = [
'add' => function($a, $b) { return $a + $b; },
'multiply' => function($a, $b) { return $a * $b; }
];
while ($conn = stream_socket_accept($socket, -1)) {
$request = fread($conn, 1024);
$data = unserialize($request);
$response = [
'id' => $data['id'],
'result' => null,
'error' => null
];
try {
if (!isset($methods[$data['method']])) {
throw new Exception('Method not found');
}
$response['result'] = $methods[$data['method']](...$data['params']);
} catch (Exception $e) {
$response['error'] = $e->getMessage();
}
fwrite($conn, serialize($response));
fclose($conn);
}
fclose($socket);
// tcp_client.php
class TcpRpcClient {
private $host;
private $port;
public function __construct($host, $port) {
$this->host = $host;
$this->port = $port;
}
public function __call($method, $params) {
$socket = stream_socket_client("tcp://{$this->host}:{$this->port}", $errno, $errstr, 30);
if (!$socket) {
throw new Exception("$errstr ($errno)");
}
$request = [
'id' => uniqid(),
'method' => $method,
'params' => $params
];
fwrite($socket, serialize($request));
$response = unserialize(fread($socket, 1024));
fclose($socket);
if ($response['error']) {
throw new Exception($response['error']);
}
return $response['result'];
}
}
// 使用示例
$client = new TcpRpcClient('localhost', 8000);
echo $client->add(5, 3); // 输出8
echo $client->multiply(4, 6); // 输出24
// swoole_server.php
$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
$methods = [
'add' => function($a, $b) { return $a + $b; },
'getUserInfo' => function($id) {
return ['id' => $id, 'name' => 'User'.$id, 'score' => rand(60, 100)];
}
];
$server->on('receive', function ($server, $fd, $reactor_id, $data) use ($methods) {
$data = json_decode($data, true);
$response = [
'id' => $data['id'],
'result' => null,
'error' => null
];
try {
if (!isset($methods[$data['method']])) {
throw new Exception('Method not found');
}
$response['result'] = $methods[$data['method']](...$data['params']);
} catch (Exception $e) {
$response['error'] = $e->getMessage();
}
$server->send($fd, json_encode($response));
});
$server->start();
// swoole_client.php
class SwooleRpcClient {
private $client;
public function __construct($host, $port) {
$this->client = new Swoole\Client(SWOOLE_SOCK_TCP);
if (!$this->client->connect($host, $port, 0.5)) {
throw new Exception("connect failed. Error: {$this->client->errCode}");
}
}
public function __call($method, $params) {
$request = [
'id' => uniqid(),
'method' => $method,
'params' => $params
];
if (!$this->client->send(json_encode($request))) {
throw new Exception("send failed. Error: {$this->client->errCode}");
}
$response = $this->client->recv();
$data = json_decode($response, true);
if ($data['error']) {
throw new Exception($data['error']);
}
return $data['result'];
}
public function __destruct() {
$this->client->close();
}
}
// 使用示例
$client = new SwooleRpcClient('127.0.0.1', 9501);
echo $client->add(10, 20); // 输出30
print_r($client->getUserInfo(1001));
// 服务注册中心简单实现
class ServiceRegistry {
private static $services = [];
public static function register($name, $host, $port) {
self::$services[$name] = [
'host' => $host,
'port' => $port,
'last_heartbeat' => time()
];
}
public static function discover($name) {
if (!isset(self::$services[$name])) {
throw new Exception("Service {$name} not found");
}
return self::$services[$name];
}
public static function heartbeat($name) {
if (isset(self::$services[$name])) {
self::$services[$name]['last_heartbeat'] = time();
}
}
}
interface LoadBalancer {
public function select(array $services);
}
class RandomLoadBalancer implements LoadBalancer {
public function select(array $services) {
return $services[array_rand($services)];
}
}
class RoundRobinLoadBalancer implements LoadBalancer {
private $index = 0;
public function select(array $services) {
$service = $services[$this->index % count($services)];
$this->index++;
return $service;
}
}
class CircuitBreaker {
private $failureThreshold;
private $recoveryTimeout;
private $failureCount = 0;
private $lastFailureTime = 0;
private $state = 'CLOSED';
public function __construct($failureThreshold = 3, $recoveryTimeout = 30) {
$this->failureThreshold = $failureThreshold;
$this->recoveryTimeout = $recoveryTimeout;
}
public function execute(callable $operation) {
if ($this->state === 'OPEN') {
if (time() - $this->lastFailureTime > $this->recoveryTimeout) {
$this->state = 'HALF_OPEN';
} else {
throw new Exception("Circuit breaker is OPEN");
}
}
try {
$result = $operation();
$this->reset();
return $result;
} catch (Exception $e) {
$this->recordFailure();
throw $e;
}
}
private function recordFailure() {
$this->failureCount++;
$this->lastFailureTime = time();
if ($this->failureCount >= $this->failureThreshold) {
$this->state = 'OPEN';
}
}
private function reset() {
$this->failureCount = 0;
$this->state = 'CLOSED';
}
}
class ConnectionPool {
private $pool;
private $config;
private $maxSize;
private $currentSize = 0;
public function __construct($config, $maxSize = 10) {
$this->config = $config;
$this->maxSize = $maxSize;
$this->pool = new SplQueue();
}
public function getConnection() {
if (!$this->pool->isEmpty()) {
return $this->pool->dequeue();
}
if ($this->currentSize < $this->maxSize) {
$this->currentSize++;
return $this->createConnection();
}
throw new Exception("Connection pool exhausted");
}
public function releaseConnection($connection) {
$this->pool->enqueue($connection);
}
private function createConnection() {
$client = new Swoole\Client(SWOOLE_SOCK_TCP);
if (!$client->connect($this->config['host'], $this->config['port'], 0.5)) {
throw new Exception("connect failed. Error: {$client->errCode}");
}
return $client;
}
}
class AsyncRpcClient {
private $host;
private $port;
public function __construct($host, $port) {
$this->host = $host;
$this->port = $port;
}
public function callAsync($method, $params, callable $callback) {
$client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$client->on("connect", function($cli) use ($method, $params) {
$request = [
'id' => uniqid(),
'method' => $method,
'params' => $params
];
$cli->send(json_encode($request));
});
$client->on("receive", function($cli, $data) use ($callback) {
$response = json_decode($data, true);
$callback($response['result'], $response['error'] ?? null);
$cli->close();
});
$client->on("error", function($cli) use ($callback) {
$callback(null, "Connection failed");
});
$client->connect($this->host, $this->port, 0.5);
}
}
// 使用示例
$client = new AsyncRpcClient('127.0.0.1', 9501);
$client->callAsync('add', [5, 7], function($result, $error) {
if ($error) {
echo "Error: $error\n";
} else {
echo "Result: $result\n";
}
});
认证与授权
数据验证
// 参数验证示例
$validator = new Validator();
$validator->validate($params, [
'username' => 'required|string|min:3|max:20',
'password' => 'required|string|min:8'
]);
传输安全
// 日志记录中间件
class LoggingMiddleware {
public function handle($request, $next) {
$start = microtime(true);
$logData = [
'time' => date('Y-m-d H:i:s'),
'method' => $request['method'],
'params' => $request['params'],
'ip' => $_SERVER['REMOTE_ADDR'] ?? ''
];
try {
$response = $next($request);
$logData['duration'] = microtime(true) - $start;
$logData['status'] = 'success';
$this->writeLog($logData);
return $response;
} catch (Exception $e) {
$logData['duration'] = microtime(true) - $start;
$logData['status'] = 'error';
$logData['error'] = $e->getMessage();
$this->writeLog($logData);
throw $e;
}
}
private function writeLog($data) {
file_put_contents(
'rpc.log',
json_encode($data) . PHP_EOL,
FILE_APPEND
);
}
}
class RpcServiceTest extends PHPUnit\Framework\TestCase {
public function testAddMethod() {
$client = new JsonRpcClient('http://localhost/server.php');
$this->assertEquals(5, $client->add(2, 3));
$this->assertEquals(0, $client->add(-1, 1));
}
public function testNonexistentMethod() {
$this->expectException(Exception::class);
$client = new JsonRpcClient('http://localhost/server.php');
$client->nonexistentMethod();
}
}
”`php // 压力测试脚本 \(start = microtime(true); \)concurrency = 50; $requests = 1000;
$client = new SwooleRpcClient(‘127.0.0.1’, 9501);
\(success = 0; \)failures = 0;
for (\(i = 0; \)i
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。