纯干货!Python在运维中的应用:批量ssh/sftp

环境:

  • 生产:4000+物理服务器,近 3000 台虚拟机。
  • 开发环境:python3.6、redhat7.9,除了paramiko为第三方模块需要自己安装,其他的直接import即可。

主要应用方向:

  1. 配置变更:例如服务器上线时需要批量校正系统分区容量、及挂载数据盘。
  2. 配置信息查询过滤:例如过滤防火墙规则、过滤网卡配置。
  3. 存活检测:设置定时任务,定时轮询服务器的 ssh 状态是否正常。
  4. 文件传输:多个 ip 同时传输目录/文件。

基本原则:

批量执行操作是一把双刃剑。批量执行操作可以提升工作效率,但是随之而来的风险不可忽略。

站在用户的角度思考问题,与客户深入沟通,找到翠屏网站设计与翠屏网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:做网站、成都网站建设、企业官网、英文网站、手机端网站、网站推广、国际域名空间、网站空间、企业邮箱。业务覆盖翠屏地区。

风险案例如下:

挂载很多数据盘,通常先格式化硬盘,再挂载数据盘,最后再写入将开机挂载信息写入/etc/fstab文件。在批量lsblk检查硬盘信息的时候发现有的系统盘在/sda有的在/sdm,如果不事先检查机器相关配置是否一致直接按照工作经验去执行批量操作,会很容易造成个人难以承受的灾难。

在执行批量操作时按照惯例:格式化硬盘->挂载->开机挂载的顺序去执行,假设有的机器因为某些故障导致格式化硬盘没法正确执行。在处理这类问题的时候通常会先提取出失败的ip,并再按照惯例执行操作。运维人员会很容易忽略开机挂载的信息已经写过了,导致复写(这都是血和泪的教训)。

所以,为了避免故障,提升工作效率,我认为应当建立团队在工作上的共识,应当遵守以下原则:

  1. 批量操作前应当准备回退方案。
  2. 批量操作前作前先确定检查目标服务器相关的配置的一致性。
  3. 批量操作时应当把重复执行会影响系统的操作和不影响的分开执行。
  4. 批量操作后应当再次验证操作结果是否符合预期。

当然,代码的规范也应当重视起来,不仅是为了便于审计,同时也需要便于溯源。我认为应当注意以下几点:

  1. 关键方法一定要记录传入的参数以及执行后的结果。
  2. 为了避免方法返回值不符合预期,该抛异常的地方一定要抛。
  3. 优化代码,删去不必要的逻辑分支和尽量不要写重复的代码,使代码看起来整洁。
  4. 程序的执行情况、结果一定要保留日志。

技术难点

1、ssh no existing session,sftp超时时间设置:

在代码无错的情况下大量ip出现No existing session,排查后定位在代码的写法上,下面是一个正确的示例。由于最开始没考虑到ssh连接的几种情况导致了重写好几遍。另外sftp的实例貌似不能直接设置连接超时时间,所以我采用了先建立ssh连接再打开sftp的方法。

import paramiko

username = 'root'
port = 22
pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') # 导入公钥
timeout=10


