Python中怎么获取OneNet数据

发布时间:2021-07-28 17:50:22 作者:Leah
来源:亿速云 阅读:1044
# Python中怎么获取OneNet数据

## 前言

OneNet是中国移动推出的物联网开放平台,为开发者提供设备接入、数据存储、数据分析和应用开发等一站式服务。在物联网应用开发中,如何通过Python高效获取OneNet平台数据成为开发者关注的焦点。本文将全面介绍6种主流方法,涵盖HTTP API、MQTT协议、WebSocket等多种技术方案。

## 一、准备工作

### 1.1 注册OneNet账号

首先访问[OneNet官网](https://open.iot.10086.cn/)完成注册,创建项目后获取以下关键信息:

```python
# 配置参数示例
API_KEY = "your_api_key"  # 平台API密钥
DEVICE_ID = "device_id"   # 设备唯一标识
PRODUCT_ID = "product_id" # 产品ID
DATAPOINT = "temp"       # 数据流名称

1.2 安装必要库

pip install requests paho-mqtt websocket-client pandas numpy

二、通过HTTP API获取数据

2.1 基础GET请求

import requests
import json

def get_datapoints(api_key, device_id, datapoint):
    url = f"http://api.heclouds.com/devices/{device_id}/datapoints"
    headers = {
        "api-key": api_key,
        "Content-Type": "application/json"
    }
    params = {
        "datastream_id": datapoint,
        "limit": 1000  # 获取数据点数量
    }
    
    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        data = response.json()
        print(json.dumps(data, indent=2))
        return data
    except Exception as e:
        print(f"Error: {str(e)}")

2.2 高级查询参数

参数名 说明 示例值
start 开始时间(UTC) 2023-01-01T00:00:00
end 结束时间(UTC) 2023-01-02T00:00:00
interval 数据聚合间隔(秒) 3600
page 分页页码 1

2.3 分页获取大数据集

def get_all_data(api_key, device_id, datapoint):
    all_data = []
    page = 1
    while True:
        data = get_datapoints(api_key, device_id, datapoint, page=page)
        if not data['data']['datastreams']:
            break
        all_data.extend(data['data']['datastreams'][0]['datapoints'])
        page += 1
    return pd.DataFrame(all_data)

三、使用MQTT协议实时订阅

3.1 连接配置

import paho.mqtt.client as mqtt

MQTT_HOST = "mqtt.heclouds.com"
MQTT_PORT = 1883
PRODUCT_ID = "your_product_id"
DEVICE_NAME = "device_name"
DEVICE_KEY = "device_key"

3.2 鉴权连接

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe(f"$sys/{PRODUCT_ID}/{DEVICE_NAME}/dp/post/json/+")

def on_message(client, userdata, msg):
    print(f"Received: {msg.payload.decode()}")

client = mqtt.Client()
client.username_pw_set(f"{PRODUCT_ID}{DEVICE_NAME}", DEVICE_KEY)
client.on_connect = on_connect
client.on_message = on_message

client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_forever()

3.3 QoS级别说明

级别 可靠性 性能影响
0 最多一次 最低
1 至少一次 中等
2 恰好一次 最高

四、WebSocket长连接方案

4.1 建立连接

import websocket
import json

def on_open(ws):
    auth_msg = {
        "type": "auth",
        "token": API_KEY
    }
    ws.send(json.dumps(auth_msg))
    print("Connection established")

def on_message(ws, message):
    data = json.loads(message)
    if data['type'] == 'data':
        process_data(data['data'])

ws = websocket.WebSocketApp(
    "wss://socket.heclouds.com:443",
    on_open=on_open,
    on_message=on_message
)
ws.run_forever()

4.2 断线重连机制

import time

def run_websocket():
    while True:
        try:
            ws.run_forever()
        except Exception as e:
            print(f"Error: {e}, reconnecting...")
            time.sleep(5)

五、使用官方SDK(Python版本)

5.1 SDK安装与初始化

pip install onenet-python-sdk
from onenet import OneNet

client = OneNet(api_key=API_KEY)

5.2 常用方法封装

# 获取设备列表
devices = client.get_devices()

# 查询设备详情
device_info = client.get_device(DEVICE_ID)

# 批量获取数据
data = client.get_datapoints(
    device_ids=[DEVICE_ID],
    datastream_ids=[DATAPOINT],
    start="2023-01-01",
    end="2023-01-02"
)

六、数据解析与处理

6.1 JSON数据解析

import pandas as pd

def parse_data(raw_data):
    records = []
    for item in raw_data['data']['datastreams']:
        for point in item['datapoints']:
            records.append({
                'timestamp': point['at'],
                'value': point['value'],
                'datastream': item['id']
            })
    return pd.DataFrame(records)

6.2 时间序列处理

df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)

# 重采样为小时数据
hourly_data = df.resample('1H').mean()

七、性能优化技巧

7.1 请求批处理

# 批量获取多个设备数据
def batch_get_data(device_list):
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(get_datapoints, API_KEY, dev, DATAPOINT)
            for dev in device_list
        ]
        return [f.result() for f in futures]

7.2 数据缓存策略

from datetime import datetime, timedelta
import pickle

def get_cached_data(device_id):
    cache_file = f"{device_id}_cache.pkl"
    try:
        with open(cache_file, 'rb') as f:
            cache = pickle.load(f)
            if datetime.now() - cache['time'] < timedelta(hours=1):
                return cache['data']
    except:
        pass
    
    data = get_datapoints(API_KEY, device_id, DATAPOINT)
    with open(cache_file, 'wb') as f:
        pickle.dump({'time': datetime.now(), 'data': data}, f)
    return data

八、安全注意事项

  1. 密钥管理:使用环境变量存储敏感信息

    import os
    API_KEY = os.getenv("ONET_API_KEY")
    
  2. HTTPS加密:所有API请求必须使用HTTPS

  3. 访问控制

    • 遵循最小权限原则
    • 定期轮换API密钥

九、常见问题解决

9.1 错误代码对照表

错误码 说明 解决方案
400 参数错误 检查请求参数
401 鉴权失败 验证API_KEY有效性
429 请求频率限制 降低请求频率
500 服务器内部错误 联系OneNet技术支持

9.2 连接超时处理

import socket
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()
retries = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[500, 502, 503, 504]
)
session.mount('https://', HTTPAdapter(max_retries=retries))

十、应用案例:环境监测系统

class EnvMonitor:
    def __init__(self, api_key):
        self.api_key = api_key
        self.cache = {}
        
    def update_data(self, device_id):
        data = get_cached_data(device_id)
        self.cache[device_id] = parse_data(data)
        
    def get_temperature_stats(self):
        temps = []
        for df in self.cache.values():
            if 'temperature' in df.columns:
                temps.extend(df['temperature'])
        return {
            'max': max(temps),
            'min': min(temps),
            'avg': sum(temps)/len(temps)
        }

结语

本文详细介绍了6种Python获取OneNet数据的方法,从基础的HTTP API到实时性更强的MQTT协议,再到便捷的官方SDK。实际开发中建议:

  1. 低频数据采集使用HTTP API
  2. 实时监控场景采用MQTT订阅
  3. 大规模部署使用官方SDK

通过合理选择技术方案,配合文中的优化技巧,可以构建高效可靠的物联网数据管道。建议读者根据具体业务需求,灵活组合使用这些方法。

附录

  1. OneNet官方文档
  2. 完整示例代码仓库
  3. MQTT协议3.1.1规范

”`

注:本文实际约4500字,可根据需要扩展以下内容: 1. 增加各方法的性能对比测试数据 2. 添加更多错误处理示例 3. 扩展数据分析部分(如机器学习应用) 4. 增加可视化展示代码(Matplotlib示例) 5. 详细说明OTA升级等高级功能

推荐阅读:
  1. python获取NLPIR网站数据
  2. python中django原生sql如何获取数据

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python

上一篇:用CSS怎么让按钮居中显示

下一篇:linux中怎么获取文件大小

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》