在 Linux 上使用 Go 构建分布式系统的落地路径
一 架构与组件选型
二 环境准备与网络配置
三 关键实现步骤
四 最小可行示例 主从任务分发
syntax = "proto3";
package core;
option go_package = ".;core";
message Task { string id = 1; string payload = 2; }
message Result { string task_id = 1; string output = 2; bool success = 3; }
service NodeService {
rpc ReportStatus(StatusReq) returns (StatusResp);
rpc AssignTask(Task) returns (stream Result);
}
message StatusReq {}
message StatusResp { string status = 1; }
protoc --go_out=. --go-grpc_out=. node.proto
package main
import (
"context"
"log"
"math/rand"
"time"
pb "your-module/core"
"google.golang.org/grpc"
)
type worker struct{ pb.UnimplementedNodeServiceServer }
func (w *worker) ReportStatus(ctx context.Context, req *pb.StatusReq) (*pb.StatusResp, error) {
return &pb.StatusResp{Status: "alive"}, nil
}
func (w *worker) AssignTask(task *pb.Task, stream pb.NodeService_AssignTaskServer) error {
log.Printf("worker received task %s", task.Id)
// 模拟处理
time.Sleep(time.Duration(rand.Intn(500)+100) * time.Millisecond)
return stream.Send(&pb.Result{
TaskId: task.Id,
Output: "processed: " + task.Payload,
Success: true,
})
}
func main() {
lis, err := grpc.DialContext(context.Background(), "master:50051", grpc.WithInsecure())
if err != nil { log.Fatalf("dial master: %v", err) }
defer lis.Close()
conn, err := grpc.DialContext(context.Background(), "0.0.0.0:50052", grpc.WithInsecure())
if err != nil { log.Fatalf("listen: %v", err) }
defer conn.Close()
client := pb.NewNodeServiceClient(conn)
go func() {
for {
_, _ = client.ReportStatus(context.Background(), &pb.StatusReq{})
time.Sleep(5 * time.Second)
}
}()
s := grpc.NewServer()
pb.RegisterNodeServiceServer(s, &worker{})
log.Println("worker listening on :50052")
if err := s.Serve(lis); err != nil { log.Fatalf("serve: %v", err) }
}
package main
import (
"context"
"log"
"net"
"net/http"
"time"
pb "your-module/core"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
)
type master struct{ pb.UnimplementedNodeServiceServer }
func (m *master) ReportStatus(ctx context.Context, req *pb.StatusReq) (*pb.StatusResp, error) {
return &pb.StatusResp{Status: "master"}, nil
}
func (m *master) AssignTask(task *pb.Task, stream pb.NodeService_AssignTaskServer) error {
log.Printf("dispatching task %s", task.Id)
return stream.Send(&pb.Result{
TaskId: task.Id,
Output: "dispatched",
Success: true,
})
}
func main() {
// gRPC
lis, err := net.Listen("tcp", ":50051")
if err != nil { log.Fatalf("listen: %v", err) }
grpcS := grpc.NewServer()
pb.RegisterNodeServiceServer(grpcS, &master{})
go grpcS.Serve(lis)
// HTTP API
r := gin.Default()
r.POST("/tasks", func(c *gin.Context) {
var t pb.Task
if err := c.ShouldBindJSON(&t); err != nil { c.Status(400); return }
// 简化:直接回传示例结果;生产应推送到 worker 流
c.JSON(200, pb.Result{TaskId: t.Id, Output: "accepted", Success: true})
})
go r.Run(":8080")
select {}
}
# Terminal 1
go run master.go
# Terminal 2
go run worker.go
# 提交任务
curl -X POST http://localhost:8080/tasks -d '{"id":"t1","payload":"hello"}'
五 生产级注意事项