运维系列之FastAPI接口服务怎么用

发布时间:2021-12-30 10:58:06 作者:小新
来源:亿速云 阅读:170

小编给大家分享一下运维系列之FastAPI接口服务怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

第一部分:FastAPI接口部分

 from fastapi import FastAPI, Path 

from com.fy.fastapi.monitor.shell.fabric.FabricLinux import FabricLinux

app = FastAPI()



 #执行shell命令;

#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/cmd/{cmd}")

def runCommand(cmd, host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.runCommand(cmd)

    fabric.closeClient()

    return {"res": result  }



#上传文件;

#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/upload/{srcFile}")

def upload(srcFile, targetDir, host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.upload(srcFile, targetDir)

    fabric.closeClient()

    return {"res": result  }



#下载文件;

#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/download/{srcFile}")

def download(srcFile, targetDir, host, userName, password):

    #fabricu.download("/home/Crawler/WeChatSouGouMain.tar.gz", "D:\\txt\\WeChatSouGouMain.tar.gz")

    fabric = FabricLinux(host , userName , password)

    result = fabric.download(srcFile, targetDir)

    fabric.closeClient()

    return {"res": result  }



#删除文件

#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/delete/{targetPath}")

def delete(targetPath:"必须是全路径", host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.runCommand("rm -rf " + targetPath)

    fabric.closeClient()

    return {"res": result  }



#解压.tar.gz文件

#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/delete/{targetPath}")

def decomTarGz(srcPath, tarPath, host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.decomTarGz(srcPath, tarPath)

    fabric.closeClient()

    return {"res": result  }



#根据PID关闭相关的服务;

#调用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/kill/{pid}")

def stop(pid: int=Path(..., gt=0, le=32767), host, userName, password):

    ''' gt: 大于; le: 小于等于 '''

    fabric = FabricLinux(host , userName , password)

    result = fabric.runCommand("kill -9 " + pid)

    fabric.closeClient()

    return {"res": result  }



#根据Python命令符,以及启动文件路径,启动相应的服务;

#调用示例:http://127.0.0.1:8000/start/python?startFile=%E6%B5%8B%E8%AF%95

@app.get("/start/{pycmd}")

def start(pycmd  , startFile , host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.runCommand("nohup " + pycmd + " " + startFile + " &")

    fabric.closeClient()

    return {"res": result  }

第二部分:fabric接口部分,主要用来执行命令行

运维系列之FastAPI接口服务怎么用

from fabric import Connection

import traceback, os

class FabricLinux:

    def __init__(self, host:"服务器IP", userName:"用户名", password:"密码"):

        self.host = host

        self.userName = userName

        self.password = password

        print(self.userName + "@" + self.host, {"password": self.password})

        self.initClient()#初始化链接

    

    #初始化ssh链接对象;

    def initClient(self):

        #如果服务器配置了SSH免密码登录,就不需要 connect_kwargs 来指定密码了。

        self.con = Connection(self.userName + "@" + self.host, connect_kwargs={"password": self.password})

    

    #关闭fabric操作对象;

    def closeClient(self):

        self.con.close()

        

    #执行shell命令;

    def runCommand(self, sshCommand:"Linux命令行语句"):

        #top命令尚未测试通过;

        #如果命令行中包含路径,最好使用绝对路径;

        try:

            #语法:run('远程命令')

            result = self.con.run(sshCommand, hide=True)

            if result.return_code == 0:# 返回码,0表示正确执行,1表示错误

                return True, result.stdout                         

            return result.failed, result.stdout

        except:

            exp = traceback.format_exc()

            if "mkdir" in exp and 'File exists' in exp:

                print("目录【", sshCommand, "】已存在")

            else:

                print(exp)

            return False, exp


    #在特定目录下,执行shell命令;

    def runDir(self, sshCommand:"Linux命令行语句", dir):

        if not os.path.isdir(dir):

            return "目标路径错误,必须是目录"

        with self.cd(dir):#with表示,with块的代码是在dir目录下执行;

            return self.runCommand(sshCommand)

    

    #解压.tar.gz文件;

    def decomTarGz(self, srcPath, tarPath):

        if os.path.isdir(srcPath):

            return "带解压的路径,必须是文件类型"

        if not os.path.isdir(tarPath):

            return "目标路径错误,必须是目录"

        return fabricu.runCommand("tar -zxvf " + srcPath + " -C " + tarPath)

    

    #切换到某个目录下(上下文不连贯)

    def cd(self, dir):

        #语法:cd('远程目录')

        self.con.cd(dir)

    

    #上传本地文件到远程主机

    def upload(self, src:"待上传的文件全路径。路径中最好不要有空格等", target:"保存到服务器上的目录"):

        if not os.path.isdir(target):

            return "待上传的文件全路径"

        elif " " in src:

            return "路径中不允许有空格、(、)等特殊字符"

        #语法:put('本地文件', '远程目录')

        else:return self.con.put(src, target)

    

    #从远程主机下载文件到本地

    def download(self , src:"远程文件全路径", target:"本地文件路径(必须包含文件名称的全路径)"):

        #语法:get('远程文件', '本地文件路径')

        #示例:fabricu.download("/home/Crawler/WeChatSouGouMain.tar.gz", "D:\\txt\\WeChatSouGouMain.tar.gz")

        if not os.path.isdir(target):

            return self.con.get(src, target)

        else:return "目标路径错误,必须是包含文件名称的全路径"

看完了这篇文章,相信你对“运维系列之FastAPI接口服务怎么用”有了一定的了解,如果想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

推荐阅读:
  1. 运维蓝图
  2. Hadoop运维记录系列(二十七)

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

fastapi

上一篇:ntp的小错误是什么

下一篇:怎么进行QOS的流量监管

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》