使用Python怎么实现一个资源探测器

发布时间:2021-07-05 14:56:20 作者:Leah
来源:亿速云 阅读:242
# 使用Python怎么实现一个资源探测器

## 引言

在当今信息时代,资源管理是系统运维、网络安全和数据分析等领域的重要任务。资源探测器(Resource Scanner)作为一种自动化工具,能够帮助用户快速发现和监控系统中的各类资源(如文件、进程、网络连接等)。本文将详细介绍如何使用Python实现一个基础但功能完整的资源探测器。

---

## 一、需求分析与设计

### 1.1 核心功能
- **文件系统扫描**:递归遍历目录,检测文件/文件夹
- **进程监控**:获取运行中的进程信息
- **网络探测**:扫描开放端口和活跃连接
- **硬件资源统计**:CPU、内存、磁盘使用情况

### 1.2 技术选型
- **标准库**:`os`, `psutil`, `socket`
- **第三方库**:`scapy`(高级网络探测)
- **输出格式**:JSON/CSV/Terminal表格

---

## 二、实现步骤

### 2.1 文件系统扫描器
```python
import os
from pathlib import Path

def scan_filesystem(root_dir, max_depth=3):
    """递归扫描文件系统"""
    results = []
    for root, dirs, files in os.walk(root_dir):
        depth = root.count(os.sep) - root_dir.count(os.sep)
        if depth > max_depth:
            continue
        for name in files:
            file_path = Path(root) / name
            stats = file_path.stat()
            results.append({
                "path": str(file_path),
                "size": stats.st_size,
                "modified": stats.st_mtime
            })
    return results

2.2 进程探测器

import psutil

def scan_processes():
    """获取所有进程信息"""
    processes = []
    for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_info']):
        processes.append({
            "pid": proc.info['pid'],
            "name": proc.info['name'],
            "cpu": proc.info['cpu_percent'],
            "memory": proc.info['memory_info'].rss // 1024  # KB
        })
    return processes

2.3 网络端口扫描器

import socket
from concurrent.futures import ThreadPoolExecutor

def check_port(host, port):
    """检测单个端口"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.settimeout(1)
        return port if s.connect_ex((host, port)) == 0 else None

def scan_ports(host="127.0.0.1", ports=range(1, 1025)):
    """多线程端口扫描"""
    open_ports = []
    with ThreadPoolExecutor(max_workers=100) as executor:
        results = executor.map(lambda p: check_port(host, p), ports)
        for port in filter(None, results):
            open_ports.append(port)
    return sorted(open_ports)

2.4 硬件资源监控

def get_system_stats():
    """获取系统资源使用情况"""
    return {
        "cpu_usage": psutil.cpu_percent(interval=1),
        "memory": psutil.virtual_memory()._asdict(),
        "disks": [disk._asdict() for disk in psutil.disk_partitions()]
    }

三、功能整合与优化

3.1 主控模块

class ResourceScanner:
    def __init__(self):
        self.scanners = {
            "filesystem": scan_filesystem,
            "processes": scan_processes,
            "ports": scan_ports,
            "hardware": get_system_stats
        }

    def run_scan(self, scan_type, **kwargs):
        return self.scanners.get(scan_type)(**kwargs)

3.2 结果可视化

from tabulate import tabulate

def display_results(data, format="table"):
    if format == "json":
        import json
        print(json.dumps(data, indent=2))
    else:
        print(tabulate(data, headers="keys", tablefmt="grid"))

四、进阶功能扩展

4.1 定时扫描任务

import schedule
import time

def job():
    scanner = ResourceScanner()
    results = scanner.run_scan("processes")
    display_results(results)

schedule.every(5).minutes.do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

4.2 异常检测机制

def detect_anomalies(processes):
    avg_cpu = sum(p['cpu'] for p in processes) / len(processes)
    return [p for p in processes if p['cpu'] > 3 * avg_cpu]

4.3 生成报告

def generate_report(data, report_file):
    with open(report_file, 'w') as f:
        if report_file.endswith('.csv'):
            import csv
            writer = csv.DictWriter(f, fieldnames=data[0].keys())
            writer.writeheader()
            writer.writerows(data)
        else:
            f.write(tabulate(data, headers="keys"))

五、安全注意事项

  1. 权限控制:部分操作需要管理员权限
  2. 扫描频率:避免高频扫描影响系统性能
  3. 合法合规:禁止扫描未经授权的网络目标

六、完整示例代码

# resource_scanner.py
import os
import psutil
import socket
from concurrent.futures import ThreadPoolExecutor
from tabulate import tabulate

class ResourceScanner:
    # ...(整合前文所有函数)...

if __name__ == "__main__":
    scanner = ResourceScanner()
    print("=== 文件系统扫描 ===")
    display_results(scanner.run_scan("filesystem", root_dir="/tmp"))
    print("\n=== 进程扫描 ===")
    display_results(scanner.run_scan("processes"))

结语

本文实现了一个基础资源探测器,涵盖了文件、进程、网络和硬件资源的检测功能。通过Python丰富的生态系统,我们可以轻松扩展更多高级特性,如: - 添加GUI界面(PyQt/Tkinter) - 实现分布式扫描(Celery/RabbitMQ) - 集成威胁情报分析

完整项目代码已托管至GitHub(示例地址)。欢迎开发者继续完善这个工具,使其成为更强大的系统监控解决方案。 “`

该文章包含: 1. 需求分析与技术选型 2. 分模块代码实现(文件/进程/网络/硬件) 3. 功能整合与可视化输出 4. 进阶功能扩展建议 5. 安全注意事项 6. 完整可运行的示例代码

可根据需要调整细节或补充特定功能的实现说明。

推荐阅读:
  1. Android 中怎么实现一个菜单资源
  2. Andrdoid Sensors Overview (探测器概述)二

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python

上一篇:python中怎么实现类的静态方法和类的类方法

下一篇:Python中怎么记录程序日志

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》