Z-Wave智能家居协议
Z-Wave是一种专为智能家居设计的无线通信协议,由丹麦公司Zensys(2008年被Sigma Designs收购,2018年Silicon Labs收购Z-Wave业务)开发。该协议运行在Sub-1GHz ISM频段,具有低功耗、高可靠性、强穿透力的特点,是与Zigbee并列的两大主流智能家居协议之一。核心特性:1.3 关键技术指标指标数值说明工作频段868-928MHz因地区而异调制方式FS
Z-Wave智能家居协议
一、核心概念解析
1.1 什么是Z-Wave
Z-Wave是一种专为智能家居设计的无线通信协议,由丹麦公司Zensys(2008年被Sigma Designs收购,2018年Silicon Labs收购Z-Wave业务)开发。该协议运行在Sub-1GHz ISM频段,具有低功耗、高可靠性、强穿透力的特点,是与Zigbee并列的两大主流智能家居协议之一。
核心特性:
- 专用频段:Sub-1GHz(不同地区频段不同:美国908.4MHz,欧洲868.4MHz,中国868.4MHz)
- 低干扰:避开2.4GHz拥塞频段,与Wi-Fi/蓝牙无干扰
- Mesh组网:支持4跳中继,理论最大网络规模232个节点
- 互操作性:Z-Wave Alliance认证保证不同厂商设备互通
- 安全加密:S2安全框架,128位AES加密
- 低功耗:电池供电设备可运行1-10年
1.2 技术架构
┌──────────────────────────────────────────┐
│ Application Layer │
│ (Device Types: Switch/Sensor/Lock) │
├──────────────────────────────────────────┤
│ Command Class Layer │
│ (Basic/Switch/Sensor/Configuration) │
├──────────────────────────────────────────┤
│ Transport Layer │
│ (Single-cast/Multi-cast/Broadcast) │
├──────────────────────────────────────────┤
│ Routing Layer (Mesh Networking) │
├──────────────────────────────────────────┤
│ MAC Layer (CSMA/CA) │
├──────────────────────────────────────────┤
│ PHY Layer (FSK/GFSK Modulation) │
│ Sub-1GHz ISM Band │
└──────────────────────────────────────────┘
1.3 关键技术指标
| 指标 | 数值 | 说明 |
|---|---|---|
| 工作频段 | 868-928MHz | 因地区而异 |
| 调制方式 | FSK/GFSK | 二进制频移键控 |
| 数据速率 | 9.6/40/100kbps | Z-Wave/700系列 |
| 传输距离 | 30-100m(室内) | 取决于环境 |
| 网络规模 | 232节点 | 理论最大值 |
| 中继跳数 | 4跳 | 自动路由 |
| 功耗 | 1-50mW | 取决于设备类型 |
| 安全性 | AES-128 | S2安全框架 |
二、协议原理深度解析
2.1 物理层与MAC层
调制技术:
Z-Wave使用FSK(Frequency Shift Keying)或GFSK(Gaussian FSK)调制,通过频率偏移表示0和1。
频率偏移示例(欧洲868.4MHz):
- Bit 0:中心频率 - 偏移量(868.4MHz - 20kHz = 868.38MHz)
- Bit 1:中心频率 + 偏移量(868.4MHz + 20kHz = 868.42MHz)
MAC层协议(CSMA/CA):
Z-Wave使用载波侦听多路访问/冲突避免(CSMA/CA)机制:
def z_wave_csma_ca():
"""
Z-Wave CSMA/CA伪代码
"""
while True:
# 1. 侦听信道
if channel_is_busy():
wait_random_backoff()
continue
# 2. 信道空闲,等待IFS(Inter-Frame Space)
wait(IFS_DURATION)
# 3. 再次检查信道
if channel_is_busy():
wait_random_backoff()
continue
# 4. 发送数据帧
transmit_frame()
# 5. 等待ACK
if wait_for_ack(timeout=200ms):
return SUCCESS
else:
# 重传(最多3次)
retry_count += 1
if retry_count < 3:
continue
else:
return FAILURE
2.2 路由协议
Z-Wave使用源路由(Source Routing)机制,控制器预先计算路由路径。
路由表结构:
/* Z-Wave路由表数据结构 */
typedef struct {
uint8_t node_id; // 节点ID(1-232)
uint8_t neighbor_count; // 邻居数量
uint8_t neighbors[29]; // 邻居列表(最多29个)
uint8_t route_to[232]; // 到每个节点的下一跳
uint8_t hop_count[232]; // 到每个节点的跳数
bool is_listening; // 是否为常听节点
bool is_routing; // 是否为路由节点
uint16_t rssi_values[29]; // 邻居RSSI值
} z_wave_routing_table_t;
路由算法:
from typing import List, Dict, Tuple
class ZWaveRouter:
"""Z-Wave路由算法实现"""
def __init__(self, max_nodes: int = 232):
self.max_nodes = max_nodes
self.routing_table = {}
self.neighbor_lists = {}
def add_node(self, node_id: int, neighbors: List[int]):
"""添加节点及其邻居列表"""
self.neighbor_lists[node_id] = neighbors
def calculate_route(self, source: int, destination: int,
max_hops: int = 4) -> List[int]:
"""
计算从源节点到目标节点的最短路径
使用Dijkstra算法
"""
# 初始化距离和前驱节点
distances = {node: float('inf') for node in self.neighbor_lists}
distances[source] = 0
predecessors = {node: None for node in self.neighbor_lists}
# 优先队列:(distance, node_id)
priority_queue = [(0, source)]
visited = set()
while priority_queue:
current_distance, current_node = heapq.heappop(priority_queue)
if current_node in visited:
continue
visited.add(current_node)
# 找到目标节点
if current_node == destination:
break
# 检查是否超过最大跳数
if current_distance >= max_hops:
continue
# 遍历邻居节点
for neighbor in self.neighbor_lists.get(current_node, []):
distance = current_distance + 1
if distance < distances[neighbor]:
distances[neighbor] = distance
predecessors[neighbor] = current_node
heapq.heappush(priority_queue, (distance, neighbor))
# 重建路径
if distances[destination] == float('inf'):
return [] # 无法到达
path = []
current = destination
while current is not None:
path.append(current)
current = predecessors[current]
path.reverse()
return path
def update_routing_table(self):
"""更新整个网络的路由表"""
for source in self.neighbor_lists:
for destination in self.neighbor_lists:
if source == destination:
continue
route = self.calculate_route(source, destination)
if route and len(route) > 1:
# 下一跳是路径中的第二个节点
next_hop = route[1]
hop_count = len(route) - 1
if source not in self.routing_table:
self.routing_table[source] = {}
self.routing_table[source][destination] = {
'next_hop': next_hop,
'hop_count': hop_count,
'full_path': route
}
def get_route(self, source: int, destination: int) -> Dict:
"""获取路由信息"""
return self.routing_table.get(source, {}).get(destination, None)
# 使用示例
if __name__ == "__main__":
router = ZWaveRouter()
# 构建网络拓扑
router.add_node(1, [2, 3]) # 控制器
router.add_node(2, [1, 4, 5]) # 路由节点
router.add_node(3, [1, 6]) # 路由节点
router.add_node(4, [2, 7]) # 终端节点
router.add_node(5, [2, 8]) # 终端节点
router.add_node(6, [3, 9]) # 终端节点
router.add_node(7, [4]) # 终端节点
router.add_node(8, [5]) # 终端节点
router.add_node(9, [6]) # 终端节点
# 更新路由表
router.update_routing_table()
# 查询路由
route_info = router.get_route(1, 7)
print(f"Route from 1 to 7: {route_info}")
# 输出:Route from 1 to 7: {'next_hop': 2, 'hop_count': 3, 'full_path': [1, 2, 4, 7]}
2.3 Command Classes(命令类)
Z-Wave定义了超过150种Command Classes,用于标准化设备功能。
常用Command Classes:
| Class ID | 名称 | 功能 | 应用场景 |
|---|---|---|---|
| 0x20 | BASIC | 基本开关控制 | 所有设备 |
| 0x25 | SWITCH_BINARY | 二进制开关 | 灯光、插座 |
| 0x26 | SWITCH_MULTILEVEL | 多级开关(调光) | 调光灯、窗帘 |
| 0x30 | SENSOR_BINARY | 二进制传感器 | 门磁、PIR |
| 0x31 | SENSOR_MULTILEVEL | 多级传感器 | 温湿度、光照 |
| 0x62 | DOOR_LOCK | 门锁控制 | 智能门锁 |
| 0x70 | CONFIGURATION | 设备配置 | 参数设置 |
| 0x80 | BATTERY | 电池状态 | 电池供电设备 |
| 0x84 | WAKE_UP | 唤醒通知 | 睡眠设备 |
| 0x98 | SECURITY | 安全加密 | 门锁、报警器 |
Command Class报文格式:
┌────────────────────────────────────────────────┐
│ Home ID (4 bytes) │
├────────────────────────────────────────────────┤
│ Source Node ID (1 byte) │
├────────────────────────────────────────────────┤
│ Frame Control (1 byte) │
├────────────────────────────────────────────────┤
│ Length (1 byte) │
├────────────────────────────────────────────────┤
│ Destination Node ID (1 byte) │
├────────────────────────────────────────────────┤
│ Command Class (1 byte) ←────────────────┤
├────────────────────────────────────────────────┤
│ Command (1 byte) │
├────────────────────────────────────────────────┤
│ Parameters (variable) │
├────────────────────────────────────────────────┤
│ Checksum (1 byte) │
└────────────────────────────────────────────────┘
示例:开关灯命令
/* 打开开关(SWITCH_BINARY)*/
uint8_t switch_on_cmd[] = {
0x25, // Command Class: SWITCH_BINARY
0x01, // Command: SET
0xFF // Value: ON (0x00=OFF, 0xFF=ON)
};
/* 调光至50%(SWITCH_MULTILEVEL)*/
uint8_t dimmer_set_cmd[] = {
0x26, // Command Class: SWITCH_MULTILEVEL
0x01, // Command: SET
0x32, // Value: 50 (0-99, 0x00-0x63)
0x00 // Duration: instant (0x00=instant, 0x01-0x7F=seconds)
};
/* 读取温度传感器(SENSOR_MULTILEVEL)*/
uint8_t sensor_get_cmd[] = {
0x31, // Command Class: SENSOR_MULTILEVEL
0x04, // Command: GET
0x01 // Sensor Type: Temperature (0x01)
};
2.4 安全框架(S2)
Z-Wave S2安全框架提供三个安全等级:
- S2 Unauthenticated:无需PIN码,用于低安全需求设备
- S2 Authenticated:需要PIN码或QR码,用于一般安全设备
- S2 Access Control:最高安全级别,用于门锁、报警器等
加密流程:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
class ZWaveS2Security:
"""Z-Wave S2安全加密实现"""
def __init__(self, network_key: bytes):
"""
初始化S2安全
:param network_key: 16字节网络密钥
"""
if len(network_key) != 16:
raise ValueError("Network key must be 16 bytes")
self.network_key = network_key
def encrypt_frame(self, plaintext: bytes, sender_nonce: bytes,
receiver_nonce: bytes) -> bytes:
"""
加密S2帧
:param plaintext: 明文数据
:param sender_nonce: 发送方随机数(8字节)
:param receiver_nonce: 接收方随机数(8字节)
:return: 密文数据
"""
# 构造IV(Initialization Vector)
iv = sender_nonce + receiver_nonce # 16字节
# 使用AES-128-OFB模式加密
cipher = Cipher(
algorithms.AES(self.network_key),
modes.OFB(iv),
backend=default_backend()
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
return ciphertext
def decrypt_frame(self, ciphertext: bytes, sender_nonce: bytes,
receiver_nonce: bytes) -> bytes:
"""
解密S2帧
"""
iv = sender_nonce + receiver_nonce
cipher = Cipher(
algorithms.AES(self.network_key),
modes.OFB(iv),
backend=default_backend()
)
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext
def generate_message_auth_code(self, frame: bytes) -> bytes:
"""
生成消息认证码(MAC)
使用AES-CMAC
"""
from cryptography.hazmat.primitives import cmac
c = cmac.CMAC(algorithms.AES(self.network_key), backend=default_backend())
c.update(frame)
mac = c.finalize()
return mac[:8] # 取前8字节
def verify_message_auth_code(self, frame: bytes, received_mac: bytes) -> bool:
"""验证消息认证码"""
calculated_mac = self.generate_message_auth_code(frame)
return calculated_mac == received_mac
# 使用示例
if __name__ == "__main__":
# 生成网络密钥(实际应用中应安全存储)
network_key = os.urandom(16)
security = ZWaveS2Security(network_key)
# 原始命令
command = b'\x25\x01\xFF' # SWITCH_BINARY SET ON
# 生成随机数
sender_nonce = os.urandom(8)
receiver_nonce = os.urandom(8)
# 加密
encrypted = security.encrypt_frame(command, sender_nonce, receiver_nonce)
print(f"Encrypted: {encrypted.hex()}")
# 生成MAC
mac = security.generate_message_auth_code(encrypted)
print(f"MAC: {mac.hex()}")
# 解密
decrypted = security.decrypt_frame(encrypted, sender_nonce, receiver_nonce)
print(f"Decrypted: {decrypted.hex()}")
# 验证
is_valid = security.verify_message_auth_code(encrypted, mac)
print(f"MAC Valid: {is_valid}")
三、实战开发指南
3.1 OpenZWave库开发
OpenZWave是开源的Z-Wave协议栈实现,支持C++/Python/Node.js。
环境配置:
# 安装依赖
sudo apt-get install libudev-dev
# 编译OpenZWave
git clone https://github.com/OpenZWave/open-zwave.git
cd open-zwave
make
sudo make install
# 安装Python绑定
pip install python-openzwave
Python控制器示例:
#!/usr/bin/env python3
"""
Z-Wave控制器示例
"""
from openzwave.network import ZWaveNetwork
from openzwave.option import ZWaveOption
logging.basicConfig(level=logging.INFO)
class ZWaveController:
"""Z-Wave控制器"""
def __init__(self, device="/dev/ttyUSB0", config_path="/usr/local/etc/openzwave"):
"""
初始化控制器
:param device: Z-Wave USB控制器设备路径
:param config_path: OpenZWave配置文件路径
"""
self.device = device
self.config_path = config_path
self.network = None
def start(self):
"""启动网络"""
print(f"Initializing Z-Wave network on {self.device}...")
# 配置选项
options = ZWaveOption(
self.device,
config_path=self.config_path,
user_path=".",
cmd_line=""
)
options.set_log_file("OZW_Log.log")
options.set_append_log_file(False)
options.set_console_output(False)
options.set_save_log_level('Info')
options.set_logging(False)
options.lock()
# 创建网络
self.network = ZWaveNetwork(options, autostart=False)
# 注册事件监听
self.network.add_observer(self._network_ready, self.network.SIGNAL_NETWORK_READY)
self.network.add_observer(self._node_added, self.network.SIGNAL_NODE_ADDED)
self.network.add_observer(self._value_changed, self.network.SIGNAL_VALUE_CHANGED)
# 启动网络
self.network.start()
print("Waiting for network to be ready...")
for i in range(0, 300):
if self.network.state >= self.network.STATE_READY:
print("Network is ready!")
break
time.sleep(1.0)
if self.network.state < self.network.STATE_READY:
print("Network initialization failed!")
return False
return True
def _network_ready(self, network):
"""网络就绪回调"""
print(f"Network HomeID: {network.home_id_str}")
print(f"Controller Node ID: {network.controller.node_id}")
print(f"Total Nodes: {network.nodes_count}")
def _node_added(self, network, node):
"""节点加入回调"""
print(f"Node {node.node_id} added: {node.product_name}")
def _value_changed(self, network, node, value):
"""节点值变化回调"""
print(f"Node {node.node_id} value changed: {value.label} = {value.data}")
def list_nodes(self):
"""列出所有节点"""
if not self.network:
print("Network not started!")
return
print("\n" + "="*60)
print("Z-Wave Network Nodes")
print("="*60)
for node_id in self.network.nodes:
node = self.network.nodes[node_id]
print(f"\nNode {node_id}:")
print(f" Product: {node.product_name}")
print(f" Manufacturer: {node.manufacturer_name}")
print(f" Type: {node.type}")
print(f" Is Awake: {node.is_awake}")
print(f" Is Failed: {node.is_failed}")
print(f" Battery Level: {node.battery_level}%")
# 列出Command Classes
print(f" Command Classes:")
for cls_id in node.command_classes:
cls = node.command_classes[cls_id]
print(f" - {cls.class_desc} (0x{cls_id:02x})")
def get_node_values(self, node_id: int):
"""获取节点的所有值"""
if node_id not in self.network.nodes:
print(f"Node {node_id} not found!")
return
node = self.network.nodes[node_id]
print(f"\nValues for Node {node_id} ({node.product_name}):")
print("="*60)
for value_id in node.values:
value = node.values[value_id]
print(f"\n {value.label}:")
print(f" Value: {value.data}")
print(f" Units: {value.units}")
print(f" Genre: {value.genre}")
print(f" Type: {value.type}")
print(f" Read Only: {value.is_read_only}")
def set_switch(self, node_id: int, state: bool):
"""
控制开关
:param node_id: 节点ID
:param state: True=开,False=关
"""
if node_id not in self.network.nodes:
print(f"Node {node_id} not found!")
return False
node = self.network.nodes[node_id]
# 查找开关值
for value_id in node.get_switches():
value = node.values[value_id]
value.data = state
print(f"Node {node_id} switch set to: {'ON' if state else 'OFF'}")
return True
print(f"Node {node_id} has no switch!")
return False
def set_dimmer(self, node_id: int, level: int):
"""
设置调光器亮度
:param node_id: 节点ID
:param level: 亮度0-99
"""
if node_id not in self.network.nodes:
print(f"Node {node_id} not found!")
return False
if not 0 <= level <= 99:
print("Level must be between 0 and 99!")
return False
node = self.network.nodes[node_id]
# 查找调光值
for value_id in node.get_dimmers():
value = node.values[value_id]
value.data = level
print(f"Node {node_id} dimmer set to: {level}%")
return True
print(f"Node {node_id} has no dimmer!")
return False
def add_node(self, timeout=30):
"""
添加新节点(设备配对)
:param timeout: 超时时间(秒)
"""
print("Starting inclusion mode...")
print("Please press the inclusion button on your Z-Wave device...")
self.network.controller.add_node()
# 等待节点加入
start_time = time.time()
while time.time() - start_time < timeout:
if self.network.controller.is_primary_controller:
time.sleep(1)
else:
break
self.network.controller.cancel_command()
print("Inclusion mode stopped.")
def remove_node(self, timeout=30):
"""
移除节点(设备解配对)
"""
print("Starting exclusion mode...")
print("Please press the exclusion button on your Z-Wave device...")
self.network.controller.remove_node()
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(1)
self.network.controller.cancel_command()
print("Exclusion mode stopped.")
def heal_network(self):
"""修复网络(重新计算路由)"""
print("Healing network...")
self.network.heal()
print("Network healing started. This may take several minutes.")
def stop(self):
"""停止网络"""
if self.network:
self.network.stop()
print("Network stopped.")
# 使用示例
if __name__ == "__main__":
controller = ZWaveController(device="/dev/ttyUSB0")
if controller.start():
time.sleep(5) # 等待网络初始化
# 列出所有节点
controller.list_nodes()
# 获取节点2的值
controller.get_node_values(node_id=2)
# 控制节点2的开关
controller.set_switch(node_id=2, state=True)
time.sleep(2)
controller.set_switch(node_id=2, state=False)
# 设置节点3的调光器
controller.set_dimmer(node_id=3, level=50)
# 保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
controller.stop()
3.2 Z-Way平台开发
Z-Way是Razberry和UZB控制器的官方平台,提供REST API和JavaScript引擎。
REST API示例:
#!/usr/bin/env python3
"""
Z-Way REST API客户端
"""
from typing import Dict, List
class ZWayClient:
"""Z-Way API客户端"""
def __init__(self, host: str = "localhost", port: int = 8083,
username: str = "admin", password: str = "admin"):
"""
初始化客户端
:param host: Z-Way服务器地址
:param port: Z-Way服务器端口
:param username: 用户名
:param password: 密码
"""
self.base_url = f"http://{host}:{port}"
self.username = username
self.password = password
self.session = requests.Session()
self.login()
def login(self):
"""登录"""
url = f"{self.base_url}/ZAutomation/api/v1/login"
data = {
"login": self.username,
"password": self.password
}
response = self.session.post(url, json=data)
if response.status_code == 200:
result = response.json()
self.session.headers.update({
"ZWAYSession": result["data"]["sid"]
})
print("Login successful!")
else:
print(f"Login failed: {response.status_code}")
def get_devices(self) -> List[Dict]:
"""获取所有设备"""
url = f"{self.base_url}/ZAutomation/api/v1/devices"
response = self.session.get(url)
if response.status_code == 200:
result = response.json()
return result["data"]["devices"]
else:
print(f"Failed to get devices: {response.status_code}")
return []
def get_device(self, device_id: str) -> Dict:
"""获取单个设备信息"""
url = f"{self.base_url}/ZAutomation/api/v1/devices/{device_id}"
response = self.session.get(url)
if response.status_code == 200:
result = response.json()
return result["data"]
else:
print(f"Failed to get device {device_id}: {response.status_code}")
return {}
def send_command(self, device_id: str, command: str) -> bool:
"""
发送命令到设备
:param device_id: 设备ID
:param command: 命令(on/off/exact?level=50等)
"""
url = f"{self.base_url}/ZAutomation/api/v1/devices/{device_id}/command/{command}"
response = self.session.get(url)
if response.status_code == 200:
print(f"Command '{command}' sent to device {device_id}")
return True
else:
print(f"Failed to send command: {response.status_code}")
return False
def set_switch(self, device_id: str, state: bool):
"""控制开关"""
command = "on" if state else "off"
return self.send_command(device_id, command)
def set_dimmer(self, device_id: str, level: int):
"""设置调光器"""
if not 0 <= level <= 99:
print("Level must be between 0 and 99!")
return False
return self.send_command(device_id, f"exact?level={level}")
def get_sensor_data(self, device_id: str) -> float:
"""获取传感器数据"""
device = self.get_device(device_id)
if device and "metrics" in device:
return device["metrics"].get("level", None)
return None
def list_devices(self):
"""列出所有设备"""
devices = self.get_devices()
print("\n" + "="*60)
print("Z-Way Devices")
print("="*60)
for device in devices:
print(f"\nDevice ID: {device['id']}")
print(f" Title: {device['metrics']['title']}")
print(f" Type: {device['deviceType']}")
print(f" Level: {device['metrics'].get('level', 'N/A')}")
print(f" Room: {device.get('location', 'N/A')}")
# 使用示例
if __name__ == "__main__":
client = ZWayClient(host="192.168.1.100", port=8083)
# 列出所有设备
client.list_devices()
# 控制开关
client.set_switch(device_id="ZWayVDev_zway_2-0-37", state=True)
# 设置调光器
client.set_dimmer(device_id="ZWayVDev_zway_3-0-38", level=75)
# 读取传感器数据
temperature = client.get_sensor_data(device_id="ZWayVDev_zway_4-0-49-1")
print(f"Temperature: {temperature}°C")
3.3 Home Assistant集成
Home Assistant是流行的开源智能家居平台,内置Z-Wave支持。
配置文件(configuration.yaml):
# Z-Wave集成配置
zwave:
usb_path: /dev/ttyUSB0
network_key: "0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10"
polling_interval: 60000
debug: false
# 自动化示例:运动检测开灯
automation:
- alias: "Motion Light On"
trigger:
platform: state
entity_id: binary_sensor.motion_sensor_2
to: 'on'
action:
service: light.turn_on
target:
entity_id: light.living_room_3
data:
brightness: 255
- alias: "Motion Light Off"
trigger:
platform: state
entity_id: binary_sensor.motion_sensor_2
to: 'off'
for:
minutes: 5
action:
service: light.turn_off
target:
entity_id: light.living_room_3
# 场景示例:离家模式
scene:
- name: "Away Mode"
entities:
light.living_room_3:
state: off
lock.front_door_5:
state: locked
switch.tv_power_7:
state: off
Python自动化脚本:
#!/usr/bin/env python3
"""
Home Assistant Z-Wave自动化脚本
"""
from datetime import datetime
class HomeAssistantZWave:
"""Home Assistant Z-Wave控制"""
def __init__(self, host: str, token: str):
"""
初始化客户端
:param host: Home Assistant地址(如http://192.168.1.100:8123)
:param token: Long-Lived Access Token
"""
self.host = host
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
def call_service(self, domain: str, service: str,
entity_id: str, **kwargs) -> bool:
"""
调用服务
:param domain: 域(如light、switch、lock)
:param service: 服务(如turn_on、turn_off)
:param entity_id: 实体ID
"""
url = f"{self.host}/api/services/{domain}/{service}"
data = {"entity_id": entity_id}
data.update(kwargs)
response = requests.post(url, headers=self.headers, json=data)
return response.status_code == 200
def get_state(self, entity_id: str) -> dict:
"""获取实体状态"""
url = f"{self.host}/api/states/{entity_id}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()
return {}
def turn_on_light(self, entity_id: str, brightness: int = 255,
color_temp: int = None):
"""开灯"""
kwargs = {"brightness": brightness}
if color_temp:
kwargs["color_temp"] = color_temp
return self.call_service("light", "turn_on", entity_id, **kwargs)
def turn_off_light(self, entity_id: str):
"""关灯"""
return self.call_service("light", "turn_off", entity_id)
def lock_door(self, entity_id: str):
"""锁门"""
return self.call_service("lock", "lock", entity_id)
def unlock_door(self, entity_id: str):
"""开锁"""
return self.call_service("lock", "unlock", entity_id)
def get_sensor_value(self, entity_id: str) -> float:
"""获取传感器数值"""
state = self.get_state(entity_id)
if state and "state" in state:
try:
return float(state["state"])
except ValueError:
return None
return None
def create_scene(self, scene_name: str):
"""创建场景"""
return self.call_service("scene", "create", f"scene.{scene_name}")
def activate_scene(self, scene_name: str):
"""激活场景"""
return self.call_service("scene", "turn_on", f"scene.{scene_name}")
# 使用示例
if __name__ == "__main__":
ha = HomeAssistantZWave(
host="http://192.168.1.100:8123",
token="YOUR_LONG_LIVED_ACCESS_TOKEN"
)
# 开灯
ha.turn_on_light("light.living_room_3", brightness=200)
# 锁门
ha.lock_door("lock.front_door_5")
# 读取温度
temperature = ha.get_sensor_value("sensor.temperature_sensor_4")
print(f"Current temperature: {temperature}°C")
# 激活场景
ha.activate_scene("away_mode")
四、行业案例分析
案例1:高端公寓智能家居系统
项目背景:某房地产开发商在500套高端公寓中预装Z-Wave智能家居系统,覆盖照明、空调、窗帘、门锁等设备。
技术方案:
- 控制器:每户1个Z-Wave Plus控制器(Silicon Labs ZGM230)
- 节点数量:每户平均40个Z-Wave设备
- 设备类型:调光灯(15个)、开关(10个)、窗帘电机(5个)、温控器(3个)、门锁(2个)、传感器(5个)
- 集成平台:Crestron Home + Z-Wave集成模块
- 安全级别:门锁使用S2 Access Control,其他设备使用S2 Authenticated
实施效果:
- 设备互操作性:100%(所有设备通过Z-Wave Plus认证)
- 用户满意度:92%
- 故障率:<1%(24个月运行期)
- 平均响应延迟:<200ms
关键代码(Crestron SIMPL+集成):
// Crestron SIMPL+ Z-Wave控制模块
#SYMBOL_NAME "ZWave Controller"
#CATEGORY "Control"
DIGITAL_INPUT _SKIP_, _SKIP_, On, Off;
ANALOG_INPUT _SKIP_, _SKIP_, _SKIP_, Brightness;
DIGITAL_OUTPUT Device_On_FB, Device_Off_FB;
ANALOG_OUTPUT Brightness_FB;
STRING_PARAMETER Device_ID[20];
INTEGER node_id;
INTEGER last_brightness;
FUNCTION Initialize()
{
// 解析设备ID
node_id = ATOI(Device_ID);
last_brightness = 0;
}
PUSH On
{
// 发送开灯命令
ZWaveSendCommand(node_id, 0x25, 0x01, 0xFF); // SWITCH_BINARY SET ON
Device_On_FB = 1;
Device_Off_FB = 0;
}
PUSH Off
{
// 发送关灯命令
ZWaveSendCommand(node_id, 0x25, 0x01, 0x00); // SWITCH_BINARY SET OFF
Device_On_FB = 0;
Device_Off_FB = 1;
}
CHANGE Brightness
{
INTEGER level;
level = Brightness * 99 / 65535; // 转换为0-99
if (level != last_brightness)
{
// 发送调光命令
ZWaveSendCommand(node_id, 0x26, 0x01, level); // SWITCH_MULTILEVEL SET
last_brightness = level;
Brightness_FB = Brightness;
if (level > 0)
{
Device_On_FB = 1;
Device_Off_FB = 0;
}
else
{
Device_On_FB = 0;
Device_Off_FB = 1;
}
}
}
FUNCTION Main()
{
WaitForInitializationComplete();
Initialize();
}
案例2:酒店客房智能控制
项目背景:某五星级酒店在200间客房部署Z-Wave智能控制系统,实现客房自动化和能源管理。
系统架构:
┌─────────────────────────────────────────────┐
│ Hotel Management System (PMS) │
│ (客房管理、计费系统) │
└──────────────────┬──────────────────────────┘
│ REST API
┌──────────────────┴──────────────────────────┐
│ Central Z-Wave Gateway (x5) │
│ (每层楼1个,覆盖40间客房) │
└──────────────────┬──────────────────────────┘
│ Z-Wave Mesh
┌──────────────┼──────────────┐
│ │ │
┌───┴──────┐ ┌───┴──────┐ ┌───┴──────┐
│ Room 101 │ │ Room 102 │ │ Room 103 │
│ (15设备) │ │ (15设备) │ │ (15设备) │
└──────────┘ └──────────┘ └──────────┘
设备配置(每间客房):
- 调光灯:6个(主灯、床头灯、卫生间灯等)
- 窗帘电机:2个(主窗、阳台门)
- 温控器:1个
- 门磁:1个(入口门)
- PIR传感器:2个(客厅、卫生间)
- 智能插座:2个(电视、迷你吧)
- 门铃:1个
场景控制:
#!/usr/bin/env python3
"""
酒店客房场景控制系统
"""
from typing import List, Dict
class HotelRoomController:
"""酒店客房控制器"""
def __init__(self, room_number: str, zwave_controller):
self.room_number = room_number
self.zwave = zwave_controller
self.devices = {}
self.current_scene = None
def register_device(self, device_name: str, node_id: int, device_type: str):
"""注册设备"""
self.devices[device_name] = {
'node_id': node_id,
'type': device_type
}
async def welcome_scene(self):
"""欢迎场景:客人入住"""
print(f"Activating Welcome Scene for Room {self.room_number}")
tasks = [
# 主灯开启(70%亮度)
self.zwave.set_dimmer(self.devices['main_light']['node_id'], 70),
# 床头灯开启(50%亮度)
self.zwave.set_dimmer(self.devices['bedside_lamp']['node_id'], 50),
# 窗帘打开50%
self.zwave.set_dimmer(self.devices['curtain']['node_id'], 50),
# 温度设置为22°C
self.zwave.set_thermostat(self.devices['thermostat']['node_id'], 22)
]
await asyncio.gather(*tasks)
self.current_scene = "welcome"
async def sleep_scene(self):
"""睡眠场景"""
print(f"Activating Sleep Scene for Room {self.room_number}")
tasks = [
# 关闭所有灯
self.zwave.set_switch(self.devices['main_light']['node_id'], False),
self.zwave.set_switch(self.devices['bedside_lamp']['node_id'], False),
# 床头灯调至10%(夜灯模式)
self.zwave.set_dimmer(self.devices['bedside_lamp']['node_id'], 10),
# 窗帘完全关闭
self.zwave.set_dimmer(self.devices['curtain']['node_id'], 0),
# 温度设置为20°C
self.zwave.set_thermostat(self.devices['thermostat']['node_id'], 20)
]
await asyncio.gather(*tasks)
self.current_scene = "sleep"
async def away_scene(self):
"""离开场景:客人外出"""
print(f"Activating Away Scene for Room {self.room_number}")
tasks = [
# 关闭所有灯
self.zwave.set_switch(self.devices['main_light']['node_id'], False),
self.zwave.set_switch(self.devices['bedside_lamp']['node_id'], False),
self.zwave.set_switch(self.devices['bathroom_light']['node_id'], False),
# 关闭电视插座
self.zwave.set_switch(self.devices['tv_outlet']['node_id'], False),
# 温度设置为26°C(节能模式)
self.zwave.set_thermostat(self.devices['thermostat']['node_id'], 26)
]
await asyncio.gather(*tasks)
self.current_scene = "away"
async def checkout_scene(self):
"""退房场景"""
print(f"Activating Checkout Scene for Room {self.room_number}")
tasks = [
# 关闭所有设备
self.zwave.set_switch(device['node_id'], False)
for device in self.devices.values()
if device['type'] in ['light', 'switch', 'outlet']
]
# 温度设置为关闭或节能模式
tasks.append(
self.zwave.set_thermostat(self.devices['thermostat']['node_id'], 28)
)
await asyncio.gather(*tasks)
self.current_scene = "checkout"
async def handle_motion_event(self, sensor_name: str, motion_detected: bool):
"""处理运动检测事件"""
if motion_detected and self.current_scene == "away":
# 检测到运动且处于离开模式,可能客人回来了
await self.welcome_scene()
elif not motion_detected and self.current_scene == "welcome":
# 5分钟无运动,可能客人外出
await asyncio.sleep(300)
# 再次检查
if not self.check_any_motion():
await self.away_scene()
def check_any_motion(self) -> bool:
"""检查是否有任何运动传感器触发"""
# 实际实现中应查询所有PIR传感器状态
return False
class HotelManagementSystem:
"""酒店管理系统"""
def __init__(self):
self.rooms = {}
def register_room(self, room_number: str, controller: HotelRoomController):
"""注册客房"""
self.rooms[room_number] = controller
async def guest_checkin(self, room_number: str):
"""客人入住"""
if room_number in self.rooms:
await self.rooms[room_number].welcome_scene()
print(f"Guest checked in to Room {room_number}")
async def guest_checkout(self, room_number: str):
"""客人退房"""
if room_number in self.rooms:
await self.rooms[room_number].checkout_scene()
print(f"Guest checked out from Room {room_number}")
async def dnd_mode(self, room_number: str, enabled: bool):
"""勿扰模式"""
# 实际实现中应禁用门铃、降低传感器灵敏度等
print(f"Room {room_number} DND mode: {'ON' if enabled else 'OFF'}")
def get_room_status(self, room_number: str) -> Dict:
"""获取客房状态"""
if room_number in self.rooms:
room = self.rooms[room_number]
return {
'room_number': room_number,
'current_scene': room.current_scene,
'devices': len(room.devices)
}
return {}
# 使用示例
if __name__ == "__main__":
# 初始化系统
hms = HotelManagementSystem()
# 注册客房
room_controller = HotelRoomController("101", zwave_controller=None) # 实际应传入真实控制器
room_controller.register_device("main_light", node_id=2, device_type="light")
room_controller.register_device("bedside_lamp", node_id=3, device_type="light")
room_controller.register_device("curtain", node_id=4, device_type="dimmer")
room_controller.register_device("thermostat", node_id=5, device_type="thermostat")
hms.register_room("101", room_controller)
# 模拟入住流程
asyncio.run(hms.guest_checkin("101"))
实施效果:
- 能源节约:28%(对比传统客房)
- 客户满意度提升:15%
- 运维成本降低:40%
- 设备故障率:<0.5%
案例3:养老院健康监护系统
项目背景:某养老院为100名老人部署基于Z-Wave的健康监护系统。
系统功能:
- 跌倒检测:PIR传感器+紧急按钮
- 活动监测:门磁传感器追踪房间进出
- 环境监测:温湿度传感器
- 自动照明:夜间低亮度自动开灯
- 紧急呼叫:一键求助按钮
Python监控系统:
#!/usr/bin/env python3
"""
养老院健康监护系统
"""
from datetime import datetime, timedelta
from typing import Dict, List
from email.mime.text import MIMEText
class ElderCareMonitor:
"""老人健康监护"""
def __init__(self, elder_name: str, room_number: str):
self.elder_name = elder_name
self.room_number = room_number
self.last_activity_time = datetime.now()
self.activity_log = []
self.alerts = []
def record_activity(self, activity_type: str, sensor_id: int):
"""记录活动"""
timestamp = datetime.now()
self.last_activity_time = timestamp
self.activity_log.append({
'timestamp': timestamp,
'type': activity_type,
'sensor_id': sensor_id
})
print(f"[{timestamp}] {self.elder_name} - {activity_type}")
def check_inactivity(self, threshold_hours: int = 12) -> bool:
"""检查是否长时间无活动"""
time_since_activity = datetime.now() - self.last_activity_time
if time_since_activity > timedelta(hours=threshold_hours):
alert = {
'timestamp': datetime.now(),
'type': 'INACTIVITY_ALERT',
'duration': time_since_activity,
'elder': self.elder_name,
'room': self.room_number
}
self.alerts.append(alert)
return True
return False
def emergency_alert(self, sensor_id: int):
"""紧急呼叫告警"""
alert = {
'timestamp': datetime.now(),
'type': 'EMERGENCY_CALL',
'sensor_id': sensor_id,
'elder': self.elder_name,
'room': self.room_number,
'priority': 'HIGH'
}
self.alerts.append(alert)
print(f"⚠️ EMERGENCY ALERT - {self.elder_name} in Room {self.room_number}")
def fall_detection_alert(self, sensor_id: int):
"""跌倒检测告警"""
alert = {
'timestamp': datetime.now(),
'type': 'FALL_DETECTED',
'sensor_id': sensor_id,
'elder': self.elder_name,
'room': self.room_number,
'priority': 'CRITICAL'
}
self.alerts.append(alert)
print(f"🚨 FALL DETECTED - {self.elder_name} in Room {self.room_number}")
def get_daily_activity_report(self) -> Dict:
"""生成每日活动报告"""
today = datetime.now().date()
today_activities = [
a for a in self.activity_log
if a['timestamp'].date() == today
]
return {
'elder': self.elder_name,
'room': self.room_number,
'date': today.isoformat(),
'total_activities': len(today_activities),
'activity_types': {
activity_type: len([a for a in today_activities if a['type'] == activity_type])
for activity_type in set(a['type'] for a in today_activities)
},
'alerts': [a for a in self.alerts if a['timestamp'].date() == today]
}
class NursingHomeCareSystem:
"""养老院监护系统"""
def __init__(self, zwave_controller):
self.zwave = zwave_controller
self.monitors = {}
self.nurses = []
def register_elder(self, elder_name: str, room_number: str,
sensors: Dict[str, int]):
"""注册老人及其传感器"""
monitor = ElderCareMonitor(elder_name, room_number)
self.monitors[room_number] = {
'monitor': monitor,
'sensors': sensors
}
def handle_sensor_event(self, sensor_id: int, event_type: str, value: any):
"""处理传感器事件"""
# 查找对应的老人
for room, data in self.monitors.items():
sensors = data['sensors']
monitor = data['monitor']
if sensor_id in sensors.values():
sensor_name = [k for k, v in sensors.items() if v == sensor_id][0]
if sensor_name == 'emergency_button' and event_type == 'pressed':
monitor.emergency_alert(sensor_id)
self.notify_nurses(monitor.alerts[-1])
elif sensor_name == 'fall_sensor' and event_type == 'triggered':
monitor.fall_detection_alert(sensor_id)
self.notify_nurses(monitor.alerts[-1])
elif sensor_name == 'door_sensor':
if event_type == 'opened':
monitor.record_activity('Door Opened', sensor_id)
elif event_type == 'closed':
monitor.record_activity('Door Closed', sensor_id)
elif sensor_name == 'motion_sensor' and event_type == 'motion_detected':
monitor.record_activity('Motion Detected', sensor_id)
# 夜间自动开灯
hour = datetime.now().hour
if 0 <= hour < 6:
# 开启夜灯(10%亮度)
light_node_id = sensors.get('night_light')
if light_node_id:
self.zwave.set_dimmer(light_node_id, 10)
def notify_nurses(self, alert: Dict):
"""通知护士"""
print(f"\n📢 Notifying nurses about alert:")
print(f" Type: {alert['type']}")
print(f" Elder: {alert['elder']}")
print(f" Room: {alert['room']}")
print(f" Time: {alert['timestamp']}")
# 发送邮件通知(示例)
self.send_email_alert(alert)
# 发送短信通知(示例)
# self.send_sms_alert(alert)
def send_email_alert(self, alert: Dict):
"""发送邮件告警"""
msg = MIMEText(f"""
Alert Type: {alert['type']}
Elder: {alert['elder']}
Room: {alert['room']}
Time: {alert['timestamp']}
Priority: {alert.get('priority', 'NORMAL')}
Please respond immediately!
""")
msg['Subject'] = f"[{alert.get('priority', 'NORMAL')}] Alert - {alert['type']}"
msg['From'] = 'monitor@nursinghome.com'
msg['To'] = ', '.join(self.nurses)
# 发送邮件(需配置SMTP服务器)
try:
with smtplib.SMTP('smtp.example.com', 587) as server:
server.starttls()
server.login('username', 'password')
server.send_message(msg)
print("✉️ Email alert sent successfully")
except Exception as e:
print(f"Failed to send email: {e}")
async def check_all_inactivity(self):
"""定期检查所有老人的活动状态"""
while True:
for room, data in self.monitors.items():
monitor = data['monitor']
if monitor.check_inactivity(threshold_hours=8):
self.notify_nurses(monitor.alerts[-1])
await asyncio.sleep(3600) # 每小时检查一次
def generate_daily_reports(self):
"""生成每日报告"""
print("\n" + "="*60)
print("Daily Activity Reports")
print("="*60)
for room, data in self.monitors.items():
monitor = data['monitor']
report = monitor.get_daily_activity_report()
print(f"\nRoom {report['room']} - {report['elder']}")
print(f" Total Activities: {report['total_activities']}")
print(f" Activity Breakdown:")
for activity_type, count in report['activity_types'].items():
print(f" - {activity_type}: {count}")
print(f" Alerts: {len(report['alerts'])}")
print("="*60)
# 使用示例
if __name__ == "__main__":
system = NursingHomeCareSystem(zwave_controller=None) # 实际应传入真实控制器
# 注册老人
system.register_elder(
elder_name="Zhang Wei",
room_number="201",
sensors={
'emergency_button': 10,
'fall_sensor': 11,
'door_sensor': 12,
'motion_sensor': 13,
'night_light': 14
}
)
system.nurses = ['nurse1@example.com', 'nurse2@example.com']
# 模拟传感器事件
system.handle_sensor_event(13, 'motion_detected', True)
system.handle_sensor_event(12, 'opened', None)
system.handle_sensor_event(10, 'pressed', True) # 紧急呼叫
# 生成每日报告
system.generate_daily_reports()
实施效果:
- 紧急响应时间:从平均15分钟降至<3分钟
- 跌倒检出率:97%
- 误报率:<5%
- 家属满意度:96%
案例4:商业建筑能源管理
项目背景:某办公大楼使用Z-Wave网络管理3,000个照明设备,实现智能照明和能源节约。
实施效果:
- 能源消耗降低:42%
- 投资回收期:2.3年
- 维护成本降低:55%
- 员工满意度提升:18%
案例5:仓库自动化照明
项目背景:某物流公司在50,000平方米仓库部署Z-Wave智能照明,根据人员活动自动控制。
实施效果:
- 照明能耗降低:67%
- 灯具寿命延长:3倍(减少频繁开关)
- 年度节约:$120,000
- 碳排放减少:180吨/年
五、性能优化技巧
5.1 网络拓扑优化
最佳实践:
- 在关键路径上部署常供电的路由节点
- 避免"瓶颈节点"(所有流量都经过的单一节点)
- 保持网络密度(每个节点至少3个邻居)
- 定期执行网络修复(Heal)操作
5.2 降低功耗
电池供电设备优化:
/* Z-Wave电池设备功耗优化 */
/* 1. 使用FLiRS(Frequently Listening Routing Slave) */
void configure_flirs_mode(void) {
// FLiRS设备每1秒唤醒250ms
// 比传统设备快很多,但功耗略高
zw_set_listen_before_talk_threshold(250);
}
/* 2. 优化唤醒间隔 */
void set_wakeup_interval(uint32_t interval_seconds) {
// 传感器可设置较长间隔(如3600秒=1小时)
// 门锁等需快速响应的设备设置较短间隔(如300秒=5分钟)
zw_send_command_class(
COMMAND_CLASS_WAKE_UP,
WAKE_UP_INTERVAL_SET,
interval_seconds
);
}
/* 3. 批量发送数据 */
void batch_sensor_data(void) {
// 累积多个传感器读数,一次性发送
static sensor_reading_t buffer[10];
static int buffer_count = 0;
sensor_reading_t reading = read_sensor();
buffer[buffer_count++] = reading;
if (buffer_count >= 10) {
// 发送批量数据
send_sensor_batch(buffer, buffer_count);
buffer_count = 0;
}
}
5.3 提高响应速度
Beaming优化:
def enable_beaming(node_id: int):
"""
启用Beaming技术
Beaming允许控制器直接唤醒睡眠设备,无需等待设备定期唤醒
"""
# 设置节点为Beaming目标
zwave.set_node_info(
node_id,
listening=False,
routing=False,
beaming=True, # 启用Beaming
frequent=False
)
print(f"Beaming enabled for node {node_id}")
print("Device can now be woken up on-demand by controller")
5.4 安全性能平衡
S2安全级别选择:
| 设备类型 | 推荐安全级别 | 理由 |
|---|---|---|
| 门锁 | S2 Access Control | 最高安全需求 |
| 报警器 | S2 Authenticated | 中高安全需求 |
| 传感器 | S2 Unauthenticated | 数据非敏感,优先考虑性能 |
| 照明 | S2 Unauthenticated | 不涉及隐私,性能优先 |
六、常见问题排查
6.1 设备无法入网
排查步骤:
#!/bin/bash
# Z-Wave设备入网诊断脚本
echo "Z-Wave Device Inclusion Troubleshooting"
echo "========================================="
echo
# 1. 检查控制器状态
echo "[1] Checking controller status..."
zw-cli status
echo
# 2. 检查控制器是否处于Inclusion模式
echo "[2] Checking inclusion mode..."
zw-cli get-inclusion-state
echo
# 3. 检查网络容量
echo "[3] Checking network capacity..."
NODE_COUNT=$(zw-cli list-nodes | wc -l)
echo "Current nodes: $NODE_COUNT / 232"
if [ $NODE_COUNT -ge 232 ]; then
echo "⚠️ Network is full! Cannot add more devices."
fi
echo
# 4. 检查设备是否已在网络中
echo "[4] Checking if device is already included..."
zw-cli list-nodes
echo
# 5. 检查信号强度
echo "[5] Checking signal strength..."
zw-cli get-rssi
echo
echo "Troubleshooting Steps:"
echo "1. Ensure device is in exclusion mode (if previously included)"
echo "2. Move device closer to controller (within 1-2 meters)"
echo "3. Reset device to factory defaults"
echo "4. Start inclusion mode on controller"
echo "5. Trigger device pairing (按照设备说明)"
6.2 通信延迟高
性能测试脚本:
#!/usr/bin/env python3
"""
Z-Wave性能测试工具
"""
from typing import List, Dict
class ZWavePerformanceTester:
"""Z-Wave性能测试器"""
def __init__(self, zwave_controller):
self.zwave = zwave_controller
def test_response_time(self, node_id: int, iterations: int = 100) -> Dict:
"""测试响应时间"""
latencies = []
print(f"Testing node {node_id} response time ({iterations} iterations)...")
for i in range(iterations):
start = time.time()
# 发送命令
success = self.zwave.set_switch(node_id, True)
if success:
# 等待ACK
elapsed = (time.time() - start) * 1000 # 转换为毫秒
latencies.append(elapsed)
time.sleep(0.1)
if latencies:
return {
'node_id': node_id,
'samples': len(latencies),
'min': min(latencies),
'max': max(latencies),
'avg': sum(latencies) / len(latencies),
'success_rate': (len(latencies) / iterations) * 100
}
else:
return {'error': 'No successful responses'}
def test_throughput(self, node_id: int, duration: int = 60) -> Dict:
"""测试吞吐量"""
start_time = time.time()
command_count = 0
success_count = 0
print(f"Testing node {node_id} throughput (duration={duration}s)...")
while time.time() - start_time < duration:
command_count += 1
# 交替开关
state = (command_count % 2 == 0)
success = self.zwave.set_switch(node_id, state)
if success:
success_count += 1
time.sleep(0.05) # 50ms间隔
elapsed = time.time() - start_time
return {
'node_id': node_id,
'duration': elapsed,
'total_commands': command_count,
'successful_commands': success_count,
'commands_per_second': command_count / elapsed,
'success_rate': (success_count / command_count) * 100
}
def test_hop_count(self, source_node: int, dest_node: int) -> int:
"""测试两个节点间的跳数"""
route = self.zwave.get_route(source_node, dest_node)
if route:
hop_count = len(route) - 1
print(f"Route from {source_node} to {dest_node}: {' -> '.join(map(str, route))}")
print(f"Hop count: {hop_count}")
return hop_count
else:
print(f"No route found from {source_node} to {dest_node}")
return -1
def run_full_test(self, node_id: int):
"""运行完整测试套件"""
print("\n" + "="*60)
print(f"Z-Wave Performance Test - Node {node_id}")
print("="*60)
# 1. 响应时间测试
print("\n### Response Time Test ###")
response_stats = self.test_response_time(node_id, iterations=50)
for key, value in response_stats.items():
if isinstance(value, float):
print(f" {key}: {value:.2f} ms")
else:
print(f" {key}: {value}")
# 2. 吞吐量测试
print("\n### Throughput Test ###")
throughput_stats = self.test_throughput(node_id, duration=30)
for key, value in throughput_stats.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
# 3. 跳数测试
print("\n### Hop Count Test ###")
controller_id = 1
self.test_hop_count(controller_id, node_id)
print("\n" + "="*60)
# 使用示例
if __name__ == "__main__":
tester = ZWavePerformanceTester(zwave_controller=None) # 实际应传入真实控制器
tester.run_full_test(node_id=5)
6.3 网络风暴
问题:大量设备同时发送状态更新导致网络拥塞。
解决方案:
from asyncio import Semaphore
class ZWaveTrafficController:
"""Z-Wave流量控制器"""
def __init__(self, max_concurrent: int = 5):
"""
初始化流量控制器
:param max_concurrent: 最大并发命令数
"""
self.semaphore = Semaphore(max_concurrent)
self.command_queue = asyncio.Queue()
async def send_command_throttled(self, node_id: int, command: callable):
"""
发送命令(带流量控制)
"""
async with self.semaphore:
try:
result = await asyncio.to_thread(command, node_id)
return result
except Exception as e:
print(f"Command failed for node {node_id}: {e}")
return None
async def batch_send(self, commands: list):
"""
批量发送命令
"""
tasks = [
self.send_command_throttled(node_id, cmd)
for node_id, cmd in commands
]
results = await asyncio.gather(*tasks)
return results
# 使用示例
async def main():
controller = ZWaveTrafficController(max_concurrent=5)
# 100个命令,但最多5个并发
commands = [
(i, lambda nid: zwave.set_switch(nid, True))
for i in range(2, 102)
]
results = await controller.batch_send(commands)
print(f"Completed {len([r for r in results if r])} / {len(commands)} commands")
asyncio.run(main())
七、技术对比
7.1 Z-Wave vs Zigbee
| 对比维度 | Z-Wave | Zigbee |
|---|---|---|
| 工作频段 | Sub-1GHz(868/908/916MHz) | 2.4GHz |
| 抗干扰能力 | 强(避开Wi-Fi/蓝牙) | 弱(与Wi-Fi共享频段) |
| 传输距离 | 30-100m | 10-75m |
| 网络规模 | 232节点 | 65,535节点 |
| 数据速率 | 9.6-100kbps | 250kbps |
| 互操作性 | 强(Alliance认证) | 中等(存在私有实现) |
| 授权费 | 需支付($3,500首次+年费) | 免费 |
| 生态系统 | 中等(3,000+产品) | 大(10,000+产品) |
| 主要应用 | 高端智能家居 | 大众智能家居 |
7.2 Z-Wave vs Wi-Fi vs Bluetooth
| 对比维度 | Z-Wave | Wi-Fi | Bluetooth LE |
|---|---|---|---|
| 功耗 | 低(1-50mW) | 高(200-500mW) | 极低(0.5-15mW) |
| 电池寿命 | 1-10年 | 数天 | 1-5年 |
| 组网能力 | Mesh(4跳) | 中心化(路由器) | Mesh(有限) |
| 安全性 | AES-128(S2) | WPA2/WPA3 | AES-128 |
| 成本 | 中等($5-20/芯片) | 高($2-10/模块) | 低($1-5/芯片) |
| 带宽 | 低(100kbps) | 高(150Mbps+) | 中(1Mbps) |
| 应用场景 | 家居控制 | 高带宽传输 | 可穿戴设备 |
八、最佳实践
8.1 设备选型建议
原则:
- 优先选择Z-Wave Plus认证设备(700系列芯片)
- 关键设备(门锁)选择支持S2 Access Control的产品
- 常供电设备选择支持Beaming的型号
- 查看设备兼容性列表(Z-Wave Alliance网站)
8.2 网络规划
步骤:
- 绘制建筑平面图,标注设备位置
- 计算设备密度(建议每10-15平方米1个路由节点)
- 规划控制器位置(网络中心)
- 预留20%网络容量(未来扩展)
8.3 安全加固
措施:
- 启用S2安全框架
- 定期更新控制器固件
- 使用强密码保护管理界面
- 隔离Z-Wave网络与互联网(使用网关)
九、总结与展望
9.1 技术优势
Z-Wave作为成熟的智能家居协议,具有以下核心优势:
- 互操作性强:Alliance认证保证跨品牌兼容
- 抗干扰能力:Sub-1GHz频段避开Wi-Fi拥塞
- 低功耗:电池供电设备可运行多年
- 易于安装:即插即用,无需专业布线
- 成熟生态:3,000+认证产品
9.2 技术演进
未来发展方向:
- Z-Wave Long Range:传输距离提升至1英里
- 更高数据速率:800系列芯片支持200kbps
- 与Matter集成:Z-Wave设备通过桥接支持Matter
- AI边缘计算:网关集成AI预测能力
9.3 实施建议
选择Z-Wave的场景:
- 高端住宅智能家居
- 商业建筑照明控制
- 养老/医疗健康监护
- 需要强互操作性的场景
不推荐Z-Wave的场景:
- 超大规模网络(>200节点)→ 选择Zigbee
- 高带宽需求(视频流)→ 选择Wi-Fi
- 成本敏感型项目 → 选择Zigbee/蓝牙
相关资源:
- Z-Wave Alliance官网:https://z-wavealliance.org/
- OpenZWave项目:https://github.com/OpenZWave/open-zwave
- Z-Wave Plus v2规范:ITU-T G.9959
- Silicon Labs Z-Wave SDK:https://www.silabs.com/wireless/z-wave
本文基于Z-Wave 700系列最新技术标准编写,涵盖从基础原理到工程实践的完整知识体系。
更多推荐
所有评论(0)