提交 08730b09 作者: shenchao

增加脚本文件

上级 1b2c7650
...@@ -4,51 +4,215 @@ ...@@ -4,51 +4,215 @@
import subprocess import subprocess
import shlex import shlex
import time import time
import datetime
logfile = "/var/log/hb.log"
def runcmd(cmd, shell=False, encoding='utf-8'): class check_all():
args = cmd if shell else shlex.split(cmd) def __init__(self, disks:list, disk_num: int, ints: list, memsize: int):
r = subprocess.run(args, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding=encoding) self.disks = disks
return r.stdout, r.returncode, r.stderr self.disk_num = disk_num
self.ints = ints
self.memsize = memsize
def sata_speed(self):
"""
测试硬盘SATA接口速率
"""
for i in self.disks:
cmd = f"smartctl -a /dev/{i} | grep -c 'SATA 3.* 6.0 Gb/s'"
ret = runcmd(cmd, shell=True)
if ret[1] or not int(ret[0].replace("\n", "")):
logs(f"Error:硬盘{i} 速率检测失败,sata接口没有达到6Gb,将设备进行分类.")
return 0
logs(f"info:硬盘{i} 速率检测正常,sata接口速率为6Gb!")
return 1
def check_disk(): def intf(self):
disk = "fdisk -l|grep -Ew 'sda|sdb|sdc|sdd|sde|sdf|sdg|sdh'|wc -l" """
ret = runcmd(disk, shell=True) 测试网络接口
d = int(ret[0].replace("\n", "")) """
print(d) for i in self.ints:
if d in [4, 8]: cmd = f"ethtool {i} | egrep -c 'Speed: 1000Mb/s|Link detected: yes'"
with open('/opt/collect/check.log', 'w') as log_file: ret = runcmd(cmd, shell=True)
log_file.write("YES") if ret[1] or int(ret[0].replace("\n", "")) != 2:
logs(f"Error:网卡{i}没有连接网线或速率没有达到1Gb,将设备进行分类.")
return 0
logs(f"info:网卡{i}接口正常.")
return 1
return True def mem(self):
"""
测试内存大小
"""
cmd = "free -g | grep Mem | awk '{print $2}'"
ret = runcmd(cmd, shell=True)
if not ret[1] and int(ret[0].replace("\n", "")) >= self.memsize:
logs("info:内存检测正常.")
return 1
else: else:
with open('/opt/collect/check.log', 'w') as log_file: logs("Error:内存检测失败,将设备进行分类.")
log_file.write("no_disk") return 0
return False def hd_num(self):
"""
测试HDD硬盘数量
"""
n = len(disks())
if n == self.disk_num:
logs(f"info:硬盘数量为{n},符合标配.")
return 1
else:
logs(f"Error:硬盘数量为{n},与标配{self.disk_num}不符.")
return 0
def sync_time(self):
"""
同步时间
"""
cmd = "ntpdate ntp3.aliyun.com"
ret = runcmd(cmd)
if not ret[1]:
time.sleep(3)
runcmd("hwclock -w")
print("当前时间:", runcmd("hwclock -r"))
return 1
else:
logs("Error:时间同步失败,请检测网络.")
return 0
def check_memory(): def usb(self):
# memory = "free -m|awk 'NR==2{print $2}'" """
memory = "free -h|awk -F'Gi' 'NR==2{print $1}'|awk '{print $2}'" 测试usb接口
ret = runcmd(memory, shell=True) """
m = int(ret[0].replace("\n", "")) cmd = "udevadm info -q path -n /dev/sd[a-i]|grep usb|grep -ocE '[^/]+$'"
print(m) n = 0
if m == 15: while n < 3:
with open('/opt/collect/check.log', 'w') as log_file: ret = runcmd(cmd, shell=True)
log_file.write("YES") if ret[1] or not int(ret[0].replace("\n", "")):
return True print("Error: 没检测到U盘请认插入usb设备")
print("再次检测中...")
time.sleep(10)
n += 1
else: else:
with open('/opt/collect/check.log', 'w') as log_file: logs("info:USB设备检测正常.")
log_file.write("no_memory") return 1
return False logs("Error:USB设备检测失败,如果已接入usb设备,请将设备归类.")
return 0
def model():
"""
查看型号并返回标配内存硬盘数量
"""
modes={"A000": [7, 4, 8, "t"],
"A001": [7, 4, 16, "t"],
"A002": [7, 4, 32, "t"],
"A003": [7, 4, 64, "t"],
"A004": [15, 4, 8, "t"],
"A005": [15, 4, 16, "t"],
"A006": [15, 4, 32, "t"],
"A007": [15, 4, 64, "t"],
"A100": [7, 4, 16, "w"],
"A101": [7, 4, 32, "w"],
"A102": [7, 4, 64, "w"],
"A103": [15, 4, 16, "w"],
"A104": [15, 4, 32, "w"],
"A105": [15, 4, 64, "w"],
"A200": [15, 4, 32, "w"],
"A201": [15, 4, 64, "w"],
"B000": [15, 4, 32, "w"],
"B001": [15, 4, 64, "w"],
"B100": [15, 4, 32, "w"],
"B101": [15, 4, 64, "w"],
"B200": [15, 4, 64, "w"],
"B300": [15, 8, 15.36, "w"],
"B301": [15, 8, 30.72, "w"],
"B400": [15, 8, 30.72, "w"],
"B500": [15, 8, 30.72, "w"]}
cmd = "/opt/tool/saturn-device-id -r 14 | cut -c 11-"
ret = runcmd(cmd, shell=True)
if ret[1]:
return 0
else:
sn = ret[0].replace("\n", "")
return modes[sn], sn
def disks():
"""
HDD硬盘列表
"""
cmd = "lsblk -d -o NAME,RM,ROTA | egrep \"sd.*0.*[0-1]{1}\""
check_ds = []
ret = runcmd(cmd, shell=True)
if ret[1]:
return 0
else:
for l in str(ret[0]).split("\n"):
l1 = l.split()
if l1 and int(l1[1]) == 0:
check_ds.append(l1[0])
return check_ds
def logs(log: str, op="a"):
with open(logfile, op) as f:
t = str(datetime.datetime.now()).split(".")[0]
line = t + " " + log + "\n"
print(line, end="")
f.write(line)
def runcmd(cmd, shell=False, encoding='utf-8'):
args = cmd if shell else shlex.split(cmd)
r = subprocess.run(args, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding=encoding)
return r.stdout, r.returncode
def check_in():
sn = str(input("请扫码获取序列号:")).replace(" ", "")
if len(sn) == 31 and "-" in sn:
s = sn.split("-")
if len(s[0]) == 14 and len(s[1]) == 16:
return sn
elif len(sn) == 14:
return sn
def warn(n: int):
"""
信号灯控制
"""
cmd = f"/opt/tool/saturn-gpio-set.sh --sys {n}"
runcmd(cmd, shell=True)
if __name__ == '__main__': if __name__ == '__main__':
warn(2)
mode = model()
if mode[0]:
err = 0
ints = ["eth0", "eth1"] if mode[0][3] == "w" else ["end0", "end1"]
while True: while True:
if check_disk() and check_memory(): logs("开始检测...", op="w")
runcmd("/opt/tool/saturn-gpio-set.sh --sys 2") mynas = check_all(disks(), mode[0][1], ints, mode[0][0])
if not mynas.sata_speed():
err = 1
if not mynas.mem():
err = 1
if not mynas.hd_num():
err = 1
if err:
warn(1)
logs("Error:检测发现问题!")
else:
warn(2)
logs("检测通过!")
time.sleep(1)
else: else:
runcmd("/opt/tool/saturn-gpio-set.sh --sys 1") warn(1)
time.sleep(2) logs("型号未能识别,检查序列号是否正确。")
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论