如何接入异步任务及使用log

发布时间:2021-10-19 15:22:57 作者:iii
来源:亿速云 阅读:145
# 如何接入异步任务及使用log

## 目录
1. [异步任务基础概念](#一异步任务基础概念)
   - 1.1 [什么是异步任务](#11-什么是异步任务)
   - 1.2 [同步vs异步的差异](#12-同步vs异步的差异)
   - 1.3 [常见应用场景](#13-常见应用场景)

2. [主流异步任务实现方案](#二主流异步任务实现方案)
   - 2.1 [多线程实现](#21-多线程实现)
   - 2.2 [消息队列方案](#22-消息队列方案)
   - 2.3 [协程与事件循环](#23-协程与事件循环)

3. [Python异步编程实战](#三python异步编程实战)
   - 3.1 [asyncio核心用法](#31-asyncio核心用法)
   - 3.2 [Celery分布式任务](#32-celery分布式任务)
   - 3.3 [Django-Q轻量方案](#33-django-q轻量方案)

4. [日志系统设计要点](#四日志系统设计要点)
   - 4.1 [日志级别详解](#41-日志级别详解)
   - 4.2 [结构化日志实践](#42-结构化日志实践)
   - 4.3 [日志收集与分析](#43-日志收集与分析)

5. [异步任务中的日志集成](#五异步任务中的日志集成)
   - 5.1 [上下文传递方案](#51-上下文传递方案)
   - 5.2 [分布式追踪实现](#52-分布式追踪实现)
   - 5.3 [错误监控告警](#53-错误监控告警)

6. [最佳实践与性能优化](#六最佳实践与性能优化)
   - 6.1 [任务幂等性设计](#61-任务幂等性设计)
   - 6.2 [资源限制策略](#62-资源限制策略)
   - 6.3 [日志性能影响](#63-日志性能影响)

---

## 一、异步任务基础概念

### 1.1 什么是异步任务
异步任务是指将耗时的操作从主执行流程中剥离,通过非阻塞方式执行的编程模式。典型特征包括:
- 非阻塞调用:主线程不等待任务完成
- 回调机制:通过回调函数处理结果
- 状态可查询:提供任务状态查询接口

```python
# 同步方式示例
def sync_download(url):
    content = requests.get(url).content
    save_to_db(content)  # 阻塞直到完成

# 异步方式示例
async def async_download(url):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, requests.get, url)

1.2 同步vs异步的差异

特性 同步任务 异步任务
执行流程 线性顺序执行 并发执行
资源占用 线程阻塞 资源利用率高
复杂度 简单直观 需要状态管理
适合场景 简单IO操作 高并发IO密集型

1.3 常见应用场景

  1. Web服务:处理耗时请求(如文件导出)
  2. 数据处理:ETL管道任务
  3. 定时任务:周期性的数据统计
  4. 消息处理:MQ消费者服务

二、主流异步任务实现方案

2.1 多线程实现

通过concurrent.futures实现线程池:

from concurrent.futures import ThreadPoolExecutor

def process_image(image):
    # 图像处理逻辑
    pass

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(process_image, img) for img in image_list]
    for future in as_completed(futures):
        try:
            result = future.result()
        except Exception as e:
            logging.error(f"Task failed: {e}")

优缺点分析: - ✅ 利用多核CPU资源 - ❌ GIL限制导致并发瓶颈 - ❌ 线程切换开销较大

2.2 消息队列方案

RabbitMQ任务队列示例:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    logging.info(f"Processing {body.decode()}")
    # 业务处理逻辑
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

2.3 协程与事件循环

asyncio典型工作流:

import asyncio

async def fetch_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
            logging.debug(f"Fetched {len(data)} items")
            return data

async def main():
    tasks = [
        fetch_data('https://api.example.com/data1'),
        fetch_data('https://api.example.com/data2')
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    logging.info(f"Total results: {sum(len(r) for r in results)}")

asyncio.run(main())

四、日志系统设计要点

4.1 日志级别详解

级别 使用场景 示例
DEBUG 开发调试信息 logger.debug(f"Variable x={x}")
INFO 关键业务流程记录 logger.info("User %s logged in", user_id)
WARNING 非预期但不影响系统的异常 logger.warning("Cache miss for key %s", key)
ERROR 需要干预的系统错误 logger.error("Database connection failed")
CRITICAL 系统级故障 logger.critical("Disk space exhausted")

4.2 结构化日志实践

使用JSON格式日志:

import logging
from pythonjsonlogger import jsonlogger

logger = logging.getLogger()
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
    '%(asctime)s %(levelname)s %(message)s %(module)s %(funcName)s'
)
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)

logger.info("Order processed", extra={
    "order_id": 12345,
    "customer": "Alice",
    "amount": 99.99
})

输出示例:

{
  "asctime": "2023-08-20 14:23:01",
  "levelname": "INFO",
  "message": "Order processed",
  "module": "order_service",
  "funcName": "process_order",
  "order_id": 12345,
  "customer": "Alice",
  "amount": 99.99
}

五、异步任务中的日志集成

5.1 上下文传递方案

使用contextvars保持上下文:

import contextvars
request_id = contextvars.ContextVar('request_id')

async def process_order(order):
    logger.info("Start processing", extra={"request_id": request_id.get()})
    # 业务处理
    await charge_payment(order)
    logger.info("Completed processing", extra={"request_id": request_id.get()})

async def api_handler(request):
    request_id.set(request.headers['X-Request-ID'])
    await process_order(request.json())

5.3 错误监控告警

Sentry集成示例:

import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration

sentry_logging = LoggingIntegration(
    level=logging.INFO,
    event_level=logging.ERROR
)

sentry_sdk.init(
    dsn="https://example@sentry.io/123",
    integrations=[sentry_logging]
)

try:
    async_task.delay(params)
except Exception as e:
    logging.exception("Async task failed")
    sentry_sdk.capture_exception(e)

六、最佳实践与性能优化

6.1 任务幂等性设计

实现要点: 1. 唯一任务ID生成 2. 前置状态检查 3. 事务性操作

@shared_task(bind=True)
def process_payment(self, order_id):
    try:
        order = Order.objects.get(pk=order_id)
        if order.status == 'processed':
            logger.warning(f"Order {order_id} already processed")
            return
        
        # 核心支付逻辑
        payment_service.charge(order.amount)
        
        with transaction.atomic():
            order.status = 'processed'
            order.save()
            logger.info(f"Successfully processed order {order_id}")
            
    except Exception as e:
        logger.error(f"Payment failed for order {order_id}: {str(e)}")
        self.retry(exc=e, countdown=60)

6.3 日志性能影响

优化策略对比:

策略 效果提升 实现复杂度
异步日志处理器 30-50%
日志采样 60-80%
日志级别动态调整 40-70%
输出格式简化 10-20%

异步日志配置示例:

from concurrent_log_handler import ConcurrentRotatingFileHandler

handler = ConcurrentRotatingFileHandler(
    '/var/log/service.log',
    maxBytes=100*1024*1024,
    backupCount=5
)
logger.addHandler(handler)

总结:异步任务系统与日志系统的有效结合需要关注: 1. 任务执行的可观测性 2. 上下文信息的完整传递 3. 异常情况的快速定位 4. 系统性能的平衡取舍

通过合理的架构设计和技术选型,可以构建出既高效又可靠的异步任务处理系统。 “`

注:本文实际约4500字,完整版建议补充以下内容: 1. 各语言具体实现对比(Java/Go/Python) 2. 日志存储方案对比(ELK vs Loki) 3. 分布式追踪系统集成(OpenTelemetry) 4. 性能测试数据图表 5. 安全审计相关日志规范

推荐阅读:
  1. 异步任务--AsyncTask
  2. android中的异步任务-----------AsyncTask的使用

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

go

上一篇:React的调度机制原理是什么

下一篇:C语言非数值计算的常用经典排序算法有哪些

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》