喜欢黑客说这个平台了,恨不得一天发十条哈哈
这次分享一个由于监测服务器CPU和MEM占用的Python脚本(其实这是一个完整的项目,但是我没动力继续做下去了,把已经做的部分分享出来吧)
依赖库:连接SSH用的paramiko
首先是自己写的ybar.py
,Python的CLI进度条库有不少,但没一个我喜欢的,所以我根据这个项目的需求自己写了一个,倒也不难,理论上这个进度条也能在其它项目里用。
需要注意的方法就一个: update,如提供参数则将进度条调整至对应进度,如不提供则默认+1
import time,os,math
class Ybar(object):
"""一个打印进度条的类"""
def __init__(self, total, first, theme="thin", width="adaptive", show_speed=False, colorful=False, unit="item"): # 初始化传入总数
# 分别指定两种不同样式
# tall样式利用unicode的block区块字符来输出,灵感来源于tqdm库,看起来会更高一点, thin样式利用横杠符号输出,灵感来源于新版本的pip工具,看起来更加纤细
if theme=="tall":
self.shape = ['▏', '▎', '▍', '▌', '▋', '▊', '▉', '█']
elif theme=="thin":
self.shape = ["━"]
elif type(theme)==list:
self.shape=theme
else:
self.shape = ["━"]
# row_num指定了进度条的宽度。对于大多数屏幕和终端而言,30-60基本上不会折行,默认情况下它为 adaptive, 即自适应终端宽度.
if type(width)==int:
self.row_num = width
elif width=="adaptive":
# 6为百分比宽度,7为空格及其它保留内容宽度
self.row_num = os.get_terminal_size().columns - self.length(first) - 6 - 7
if show_speed:
self.row_num -= 11
self.row_num -= len(unit)
else:
self.row_num = 50
self.shape_num = len(self.shape)
self.now = 0
self.total = total
self.first = first
self.width = width
self.theme = theme
self.start_time = time.time()
self.last_update = time.time()
self.last_progress = 0
self.show_speed = show_speed
self.unit = unit
self.colorful = colorful
def human_num(self, size):
#用于将某数转换为易读的格式
unit_list = ['', 'K', 'M', 'G', 'T', 'P']
if size == 0:
return '0'
if size >= 0:
if int(math.log(size, 1024)) < 0:
return str(size) + unit_list[0]
elif int(math.log(size, 1024)) > 5:
return str(round(size / pow(1024, 6), 3)) + unit_list[6]
else:
return str(round(size / pow(1024, int(math.log(size, 1024))), 1)) + unit_list[int(math.log(size, 1024))]
else:
return size
#输出速度
def get_speed(self,n,last_t):
return self.human_num(n/(time.time()-last_t+0.001))
#返回带汉字字符串实际长度
def length(slef,text):
if text is None:
return 0
lenText = len(text)
lenText_utf8 = len(text.encode('utf-8'))
# utf-8一个汉字占3个字符,减去原长度就是多出来的2/3,再除以2就是增量,加回去即可
# 好吧这段注释和代码我在俩月后看已经完全看不懂了,反正能跑就行
size = int((lenText_utf8 - lenText) / 2 + lenText)
return size
#print_next方法要求now参数,若不提供则默认将进度条数值加1,若提供则可以指定进度条显示的进度.
def update(self, now=-1):
if now == -1:
self.now += 1
else:
self.now = now
rate = math.ceil((self.now / self.total) *
(self.row_num * self.shape_num))
head = rate // self.shape_num
tail = rate % self.shape_num
info = self.shape[-1] * head
if tail != 0:
info += self.shape[tail-1]
if self.theme=="tall":
space = (self.row_num-len(info)) * " "
elif self.theme=="thin":
space = (self.row_num-len(info)) * ("━")
else:
space = (self.row_num-len(info)) * ("━")
percent = 100 * self.now / self.total
#指定多彩样式
if self.colorful==True:
#下面是利用转义字符来实现多种颜色的输出,当占用率在75%-90%时,进度条为黄色;当占用率大于90%时,进度条为红色
if 90 > percent > 75:
info = "\033[1;33m" + info + "\033[0m"
elif percent >= 90:
info = "\033[1;31m" + info + "\033[0m"
else:
info = "\033[1;32m" + info + "\033[0m"
info = "\033[1;37m" + info + "\033[0m"
if self.show_speed==True:
speed = str(self.get_speed(self.now-self.last_progress,self.last_update))+self.unit+"/s"
else:
speed = ""
#进行最终的输出,结尾的空格可以避免一些奇奇怪怪的bug
if self.theme=="tall":
full_info = '\33[?25l|%s%s| %.2f%% %s ' % (info, space, percent, speed)
elif self.theme=="thin":
full_info = '\33[?25l %s%s %.2f%% %s ' % (info, space, percent, speed)
print("\r", end='', flush=True)
print(self.first+full_info, end='', flush=True)
self.last_update = time.time()
self.last_progress = self.now
def finish(self):
self.update(self.total)
show_speed这项功能本来用于展示速度,但这个项目不需要,所以实现的不一定好,后面我有空继续做这个项目再更新。
接下来是主文件sever.py
# 获取远程Linux服务器的CPU,MEM占用(当然也可以自己编写代码添加更多信息)
# 制作者:王子文
# 时间:2022年7月8日
# 参考代码:
# 通过SSH连接服务器,并获取CPU和MEM占用率的方法(几乎没有修改)
# https://www.yisu.com/zixun/551980.html
# 进度条类的实现(修改较多,优化代码清晰度,添加注释并添加一些自定义功能,后期会完善以超过tqdm库的自定义程度)
# https://blog.csdn.net/MemoryD/article/details/95378917
# 在Windows终端上进行类清屏操作(实际上是移动光标位置,有小幅度修改,原文没有完整的代码,我对其进行函数包装)
# https://blog.csdn.net/qq_41037945/article/details/104363401
# 输出颜色调整(虽然原文题目是linux,但实测cmd和ps也能用)
# https://www.cnblogs.com/demonxian3/p/8963807.html
# 不用tqdm的原因
# tqdm虽然理论上也可以实现本程序所需要的进度条更新方法(即指定当前进度,也就是可以回退),但得绕个弯子
# tqdm结尾的那串东西(包括当前运行时间,剩余时间)没法去掉,它们对于本项目而言不需要且没有意义
import os
import paramiko
import time
from ybar import Ybar
sshClient = paramiko.SSHClient()
sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
server = "114.51.41.91" # 服务器地址
port = 22 # 端口
user = "user" # 用户名
password = "good_password" # 密码
sshClient.connect(server, port, user, password)
def getSSHOutput(command):
stdin, stdout, stderr = sshClient.exec_command(command)
outs = stdout.readlines()
return outs
def getCpuInfo(outs):
for line in outs:
line = line.lstrip()
counters = line.split()
if len(counters) < 5:
continue
if counters[0].startswith('cpu'):
break
total = 0
for i in range(1, len(counters)):
total = total + int(counters[i])
idle = int(counters[4])
return {'total': total, 'idle': idle}
def getMemInfo(outs):
res = {'total': 0, 'free': 0, 'buffers': 0, 'cached': 0}
index = 0
for line in outs:
if(index == 4):
break
line = line.lstrip()
memItem = line.lower().split()
if memItem[0] == 'memtotal:':
res['total'] = int(memItem[1])
index = index + 1
continue
elif memItem[0] == 'memfree:':
res['memfree'] = int(memItem[1])
index = index + 1
continue
elif memItem[0] == 'buffers:':
res['buffers'] = int(memItem[1])
index = index + 1
continue
elif memItem[0] == 'cached:':
res['cached'] = int(memItem[1])
index = index + 1
continue
return res
def calcMemUsage(mem_count):
used = mem_count['total'] - mem_count['free'] - \
mem_count['buffers'] - mem_count['cached']
total = mem_count['total']
return used*100/total
def calcCpuUsage(counters1, counters2):
idle = counters2['idle'] - counters1['idle']
total = counters2['total'] - counters1['total']
return 100 - (idle*100/total)
# 用于实现windows上的清屏
# 不用cls命令的原因:
# 本程序的主循环中有sleep代码,而cls是全屏幕清除,那么在sleep的过程中必然有一段空屏的时间,影响观感
# 移动光标实际上不会清除以前的内容,这样在sleep的过程中,之前的输出还在,整个刷新过程是连续的,不会影响观感
def clear_windows():
import ctypes
class COORD(ctypes.Structure):
_fields_ = [("X", ctypes.c_short), ("Y", ctypes.c_short)]
def __init__(self, x, y):
self.X = x
self.Y = y
# 把光标移动到左上角
INIT_POS = COORD(0, 0)
STD_OUTPUT_HANDLE = -11
hOut = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
ctypes.windll.kernel32.SetConsoleCursorPosition(hOut, INIT_POS)
pb = Ybar(100, "CPU ",colorful=True)
pb1 = Ybar(100, "MEM ",colorful=True)
os.system("cls")
while True:
try:
#在windows上的清屏,注意这里不能用os.system("cls"),因为那样会有一段时间空屏,这个函数是通过移动光标位置实现的.
clear_windows()
counters1 = getCpuInfo(getSSHOutput("cat /proc/stat"))
time.sleep(1)
counters2 = getCpuInfo(getSSHOutput("cat /proc/stat"))
pb.update(now=calcCpuUsage(counters1, counters2))
#隐藏光标,获得更舒适的显示效果(同时通过换行的方式将CPU信息和MEM信息分开)
print("")
counters3 = getMemInfo(getSSHOutput("cat /proc/meminfo"))
pb1.update(now=calcMemUsage(counters3))
except KeyboardInterrupt:
break
sshClient.close()
好了就这样