Python服务器简单资源监控+自制CLI进度条

喜欢黑客说这个平台了,恨不得一天发十条哈哈 stuck_out_tongue_closed_eyes

这次分享一个由于监测服务器CPU和MEM占用的Python脚本(其实这是一个完整的项目,但是我没动力继续做下去了,把已经做的部分分享出来吧)

依赖库:连接SSH用的paramiko

首先是自己写的ybar.py,Python的CLI进度条库有不少,但没一个我喜欢的,所以我根据这个项目的需求自己写了一个,倒也不难,理论上这个进度条也能在其它项目里用。

需要注意的方法就一个: update,如提供参数则将进度条调整至对应进度,如不提供则默认+1

import time,os,math
class Ybar(object):
    """一个打印进度条的类"""
    def __init__(self, total, first, theme="thin", width="adaptive", show_speed=False, colorful=False, unit="item"):  # 初始化传入总数
        # 分别指定两种不同样式
        # tall样式利用unicode的block区块字符来输出,灵感来源于tqdm库,看起来会更高一点, thin样式利用横杠符号输出,灵感来源于新版本的pip工具,看起来更加纤细
        if theme=="tall":
            self.shape = ['▏', '▎', '▍', '▌', '▋', '▊', '▉', '█']
        elif theme=="thin":
            self.shape = ["━"]
        elif type(theme)==list:
            self.shape=theme
        else:
            self.shape = ["━"]
        # row_num指定了进度条的宽度。对于大多数屏幕和终端而言,30-60基本上不会折行,默认情况下它为 adaptive, 即自适应终端宽度.
        if type(width)==int:
            self.row_num = width
        elif width=="adaptive":
            # 6为百分比宽度,7为空格及其它保留内容宽度
            self.row_num = os.get_terminal_size().columns - self.length(first) - 6 - 7 
            if show_speed:
                self.row_num -= 11
                self.row_num -= len(unit)
        else:
            self.row_num = 50
        self.shape_num = len(self.shape)
        self.now = 0
        self.total = total
        self.first = first
        self.width = width
        self.theme = theme
        self.start_time = time.time()
        self.last_update = time.time()
        self.last_progress = 0
        self.show_speed = show_speed
        self.unit = unit
        self.colorful = colorful

    def human_num(self, size):
        #用于将某数转换为易读的格式
        unit_list = ['', 'K', 'M', 'G', 'T', 'P']
        if size == 0:
            return '0'
        if size >= 0:
            if int(math.log(size, 1024)) < 0:
                return str(size) + unit_list[0]
            elif int(math.log(size, 1024)) > 5:
                return str(round(size / pow(1024, 6), 3)) + unit_list[6]
            else:
                return str(round(size / pow(1024, int(math.log(size, 1024))), 1)) + unit_list[int(math.log(size, 1024))]
        else:
            return size

    #输出速度
    def get_speed(self,n,last_t):
        return self.human_num(n/(time.time()-last_t+0.001))

    #返回带汉字字符串实际长度
    def length(slef,text):
        if text is None:
            return 0
        lenText = len(text)
        lenText_utf8 = len(text.encode('utf-8'))
        # utf-8一个汉字占3个字符,减去原长度就是多出来的2/3,再除以2就是增量,加回去即可
        # 好吧这段注释和代码我在俩月后看已经完全看不懂了,反正能跑就行
        size = int((lenText_utf8 - lenText) / 2 + lenText)
        return size

    #print_next方法要求now参数,若不提供则默认将进度条数值加1,若提供则可以指定进度条显示的进度.
    def update(self, now=-1):
        if now == -1:
            self.now += 1
        else:
            self.now = now

        rate = math.ceil((self.now / self.total) *
                         (self.row_num * self.shape_num))
        head = rate // self.shape_num
        tail = rate % self.shape_num
        info = self.shape[-1] * head
        if tail != 0:
            info += self.shape[tail-1]


        if self.theme=="tall":
            space = (self.row_num-len(info)) * " "
        elif self.theme=="thin":
            space = (self.row_num-len(info)) * ("━")
        else:
            space = (self.row_num-len(info)) * ("━")

        percent = 100 * self.now / self.total
        #指定多彩样式
        if self.colorful==True:
            #下面是利用转义字符来实现多种颜色的输出,当占用率在75%-90%时,进度条为黄色;当占用率大于90%时,进度条为红色
            if 90 > percent > 75:
                info = "\033[1;33m" + info + "\033[0m"
            elif percent >= 90:
                info = "\033[1;31m" + info + "\033[0m"
            else:
                info = "\033[1;32m" + info + "\033[0m"

        info = "\033[1;37m" + info + "\033[0m"

        if self.show_speed==True:
            speed = str(self.get_speed(self.now-self.last_progress,self.last_update))+self.unit+"/s"
        else:
            speed = ""
        
        #进行最终的输出,结尾的空格可以避免一些奇奇怪怪的bug
        if self.theme=="tall":
            full_info = '\33[?25l|%s%s| %.2f%% %s  ' % (info, space, percent, speed)
        elif self.theme=="thin":
            full_info = '\33[?25l %s%s  %.2f%% %s  ' % (info, space, percent, speed)

        print("\r", end='', flush=True)
        print(self.first+full_info, end='', flush=True)
        self.last_update = time.time()
        self.last_progress = self.now
    def finish(self):
        self.update(self.total)

