在 Linux 上用 Go 构建分布式系统的实践指南
一、架构与通信选型
二、最小落地示例 主从任务分发
syntax = "proto3";
package core;
option go_package = ".;core";
message Request { string action = 1; }
message Response { string data = 1; }
service NodeService {
rpc ReportStatus(Request) returns (Response);
rpc AssignTask(Request) returns (stream Response);
}
go install google.golang.org/grpc
go install google.golang.org/protobuf/cmd/protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go-grpc
mkdir -p core
protoc --go_out=./core --go-grpc_out=./core node.proto
// core/node_service.go
package core
import (
"context"
"log"
"net"
"google.golang.org/grpc"
pb "your-module/core"
)
type NodeServiceGrpcServer struct {
pb.UnimplementedNodeServiceServer
CmdCh chan string
}
func (n *NodeServiceGrpcServer) ReportStatus(ctx context.Context, req *pb.Request) (*pb.Response, error) {
log.Printf("worker status: %s", req.Action)
return &pb.Response{Data: "ok"}, nil
}
func (n *NodeServiceGrpcServer) AssignTask(req *pb.Request, stream pb.NodeService_AssignTaskServer) error {
for {
select {
case cmd := <-n.CmdCh:
if err := stream.Send(&pb.Response{Data: cmd}); err != nil {
return err
}
}
}
}
func NewNodeServer() *NodeServiceGrpcServer {
return &NodeServiceGrpcServer{
CmdCh: make(chan string, 32),
}
}
// cmd/worker/main.go
package main
import (
"log"
"os"
"os/signal"
"google.golang.org/grpc"
"your-module/core"
)
func main() {
lis, err := net.Listen("tcp", ":50052")
if err != nil { log.Fatalf("listen: %v", err) }
s := grpc.NewServer()
pb.RegisterNodeServiceServer(s, core.NewNodeServer())
go func() { log.Fatal(s.Serve(lis)) }()
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
s.Stop()
}
// cmd/master/main.go
package main
import (
"context"
"log"
"net/http"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "your-module/core"
)
var workerAddr = "127.0.0.1:50052"
func main() {
conn, err := grpc.Dial(workerAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil { log.Fatalf("dial: %v", err) }
defer conn.Close()
client := pb.NewNodeServiceClient(conn)
r := gin.Default()
r.POST("/tasks", func(c *gin.Context) {
var req struct{ Action string `json:"action"` }
if err := c.BindJSON(&req); err != nil { c.Status(400); return }
// 简单演示:直接下发到 worker 的 CmdCh(生产可用 worker 管理/队列)
// 这里为了示例,直接调用 RPC 发送一条流式消息
stream, _ := client.AssignTask(context.Background(), &pb.Request{Action: req.Action})
go func() {
for {
resp, err := stream.Recv()
if err != nil { return }
log.Printf("task recv: %s", resp.Data)
}
}()
c.JSON(200, gin.H{"status": "dispatched"})
})
log.Println("master api: :8080")
log.Fatal(http.ListenAndServe(":8080", r))
}
# Terminal 1: Worker
go run cmd/worker/main.go
# Terminal 2: Master
go run cmd/master/main.go
# Terminal 3: 下发任务
curl -X POST http://localhost:8080/tasks -d '{"action":"backup_db"}'
上述示例展示了 gRPC 的服务定义、代码生成、Worker 流式下发与 Master HTTP 触发,可作为进一步扩展(多 Worker、任务队列、注册中心、重试熔断等)的骨架。
三、生产级关键能力清单
四、部署与运维要点