Python 在 DevOps 领域扮演着重要角色,其简洁的语法、丰富的生态系统和强大的自动化能力使其成为运维自动化的首选语言。本指南将从 DevOps 理念开始,逐步深入到 Python 在各个 DevOps 场景中的实际应用,帮助您构建高效的自动化运维体系。

图:《Python for DevOps: Learn Ruthlessly Effective Automation》
DevOps 理念与文化#
什么是 DevOps#
DevOps 是一种文化、实践和工具的组合,旨在提高组织快速交付应用程序和服务的能力。它强调开发(Development)和运维(Operations)团队之间的协作与沟通。
DevOps 核心价值观#
graph TD
A[DevOps 核心价值观] --> B[协作 Collaboration]
A --> C[自动化 Automation]
A --> D[持续改进 Continuous Improvement]
A --> E[快速反馈 Fast Feedback]
B --> B1[打破部门壁垒]
B --> B2[共享责任]
B --> B3[透明沟通]
C --> C1[基础设施即代码]
C --> C2[CI/CD 流水线]
C --> C3[自动化测试]
D --> D1[度量和监控]
D --> D2[学习和实验]
D --> D3[故障后分析]
E --> E1[快速部署]
E --> E2[实时监控]
E --> E3[用户反馈]
DevOps 生命周期#
# DevOps 生命周期的 Python 表示
class DevOpsLifecycle:
def __init__(self):
self.stages = [
"Plan", # 规划
"Code", # 编码
"Build", # 构建
"Test", # 测试
"Release", # 发布
"Deploy", # 部署
"Operate", # 运维
"Monitor" # 监控
]
def execute_stage(self, stage, artifacts=None):
"""执行 DevOps 生命周期的某个阶段"""
if stage not in self.stages:
raise ValueError(f"Invalid stage: {stage}")
print(f"Executing {stage} stage...")
# 根据不同阶段执行相应操作
if stage == "Build":
return self.build_application(artifacts)
elif stage == "Test":
return self.run_tests(artifacts)
elif stage == "Deploy":
return self.deploy_application(artifacts)
elif stage == "Monitor":
return self.monitor_application()
def build_application(self, source_code):
"""构建应用程序"""
# 模拟构建过程
return {"status": "success", "artifact": "app.tar.gz"}
def run_tests(self, build_artifact):
"""运行自动化测试"""
# 模拟测试过程
return {"status": "passed", "coverage": 85}
def deploy_application(self, artifact):
"""部署应用程序"""
# 模拟部署过程
return {"status": "deployed", "url": "https://app.example.com"}
def monitor_application(self):
"""监控应用程序"""
# 模拟监控过程
return {"status": "healthy", "uptime": "99.9%"}
# 使用示例
devops = DevOpsLifecycle()
for stage in devops.stages:
result = devops.execute_stage(stage)
print(f"{stage} result: {result}")
DevOps 团队文化建设#
高效团队的特征#
基于《Python for DevOps》一书的观点,一个优秀的 DevOps 团队应该具备以下特征:
明确且令人振奋的目标
- 团队成员对目标有共同理解
- 目标具有挑战性但可实现
- 与业务价值紧密相关
以结果为导向的结构
- 避免无效的会议和流程
- 专注于可衡量的结果
- 快速决策和执行
能干的团队成员
- 技术能力过硬
- 学习能力强
- 具备跨领域知识
统一承诺
- 对团队目标的共同承诺
- 相互支持和协作
- 共同承担责任
合作的氛围
- 开放透明的沟通
- 建设性的反馈
- 相互尊重
卓越的标准
- 高质量的代码和文档
- 严格的测试和部署流程
- 持续改进的意识
外部支持和认可
- 管理层的支持
- 资源的保障
- 成果的认可
原则性领导
- 基于数据的决策
- 以身作则
- 培养团队成员
团队协作实践#
import datetime
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class TeamMember:
name: str
role: str
skills: List[str]
availability: float # 0.0 to 1.0
@dataclass
class Task:
id: str
title: str
description: str
required_skills: List[str]
estimated_hours: int
priority: str # high, medium, low
assigned_to: Optional[str] = None
status: str = "todo" # todo, in_progress, done
class DevOpsTeam:
def __init__(self, name: str):
self.name = name
self.members: List[TeamMember] = []
self.tasks: List[Task] = []
self.sprint_capacity = 0
def add_member(self, member: TeamMember):
"""添加团队成员"""
self.members.append(member)
print(f"Added {member.name} ({member.role}) to team {self.name}")
def calculate_sprint_capacity(self, sprint_hours: int = 80):
"""计算团队冲刺容量"""
total_capacity = 0
for member in self.members:
member_capacity = sprint_hours * member.availability
total_capacity += member_capacity
print(f"{member.name}: {member_capacity} hours")
self.sprint_capacity = total_capacity
print(f"Total team capacity: {total_capacity} hours")
return total_capacity
def assign_tasks(self):
"""智能任务分配"""
# 按优先级排序任务
sorted_tasks = sorted(self.tasks,
key=lambda x: {"high": 3, "medium": 2, "low": 1}[x.priority],
reverse=True)
for task in sorted_tasks:
if task.assigned_to:
continue
# 找到最适合的团队成员
best_member = self.find_best_assignee(task)
if best_member:
task.assigned_to = best_member.name
print(f"Assigned task '{task.title}' to {best_member.name}")
def find_best_assignee(self, task: Task) -> Optional[TeamMember]:
"""找到最适合执行任务的团队成员"""
candidates = []
for member in self.members:
# 检查技能匹配度
skill_match = len(set(task.required_skills) & set(member.skills))
if skill_match > 0:
candidates.append((member, skill_match))
if not candidates:
return None
# 选择技能匹配度最高的成员
candidates.sort(key=lambda x: x[1], reverse=True)
return candidates[0][0]
def generate_daily_standup_report(self):
"""生成每日站会报告"""
print(f"\n=== Daily Standup Report for {self.name} ===")
print(f"Date: {datetime.datetime.now().strftime('%Y-%m-%d')}")
for member in self.members:
member_tasks = [t for t in self.tasks if t.assigned_to == member.name]
in_progress = [t for t in member_tasks if t.status == "in_progress"]
completed = [t for t in member_tasks if t.status == "done"]
print(f"\n{member.name} ({member.role}):")
print(f" Yesterday: Completed {len(completed)} tasks")
print(f" Today: Working on {len(in_progress)} tasks")
if in_progress:
for task in in_progress:
print(f" - {task.title}")
# 使用示例
team = DevOpsTeam("Platform Engineering")
# 添加团队成员
team.add_member(TeamMember("Alice", "DevOps Engineer", ["Python", "Docker", "Kubernetes"], 0.8))
team.add_member(TeamMember("Bob", "SRE", ["Go", "Prometheus", "Grafana"], 0.9))
team.add_member(TeamMember("Charlie", "Platform Engineer", ["Python", "Terraform", "AWS"], 0.7))
# 添加任务
team.tasks.extend([
Task("T001", "Setup CI/CD Pipeline", "Implement GitLab CI/CD", ["Python", "Docker"], 16, "high"),
Task("T002", "Monitor Setup", "Setup Prometheus monitoring", ["Prometheus", "Grafana"], 12, "medium"),
Task("T003", "Infrastructure as Code", "Terraform AWS resources", ["Terraform", "AWS"], 20, "high")
])
# 计算容量和分配任务
team.calculate_sprint_capacity()
team.assign_tasks()
team.generate_daily_standup_report()
核心理念与思考#
2019 年,70%的开发者认为自己高于平均水平,而 10%的人认为自己低于平均水平。
相信自动化优于等级制度的信念正是 DevOps 的核心。
Windows操作系统会临时关闭整个网络堆栈。如果在短时间内生成了太多的网络连接,操作系统会保护自己。构建服务器是一个基础设施,
必须确保其正常运行,以便能够可靠地交付软件。自动化的每个工作都是你的工作。没有比确保事情自动化更重要或更有价值的任务了
如果组织中的领导
比其他人更好(更多资源)/更高,你将永远无法实施真正的 DevOps 原则。你将应用最高薪酬人员的意见(HIPO)原则。虽然 DevOps 可以真正挽救生命并拯救你的公司,但 HIPO 是凶猛的动物,可以并且确实会摧毁它们所经之处的一切。工作博弈论:在武术馆里,让学生帮忙拖地是司空见惯的事情。这样做有很多明显的原因。它向教练表达了尊重,并教会学生自律。
- 这里涉及到一个
博弈论问题。接触到葡萄球菌感染可能会引发严重的健康问题。如果你有机会在你训练的健身房里拖地,要仔细考虑你的回应。人们会观察你清洁地板的能力,如果你做得好,他们会因为尊重你而也做得好。如果你把这个任务看作是“低人一等” 的事情,没有做好,可能会引发两个问题。一是你没有清洁好地板,可能导致健身房的其他会员生病。二是你“感染”了其他人的心态,他们反过来也不会清洁地板。你的行为在现在和未来都会产生影响。
- 这里涉及到一个
确保你用愉快的表情做出出色的工作。你的生命可能取决于此。
一个好的团队的特征
一个明确、令人振奋的目标
一个以结果为导向的结构
许多公司使用的工具和流程如果不能直接归因于结果,那么它们就是值得质疑的:Skype、电子邮件、漫长的会议、加班。能干的团队成员
统一承诺
合作的氛围
需要创造一种尊重的环境,人们能够坦诚开放并期待反馈。如果偏向任何一方的程度过高,就注定会失败。
招聘流程。许多公司抱怨无法招聘、无法实现多样化招聘,以及无法找到优秀的候选人。- 首先,公司鼓励候选人申请。
- 接下来,他们浪费时间进行定制的无关测试。
- 然后他们用一轮比随机还没有预测价值的面试来“雾化”他们。
- 然后他们就对候选人置之不理,不给任何反馈。
- 他们撒谎说他们正在努力招聘人才,但实际上他们的流程有问题
- 然后他们在社交媒体上大声抱怨多样化候选人或任何候选人的参与有多么困难。
以尊重的态度对待人,你就会得到尊重
卓越的标准
- 另一种表达方式是说需要更高程度的
自律。编写软件、测试和部署需要更高的标准。在部署之前,需要更严格的措施来阅读有关新技术的文档。 - 要发布没有经过
适当的 DevOps 生命周期的代码。 - 在基础设施方面,需要在许多步骤上遵循最佳实践,无论是 Zookeeper 配置、EC2 存储配置,还是 Mongo 或无服务器。
- 管理层需要高标准。公司中的每个人都能看到,
决策时使用的是数据而不是观点、等级、攻击性或渴望佣金的欲望。
- 另一种表达方式是说需要更高程度的
外部支持和认可
- 当领导者表现出低于平均水平的承诺和诚信时,要求超常贡献是具有挑战性的。
- 一个部门把困难的任务推给另一个部门。甩锅、逃避/推脱 责任
原则性领导
发掘技术点
GitHub - pytest-dev/pytest-testinfra: Testinfra test your infrastructures
查了一下资料,这类工具还有很多如:
- Serverspec: Serverspec 是一个 Ruby DSL(领域特定语言),用于编写基础设施测试。它可以用于测试服务器的状态、配置和软件包。
- Goss: Goss(Golang Server Spec)是一个用 Go 编写的服务器测试工具。它使用 YAML 文件来定义测试,并支持检查文件、包、用户、端口等。
- Molecule: Molecule 是一个用于测试 Ansible 角色的工具。它可以自动化测试 Ansible 配置的正确性。
- KitchenCI: Test Kitchen 是一个基于 Ruby 的工具,用于测试基础设施即代码(Infrastructure as Code)。它可以测试使用 Chef、Puppet、Ansible 等配置管理工具创建的基础设施。
- Bats: Bats(Bash Automated Testing System)是一个基于 Bash 的测试框架,用于编写和运行 shell 脚本测试。
- Terratest: Terratest 是一个 Go 语言库,用于编写自动化基础设施测试的代码。它可以与 Terraform 一起使用,测试基础设施即代码的正确性。
- Kitchen-Terraform: Kitchen-Terraform 是 Test Kitchen 的插件,用于测试 Terraform 配置的正确性。
- Pester: Pester 是一个用于测试 PowerShell 脚本的工具,适用于 Windows 环境。
- Cucumber: Cucumber 是一个行为驱动开发(BDD)工具,用于编写可执行的规范和测试。它支持多种编程语言,并用于测试各种应用程序类型,而不仅仅是基础设施。
Python DevOps 实战应用#
自动化运维脚本#
服务器健康检查#
#!/usr/bin/env python3
"""
服务器健康检查脚本
"""
import psutil
import requests
import subprocess
import socket
import time
from datetime import datetime
from typing import Dict, List, Any
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class HealthChecker:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.results = {}
self.alerts = []
def check_cpu_usage(self) -> Dict[str, Any]:
"""检查 CPU 使用率"""
cpu_percent = psutil.cpu_percent(interval=1)
threshold = self.config.get('cpu_threshold', 80)
result = {
'metric': 'cpu_usage',
'value': cpu_percent,
'threshold': threshold,
'status': 'OK' if cpu_percent < threshold else 'CRITICAL',
'timestamp': datetime.now().isoformat()
}
if result['status'] == 'CRITICAL':
self.alerts.append(f"High CPU usage: {cpu_percent}%")
return result
def check_memory_usage(self) -> Dict[str, Any]:
"""检查内存使用率"""
memory = psutil.virtual_memory()
threshold = self.config.get('memory_threshold', 85)
result = {
'metric': 'memory_usage',
'value': memory.percent,
'threshold': threshold,
'status': 'OK' if memory.percent < threshold else 'CRITICAL',
'timestamp': datetime.now().isoformat()
}
if result['status'] == 'CRITICAL':
self.alerts.append(f"High memory usage: {memory.percent}%")
return result
def check_disk_usage(self) -> List[Dict[str, Any]]:
"""检查磁盘使用率"""
results = []
threshold = self.config.get('disk_threshold', 90)
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
percent = (usage.used / usage.total) * 100
result = {
'metric': 'disk_usage',
'mountpoint': partition.mountpoint,
'value': percent,
'threshold': threshold,
'status': 'OK' if percent < threshold else 'CRITICAL',
'timestamp': datetime.now().isoformat()
}
if result['status'] == 'CRITICAL':
self.alerts.append(f"High disk usage on {partition.mountpoint}: {percent:.1f}%")
results.append(result)
except PermissionError:
continue
return results
def check_service_status(self) -> List[Dict[str, Any]]:
"""检查服务状态"""
results = []
services = self.config.get('services', [])
for service in services:
try:
result = subprocess.run(
['systemctl', 'is-active', service],
capture_output=True, text=True
)
status = result.stdout.strip()
is_active = status == 'active'
check_result = {
'metric': 'service_status',
'service': service,
'status': 'OK' if is_active else 'CRITICAL',
'value': status,
'timestamp': datetime.now().isoformat()
}
if not is_active:
self.alerts.append(f"Service {service} is not active: {status}")
results.append(check_result)
except Exception as e:
results.append({
'metric': 'service_status',
'service': service,
'status': 'ERROR',
'error': str(e),
'timestamp': datetime.now().isoformat()
})
return results
def check_port_connectivity(self) -> List[Dict[str, Any]]:
"""检查端口连通性"""
results = []
ports = self.config.get('ports', [])
for port_config in ports:
host = port_config.get('host', 'localhost')
port = port_config['port']
timeout = port_config.get('timeout', 5)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
is_open = result == 0
check_result = {
'metric': 'port_connectivity',
'host': host,
'port': port,
'status': 'OK' if is_open else 'CRITICAL',
'timestamp': datetime.now().isoformat()
}
if not is_open:
self.alerts.append(f"Port {host}:{port} is not accessible")
results.append(check_result)
except Exception as e:
results.append({
'metric': 'port_connectivity',
'host': host,
'port': port,
'status': 'ERROR',
'error': str(e),
'timestamp': datetime.now().isoformat()
})
return results
def check_http_endpoints(self) -> List[Dict[str, Any]]:
"""检查 HTTP 端点"""
results = []
endpoints = self.config.get('http_endpoints', [])
for endpoint in endpoints:
url = endpoint['url']
timeout = endpoint.get('timeout', 10)
expected_status = endpoint.get('expected_status', 200)
try:
response = requests.get(url, timeout=timeout)
is_healthy = response.status_code == expected_status
result = {
'metric': 'http_endpoint',
'url': url,
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'status': 'OK' if is_healthy else 'CRITICAL',
'timestamp': datetime.now().isoformat()
}
if not is_healthy:
self.alerts.append(f"HTTP endpoint {url} returned status {response.status_code}")
results.append(result)
except Exception as e:
results.append({
'metric': 'http_endpoint',
'url': url,
'status': 'ERROR',
'error': str(e),
'timestamp': datetime.now().isoformat()
})
return results
def run_all_checks(self) -> Dict[str, Any]:
"""运行所有健康检查"""
print("Running health checks...")
self.results = {
'hostname': socket.gethostname(),
'timestamp': datetime.now().isoformat(),
'checks': {
'cpu': self.check_cpu_usage(),
'memory': self.check_memory_usage(),
'disk': self.check_disk_usage(),
'services': self.check_service_status(),
'ports': self.check_port_connectivity(),
'http_endpoints': self.check_http_endpoints()
},
'alerts': self.alerts
}
return self.results
def send_alerts(self):
"""发送告警"""
if not self.alerts:
return
email_config = self.config.get('email', {})
if not email_config:
print("No email configuration found, skipping alerts")
return
try:
msg = MIMEMultipart()
msg['From'] = email_config['from']
msg['To'] = ', '.join(email_config['to'])
msg['Subject'] = f"Health Check Alerts - {socket.gethostname()}"
body = f"""
Health Check Alerts for {socket.gethostname()}
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Alerts:
{chr(10).join(f"- {alert}" for alert in self.alerts)}
Full report attached.
"""
msg.attach(MIMEText(body, 'plain'))
# 添加详细报告
report_json = json.dumps(self.results, indent=2)
attachment = MIMEText(report_json, 'plain')
attachment.add_header('Content-Disposition', 'attachment', filename='health_report.json')
msg.attach(attachment)
# 发送邮件
server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
if email_config.get('use_tls'):
server.starttls()
if email_config.get('username'):
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
server.quit()
print(f"Alert email sent to {email_config['to']}")
except Exception as e:
print(f"Failed to send alert email: {e}")
def save_results(self, filename: str = None):
"""保存检查结果"""
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"health_check_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"Results saved to: {filename}")
# 配置示例
config = {
'cpu_threshold': 80,
'memory_threshold': 85,
'disk_threshold': 90,
'services': ['nginx', 'mysql', 'redis'],
'ports': [
{'host': 'localhost', 'port': 80},
{'host': 'localhost', 'port': 3306},
{'host': 'localhost', 'port': 6379}
],
'http_endpoints': [
{'url': 'http://localhost/health', 'expected_status': 200},
{'url': 'https://api.example.com/status', 'expected_status': 200}
],
'email': {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'use_tls': True,
'username': 'alerts@example.com',
'password': 'your_password',
'from': 'alerts@example.com',
'to': ['admin@example.com', 'ops@example.com']
}
}
# 使用示例
if __name__ == "__main__":
checker = HealthChecker(config)
results = checker.run_all_checks()
# 打印摘要
print(f"\nHealth Check Summary for {results['hostname']}:")
print(f"Time: {results['timestamp']}")
print(f"Alerts: {len(results['alerts'])}")
if results['alerts']:
print("\nAlerts:")
for alert in results['alerts']:
print(f" - {alert}")
# 发送告警
checker.send_alerts()
else:
print("All checks passed!")
# 保存结果
checker.save_results()
基础设施即代码 (IaC)#
AWS 资源管理#
#!/usr/bin/env python3
"""
AWS 资源管理工具
"""
import boto3
import json
import time
from typing import Dict, List, Any, Optional
from botocore.exceptions import ClientError
import yaml
class AWSResourceManager:
def __init__(self, region: str = 'us-west-2', profile: str = None):
self.region = region
self.session = boto3.Session(profile_name=profile)
# 初始化各种 AWS 客户端
self.ec2 = self.session.client('ec2', region_name=region)
self.s3 = self.session.client('s3')
self.rds = self.session.client('rds', region_name=region)
self.elbv2 = self.session.client('elbv2', region_name=region)
self.route53 = self.session.client('route53')
self.cloudformation = self.session.client('cloudformation', region_name=region)
def create_vpc(self, vpc_config: Dict[str, Any]) -> str:
"""创建 VPC"""
try:
# 创建 VPC
vpc_response = self.ec2.create_vpc(
CidrBlock=vpc_config['cidr_block'],
TagSpecifications=[{
'ResourceType': 'vpc',
'Tags': [
{'Key': 'Name', 'Value': vpc_config['name']},
{'Key': 'Environment', 'Value': vpc_config.get('environment', 'dev')}
]
}]
)
vpc_id = vpc_response['Vpc']['VpcId']
print(f"Created VPC: {vpc_id}")
# 等待 VPC 可用
self.ec2.get_waiter('vpc_available').wait(VpcIds=[vpc_id])
# 启用 DNS 主机名
self.ec2.modify_vpc_attribute(
VpcId=vpc_id,
EnableDnsHostnames={'Value': True}
)
# 创建子网
for subnet_config in vpc_config.get('subnets', []):
self.create_subnet(vpc_id, subnet_config)
# 创建互联网网关
if vpc_config.get('internet_gateway', True):
self.create_internet_gateway(vpc_id, vpc_config['name'])
return vpc_id
except ClientError as e:
print(f"Error creating VPC: {e}")
raise
def create_subnet(self, vpc_id: str, subnet_config: Dict[str, Any]) -> str:
"""创建子网"""
try:
subnet_response = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=subnet_config['cidr_block'],
AvailabilityZone=subnet_config.get('availability_zone'),
TagSpecifications=[{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': subnet_config['name']},
{'Key': 'Type', 'Value': subnet_config.get('type', 'private')}
]
}]
)
subnet_id = subnet_response['Subnet']['SubnetId']
print(f"Created subnet: {subnet_id}")
# 如果是公共子网,启用自动分配公网 IP
if subnet_config.get('type') == 'public':
self.ec2.modify_subnet_attribute(
SubnetId=subnet_id,
MapPublicIpOnLaunch={'Value': True}
)
return subnet_id
except ClientError as e:
print(f"Error creating subnet: {e}")
raise
def create_internet_gateway(self, vpc_id: str, name: str) -> str:
"""创建互联网网关"""
try:
# 创建互联网网关
igw_response = self.ec2.create_internet_gateway(
TagSpecifications=[{
'ResourceType': 'internet-gateway',
'Tags': [{'Key': 'Name', 'Value': f"{name}-igw"}]
}]
)
igw_id = igw_response['InternetGateway']['InternetGatewayId']
print(f"Created Internet Gateway: {igw_id}")
# 附加到 VPC
self.ec2.attach_internet_gateway(
InternetGatewayId=igw_id,
VpcId=vpc_id
)
return igw_id
except ClientError as e:
print(f"Error creating Internet Gateway: {e}")
raise
def create_security_group(self, vpc_id: str, sg_config: Dict[str, Any]) -> str:
"""创建安全组"""
try:
sg_response = self.ec2.create_security_group(
GroupName=sg_config['name'],
Description=sg_config['description'],
VpcId=vpc_id,
TagSpecifications=[{
'ResourceType': 'security-group',
'Tags': [{'Key': 'Name', 'Value': sg_config['name']}]
}]
)
sg_id = sg_response['GroupId']
print(f"Created Security Group: {sg_id}")
# 添加入站规则
for rule in sg_config.get('ingress_rules', []):
self.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[{
'IpProtocol': rule['protocol'],
'FromPort': rule['from_port'],
'ToPort': rule['to_port'],
'IpRanges': [{'CidrIp': rule['cidr']}]
}]
)
return sg_id
except ClientError as e:
print(f"Error creating Security Group: {e}")
raise
def launch_ec2_instance(self, instance_config: Dict[str, Any]) -> str:
"""启动 EC2 实例"""
try:
# 用户数据脚本
user_data = instance_config.get('user_data', '')
if isinstance(user_data, list):
user_data = '\n'.join(user_data)
response = self.ec2.run_instances(
ImageId=instance_config['ami_id'],
MinCount=1,
MaxCount=1,
InstanceType=instance_config['instance_type'],
KeyName=instance_config.get('key_name'),
SecurityGroupIds=instance_config.get('security_group_ids', []),
SubnetId=instance_config.get('subnet_id'),
UserData=user_data,
TagSpecifications=[{
'ResourceType': 'instance',
'Tags': [
{'Key': 'Name', 'Value': instance_config['name']},
{'Key': 'Environment', 'Value': instance_config.get('environment', 'dev')}
]
}]
)
instance_id = response['Instances'][0]['InstanceId']
print(f"Launched EC2 instance: {instance_id}")
# 等待实例运行
print("Waiting for instance to be running...")
self.ec2.get_waiter('instance_running').wait(InstanceIds=[instance_id])
return instance_id
except ClientError as e:
print(f"Error launching EC2 instance: {e}")
raise
def create_load_balancer(self, lb_config: Dict[str, Any]) -> str:
"""创建负载均衡器"""
try:
response = self.elbv2.create_load_balancer(
Name=lb_config['name'],
Subnets=lb_config['subnet_ids'],
SecurityGroups=lb_config.get('security_group_ids', []),
Scheme=lb_config.get('scheme', 'internet-facing'),
Type=lb_config.get('type', 'application'),
Tags=[
{'Key': 'Name', 'Value': lb_config['name']},
{'Key': 'Environment', 'Value': lb_config.get('environment', 'dev')}
]
)
lb_arn = response['LoadBalancers'][0]['LoadBalancerArn']
print(f"Created Load Balancer: {lb_arn}")
return lb_arn
except ClientError as e:
print(f"Error creating Load Balancer: {e}")
raise
def deploy_infrastructure(self, config_file: str):
"""部署完整基础设施"""
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
print(f"Deploying infrastructure from {config_file}")
# 创建 VPC
vpc_id = self.create_vpc(config['vpc'])
# 创建安全组
security_groups = {}
for sg_config in config.get('security_groups', []):
sg_id = self.create_security_group(vpc_id, sg_config)
security_groups[sg_config['name']] = sg_id
# 启动 EC2 实例
instances = {}
for instance_config in config.get('instances', []):
# 解析安全组引用
if 'security_groups' in instance_config:
instance_config['security_group_ids'] = [
security_groups[sg_name] for sg_name in instance_config['security_groups']
]
instance_id = self.launch_ec2_instance(instance_config)
instances[instance_config['name']] = instance_id
# 创建负载均衡器
for lb_config in config.get('load_balancers', []):
if 'security_groups' in lb_config:
lb_config['security_group_ids'] = [
security_groups[sg_name] for sg_name in lb_config['security_groups']
]
self.create_load_balancer(lb_config)
print("Infrastructure deployment completed!")
return {
'vpc_id': vpc_id,
'security_groups': security_groups,
'instances': instances
}
# 基础设施配置示例
infrastructure_config = """
vpc:
name: "my-vpc"
cidr_block: "10.0.0.0/16"
environment: "production"
internet_gateway: true
subnets:
- name: "public-subnet-1"
cidr_block: "10.0.1.0/24"
type: "public"
availability_zone: "us-west-2a"
- name: "private-subnet-1"
cidr_block: "10.0.2.0/24"
type: "private"
availability_zone: "us-west-2a"
security_groups:
- name: "web-sg"
description: "Security group for web servers"
ingress_rules:
- protocol: "tcp"
from_port: 80
to_port: 80
cidr: "0.0.0.0/0"
- protocol: "tcp"
from_port: 443
to_port: 443
cidr: "0.0.0.0/0"
- protocol: "tcp"
from_port: 22
to_port: 22
cidr: "10.0.0.0/16"
instances:
- name: "web-server-1"
ami_id: "ami-0c02fb55956c7d316" # Amazon Linux 2
instance_type: "t3.micro"
key_name: "my-key-pair"
security_groups: ["web-sg"]
environment: "production"
user_data: |
#!/bin/bash
yum update -y
yum install -y httpd
systemctl start httpd
systemctl enable httpd
echo "<h1>Hello from Web Server 1</h1>" > /var/www/html/index.html
load_balancers:
- name: "web-lb"
type: "application"
scheme: "internet-facing"
security_groups: ["web-sg"]
environment: "production"
"""
# 使用示例
if __name__ == "__main__":
# 保存配置到文件
with open('infrastructure.yaml', 'w') as f:
f.write(infrastructure_config)
# 部署基础设施
manager = AWSResourceManager(region='us-west-2')
result = manager.deploy_infrastructure('infrastructure.yaml')
print(f"Deployment result: {json.dumps(result, indent=2)}")
CI/CD 自动化#
GitLab CI/CD 管理#
#!/usr/bin/env python3
"""
GitLab CI/CD 管理工具
"""
import requests
import json
import time
from typing import Dict, List, Any, Optional
import yaml
class GitLabCIManager:
def __init__(self, gitlab_url: str, access_token: str):
self.gitlab_url = gitlab_url.rstrip('/')
self.access_token = access_token
self.headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
def get_project(self, project_id: str) -> Dict[str, Any]:
"""获取项目信息"""
url = f"{self.gitlab_url}/api/v4/projects/{project_id}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def get_pipelines(self, project_id: str, status: str = None) -> List[Dict[str, Any]]:
"""获取流水线列表"""
url = f"{self.gitlab_url}/api/v4/projects/{project_id}/pipelines"
params = {}
if status:
params['status'] = status
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
def get_pipeline_jobs(self, project_id: str, pipeline_id: str) -> List[Dict[str, Any]]:
"""获取流水线的作业列表"""
url = f"{self.gitlab_url}/api/v4/projects/{project_id}/pipelines/{pipeline_id}/jobs"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def trigger_pipeline(self, project_id: str, ref: str = 'main', variables: Dict[str, str] = None) -> Dict[str, Any]:
"""触发流水线"""
url = f"{self.gitlab_url}/api/v4/projects/{project_id}/pipeline"
data = {'ref': ref}
if variables:
data['variables'] = [
{'key': key, 'value': value} for key, value in variables.items()
]
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json()
def wait_for_pipeline(self, project_id: str, pipeline_id: str, timeout: int = 1800) -> str:
"""等待流水线完成"""
start_time = time.time()
while time.time() - start_time < timeout:
url = f"{self.gitlab_url}/api/v4/projects/{project_id}/pipelines/{pipeline_id}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
pipeline = response.json()
status = pipeline['status']
print(f"Pipeline {pipeline_id} status: {status}")
if status in ['success', 'failed', 'canceled', 'skipped']:
return status
time.sleep(30)
raise TimeoutError(f"Pipeline {pipeline_id} did not complete within {timeout} seconds")
def get_job_log(self, project_id: str, job_id: str) -> str:
"""获取作业日志"""
url = f"{self.gitlab_url}/api/v4/projects/{project_id}/jobs/{job_id}/trace"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.text
def create_deployment(self, project_id: str, deployment_config: Dict[str, Any]) -> Dict[str, Any]:
"""创建部署"""
url = f"{self.gitlab_url}/api/v4/projects/{project_id}/deployments"
response = requests.post(url, headers=self.headers, json=deployment_config)
response.raise_for_status()
return response.json()
def generate_gitlab_ci_yaml(self, config: Dict[str, Any]) -> str:
"""生成 .gitlab-ci.yml 文件"""
gitlab_ci = {
'stages': config.get('stages', ['build', 'test', 'deploy']),
'variables': config.get('variables', {}),
'before_script': config.get('before_script', []),
'after_script': config.get('after_script', [])
}
# 添加作业
for job_name, job_config in config.get('jobs', {}).items():
gitlab_ci[job_name] = job_config
return yaml.dump(gitlab_ci, default_flow_style=False)
def deploy_application(self, project_id: str, environment: str, variables: Dict[str, str] = None) -> Dict[str, Any]:
"""部署应用程序"""
print(f"Deploying to {environment}...")
# 触发部署流水线
deploy_variables = variables or {}
deploy_variables['ENVIRONMENT'] = environment
pipeline = self.trigger_pipeline(project_id, variables=deploy_variables)
pipeline_id = pipeline['id']
print(f"Triggered pipeline {pipeline_id}")
# 等待流水线完成
status = self.wait_for_pipeline(project_id, pipeline_id)
if status == 'success':
print(f"Deployment to {environment} successful!")
else:
print(f"Deployment to {environment} failed with status: {status}")
# 获取失败的作业日志
jobs = self.get_pipeline_jobs(project_id, pipeline_id)
for job in jobs:
if job['status'] == 'failed':
print(f"\nFailed job: {job['name']}")
log = self.get_job_log(project_id, job['id'])
print(f"Log:\n{log[-1000:]}") # 显示最后 1000 字符
return {
'pipeline_id': pipeline_id,
'status': status,
'environment': environment
}
class DockerManager:
def __init__(self):
import docker
self.client = docker.from_env()
def build_image(self, dockerfile_path: str, image_name: str, tag: str = 'latest') -> str:
"""构建 Docker 镜像"""
print(f"Building Docker image: {image_name}:{tag}")
image, logs = self.client.images.build(
path=dockerfile_path,
tag=f"{image_name}:{tag}",
rm=True
)
for log in logs:
if 'stream' in log:
print(log['stream'].strip())
print(f"Image built successfully: {image.id}")
return image.id
def push_image(self, image_name: str, tag: str = 'latest', registry: str = None) -> bool:
"""推送镜像到仓库"""
full_name = f"{image_name}:{tag}"
if registry:
full_name = f"{registry}/{full_name}"
print(f"Pushing image: {full_name}")
try:
for line in self.client.images.push(full_name, stream=True, decode=True):
if 'status' in line:
print(f"{line['status']}: {line.get('progress', '')}")
print("Image pushed successfully!")
return True
except Exception as e:
print(f"Failed to push image: {e}")
return False
def run_container(self, image_name: str, container_config: Dict[str, Any]) -> str:
"""运行容器"""
print(f"Running container from image: {image_name}")
container = self.client.containers.run(
image_name,
**container_config,
detach=True
)
print(f"Container started: {container.id}")
return container.id
def generate_dockerfile(self, app_config: Dict[str, Any]) -> str:
"""生成 Dockerfile"""
base_image = app_config.get('base_image', 'python:3.9-slim')
working_dir = app_config.get('working_dir', '/app')
dockerfile_content = f"""
FROM {base_image}
WORKDIR {working_dir}
# 安装系统依赖
RUN apt-get update && apt-get install -y \\
{' '.join(app_config.get('system_packages', []))} \\
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 设置环境变量
{chr(10).join(f'ENV {key}={value}' for key, value in app_config.get('env_vars', {}).items())}
# 暴露端口
EXPOSE {app_config.get('port', 8000)}
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \\
CMD {app_config.get('health_check', 'curl -f http://localhost:8000/health || exit 1')}
# 启动命令
CMD {json.dumps(app_config.get('cmd', ['python', 'app.py']))}
"""
return dockerfile_content.strip()
# CI/CD 配置示例
ci_config = {
'stages': ['build', 'test', 'security', 'deploy'],
'variables': {
'DOCKER_DRIVER': 'overlay2',
'DOCKER_TLS_CERTDIR': '/certs'
},
'before_script': [
'echo "Starting CI/CD pipeline"',
'docker info'
],
'jobs': {
'build': {
'stage': 'build',
'image': 'docker:latest',
'services': ['docker:dind'],
'script': [
'docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .',
'docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA'
],
'only': ['main', 'develop']
},
'test': {
'stage': 'test',
'image': 'python:3.9',
'script': [
'pip install -r requirements-dev.txt',
'pytest tests/ --cov=src --cov-report=xml',
'flake8 src/'
],
'artifacts': {
'reports': {
'coverage_report': {
'coverage_format': 'cobertura',
'path': 'coverage.xml'
}
}
}
},
'security_scan': {
'stage': 'security',
'image': 'owasp/zap2docker-stable',
'script': [
'zap-baseline.py -t http://localhost:8000'
],
'allow_failure': True
},
'deploy_staging': {
'stage': 'deploy',
'image': 'alpine/helm:latest',
'script': [
'helm upgrade --install myapp ./helm-chart --namespace staging',
'kubectl rollout status deployment/myapp -n staging'
],
'environment': {
'name': 'staging',
'url': 'https://staging.example.com'
},
'only': ['develop']
},
'deploy_production': {
'stage': 'deploy',
'image': 'alpine/helm:latest',
'script': [
'helm upgrade --install myapp ./helm-chart --namespace production',
'kubectl rollout status deployment/myapp -n production'
],
'environment': {
'name': 'production',
'url': 'https://example.com'
},
'when': 'manual',
'only': ['main']
}
}
}
# 使用示例
if __name__ == "__main__":
# GitLab CI/CD 管理
gitlab_manager = GitLabCIManager(
gitlab_url='https://gitlab.example.com',
access_token='your_access_token'
)
# 生成 .gitlab-ci.yml
gitlab_ci_yaml = gitlab_manager.generate_gitlab_ci_yaml(ci_config)
with open('.gitlab-ci.yml', 'w') as f:
f.write(gitlab_ci_yaml)
print("Generated .gitlab-ci.yml")
# Docker 管理
docker_manager = DockerManager()
# 应用配置
app_config = {
'base_image': 'python:3.9-slim',
'working_dir': '/app',
'system_packages': ['curl', 'git'],
'port': 8000,
'env_vars': {
'FLASK_ENV': 'production',
'DATABASE_URL': 'postgresql://user:pass@db:5432/myapp'
},
'health_check': 'curl -f http://localhost:8000/health || exit 1',
'cmd': ['gunicorn', '--bind', '0.0.0.0:8000', 'app:app']
}
# 生成 Dockerfile
dockerfile_content = docker_manager.generate_dockerfile(app_config)
with open('Dockerfile', 'w') as f:
f.write(dockerfile_content)
print("Generated Dockerfile")
监控和告警#
Prometheus 监控集成#
#!/usr/bin/env python3
"""
Prometheus 监控和告警工具
"""
import time
import requests
import json
from typing import Dict, List, Any, Optional
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from prometheus_client.core import CollectorRegistry
import threading
import logging
class PrometheusMonitor:
def __init__(self, prometheus_url: str, port: int = 8000):
self.prometheus_url = prometheus_url.rstrip('/')
self.port = port
self.registry = CollectorRegistry()
# 定义指标
self.request_count = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status'],
registry=self.registry
)
self.request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint'],
registry=self.registry
)
self.system_cpu_usage = Gauge(
'system_cpu_usage_percent',
'System CPU usage percentage',
registry=self.registry
)
self.system_memory_usage = Gauge(
'system_memory_usage_percent',
'System memory usage percentage',
registry=self.registry
)
self.application_health = Gauge(
'application_health_status',
'Application health status (1=healthy, 0=unhealthy)',
['service'],
registry=self.registry
)
def start_metrics_server(self):
"""启动指标服务器"""
start_http_server(self.port, registry=self.registry)
print(f"Metrics server started on port {self.port}")
def record_request(self, method: str, endpoint: str, status: int, duration: float):
"""记录 HTTP 请求指标"""
self.request_count.labels(method=method, endpoint=endpoint, status=str(status)).inc()
self.request_duration.labels(method=method, endpoint=endpoint).observe(duration)
def update_system_metrics(self):
"""更新系统指标"""
import psutil
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.system_cpu_usage.set(cpu_percent)
# 内存使用率
memory = psutil.virtual_memory()
self.system_memory_usage.set(memory.percent)
def check_service_health(self, service_name: str, health_url: str) -> bool:
"""检查服务健康状态"""
try:
response = requests.get(health_url, timeout=5)
is_healthy = response.status_code == 200
self.application_health.labels(service=service_name).set(1 if is_healthy else 0)
return is_healthy
except Exception as e:
logging.error(f"Health check failed for {service_name}: {e}")
self.application_health.labels(service=service_name).set(0)
return False
def query_prometheus(self, query: str) -> Dict[str, Any]:
"""查询 Prometheus"""
url = f"{self.prometheus_url}/api/v1/query"
params = {'query': query}
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()
def query_range(self, query: str, start: str, end: str, step: str = '15s') -> Dict[str, Any]:
"""范围查询 Prometheus"""
url = f"{self.prometheus_url}/api/v1/query_range"
params = {
'query': query,
'start': start,
'end': end,
'step': step
}
response = requests.get(url, params=params)
response.raise_for_status()
return response.json()
def get_alerts(self) -> List[Dict[str, Any]]:
"""获取当前告警"""
url = f"{self.prometheus_url}/api/v1/alerts"
response = requests.get(url)
response.raise_for_status()
return response.json()['data']['alerts']
class AlertManager:
def __init__(self, webhook_url: str = None, email_config: Dict[str, str] = None):
self.webhook_url = webhook_url
self.email_config = email_config
self.alert_rules = []
def add_alert_rule(self, rule: Dict[str, Any]):
"""添加告警规则"""
self.alert_rules.append(rule)
def check_alerts(self, metrics: Dict[str, float]):
"""检查告警条件"""
alerts = []
for rule in self.alert_rules:
metric_name = rule['metric']
threshold = rule['threshold']
operator = rule.get('operator', '>')
if metric_name not in metrics:
continue
value = metrics[metric_name]
triggered = False
if operator == '>':
triggered = value > threshold
elif operator == '<':
triggered = value < threshold
elif operator == '>=':
triggered = value >= threshold
elif operator == '<=':
triggered = value <= threshold
elif operator == '==':
triggered = value == threshold
if triggered:
alert = {
'rule_name': rule['name'],
'metric': metric_name,
'value': value,
'threshold': threshold,
'operator': operator,
'severity': rule.get('severity', 'warning'),
'message': rule.get('message', f"{metric_name} {operator} {threshold}"),
'timestamp': time.time()
}
alerts.append(alert)
return alerts
def send_alert(self, alert: Dict[str, Any]):
"""发送告警"""
if self.webhook_url:
self.send_webhook_alert(alert)
if self.email_config:
self.send_email_alert(alert)
def send_webhook_alert(self, alert: Dict[str, Any]):
"""发送 Webhook 告警"""
try:
payload = {
'text': f"🚨 Alert: {alert['rule_name']}",
'attachments': [{
'color': 'danger' if alert['severity'] == 'critical' else 'warning',
'fields': [
{'title': 'Metric', 'value': alert['metric'], 'short': True},
{'title': 'Value', 'value': str(alert['value']), 'short': True},
{'title': 'Threshold', 'value': str(alert['threshold']), 'short': True},
{'title': 'Severity', 'value': alert['severity'], 'short': True}
],
'text': alert['message']
}]
}
response = requests.post(self.webhook_url, json=payload)
response.raise_for_status()
print(f"Webhook alert sent: {alert['rule_name']}")
except Exception as e:
logging.error(f"Failed to send webhook alert: {e}")
def send_email_alert(self, alert: Dict[str, Any]):
"""发送邮件告警"""
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
msg = MIMEMultipart()
msg['From'] = self.email_config['from']
msg['To'] = ', '.join(self.email_config['to'])
msg['Subject'] = f"Alert: {alert['rule_name']}"
body = f"""
Alert Details:
- Rule: {alert['rule_name']}
- Metric: {alert['metric']}
- Current Value: {alert['value']}
- Threshold: {alert['threshold']}
- Operator: {alert['operator']}
- Severity: {alert['severity']}
- Message: {alert['message']}
- Time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(alert['timestamp']))}
"""
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
if self.email_config.get('use_tls'):
server.starttls()
if self.email_config.get('username'):
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
print(f"Email alert sent: {alert['rule_name']}")
except Exception as e:
logging.error(f"Failed to send email alert: {e}")
class LogAnalyzer:
def __init__(self, log_sources: List[Dict[str, str]]):
self.log_sources = log_sources
self.patterns = {
'error': [r'ERROR', r'FATAL', r'Exception', r'Traceback'],
'warning': [r'WARNING', r'WARN'],
'info': [r'INFO'],
'debug': [r'DEBUG']
}
def analyze_logs(self, time_range: str = '1h') -> Dict[str, Any]:
"""分析日志"""
import re
from collections import defaultdict
results = {
'summary': defaultdict(int),
'errors': [],
'warnings': [],
'patterns': defaultdict(int)
}
for source in self.log_sources:
source_type = source['type']
source_path = source['path']
if source_type == 'file':
self.analyze_log_file(source_path, results)
elif source_type == 'elasticsearch':
self.analyze_elasticsearch_logs(source, results, time_range)
return results
def analyze_log_file(self, file_path: str, results: Dict[str, Any]):
"""分析日志文件"""
import re
try:
with open(file_path, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
# 检查日志级别
for level, patterns in self.patterns.items():
for pattern in patterns:
if re.search(pattern, line, re.IGNORECASE):
results['summary'][level] += 1
if level == 'error':
results['errors'].append(line)
elif level == 'warning':
results['warnings'].append(line)
break
except Exception as e:
logging.error(f"Failed to analyze log file {file_path}: {e}")
def analyze_elasticsearch_logs(self, source: Dict[str, str], results: Dict[str, Any], time_range: str):
"""分析 Elasticsearch 日志"""
try:
from elasticsearch import Elasticsearch
es = Elasticsearch([source['url']])
query = {
"query": {
"range": {
"@timestamp": {
"gte": f"now-{time_range}"
}
}
},
"aggs": {
"log_levels": {
"terms": {
"field": "level.keyword"
}
}
}
}
response = es.search(index=source['index'], body=query)
# 处理聚合结果
for bucket in response['aggregations']['log_levels']['buckets']:
level = bucket['key'].lower()
count = bucket['doc_count']
results['summary'][level] += count
# 获取错误日志
error_query = {
"query": {
"bool": {
"must": [
{"range": {"@timestamp": {"gte": f"now-{time_range}"}}},
{"terms": {"level.keyword": ["ERROR", "FATAL"]}}
]
}
},
"sort": [{"@timestamp": {"order": "desc"}}],
"size": 100
}
error_response = es.search(index=source['index'], body=error_query)
for hit in error_response['hits']['hits']:
results['errors'].append(hit['_source'].get('message', ''))
except Exception as e:
logging.error(f"Failed to analyze Elasticsearch logs: {e}")
def generate_report(self, analysis_results: Dict[str, Any]) -> str:
"""生成分析报告"""
report = []
report.append("=" * 60)
report.append("LOG ANALYSIS REPORT")
report.append("=" * 60)
# 摘要
summary = analysis_results['summary']
report.append(f"\nSummary:")
for level, count in summary.items():
report.append(f" {level.upper()}: {count}")
# 错误详情
errors = analysis_results['errors']
if errors:
report.append(f"\nTop Errors ({len(errors)}):")
for i, error in enumerate(errors[:10], 1):
report.append(f" {i}. {error[:100]}...")
# 警告详情
warnings = analysis_results['warnings']
if warnings:
report.append(f"\nTop Warnings ({len(warnings)}):")
for i, warning in enumerate(warnings[:10], 1):
report.append(f" {i}. {warning[:100]}...")
return '\n'.join(report)
# 使用示例
if __name__ == "__main__":
# Prometheus 监控
monitor = PrometheusMonitor('http://localhost:9090')
monitor.start_metrics_server()
# 告警管理器
alert_manager = AlertManager(
webhook_url='https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK',
email_config={
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'use_tls': True,
'username': 'alerts@example.com',
'password': 'your_password',
'from': 'alerts@example.com',
'to': ['admin@example.com']
}
)
# 添加告警规则
alert_manager.add_alert_rule({
'name': 'High CPU Usage',
'metric': 'cpu_usage',
'threshold': 80,
'operator': '>',
'severity': 'warning',
'message': 'CPU usage is above 80%'
})
alert_manager.add_alert_rule({
'name': 'High Memory Usage',
'metric': 'memory_usage',
'threshold': 90,
'operator': '>',
'severity': 'critical',
'message': 'Memory usage is above 90%'
})
# 日志分析器
log_analyzer = LogAnalyzer([
{'type': 'file', 'path': '/var/log/application.log'},
{'type': 'elasticsearch', 'url': 'http://localhost:9200', 'index': 'logs-*'}
])
# 监控循环
def monitoring_loop():
while True:
try:
# 更新系统指标
monitor.update_system_metrics()
# 检查服务健康状态
monitor.check_service_health('web-app', 'http://localhost:8080/health')
monitor.check_service_health('api', 'http://localhost:8081/health')
# 获取当前指标值
import psutil
metrics = {
'cpu_usage': psutil.cpu_percent(),
'memory_usage': psutil.virtual_memory().percent
}
# 检查告警
alerts = alert_manager.check_alerts(metrics)
for alert in alerts:
alert_manager.send_alert(alert)
# 分析日志
log_results = log_analyzer.analyze_logs('1h')
if log_results['summary']['error'] > 10:
alert = {
'rule_name': 'High Error Rate',
'metric': 'error_count',
'value': log_results['summary']['error'],
'threshold': 10,
'operator': '>',
'severity': 'warning',
'message': f"High error rate detected: {log_results['summary']['error']} errors in the last hour",
'timestamp': time.time()
}
alert_manager.send_alert(alert)
time.sleep(60) # 每分钟检查一次
except Exception as e:
logging.error(f"Monitoring loop error: {e}")
time.sleep(60)
# 启动监控线程
monitoring_thread = threading.Thread(target=monitoring_loop, daemon=True)
monitoring_thread.start()
print("Monitoring system started. Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Monitoring system stopped.")
DevOps 最佳实践#
代码质量和安全#
代码质量检查工具#
#!/usr/bin/env python3
"""
代码质量检查工具集成
"""
import subprocess
import json
import os
from typing import Dict, List, Any
import yaml
class CodeQualityChecker:
def __init__(self, project_path: str):
self.project_path = project_path
self.results = {}
def run_flake8(self) -> Dict[str, Any]:
"""运行 flake8 代码风格检查"""
try:
result = subprocess.run(
['flake8', '--format=json', self.project_path],
capture_output=True, text=True, cwd=self.project_path
)
if result.stdout:
violations = json.loads(result.stdout)
else:
violations = []
return {
'tool': 'flake8',
'status': 'success' if result.returncode == 0 else 'failed',
'violations': violations,
'total_violations': len(violations)
}
except Exception as e:
return {'tool': 'flake8', 'status': 'error', 'error': str(e)}
def run_bandit(self) -> Dict[str, Any]:
"""运行 bandit 安全检查"""
try:
result = subprocess.run(
['bandit', '-r', self.project_path, '-f', 'json'],
capture_output=True, text=True, cwd=self.project_path
)
if result.stdout:
report = json.loads(result.stdout)
issues = report.get('results', [])
else:
issues = []
return {
'tool': 'bandit',
'status': 'success',
'issues': issues,
'total_issues': len(issues),
'high_severity': len([i for i in issues if i.get('issue_severity') == 'HIGH']),
'medium_severity': len([i for i in issues if i.get('issue_severity') == 'MEDIUM']),
'low_severity': len([i for i in issues if i.get('issue_severity') == 'LOW'])
}
except Exception as e:
return {'tool': 'bandit', 'status': 'error', 'error': str(e)}
def run_pytest_coverage(self) -> Dict[str, Any]:
"""运行测试覆盖率检查"""
try:
result = subprocess.run(
['pytest', '--cov=src', '--cov-report=json', '--cov-report=term'],
capture_output=True, text=True, cwd=self.project_path
)
# 读取覆盖率报告
coverage_file = os.path.join(self.project_path, 'coverage.json')
if os.path.exists(coverage_file):
with open(coverage_file, 'r') as f:
coverage_data = json.load(f)
total_coverage = coverage_data['totals']['percent_covered']
else:
total_coverage = 0
return {
'tool': 'pytest-cov',
'status': 'success' if result.returncode == 0 else 'failed',
'coverage_percentage': total_coverage,
'tests_passed': 'FAILED' not in result.stdout,
'output': result.stdout
}
except Exception as e:
return {'tool': 'pytest-cov', 'status': 'error', 'error': str(e)}
def run_mypy(self) -> Dict[str, Any]:
"""运行 mypy 类型检查"""
try:
result = subprocess.run(
['mypy', self.project_path, '--json-report', '/tmp/mypy-report'],
capture_output=True, text=True, cwd=self.project_path
)
# 解析 mypy 输出
errors = []
if result.stdout:
for line in result.stdout.split('\n'):
if line.strip() and ':' in line:
errors.append(line.strip())
return {
'tool': 'mypy',
'status': 'success' if result.returncode == 0 else 'failed',
'errors': errors,
'total_errors': len(errors)
}
except Exception as e:
return {'tool': 'mypy', 'status': 'error', 'error': str(e)}
def run_all_checks(self) -> Dict[str, Any]:
"""运行所有代码质量检查"""
print("Running code quality checks...")
self.results = {
'flake8': self.run_flake8(),
'bandit': self.run_bandit(),
'coverage': self.run_pytest_coverage(),
'mypy': self.run_mypy()
}
# 计算总体评分
score = self.calculate_quality_score()
self.results['overall_score'] = score
return self.results
def calculate_quality_score(self) -> Dict[str, Any]:
"""计算代码质量评分"""
score = 100
issues = []
# Flake8 扣分
flake8_violations = self.results['flake8'].get('total_violations', 0)
if flake8_violations > 0:
deduction = min(flake8_violations * 2, 30)
score -= deduction
issues.append(f"Flake8 violations: -{deduction} points")
# Bandit 扣分
bandit_high = self.results['bandit'].get('high_severity', 0)
bandit_medium = self.results['bandit'].get('medium_severity', 0)
if bandit_high > 0:
deduction = bandit_high * 10
score -= deduction
issues.append(f"High security issues: -{deduction} points")
if bandit_medium > 0:
deduction = bandit_medium * 5
score -= deduction
issues.append(f"Medium security issues: -{deduction} points")
# 测试覆盖率扣分
coverage = self.results['coverage'].get('coverage_percentage', 0)
if coverage < 80:
deduction = (80 - coverage) * 0.5
score -= deduction
issues.append(f"Low test coverage ({coverage}%): -{deduction:.1f} points")
# MyPy 扣分
mypy_errors = self.results['mypy'].get('total_errors', 0)
if mypy_errors > 0:
deduction = min(mypy_errors * 1, 20)
score -= deduction
issues.append(f"Type errors: -{deduction} points")
score = max(score, 0)
return {
'score': round(score, 1),
'grade': self.get_grade(score),
'issues': issues
}
def get_grade(self, score: float) -> str:
"""根据分数获取等级"""
if score >= 90:
return 'A'
elif score >= 80:
return 'B'
elif score >= 70:
return 'C'
elif score >= 60:
return 'D'
else:
return 'F'
def generate_report(self) -> str:
"""生成质量报告"""
if not self.results:
self.run_all_checks()
report = []
report.append("=" * 60)
report.append("CODE QUALITY REPORT")
report.append("=" * 60)
# 总体评分
overall = self.results['overall_score']
report.append(f"\nOverall Score: {overall['score']}/100 (Grade: {overall['grade']})")
if overall['issues']:
report.append("\nIssues:")
for issue in overall['issues']:
report.append(f" - {issue}")
# 详细结果
report.append("\nDetailed Results:")
for tool, result in self.results.items():
if tool == 'overall_score':
continue
report.append(f"\n{tool.upper()}:")
report.append(f" Status: {result.get('status', 'unknown')}")
if tool == 'flake8':
report.append(f" Violations: {result.get('total_violations', 0)}")
elif tool == 'bandit':
report.append(f" Security Issues: {result.get('total_issues', 0)}")
report.append(f" High: {result.get('high_severity', 0)}")
report.append(f" Medium: {result.get('medium_severity', 0)}")
report.append(f" Low: {result.get('low_severity', 0)}")
elif tool == 'coverage':
report.append(f" Coverage: {result.get('coverage_percentage', 0)}%")
report.append(f" Tests Passed: {result.get('tests_passed', False)}")
elif tool == 'mypy':
report.append(f" Type Errors: {result.get('total_errors', 0)}")
return '\n'.join(report)
class SecurityScanner:
def __init__(self):
self.scan_results = {}
def scan_dependencies(self, requirements_file: str) -> Dict[str, Any]:
"""扫描依赖包安全漏洞"""
try:
# 使用 safety 扫描已知漏洞
result = subprocess.run(
['safety', 'check', '--json', '-r', requirements_file],
capture_output=True, text=True
)
if result.stdout:
vulnerabilities = json.loads(result.stdout)
else:
vulnerabilities = []
return {
'tool': 'safety',
'status': 'success',
'vulnerabilities': vulnerabilities,
'total_vulnerabilities': len(vulnerabilities)
}
except Exception as e:
return {'tool': 'safety', 'status': 'error', 'error': str(e)}
def scan_docker_image(self, image_name: str) -> Dict[str, Any]:
"""扫描 Docker 镜像安全漏洞"""
try:
# 使用 trivy 扫描镜像
result = subprocess.run(
['trivy', 'image', '--format', 'json', image_name],
capture_output=True, text=True
)
if result.stdout:
scan_result = json.loads(result.stdout)
vulnerabilities = []
for target in scan_result.get('Results', []):
vulnerabilities.extend(target.get('Vulnerabilities', []))
else:
vulnerabilities = []
# 按严重程度分类
severity_count = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
for vuln in vulnerabilities:
severity = vuln.get('Severity', 'UNKNOWN')
if severity in severity_count:
severity_count[severity] += 1
return {
'tool': 'trivy',
'status': 'success',
'vulnerabilities': vulnerabilities,
'total_vulnerabilities': len(vulnerabilities),
'severity_breakdown': severity_count
}
except Exception as e:
return {'tool': 'trivy', 'status': 'error', 'error': str(e)}
### 性能优化和监控
#### 应用性能监控
```python
#!/usr/bin/env python3
"""
应用性能监控工具
"""
import time
import functools
import threading
from typing import Dict, Any, Callable
import statistics
from collections import defaultdict, deque
class PerformanceMonitor:
def __init__(self, max_samples: int = 1000):
self.max_samples = max_samples
self.metrics = defaultdict(lambda: deque(maxlen=max_samples))
self.counters = defaultdict(int)
self.lock = threading.Lock()
def timing_decorator(self, name: str = None):
"""性能计时装饰器"""
def decorator(func: Callable) -> Callable:
metric_name = name or f"{func.__module__}.{func.__name__}"
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
self.record_success(metric_name, time.time() - start_time)
return result
except Exception as e:
self.record_error(metric_name, time.time() - start_time)
raise
return wrapper
return decorator
def record_success(self, metric_name: str, duration: float):
"""记录成功的操作"""
with self.lock:
self.metrics[f"{metric_name}.duration"].append(duration)
self.counters[f"{metric_name}.success"] += 1
def record_error(self, metric_name: str, duration: float):
"""记录失败的操作"""
with self.lock:
self.metrics[f"{metric_name}.duration"].append(duration)
self.counters[f"{metric_name}.error"] += 1
def get_statistics(self, metric_name: str) -> Dict[str, Any]:
"""获取指标统计信息"""
duration_key = f"{metric_name}.duration"
success_key = f"{metric_name}.success"
error_key = f"{metric_name}.error"
with self.lock:
durations = list(self.metrics[duration_key])
success_count = self.counters[success_key]
error_count = self.counters[error_key]
if not durations:
return {'error': 'No data available'}
total_requests = success_count + error_count
error_rate = (error_count / total_requests) * 100 if total_requests > 0 else 0
return {
'total_requests': total_requests,
'success_count': success_count,
'error_count': error_count,
'error_rate_percent': round(error_rate, 2),
'avg_duration': round(statistics.mean(durations), 4),
'median_duration': round(statistics.median(durations), 4),
'min_duration': round(min(durations), 4),
'max_duration': round(max(durations), 4),
'p95_duration': round(statistics.quantiles(durations, n=20)[18], 4) if len(durations) >= 20 else None,
'p99_duration': round(statistics.quantiles(durations, n=100)[98], 4) if len(durations) >= 100 else None
}
def get_all_metrics(self) -> Dict[str, Dict[str, Any]]:
"""获取所有指标"""
metrics = {}
# 获取所有唯一的指标名称
metric_names = set()
for key in list(self.metrics.keys()) + list(self.counters.keys()):
if key.endswith('.duration') or key.endswith('.success') or key.endswith('.error'):
base_name = key.rsplit('.', 1)[0]
metric_names.add(base_name)
for metric_name in metric_names:
metrics[metric_name] = self.get_statistics(metric_name)
return metrics
def generate_report(self) -> str:
"""生成性能报告"""
metrics = self.get_all_metrics()
report = []
report.append("=" * 60)
report.append("PERFORMANCE MONITORING REPORT")
report.append("=" * 60)
for metric_name, stats in metrics.items():
if 'error' in stats:
continue
report.append(f"\n{metric_name}:")
report.append(f" Total Requests: {stats['total_requests']}")
report.append(f" Success Rate: {100 - stats['error_rate_percent']:.2f}%")
report.append(f" Error Rate: {stats['error_rate_percent']:.2f}%")
report.append(f" Average Duration: {stats['avg_duration']}s")
report.append(f" Median Duration: {stats['median_duration']}s")
report.append(f" 95th Percentile: {stats['p95_duration']}s")
report.append(f" 99th Percentile: {stats['p99_duration']}s")
return '\n'.join(report)
## 全局性能监控实例
perf_monitor = PerformanceMonitor()
## 使用示例
@perf_monitor.timing_decorator("database.query")
def database_query(query: str):
"""模拟数据库查询"""
import random
time.sleep(random.uniform(0.01, 0.1)) # 模拟查询时间
if random.random() < 0.05: # 5% 的错误率
raise Exception("Database connection failed")
return f"Results for: {query}"
@perf_monitor.timing_decorator("api.request")
def api_request(endpoint: str):
"""模拟 API 请求"""
import random
time.sleep(random.uniform(0.05, 0.2)) # 模拟请求时间
if random.random() < 0.02: # 2% 的错误率
raise Exception("API request failed")
return f"Response from: {endpoint}"
if __name__ == "__main__":
# 代码质量检查
quality_checker = CodeQualityChecker('/path/to/your/project')
quality_report = quality_checker.generate_report()
print(quality_report)
# 安全扫描
security_scanner = SecurityScanner()
# 扫描依赖包
dep_scan = security_scanner.scan_dependencies('requirements.txt')
print(f"Dependency scan: {dep_scan['total_vulnerabilities']} vulnerabilities found")
# 性能监控示例
print("\nRunning performance tests...")
for i in range(100):
try:
database_query(f"SELECT * FROM table_{i}")
api_request(f"/api/endpoint_{i}")
except Exception:
pass # 忽略模拟的错误
# 生成性能报告
perf_report = perf_monitor.generate_report()
print(perf_report)
总结与展望#
DevOps 成功要素#
- 文化转变: 打破部门壁垒,建立协作文化
- 自动化优先: 将重复性工作自动化
- 持续改进: 基于数据驱动的决策
- 快速反馈: 建立快速反馈循环
- 安全集成: 将安全融入到整个开发生命周期
Python 在 DevOps 中的优势#
- 生态丰富: 大量的第三方库和工具
- 易于学习: 语法简洁,上手容易
- 跨平台: 支持多种操作系统
- 社区活跃: 持续的更新和支持
- 集成能力强: 易于与其他工具集成
学习路径建议#
基础阶段
- Python 编程基础
- Linux 系统管理
- 版本控制 (Git)
- 基础网络知识
进阶阶段
- 容器化技术 (Docker, Kubernetes)
- CI/CD 流水线设计
- 基础设施即代码 (Terraform, Ansible)
- 监控和日志管理
高级阶段
- 微服务架构
- 云原生技术
- 安全最佳实践
- 性能优化
推荐工具和资源#
必备工具#
- 版本控制: Git, GitLab, GitHub
- CI/CD: Jenkins, GitLab CI, GitHub Actions
- 容器化: Docker, Kubernetes, Helm
- 监控: Prometheus, Grafana, ELK Stack
- 基础设施: Terraform, Ansible, Pulumi
- 云平台: AWS, Azure, GCP
学习资源#
- 官方文档: 各工具的官方文档
- 在线课程: Coursera, Udemy, Pluralsight
- 实践平台: Katacoda, Play with Docker
- 社区: DevOps 社区, Stack Overflow
- 书籍: 《Phoenix Project》, 《DevOps Handbook》
未来趋势#
- GitOps: 基于 Git 的运维模式
- AIOps: 人工智能运维
- Serverless: 无服务器架构
- Edge Computing: 边缘计算
- Security as Code: 安全即代码
通过系统学习和实践,结合 Python 的强大能力,您将能够构建高效、可靠、安全的 DevOps 体系,推动组织的数字化转型和业务发展。记住,DevOps 不仅仅是工具和技术,更是一种文化和思维方式的转变。
