linux

Golang如何在Linux上实现分布式系统

小樊
35
2025-12-26 03:59:00
栏目: 编程语言

在 Linux 上用 Go 构建分布式系统的实践指南

一、架构与通信选型

二、最小落地示例 主从任务分发

syntax = "proto3";
package core;
option go_package = ".;core";

message Request  { string action = 1; }
message Response { string data   = 1; }

service NodeService {
  rpc ReportStatus(Request) returns (Response);
  rpc AssignTask(Request) returns (stream Response);
}
go install google.golang.org/grpc
go install google.golang.org/protobuf/cmd/protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go-grpc

mkdir -p core
protoc --go_out=./core --go-grpc_out=./core node.proto
// core/node_service.go
package core

import (
	"context"
	"log"
	"net"

	"google.golang.org/grpc"
	pb "your-module/core"
)

type NodeServiceGrpcServer struct {
	pb.UnimplementedNodeServiceServer
	CmdCh chan string
}

func (n *NodeServiceGrpcServer) ReportStatus(ctx context.Context, req *pb.Request) (*pb.Response, error) {
	log.Printf("worker status: %s", req.Action)
	return &pb.Response{Data: "ok"}, nil
}

func (n *NodeServiceGrpcServer) AssignTask(req *pb.Request, stream pb.NodeService_AssignTaskServer) error {
	for {
		select {
		case cmd := <-n.CmdCh:
			if err := stream.Send(&pb.Response{Data: cmd}); err != nil {
				return err
			}
		}
	}
}

func NewNodeServer() *NodeServiceGrpcServer {
	return &NodeServiceGrpcServer{
		CmdCh: make(chan string, 32),
	}
}

// cmd/worker/main.go
package main

import (
	"log"
	"os"
	"os/signal"

	"google.golang.org/grpc"
	"your-module/core"
)

func main() {
	lis, err := net.Listen("tcp", ":50052")
	if err != nil { log.Fatalf("listen: %v", err) }

	s := grpc.NewServer()
	pb.RegisterNodeServiceServer(s, core.NewNodeServer())

	go func() { log.Fatal(s.Serve(lis)) }()

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt)
	<-ch
	s.Stop()
}
// cmd/master/main.go
package main

import (
	"context"
	"log"
	"net/http"

	"github.com/gin-gonic/gin"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "your-module/core"
)

var workerAddr = "127.0.0.1:50052"

func main() {
	conn, err := grpc.Dial(workerAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil { log.Fatalf("dial: %v", err) }
	defer conn.Close()
	client := pb.NewNodeServiceClient(conn)

	r := gin.Default()
	r.POST("/tasks", func(c *gin.Context) {
		var req struct{ Action string `json:"action"` }
		if err := c.BindJSON(&req); err != nil { c.Status(400); return }
		// 简单演示:直接下发到 worker 的 CmdCh(生产可用 worker 管理/队列)
		// 这里为了示例,直接调用 RPC 发送一条流式消息
		stream, _ := client.AssignTask(context.Background(), &pb.Request{Action: req.Action})
		go func() {
			for {
				resp, err := stream.Recv()
				if err != nil { return }
				log.Printf("task recv: %s", resp.Data)
			}
		}()
		c.JSON(200, gin.H{"status": "dispatched"})
	})

	log.Println("master api: :8080")
	log.Fatal(http.ListenAndServe(":8080", r))
}
# Terminal 1: Worker
go run cmd/worker/main.go

# Terminal 2: Master
go run cmd/master/main.go

# Terminal 3: 下发任务
curl -X POST http://localhost:8080/tasks -d '{"action":"backup_db"}'

上述示例展示了 gRPC 的服务定义、代码生成、Worker 流式下发与 Master HTTP 触发,可作为进一步扩展(多 Worker、任务队列、注册中心、重试熔断等)的骨架。

三、生产级关键能力清单

四、部署与运维要点

0
看了该问题的人还看了