def ssh_client( ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient() # 实例化ssh
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 配置ssh互信
if auth == 'id_rsa': # 公钥认证
client.connect(ip, port, username, pkey=pkey, banner_timeout=60, timeout=timeout)
elif auth == 'noAuth': # 用户名密码认证
if user is not None and passwd is not None:
client.connect(ip, port, user, passwd, banner_timeout=60, timeout=timeout)
else:
raise ValueError('传入的用户名密码不能为空')
else:
raise NameError('不存在此%s认证方式' % auth)
return client


def sftp_client(ip, user=None, passwd=None, auth='id_rsa'):
ssh = ssh_client(ip, user, passwd, auth)
sftp = ssh.open_sftp()
return sftp

2、sftp中的get()和put()方法仅能传文件,不支持直接传目录:

不能直接传目录,那换个思路,遍历路径中的目录和文件,先创建目录再传文件就能达到一样的效果了。在paramiko的sftp中sftp.listdir_attr()方法可以获取远程路径中的文件、目录信息。那么我们可以写一个递归来遍历远程路径中的所有文件和目录(传入一个列表是为了接收递归返回的值)。

def check_remote_folders(sftp, remote_path, remote_folders: list):
# 递归遍历远程路径中的目录,并添加到remote_folders中
for f in sftp.listdir_attr(remote_path):
# 检查路径状态是否为目录
if stat.S_ISDIR(f.st_mode):
# 递归调用自己
check_remote_folders(sftp, remote_path + '/' + f.filename, remote_folders)
# 添加遍历到的目录信息到列表中
remote_folders.append(remote_path + '/' + f.filename)

python自带的os模块中的os.walk()方法可以遍历到本地路径中的目录和文件。

  local_files_path = []
local_directory = []
for root, dirs, files in os.walk(local_path):
local_directory.append(root)
for file in files:if root[-1] != '/':
local_files_path.append(root + '/' + file)
elif root[-1] == '/':
local_files_path.append(root + file)

3、多线程多个ip使用sftp.get()方法时无法并发。

改成多进程即可。

 def batch_sftp_get(ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
pool = multiprocessing.Pool(5)for i in ip:
pool.apply_async(sftp_get, (i, remote_path, local_path, user, passwd, auth,))
pool.close()
pool.join()

4、多个ip需要执行相同命令或不同的命令。

由于是日常使用的场景不会很复杂,所以借鉴了ansible的playbook,读取提前准备好的配置文件即可,然后再整合到之前定义的ssh函数中。

# 配置文件大概是这样
192.168.0.100:df -Th | grep xfs; lsblk | grep disk | wc -l
192.168.0.101:ip a | grep team | wc -l
192.168.0.102:route -n
...
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, json


def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
pool = ThreadPoolExecutor(self.workers)
task = []
if cmd_type == 'one':
task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]
elif cmd_type == 'many':
if isinstance(ip, list):
for i in ip:
separator = ''if ':' in i:
separator = ':'elif ',' in i:
separator = ','if separator != '':
data = i.split(separator)
task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))
else:
return '请检查ip和命令间的分隔符'else:
return 'ip的类型为%s, 请传入一个正确的类型' % type(ip)
else:
return 'cmd_type不存在%s值, 请传入一个正确的参数' % cmd_type
self.logger.debug('检查变量task:%s' % task)
results = {}
for future in as_completed(task):
res = future.result().split(':')
results[res[1]] = {res[0]: res[2]}
if 'success' in future.result():
print('\033[32;1m%s\033[0m' % future.result().replace('success:', ''))
elif 'failed' in future.result():
print('\033[31;1m%s\033[0m' % future.result().replace('failed:', ''))
pool.shutdown()
json_results = {
'task_name': task_name,
'task_sn': self.task_sn,
'start_time': self.now_time,
'cost_time': '%.2fs' % (time.perf_counter() - self.s),
'results': results
}
self.logger.info('json_results:%s' % json_results)
with open(self.log_path + 'task_%s_%s.log' % (task_name, self.task_sn), 'a') as f:
f.write(json.dumps(json_results))
return json_results

同时,我们还衍生出一个需求,既然都要读取配置,那同样也可以提前把ip地址准备在文件里。正好也能读取我们返回的执行程序的结果。

import os
import json


def get_info(self, path):
self.logger.debug('接收参数path:%s'.encode('utf8') % path.encode('utf8'))
if os.path.exists(path):
info_list = [i.replace('\n', '') for i in open(path, 'r', encoding='utf8').readlines()]
return info_list
else:
self.logger.warning('%s不存在,请传入一个正确的目录' % path)
raise ValueError('%s不存在,请传入一个正确的目录' % path)


def log_analysis(filename):if os.path.exists(filename):
try:
data = json.load(open(filename, 'r', encoding='utf8'))
return data
except Exception as e:
print('%s无法解析该类型文件' % filename + ' ' + str(e))
raise TypeError('%s无法解析该类型文件' % filename + ' ' + str(e))
else:
raise ValueError('该%s文件路径不存在,请传入一个正确的文件路径' % filename)


