您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Python中怎么获取OneNet数据
## 前言
OneNet是中国移动推出的物联网开放平台,为开发者提供设备接入、数据存储、数据分析和应用开发等一站式服务。在物联网应用开发中,如何通过Python高效获取OneNet平台数据成为开发者关注的焦点。本文将全面介绍6种主流方法,涵盖HTTP API、MQTT协议、WebSocket等多种技术方案。
## 一、准备工作
### 1.1 注册OneNet账号
首先访问[OneNet官网](https://open.iot.10086.cn/)完成注册,创建项目后获取以下关键信息:
```python
# 配置参数示例
API_KEY = "your_api_key" # 平台API密钥
DEVICE_ID = "device_id" # 设备唯一标识
PRODUCT_ID = "product_id" # 产品ID
DATAPOINT = "temp" # 数据流名称
pip install requests paho-mqtt websocket-client pandas numpy
import requests
import json
def get_datapoints(api_key, device_id, datapoint):
url = f"http://api.heclouds.com/devices/{device_id}/datapoints"
headers = {
"api-key": api_key,
"Content-Type": "application/json"
}
params = {
"datastream_id": datapoint,
"limit": 1000 # 获取数据点数量
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
print(json.dumps(data, indent=2))
return data
except Exception as e:
print(f"Error: {str(e)}")
参数名 | 说明 | 示例值 |
---|---|---|
start | 开始时间(UTC) | 2023-01-01T00:00:00 |
end | 结束时间(UTC) | 2023-01-02T00:00:00 |
interval | 数据聚合间隔(秒) | 3600 |
page | 分页页码 | 1 |
def get_all_data(api_key, device_id, datapoint):
all_data = []
page = 1
while True:
data = get_datapoints(api_key, device_id, datapoint, page=page)
if not data['data']['datastreams']:
break
all_data.extend(data['data']['datastreams'][0]['datapoints'])
page += 1
return pd.DataFrame(all_data)
import paho.mqtt.client as mqtt
MQTT_HOST = "mqtt.heclouds.com"
MQTT_PORT = 1883
PRODUCT_ID = "your_product_id"
DEVICE_NAME = "device_name"
DEVICE_KEY = "device_key"
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe(f"$sys/{PRODUCT_ID}/{DEVICE_NAME}/dp/post/json/+")
def on_message(client, userdata, msg):
print(f"Received: {msg.payload.decode()}")
client = mqtt.Client()
client.username_pw_set(f"{PRODUCT_ID}{DEVICE_NAME}", DEVICE_KEY)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_HOST, MQTT_PORT, 60)
client.loop_forever()
级别 | 可靠性 | 性能影响 |
---|---|---|
0 | 最多一次 | 最低 |
1 | 至少一次 | 中等 |
2 | 恰好一次 | 最高 |
import websocket
import json
def on_open(ws):
auth_msg = {
"type": "auth",
"token": API_KEY
}
ws.send(json.dumps(auth_msg))
print("Connection established")
def on_message(ws, message):
data = json.loads(message)
if data['type'] == 'data':
process_data(data['data'])
ws = websocket.WebSocketApp(
"wss://socket.heclouds.com:443",
on_open=on_open,
on_message=on_message
)
ws.run_forever()
import time
def run_websocket():
while True:
try:
ws.run_forever()
except Exception as e:
print(f"Error: {e}, reconnecting...")
time.sleep(5)
pip install onenet-python-sdk
from onenet import OneNet
client = OneNet(api_key=API_KEY)
# 获取设备列表
devices = client.get_devices()
# 查询设备详情
device_info = client.get_device(DEVICE_ID)
# 批量获取数据
data = client.get_datapoints(
device_ids=[DEVICE_ID],
datastream_ids=[DATAPOINT],
start="2023-01-01",
end="2023-01-02"
)
import pandas as pd
def parse_data(raw_data):
records = []
for item in raw_data['data']['datastreams']:
for point in item['datapoints']:
records.append({
'timestamp': point['at'],
'value': point['value'],
'datastream': item['id']
})
return pd.DataFrame(records)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
# 重采样为小时数据
hourly_data = df.resample('1H').mean()
# 批量获取多个设备数据
def batch_get_data(device_list):
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(get_datapoints, API_KEY, dev, DATAPOINT)
for dev in device_list
]
return [f.result() for f in futures]
from datetime import datetime, timedelta
import pickle
def get_cached_data(device_id):
cache_file = f"{device_id}_cache.pkl"
try:
with open(cache_file, 'rb') as f:
cache = pickle.load(f)
if datetime.now() - cache['time'] < timedelta(hours=1):
return cache['data']
except:
pass
data = get_datapoints(API_KEY, device_id, DATAPOINT)
with open(cache_file, 'wb') as f:
pickle.dump({'time': datetime.now(), 'data': data}, f)
return data
密钥管理:使用环境变量存储敏感信息
import os
API_KEY = os.getenv("ONET_API_KEY")
HTTPS加密:所有API请求必须使用HTTPS
访问控制:
错误码 | 说明 | 解决方案 |
---|---|---|
400 | 参数错误 | 检查请求参数 |
401 | 鉴权失败 | 验证API_KEY有效性 |
429 | 请求频率限制 | 降低请求频率 |
500 | 服务器内部错误 | 联系OneNet技术支持 |
import socket
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
retries = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504]
)
session.mount('https://', HTTPAdapter(max_retries=retries))
class EnvMonitor:
def __init__(self, api_key):
self.api_key = api_key
self.cache = {}
def update_data(self, device_id):
data = get_cached_data(device_id)
self.cache[device_id] = parse_data(data)
def get_temperature_stats(self):
temps = []
for df in self.cache.values():
if 'temperature' in df.columns:
temps.extend(df['temperature'])
return {
'max': max(temps),
'min': min(temps),
'avg': sum(temps)/len(temps)
}
本文详细介绍了6种Python获取OneNet数据的方法,从基础的HTTP API到实时性更强的MQTT协议,再到便捷的官方SDK。实际开发中建议:
通过合理选择技术方案,配合文中的优化技巧,可以构建高效可靠的物联网数据管道。建议读者根据具体业务需求,灵活组合使用这些方法。
”`
注:本文实际约4500字,可根据需要扩展以下内容: 1. 增加各方法的性能对比测试数据 2. 添加更多错误处理示例 3. 扩展数据分析部分(如机器学习应用) 4. 增加可视化展示代码(Matplotlib示例) 5. 详细说明OTA升级等高级功能
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。