show_speed这项功能本来用于展示速度,但这个项目不需要,所以实现的不一定好,后面我有空继续做这个项目再更新。

接下来是主文件sever.py

# 获取远程Linux服务器的CPU,MEM占用(当然也可以自己编写代码添加更多信息)
# 制作者:王子文
# 时间:2022年7月8日

# 参考代码:
# 通过SSH连接服务器,并获取CPU和MEM占用率的方法(几乎没有修改)
# https://www.yisu.com/zixun/551980.html

# 进度条类的实现(修改较多,优化代码清晰度,添加注释并添加一些自定义功能,后期会完善以超过tqdm库的自定义程度)
# https://blog.csdn.net/MemoryD/article/details/95378917

# 在Windows终端上进行类清屏操作(实际上是移动光标位置,有小幅度修改,原文没有完整的代码,我对其进行函数包装)
# https://blog.csdn.net/qq_41037945/article/details/104363401

# 输出颜色调整(虽然原文题目是linux,但实测cmd和ps也能用)
# https://www.cnblogs.com/demonxian3/p/8963807.html

# 不用tqdm的原因
# tqdm虽然理论上也可以实现本程序所需要的进度条更新方法(即指定当前进度,也就是可以回退),但得绕个弯子
# tqdm结尾的那串东西(包括当前运行时间,剩余时间)没法去掉,它们对于本项目而言不需要且没有意义

import os
import paramiko
import time
from ybar import Ybar


sshClient = paramiko.SSHClient()
sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
server = "114.51.41.91" # 服务器地址
port = 22     # 端口
user = "user"  # 用户名
password = "good_password" # 密码
sshClient.connect(server, port, user, password)


def getSSHOutput(command):
    stdin, stdout, stderr = sshClient.exec_command(command)
    outs = stdout.readlines()
    return outs


def getCpuInfo(outs):
    for line in outs:
        line = line.lstrip()
        counters = line.split()
        if len(counters) < 5:
            continue
        if counters[0].startswith('cpu'):
            break
    total = 0
    for i in range(1, len(counters)):
        total = total + int(counters[i])
    idle = int(counters[4])
    return {'total': total, 'idle': idle}


def getMemInfo(outs):
    res = {'total': 0, 'free': 0, 'buffers': 0, 'cached': 0}
    index = 0
    for line in outs:
        if(index == 4):
            break
        line = line.lstrip()
        memItem = line.lower().split()
        if memItem[0] == 'memtotal:':
            res['total'] = int(memItem[1])
            index = index + 1
            continue
        elif memItem[0] == 'memfree:':
            res['memfree'] = int(memItem[1])
            index = index + 1
            continue
        elif memItem[0] == 'buffers:':
            res['buffers'] = int(memItem[1])
            index = index + 1
            continue
        elif memItem[0] == 'cached:':
            res['cached'] = int(memItem[1])
            index = index + 1
            continue
    return res


def calcMemUsage(mem_count):
    used = mem_count['total'] - mem_count['free'] - \
        mem_count['buffers'] - mem_count['cached']
    total = mem_count['total']
    return used*100/total


def calcCpuUsage(counters1, counters2):
    idle = counters2['idle'] - counters1['idle']
    total = counters2['total'] - counters1['total']
    return 100 - (idle*100/total)

# 用于实现windows上的清屏
# 不用cls命令的原因:
# 本程序的主循环中有sleep代码,而cls是全屏幕清除,那么在sleep的过程中必然有一段空屏的时间,影响观感
# 移动光标实际上不会清除以前的内容,这样在sleep的过程中,之前的输出还在,整个刷新过程是连续的,不会影响观感
def clear_windows():
    import ctypes
    class COORD(ctypes.Structure):
        _fields_ = [("X", ctypes.c_short), ("Y", ctypes.c_short)]

        def __init__(self, x, y):
            self.X = x
            self.Y = y
    # 把光标移动到左上角
    INIT_POS = COORD(0, 0)
    STD_OUTPUT_HANDLE = -11
    hOut = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
    ctypes.windll.kernel32.SetConsoleCursorPosition(hOut, INIT_POS)
    

pb = Ybar(100, "CPU ",colorful=True)
pb1 = Ybar(100, "MEM ",colorful=True)
os.system("cls")
while True:
    try:
        #在windows上的清屏,注意这里不能用os.system("cls"),因为那样会有一段时间空屏,这个函数是通过移动光标位置实现的.
        clear_windows()
        counters1 = getCpuInfo(getSSHOutput("cat /proc/stat"))
        time.sleep(1)
        counters2 = getCpuInfo(getSSHOutput("cat /proc/stat"))
        pb.update(now=calcCpuUsage(counters1, counters2))
        #隐藏光标,获得更舒适的显示效果(同时通过换行的方式将CPU信息和MEM信息分开)
        print("")
        counters3 = getMemInfo(getSSHOutput("cat /proc/meminfo"))
        pb1.update(now=calcMemUsage(counters3))
    except KeyboardInterrupt:
        break
sshClient.close()

好了就这样

ssh·server·python
161 views
Comments
登录后评论
Sign In