def show_log(self, filename, mode=None):
data: dict = self.log_analysis(filename)
if isinstance(data, dict):
for key, value in data["results"].items():
if 'success' in value.keys():
if mode == 'success':
print(key)
elif mode is None:
print('%s:%s' % (key, value['success'].replace('\r\n', '')))
elif 'failed' in value.keys():
if mode == 'failed':
print(key)
elif mode is None:
print('%s:%s' % (key, value['failed'].replace('\r\n', '')))

完整代码展示:

from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
import os
import re
import time
import stat
import json
import random
import logging
import asyncio
import argparse
import paramiko


class TaskManager:def __init__(self, timeout=10, workers=15, system='linux'):
self.username = 'root'
self.port = 22
self.datetime = time.strftime("%Y-%m-%d", time.localtime())
self.timeout = timeout
self.workers = workers
self.now_time = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())
self.s = time.perf_counter()
self.task_sn = self.sn_random_generator()

if system == 'linux':
self.pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
self.log_path = '/tmp/TaskManager/log/'
self.log_debug_path = '/tmp/TaskManager/debug/'elif system == 'windows':
self.pkey = paramiko.RSAKey.from_private_key_file(r'C:\Users\001\.ssh\id_rsa')
self.log_path = r'D:\tmp\TaskManager\log\\'
self.log_debug_path = r'D:\tmp\TaskManager\debug\\'

if os.path.exists(self.log_path) is False:
os.makedirs(self.log_path, exist_ok=True)
if os.path.exists(self.log_debug_path) is False:
os.makedirs(self.log_debug_path, exist_ok=True)

self.logger = logging.getLogger(__name__)
self.logger.setLevel(level=logging.DEBUG)
self.handler = logging.FileHandler(self.log_debug_path + '%s_%s.log' % (self.datetime, self.task_sn))
self.formatter = logging.Formatter("%(asctime)s[%(levelname)s][%(funcName)s]%(message)s ")
self.handler.setFormatter(self.formatter)
self.logger.addHandler(self.handler)
self.logger.info('初始化完成'.encode(encoding='utf8'))

def ssh_client(self, ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if auth == 'id_rsa':
self.logger.info('正在 SSH 连接%s'.encode('utf8') % str(ip).encode('utf8'))
client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=60, timeout=self.timeout)
elif auth == 'noAuth':
if user is not None and passwd is not None:
client.connect(ip, self.port, user, passwd, banner_timeout=60, timeout=self.timeout)
# allow_agent=False, look_for_keys=False# No existing session 解决办法 else:raise ValueError('传入的用户名密码不能为空')else:raise NameError('不存在此%s 认证方式' % auth)return client

def ssh_exec(self, ip, cmd, user=None, passwd=None, auth='id_rsa'):try:
ssh = self.ssh_client(ip, user, passwd, auth)
stdin, stdout, stderr = ssh.exec_command(command=cmd, get_pty=True)
self.logger.debug('%s:stdin 输入:%s' % (ip, stdin))
self.logger.debug('%s:stderr 错误:%s' % (ip, stderr))
self.logger.debug('%s:stdout 输出:%s' % (ip, stdout))
result = stdout.read().decode('utf-8')
ssh.close()
return 'success:' + ip + ':' + str(result)
except Exception as e:
return 'failed:' + ip + ':' + str(e)

def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
self.logger.debug('接收参数 ip:%s, cmd:%s, user:%s passwd:%s cmd_type:%s, task_name:%s' %
(ip, cmd, user, passwd, cmd_type, task_name))
print('\033[35;1m-------------------Task is set up! Time:%s ----------------------\033[0m' % self.now_time)
pool = 纯干货!Python在运维中的应用:批量ssh/sftp
新闻来源:http://www.shufengxianlan.com/qtweb/news25/257475.html

成都网站建设公司_创新互联,为您提供网页设计公司企业网站制作外贸建站定制网站电子商务网站建设

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联