新聞中心
環(huán)境:
- 生產(chǎn):4000+物理服務(wù)器,近 3000 臺(tái)虛擬機(jī)。
- 開(kāi)發(fā)環(huán)境:python3.6、redhat7.9,除了paramiko為第三方模塊需要自己安裝,其他的直接import即可。
主要應(yīng)用方向:
- 配置變更:例如服務(wù)器上線時(shí)需要批量校正系統(tǒng)分區(qū)容量、及掛載數(shù)據(jù)盤(pán)。
- 配置信息查詢過(guò)濾:例如過(guò)濾防火墻規(guī)則、過(guò)濾網(wǎng)卡配置。
- 存活檢測(cè):設(shè)置定時(shí)任務(wù),定時(shí)輪詢服務(wù)器的 ssh 狀態(tài)是否正常。
- 文件傳輸:多個(gè) ip 同時(shí)傳輸目錄/文件。
基本原則:
批量執(zhí)行操作是一把雙刃劍。批量執(zhí)行操作可以提升工作效率,但是隨之而來(lái)的風(fēng)險(xiǎn)不可忽略。

站在用戶的角度思考問(wèn)題,與客戶深入溝通,找到翠屏網(wǎng)站設(shè)計(jì)與翠屏網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類(lèi)型包括:做網(wǎng)站、成都網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、國(guó)際域名空間、網(wǎng)站空間、企業(yè)郵箱。業(yè)務(wù)覆蓋翠屏地區(qū)。
風(fēng)險(xiǎn)案例如下:
掛載很多數(shù)據(jù)盤(pán),通常先格式化硬盤(pán),再掛載數(shù)據(jù)盤(pán),最后再寫(xiě)入將開(kāi)機(jī)掛載信息寫(xiě)入/etc/fstab文件。在批量lsblk檢查硬盤(pán)信息的時(shí)候發(fā)現(xiàn)有的系統(tǒng)盤(pán)在/sda有的在/sdm,如果不事先檢查機(jī)器相關(guān)配置是否一致直接按照工作經(jīng)驗(yàn)去執(zhí)行批量操作,會(huì)很容易造成個(gè)人難以承受的災(zāi)難。
在執(zhí)行批量操作時(shí)按照慣例:格式化硬盤(pán)->掛載->開(kāi)機(jī)掛載的順序去執(zhí)行,假設(shè)有的機(jī)器因?yàn)槟承┕收蠈?dǎo)致格式化硬盤(pán)沒(méi)法正確執(zhí)行。在處理這類(lèi)問(wèn)題的時(shí)候通常會(huì)先提取出失敗的ip,并再按照慣例執(zhí)行操作。運(yùn)維人員會(huì)很容易忽略開(kāi)機(jī)掛載的信息已經(jīng)寫(xiě)過(guò)了,導(dǎo)致復(fù)寫(xiě)(這都是血和淚的教訓(xùn))。
所以,為了避免故障,提升工作效率,我認(rèn)為應(yīng)當(dāng)建立團(tuán)隊(duì)在工作上的共識(shí),應(yīng)當(dāng)遵守以下原則:
- 批量操作前應(yīng)當(dāng)準(zhǔn)備回退方案。
- 批量操作前作前先確定檢查目標(biāo)服務(wù)器相關(guān)的配置的一致性。
- 批量操作時(shí)應(yīng)當(dāng)把重復(fù)執(zhí)行會(huì)影響系統(tǒng)的操作和不影響的分開(kāi)執(zhí)行。
- 批量操作后應(yīng)當(dāng)再次驗(yàn)證操作結(jié)果是否符合預(yù)期。
當(dāng)然,代碼的規(guī)范也應(yīng)當(dāng)重視起來(lái),不僅是為了便于審計(jì),同時(shí)也需要便于溯源。我認(rèn)為應(yīng)當(dāng)注意以下幾點(diǎn):
- 關(guān)鍵方法一定要記錄傳入的參數(shù)以及執(zhí)行后的結(jié)果。
- 為了避免方法返回值不符合預(yù)期,該拋異常的地方一定要拋。
- 優(yōu)化代碼,刪去不必要的邏輯分支和盡量不要寫(xiě)重復(fù)的代碼,使代碼看起來(lái)整潔。
- 程序的執(zhí)行情況、結(jié)果一定要保留日志。
技術(shù)難點(diǎn)
1、ssh no existing session,sftp超時(shí)時(shí)間設(shè)置:
在代碼無(wú)錯(cuò)的情況下大量ip出現(xiàn)No existing session,排查后定位在代碼的寫(xiě)法上,下面是一個(gè)正確的示例。由于最開(kāi)始沒(méi)考慮到ssh連接的幾種情況導(dǎo)致了重寫(xiě)好幾遍。另外sftp的實(shí)例貌似不能直接設(shè)置連接超時(shí)時(shí)間,所以我采用了先建立ssh連接再打開(kāi)sftp的方法。
import paramiko
username = 'root'
port = 22
pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') # 導(dǎo)入公鑰
timeout=10
def ssh_client( ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient() # 實(shí)例化ssh
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 配置ssh互信
if auth == 'id_rsa': # 公鑰認(rèn)證
client.connect(ip, port, username, pkey=pkey, banner_timeout=60, timeout=timeout)
elif auth == 'noAuth': # 用戶名密碼認(rèn)證
if user is not None and passwd is not None:
client.connect(ip, port, user, passwd, banner_timeout=60, timeout=timeout)
else:
raise ValueError('傳入的用戶名密碼不能為空')
else:
raise NameError('不存在此%s認(rèn)證方式' % auth)
return client
def sftp_client(ip, user=None, passwd=None, auth='id_rsa'):
ssh = ssh_client(ip, user, passwd, auth)
sftp = ssh.open_sftp()
return sftp
2、sftp中的get()和put()方法僅能傳文件,不支持直接傳目錄:
不能直接傳目錄,那換個(gè)思路,遍歷路徑中的目錄和文件,先創(chuàng)建目錄再傳文件就能達(dá)到一樣的效果了。在paramiko的sftp中sftp.listdir_attr()方法可以獲取遠(yuǎn)程路徑中的文件、目錄信息。那么我們可以寫(xiě)一個(gè)遞歸來(lái)遍歷遠(yuǎn)程路徑中的所有文件和目錄(傳入一個(gè)列表是為了接收遞歸返回的值)。
def check_remote_folders(sftp, remote_path, remote_folders: list):
# 遞歸遍歷遠(yuǎn)程路徑中的目錄,并添加到remote_folders中
for f in sftp.listdir_attr(remote_path):
# 檢查路徑狀態(tài)是否為目錄
if stat.S_ISDIR(f.st_mode):
# 遞歸調(diào)用自己
check_remote_folders(sftp, remote_path + '/' + f.filename, remote_folders)
# 添加遍歷到的目錄信息到列表中
remote_folders.append(remote_path + '/' + f.filename)
python自帶的os模塊中的os.walk()方法可以遍歷到本地路徑中的目錄和文件。
local_files_path = []
local_directory = []
for root, dirs, files in os.walk(local_path):
local_directory.append(root)
for file in files:if root[-1] != '/':
local_files_path.append(root + '/' + file)
elif root[-1] == '/':
local_files_path.append(root + file)
3、多線程多個(gè)ip使用sftp.get()方法時(shí)無(wú)法并發(fā)。
改成多進(jìn)程即可。
def batch_sftp_get(ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
pool = multiprocessing.Pool(5)for i in ip:
pool.apply_async(sftp_get, (i, remote_path, local_path, user, passwd, auth,))
pool.close()
pool.join()
4、多個(gè)ip需要執(zhí)行相同命令或不同的命令。
由于是日常使用的場(chǎng)景不會(huì)很復(fù)雜,所以借鑒了ansible的playbook,讀取提前準(zhǔn)備好的配置文件即可,然后再整合到之前定義的ssh函數(shù)中。
# 配置文件大概是這樣
192.168.0.100:df -Th | grep xfs; lsblk | grep disk | wc -l
192.168.0.101:ip a | grep team | wc -l
192.168.0.102:route -n
...
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, json
def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
pool = ThreadPoolExecutor(self.workers)
task = []
if cmd_type == 'one':
task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]
elif cmd_type == 'many':
if isinstance(ip, list):
for i in ip:
separator = ''if ':' in i:
separator = ':'elif ',' in i:
separator = ','if separator != '':
data = i.split(separator)
task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))
else:
return '請(qǐng)檢查ip和命令間的分隔符'else:
return 'ip的類(lèi)型為%s, 請(qǐng)傳入一個(gè)正確的類(lèi)型' % type(ip)
else:
return 'cmd_type不存在%s值, 請(qǐng)傳入一個(gè)正確的參數(shù)' % cmd_type
self.logger.debug('檢查變量task:%s' % task)
results = {}
for future in as_completed(task):
res = future.result().split(':')
results[res[1]] = {res[0]: res[2]}
if 'success' in future.result():
print('\033[32;1m%s\033[0m' % future.result().replace('success:', ''))
elif 'failed' in future.result():
print('\033[31;1m%s\033[0m' % future.result().replace('failed:', ''))
pool.shutdown()
json_results = {
'task_name': task_name,
'task_sn': self.task_sn,
'start_time': self.now_time,
'cost_time': '%.2fs' % (time.perf_counter() - self.s),
'results': results
}
self.logger.info('json_results:%s' % json_results)
with open(self.log_path + 'task_%s_%s.log' % (task_name, self.task_sn), 'a') as f:
f.write(json.dumps(json_results))
return json_results
同時(shí),我們還衍生出一個(gè)需求,既然都要讀取配置,那同樣也可以提前把ip地址準(zhǔn)備在文件里。正好也能讀取我們返回的執(zhí)行程序的結(jié)果。
import os
import json
def get_info(self, path):
self.logger.debug('接收參數(shù)path:%s'.encode('utf8') % path.encode('utf8'))
if os.path.exists(path):
info_list = [i.replace('\n', '') for i in open(path, 'r', encoding='utf8').readlines()]
return info_list
else:
self.logger.warning('%s不存在,請(qǐng)傳入一個(gè)正確的目錄' % path)
raise ValueError('%s不存在,請(qǐng)傳入一個(gè)正確的目錄' % path)
def log_analysis(filename):if os.path.exists(filename):
try:
data = json.load(open(filename, 'r', encoding='utf8'))
return data
except Exception as e:
print('%s無(wú)法解析該類(lèi)型文件' % filename + ' ' + str(e))
raise TypeError('%s無(wú)法解析該類(lèi)型文件' % filename + ' ' + str(e))
else:
raise ValueError('該%s文件路徑不存在,請(qǐng)傳入一個(gè)正確的文件路徑' % filename)
def show_log(self, filename, mode=None):
data: dict = self.log_analysis(filename)
if isinstance(data, dict):
for key, value in data["results"].items():
if 'success' in value.keys():
if mode == 'success':
print(key)
elif mode is None:
print('%s:%s' % (key, value['success'].replace('\r\n', '')))
elif 'failed' in value.keys():
if mode == 'failed':
print(key)
elif mode is None:
print('%s:%s' % (key, value['failed'].replace('\r\n', '')))
完整代碼展示:
from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
import os
import re
import time
import stat
import json
import random
import logging
import asyncio
import argparse
import paramiko
class TaskManager:def __init__(self, timeout=10, workers=15, system='linux'):
self.username = 'root'
self.port = 22
self.datetime = time.strftime("%Y-%m-%d", time.localtime())
self.timeout = timeout
self.workers = workers
self.now_time = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())
self.s = time.perf_counter()
self.task_sn = self.sn_random_generator()
if system == 'linux':
self.pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
self.log_path = '/tmp/TaskManager/log/'
self.log_debug_path = '/tmp/TaskManager/debug/'elif system == 'windows':
self.pkey = paramiko.RSAKey.from_private_key_file(r'C:\Users\001\.ssh\id_rsa')
self.log_path = r'D:\tmp\TaskManager\log\\'
self.log_debug_path = r'D:\tmp\TaskManager\debug\\'
if os.path.exists(self.log_path) is False:
os.makedirs(self.log_path, exist_ok=True)
if os.path.exists(self.log_debug_path) is False:
os.makedirs(self.log_debug_path, exist_ok=True)
self.logger = logging.getLogger(__name__)
self.logger.setLevel(level=logging.DEBUG)
self.handler = logging.FileHandler(self.log_debug_path + '%s_%s.log' % (self.datetime, self.task_sn))
self.formatter = logging.Formatter("%(asctime)s[%(levelname)s][%(funcName)s]%(message)s ")
self.handler.setFormatter(self.formatter)
self.logger.addHandler(self.handler)
self.logger.info('初始化完成'.encode(encoding='utf8'))
def ssh_client(self, ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if auth == 'id_rsa':
self.logger.info('正在 SSH 連接%s'.encode('utf8') % str(ip).encode('utf8'))
client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=60, timeout=self.timeout)
elif auth == 'noAuth':
if user is not None and passwd is not None:
client.connect(ip, self.port, user, passwd, banner_timeout=60, timeout=self.timeout)
# allow_agent=False, look_for_keys=False# No existing session 解決辦法 else:raise ValueError('傳入的用戶名密碼不能為空')else:raise NameError('不存在此%s 認(rèn)證方式' % auth)return client
def ssh_exec(self, ip, cmd, user=None, passwd=None, auth='id_rsa'):try:
ssh = self.ssh_client(ip, user, passwd, auth)
stdin, stdout, stderr = ssh.exec_command(command=cmd, get_pty=True)
self.logger.debug('%s:stdin 輸入:%s' % (ip, stdin))
self.logger.debug('%s:stderr 錯(cuò)誤:%s' % (ip, stderr))
self.logger.debug('%s:stdout 輸出:%s' % (ip, stdout))
result = stdout.read().decode('utf-8')
ssh.close()
return 'success:' + ip + ':' + str(result)
except Exception as e:
return 'failed:' + ip + ':' + str(e)
def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
self.logger.debug('接收參數(shù) ip:%s, cmd:%s, user:%s passwd:%s cmd_type:%s, task_name:%s' %
(ip, cmd, user, passwd, cmd_type, task_name))
print('\033[35;1m-------------------Task is set up! Time:%s ----------------------\033[0m' % self.now_time)
pool = http://m.fisionsoft.com.cn/article/dpogish.html


咨詢
建站咨詢
