Z-Wave智能家居协议

一、核心概念解析

1.1 什么是Z-Wave

Z-Wave是一种专为智能家居设计的无线通信协议,由丹麦公司Zensys(2008年被Sigma Designs收购,2018年Silicon Labs收购Z-Wave业务)开发。该协议运行在Sub-1GHz ISM频段,具有低功耗、高可靠性、强穿透力的特点,是与Zigbee并列的两大主流智能家居协议之一。

核心特性

  • 专用频段:Sub-1GHz(不同地区频段不同:美国908.4MHz,欧洲868.4MHz,中国868.4MHz)
  • 低干扰:避开2.4GHz拥塞频段,与Wi-Fi/蓝牙无干扰
  • Mesh组网:支持4跳中继,理论最大网络规模232个节点
  • 互操作性:Z-Wave Alliance认证保证不同厂商设备互通
  • 安全加密:S2安全框架,128位AES加密
  • 低功耗:电池供电设备可运行1-10年

1.2 技术架构

┌──────────────────────────────────────────┐
│    Application Layer                     │
│    (Device Types: Switch/Sensor/Lock)    │
├──────────────────────────────────────────┤
│    Command Class Layer                   │
│    (Basic/Switch/Sensor/Configuration)   │
├──────────────────────────────────────────┤
│    Transport Layer                       │
│    (Single-cast/Multi-cast/Broadcast)    │
├──────────────────────────────────────────┤
│    Routing Layer (Mesh Networking)       │
├──────────────────────────────────────────┤
│    MAC Layer (CSMA/CA)                   │
├──────────────────────────────────────────┤
│    PHY Layer (FSK/GFSK Modulation)       │
│    Sub-1GHz ISM Band                     │
└──────────────────────────────────────────┘

1.3 关键技术指标

指标 数值 说明
工作频段 868-928MHz 因地区而异
调制方式 FSK/GFSK 二进制频移键控
数据速率 9.6/40/100kbps Z-Wave/700系列
传输距离 30-100m(室内) 取决于环境
网络规模 232节点 理论最大值
中继跳数 4跳 自动路由
功耗 1-50mW 取决于设备类型
安全性 AES-128 S2安全框架

二、协议原理深度解析

2.1 物理层与MAC层

调制技术

Z-Wave使用FSK(Frequency Shift Keying)或GFSK(Gaussian FSK)调制,通过频率偏移表示0和1。

频率偏移示例(欧洲868.4MHz):
- Bit 0:中心频率 - 偏移量(868.4MHz - 20kHz = 868.38MHz)
- Bit 1:中心频率 + 偏移量(868.4MHz + 20kHz = 868.42MHz)

MAC层协议(CSMA/CA)

Z-Wave使用载波侦听多路访问/冲突避免(CSMA/CA)机制:

def z_wave_csma_ca():
    """
    Z-Wave CSMA/CA伪代码
    """
    while True:
        # 1. 侦听信道
        if channel_is_busy():
            wait_random_backoff()
            continue

        # 2. 信道空闲,等待IFS(Inter-Frame Space)
        wait(IFS_DURATION)

        # 3. 再次检查信道
        if channel_is_busy():
            wait_random_backoff()
            continue

        # 4. 发送数据帧
        transmit_frame()

        # 5. 等待ACK
        if wait_for_ack(timeout=200ms):
            return SUCCESS
        else:
            # 重传(最多3次)
            retry_count += 1
            if retry_count < 3:
                continue
            else:
                return FAILURE

2.2 路由协议

Z-Wave使用源路由(Source Routing)机制,控制器预先计算路由路径。

路由表结构

/* Z-Wave路由表数据结构 */
typedef struct {
    uint8_t node_id;              // 节点ID(1-232)
    uint8_t neighbor_count;       // 邻居数量
    uint8_t neighbors[29];        // 邻居列表(最多29个)
    uint8_t route_to[232];        // 到每个节点的下一跳
    uint8_t hop_count[232];       // 到每个节点的跳数
    bool is_listening;            // 是否为常听节点
    bool is_routing;              // 是否为路由节点
    uint16_t rssi_values[29];     // 邻居RSSI值
} z_wave_routing_table_t;

路由算法

from typing import List, Dict, Tuple

class ZWaveRouter:
    """Z-Wave路由算法实现"""

    def __init__(self, max_nodes: int = 232):
        self.max_nodes = max_nodes
        self.routing_table = {}
        self.neighbor_lists = {}

    def add_node(self, node_id: int, neighbors: List[int]):
        """添加节点及其邻居列表"""
        self.neighbor_lists[node_id] = neighbors

    def calculate_route(self, source: int, destination: int,
                       max_hops: int = 4) -> List[int]:
        """
        计算从源节点到目标节点的最短路径
        使用Dijkstra算法
        """
        # 初始化距离和前驱节点
        distances = {node: float('inf') for node in self.neighbor_lists}
        distances[source] = 0
        predecessors = {node: None for node in self.neighbor_lists}

        # 优先队列:(distance, node_id)
        priority_queue = [(0, source)]
        visited = set()

        while priority_queue:
            current_distance, current_node = heapq.heappop(priority_queue)

            if current_node in visited:
                continue

            visited.add(current_node)

            # 找到目标节点
            if current_node == destination:
                break

            # 检查是否超过最大跳数
            if current_distance >= max_hops:
                continue

            # 遍历邻居节点
            for neighbor in self.neighbor_lists.get(current_node, []):
                distance = current_distance + 1

                if distance < distances[neighbor]:
                    distances[neighbor] = distance
                    predecessors[neighbor] = current_node
                    heapq.heappush(priority_queue, (distance, neighbor))

        # 重建路径
        if distances[destination] == float('inf'):
            return []  # 无法到达

        path = []
        current = destination
        while current is not None:
            path.append(current)
            current = predecessors[current]

        path.reverse()
        return path

    def update_routing_table(self):
        """更新整个网络的路由表"""
        for source in self.neighbor_lists:
            for destination in self.neighbor_lists:
                if source == destination:
                    continue

                route = self.calculate_route(source, destination)
                if route and len(route) > 1:
                    # 下一跳是路径中的第二个节点
                    next_hop = route[1]
                    hop_count = len(route) - 1

                    if source not in self.routing_table:
                        self.routing_table[source] = {}

                    self.routing_table[source][destination] = {
                        'next_hop': next_hop,
                        'hop_count': hop_count,
                        'full_path': route
                    }

    def get_route(self, source: int, destination: int) -> Dict:
        """获取路由信息"""
        return self.routing_table.get(source, {}).get(destination, None)

# 使用示例
if __name__ == "__main__":
    router = ZWaveRouter()

    # 构建网络拓扑
    router.add_node(1, [2, 3])        # 控制器
    router.add_node(2, [1, 4, 5])     # 路由节点
    router.add_node(3, [1, 6])        # 路由节点
    router.add_node(4, [2, 7])        # 终端节点
    router.add_node(5, [2, 8])        # 终端节点
    router.add_node(6, [3, 9])        # 终端节点
    router.add_node(7, [4])           # 终端节点
    router.add_node(8, [5])           # 终端节点
    router.add_node(9, [6])           # 终端节点

    # 更新路由表
    router.update_routing_table()

    # 查询路由
    route_info = router.get_route(1, 7)
    print(f"Route from 1 to 7: {route_info}")
    # 输出:Route from 1 to 7: {'next_hop': 2, 'hop_count': 3, 'full_path': [1, 2, 4, 7]}

2.3 Command Classes(命令类)

Z-Wave定义了超过150种Command Classes,用于标准化设备功能。

常用Command Classes

Class ID 名称 功能 应用场景
0x20 BASIC 基本开关控制 所有设备
0x25 SWITCH_BINARY 二进制开关 灯光、插座
0x26 SWITCH_MULTILEVEL 多级开关(调光) 调光灯、窗帘
0x30 SENSOR_BINARY 二进制传感器 门磁、PIR
0x31 SENSOR_MULTILEVEL 多级传感器 温湿度、光照
0x62 DOOR_LOCK 门锁控制 智能门锁
0x70 CONFIGURATION 设备配置 参数设置
0x80 BATTERY 电池状态 电池供电设备
0x84 WAKE_UP 唤醒通知 睡眠设备
0x98 SECURITY 安全加密 门锁、报警器

Command Class报文格式

┌────────────────────────────────────────────────┐
│  Home ID (4 bytes)                             │
├────────────────────────────────────────────────┤
│  Source Node ID (1 byte)                       │
├────────────────────────────────────────────────┤
│  Frame Control (1 byte)                        │
├────────────────────────────────────────────────┤
│  Length (1 byte)                               │
├────────────────────────────────────────────────┤
│  Destination Node ID (1 byte)                  │
├────────────────────────────────────────────────┤
│  Command Class (1 byte)       ←────────────────┤
├────────────────────────────────────────────────┤
│  Command (1 byte)                              │
├────────────────────────────────────────────────┤
│  Parameters (variable)                         │
├────────────────────────────────────────────────┤
│  Checksum (1 byte)                             │
└────────────────────────────────────────────────┘

示例:开关灯命令

/* 打开开关(SWITCH_BINARY)*/
uint8_t switch_on_cmd[] = {
    0x25,  // Command Class: SWITCH_BINARY
    0x01,  // Command: SET
    0xFF   // Value: ON (0x00=OFF, 0xFF=ON)
};

/* 调光至50%(SWITCH_MULTILEVEL)*/
uint8_t dimmer_set_cmd[] = {
    0x26,  // Command Class: SWITCH_MULTILEVEL
    0x01,  // Command: SET
    0x32,  // Value: 50 (0-99, 0x00-0x63)
    0x00   // Duration: instant (0x00=instant, 0x01-0x7F=seconds)
};

/* 读取温度传感器(SENSOR_MULTILEVEL)*/
uint8_t sensor_get_cmd[] = {
    0x31,  // Command Class: SENSOR_MULTILEVEL
    0x04,  // Command: GET
    0x01   // Sensor Type: Temperature (0x01)
};

2.4 安全框架(S2)

Z-Wave S2安全框架提供三个安全等级:

  1. S2 Unauthenticated:无需PIN码,用于低安全需求设备
  2. S2 Authenticated:需要PIN码或QR码,用于一般安全设备
  3. S2 Access Control:最高安全级别,用于门锁、报警器等

加密流程

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend

class ZWaveS2Security:
    """Z-Wave S2安全加密实现"""

    def __init__(self, network_key: bytes):
        """
        初始化S2安全
        :param network_key: 16字节网络密钥
        """
        if len(network_key) != 16:
            raise ValueError("Network key must be 16 bytes")
        self.network_key = network_key

    def encrypt_frame(self, plaintext: bytes, sender_nonce: bytes,
                     receiver_nonce: bytes) -> bytes:
        """
        加密S2帧
        :param plaintext: 明文数据
        :param sender_nonce: 发送方随机数(8字节)
        :param receiver_nonce: 接收方随机数(8字节)
        :return: 密文数据
        """
        # 构造IV(Initialization Vector)
        iv = sender_nonce + receiver_nonce  # 16字节

        # 使用AES-128-OFB模式加密
        cipher = Cipher(
            algorithms.AES(self.network_key),
            modes.OFB(iv),
            backend=default_backend()
        )
        encryptor = cipher.encryptor()
        ciphertext = encryptor.update(plaintext) + encryptor.finalize()

        return ciphertext

    def decrypt_frame(self, ciphertext: bytes, sender_nonce: bytes,
                     receiver_nonce: bytes) -> bytes:
        """
        解密S2帧
        """
        iv = sender_nonce + receiver_nonce

        cipher = Cipher(
            algorithms.AES(self.network_key),
            modes.OFB(iv),
            backend=default_backend()
        )
        decryptor = cipher.decryptor()
        plaintext = decryptor.update(ciphertext) + decryptor.finalize()

        return plaintext

    def generate_message_auth_code(self, frame: bytes) -> bytes:
        """
        生成消息认证码(MAC)
        使用AES-CMAC
        """
        from cryptography.hazmat.primitives import cmac

        c = cmac.CMAC(algorithms.AES(self.network_key), backend=default_backend())
        c.update(frame)
        mac = c.finalize()

        return mac[:8]  # 取前8字节

    def verify_message_auth_code(self, frame: bytes, received_mac: bytes) -> bool:
        """验证消息认证码"""
        calculated_mac = self.generate_message_auth_code(frame)
        return calculated_mac == received_mac

# 使用示例
if __name__ == "__main__":
    # 生成网络密钥(实际应用中应安全存储)
    network_key = os.urandom(16)
    security = ZWaveS2Security(network_key)

    # 原始命令
    command = b'\x25\x01\xFF'  # SWITCH_BINARY SET ON

    # 生成随机数
    sender_nonce = os.urandom(8)
    receiver_nonce = os.urandom(8)

    # 加密
    encrypted = security.encrypt_frame(command, sender_nonce, receiver_nonce)
    print(f"Encrypted: {encrypted.hex()}")

    # 生成MAC
    mac = security.generate_message_auth_code(encrypted)
    print(f"MAC: {mac.hex()}")

    # 解密
    decrypted = security.decrypt_frame(encrypted, sender_nonce, receiver_nonce)
    print(f"Decrypted: {decrypted.hex()}")

    # 验证
    is_valid = security.verify_message_auth_code(encrypted, mac)
    print(f"MAC Valid: {is_valid}")

三、实战开发指南

3.1 OpenZWave库开发

OpenZWave是开源的Z-Wave协议栈实现,支持C++/Python/Node.js。

环境配置

# 安装依赖
sudo apt-get install libudev-dev

# 编译OpenZWave
git clone https://github.com/OpenZWave/open-zwave.git
cd open-zwave
make
sudo make install

# 安装Python绑定
pip install python-openzwave

Python控制器示例

#!/usr/bin/env python3
"""
Z-Wave控制器示例
"""

from openzwave.network import ZWaveNetwork
from openzwave.option import ZWaveOption

logging.basicConfig(level=logging.INFO)

class ZWaveController:
    """Z-Wave控制器"""

    def __init__(self, device="/dev/ttyUSB0", config_path="/usr/local/etc/openzwave"):
        """
        初始化控制器
        :param device: Z-Wave USB控制器设备路径
        :param config_path: OpenZWave配置文件路径
        """
        self.device = device
        self.config_path = config_path
        self.network = None

    def start(self):
        """启动网络"""
        print(f"Initializing Z-Wave network on {self.device}...")

        # 配置选项
        options = ZWaveOption(
            self.device,
            config_path=self.config_path,
            user_path=".",
            cmd_line=""
        )
        options.set_log_file("OZW_Log.log")
        options.set_append_log_file(False)
        options.set_console_output(False)
        options.set_save_log_level('Info')
        options.set_logging(False)
        options.lock()

        # 创建网络
        self.network = ZWaveNetwork(options, autostart=False)

        # 注册事件监听
        self.network.add_observer(self._network_ready, self.network.SIGNAL_NETWORK_READY)
        self.network.add_observer(self._node_added, self.network.SIGNAL_NODE_ADDED)
        self.network.add_observer(self._value_changed, self.network.SIGNAL_VALUE_CHANGED)

        # 启动网络
        self.network.start()

        print("Waiting for network to be ready...")
        for i in range(0, 300):
            if self.network.state >= self.network.STATE_READY:
                print("Network is ready!")
                break
            time.sleep(1.0)

        if self.network.state < self.network.STATE_READY:
            print("Network initialization failed!")
            return False

        return True

    def _network_ready(self, network):
        """网络就绪回调"""
        print(f"Network HomeID: {network.home_id_str}")
        print(f"Controller Node ID: {network.controller.node_id}")
        print(f"Total Nodes: {network.nodes_count}")

    def _node_added(self, network, node):
        """节点加入回调"""
        print(f"Node {node.node_id} added: {node.product_name}")

    def _value_changed(self, network, node, value):
        """节点值变化回调"""
        print(f"Node {node.node_id} value changed: {value.label} = {value.data}")

    def list_nodes(self):
        """列出所有节点"""
        if not self.network:
            print("Network not started!")
            return

        print("\n" + "="*60)
        print("Z-Wave Network Nodes")
        print("="*60)

        for node_id in self.network.nodes:
            node = self.network.nodes[node_id]
            print(f"\nNode {node_id}:")
            print(f"  Product: {node.product_name}")
            print(f"  Manufacturer: {node.manufacturer_name}")
            print(f"  Type: {node.type}")
            print(f"  Is Awake: {node.is_awake}")
            print(f"  Is Failed: {node.is_failed}")
            print(f"  Battery Level: {node.battery_level}%")

            # 列出Command Classes
            print(f"  Command Classes:")
            for cls_id in node.command_classes:
                cls = node.command_classes[cls_id]
                print(f"    - {cls.class_desc} (0x{cls_id:02x})")

    def get_node_values(self, node_id: int):
        """获取节点的所有值"""
        if node_id not in self.network.nodes:
            print(f"Node {node_id} not found!")
            return

        node = self.network.nodes[node_id]
        print(f"\nValues for Node {node_id} ({node.product_name}):")
        print("="*60)

        for value_id in node.values:
            value = node.values[value_id]
            print(f"\n  {value.label}:")
            print(f"    Value: {value.data}")
            print(f"    Units: {value.units}")
            print(f"    Genre: {value.genre}")
            print(f"    Type: {value.type}")
            print(f"    Read Only: {value.is_read_only}")

    def set_switch(self, node_id: int, state: bool):
        """
        控制开关
        :param node_id: 节点ID
        :param state: True=开,False=关
        """
        if node_id not in self.network.nodes:
            print(f"Node {node_id} not found!")
            return False

        node = self.network.nodes[node_id]

        # 查找开关值
        for value_id in node.get_switches():
            value = node.values[value_id]
            value.data = state
            print(f"Node {node_id} switch set to: {'ON' if state else 'OFF'}")
            return True

        print(f"Node {node_id} has no switch!")
        return False

    def set_dimmer(self, node_id: int, level: int):
        """
        设置调光器亮度
        :param node_id: 节点ID
        :param level: 亮度0-99
        """
        if node_id not in self.network.nodes:
            print(f"Node {node_id} not found!")
            return False

        if not 0 <= level <= 99:
            print("Level must be between 0 and 99!")
            return False

        node = self.network.nodes[node_id]

        # 查找调光值
        for value_id in node.get_dimmers():
            value = node.values[value_id]
            value.data = level
            print(f"Node {node_id} dimmer set to: {level}%")
            return True

        print(f"Node {node_id} has no dimmer!")
        return False

    def add_node(self, timeout=30):
        """
        添加新节点(设备配对)
        :param timeout: 超时时间(秒)
        """
        print("Starting inclusion mode...")
        print("Please press the inclusion button on your Z-Wave device...")

        self.network.controller.add_node()

        # 等待节点加入
        start_time = time.time()
        while time.time() - start_time < timeout:
            if self.network.controller.is_primary_controller:
                time.sleep(1)
            else:
                break

        self.network.controller.cancel_command()
        print("Inclusion mode stopped.")

    def remove_node(self, timeout=30):
        """
        移除节点(设备解配对)
        """
        print("Starting exclusion mode...")
        print("Please press the exclusion button on your Z-Wave device...")

        self.network.controller.remove_node()

        start_time = time.time()
        while time.time() - start_time < timeout:
            time.sleep(1)

        self.network.controller.cancel_command()
        print("Exclusion mode stopped.")

    def heal_network(self):
        """修复网络(重新计算路由)"""
        print("Healing network...")
        self.network.heal()
        print("Network healing started. This may take several minutes.")

    def stop(self):
        """停止网络"""
        if self.network:
            self.network.stop()
            print("Network stopped.")

# 使用示例
if __name__ == "__main__":
    controller = ZWaveController(device="/dev/ttyUSB0")

    if controller.start():
        time.sleep(5)  # 等待网络初始化

        # 列出所有节点
        controller.list_nodes()

        # 获取节点2的值
        controller.get_node_values(node_id=2)

        # 控制节点2的开关
        controller.set_switch(node_id=2, state=True)
        time.sleep(2)
        controller.set_switch(node_id=2, state=False)

        # 设置节点3的调光器
        controller.set_dimmer(node_id=3, level=50)

        # 保持运行
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            pass

    controller.stop()

3.2 Z-Way平台开发

Z-Way是Razberry和UZB控制器的官方平台,提供REST API和JavaScript引擎。

REST API示例

#!/usr/bin/env python3
"""
Z-Way REST API客户端
"""

from typing import Dict, List

class ZWayClient:
    """Z-Way API客户端"""

    def __init__(self, host: str = "localhost", port: int = 8083,
                 username: str = "admin", password: str = "admin"):
        """
        初始化客户端
        :param host: Z-Way服务器地址
        :param port: Z-Way服务器端口
        :param username: 用户名
        :param password: 密码
        """
        self.base_url = f"http://{host}:{port}"
        self.username = username
        self.password = password
        self.session = requests.Session()
        self.login()

    def login(self):
        """登录"""
        url = f"{self.base_url}/ZAutomation/api/v1/login"
        data = {
            "login": self.username,
            "password": self.password
        }

        response = self.session.post(url, json=data)
        if response.status_code == 200:
            result = response.json()
            self.session.headers.update({
                "ZWAYSession": result["data"]["sid"]
            })
            print("Login successful!")
        else:
            print(f"Login failed: {response.status_code}")

    def get_devices(self) -> List[Dict]:
        """获取所有设备"""
        url = f"{self.base_url}/ZAutomation/api/v1/devices"
        response = self.session.get(url)

        if response.status_code == 200:
            result = response.json()
            return result["data"]["devices"]
        else:
            print(f"Failed to get devices: {response.status_code}")
            return []

    def get_device(self, device_id: str) -> Dict:
        """获取单个设备信息"""
        url = f"{self.base_url}/ZAutomation/api/v1/devices/{device_id}"
        response = self.session.get(url)

        if response.status_code == 200:
            result = response.json()
            return result["data"]
        else:
            print(f"Failed to get device {device_id}: {response.status_code}")
            return {}

    def send_command(self, device_id: str, command: str) -> bool:
        """
        发送命令到设备
        :param device_id: 设备ID
        :param command: 命令(on/off/exact?level=50等)
        """
        url = f"{self.base_url}/ZAutomation/api/v1/devices/{device_id}/command/{command}"
        response = self.session.get(url)

        if response.status_code == 200:
            print(f"Command '{command}' sent to device {device_id}")
            return True
        else:
            print(f"Failed to send command: {response.status_code}")
            return False

    def set_switch(self, device_id: str, state: bool):
        """控制开关"""
        command = "on" if state else "off"
        return self.send_command(device_id, command)

    def set_dimmer(self, device_id: str, level: int):
        """设置调光器"""
        if not 0 <= level <= 99:
            print("Level must be between 0 and 99!")
            return False
        return self.send_command(device_id, f"exact?level={level}")

    def get_sensor_data(self, device_id: str) -> float:
        """获取传感器数据"""
        device = self.get_device(device_id)
        if device and "metrics" in device:
            return device["metrics"].get("level", None)
        return None

    def list_devices(self):
        """列出所有设备"""
        devices = self.get_devices()

        print("\n" + "="*60)
        print("Z-Way Devices")
        print("="*60)

        for device in devices:
            print(f"\nDevice ID: {device['id']}")
            print(f"  Title: {device['metrics']['title']}")
            print(f"  Type: {device['deviceType']}")
            print(f"  Level: {device['metrics'].get('level', 'N/A')}")
            print(f"  Room: {device.get('location', 'N/A')}")

# 使用示例
if __name__ == "__main__":
    client = ZWayClient(host="192.168.1.100", port=8083)

    # 列出所有设备
    client.list_devices()

    # 控制开关
    client.set_switch(device_id="ZWayVDev_zway_2-0-37", state=True)

    # 设置调光器
    client.set_dimmer(device_id="ZWayVDev_zway_3-0-38", level=75)

    # 读取传感器数据
    temperature = client.get_sensor_data(device_id="ZWayVDev_zway_4-0-49-1")
    print(f"Temperature: {temperature}°C")

3.3 Home Assistant集成

Home Assistant是流行的开源智能家居平台,内置Z-Wave支持。

配置文件(configuration.yaml)

# Z-Wave集成配置
zwave:
  usb_path: /dev/ttyUSB0
  network_key: "0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10"
  polling_interval: 60000
  debug: false

# 自动化示例:运动检测开灯
automation:
  - alias: "Motion Light On"
    trigger:
      platform: state
      entity_id: binary_sensor.motion_sensor_2
      to: 'on'
    action:
      service: light.turn_on
      target:
        entity_id: light.living_room_3
      data:
        brightness: 255

  - alias: "Motion Light Off"
    trigger:
      platform: state
      entity_id: binary_sensor.motion_sensor_2
      to: 'off'
      for:
        minutes: 5
    action:
      service: light.turn_off
      target:
        entity_id: light.living_room_3

# 场景示例:离家模式
scene:
  - name: "Away Mode"
    entities:
      light.living_room_3:
        state: off
      lock.front_door_5:
        state: locked
      switch.tv_power_7:
        state: off

Python自动化脚本

#!/usr/bin/env python3
"""
Home Assistant Z-Wave自动化脚本
"""

from datetime import datetime

class HomeAssistantZWave:
    """Home Assistant Z-Wave控制"""

    def __init__(self, host: str, token: str):
        """
        初始化客户端
        :param host: Home Assistant地址(如http://192.168.1.100:8123)
        :param token: Long-Lived Access Token
        """
        self.host = host
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }

    def call_service(self, domain: str, service: str,
                    entity_id: str, **kwargs) -> bool:
        """
        调用服务
        :param domain: 域(如light、switch、lock)
        :param service: 服务(如turn_on、turn_off)
        :param entity_id: 实体ID
        """
        url = f"{self.host}/api/services/{domain}/{service}"
        data = {"entity_id": entity_id}
        data.update(kwargs)

        response = requests.post(url, headers=self.headers, json=data)
        return response.status_code == 200

    def get_state(self, entity_id: str) -> dict:
        """获取实体状态"""
        url = f"{self.host}/api/states/{entity_id}"
        response = requests.get(url, headers=self.headers)

        if response.status_code == 200:
            return response.json()
        return {}

    def turn_on_light(self, entity_id: str, brightness: int = 255,
                     color_temp: int = None):
        """开灯"""
        kwargs = {"brightness": brightness}
        if color_temp:
            kwargs["color_temp"] = color_temp

        return self.call_service("light", "turn_on", entity_id, **kwargs)

    def turn_off_light(self, entity_id: str):
        """关灯"""
        return self.call_service("light", "turn_off", entity_id)

    def lock_door(self, entity_id: str):
        """锁门"""
        return self.call_service("lock", "lock", entity_id)

    def unlock_door(self, entity_id: str):
        """开锁"""
        return self.call_service("lock", "unlock", entity_id)

    def get_sensor_value(self, entity_id: str) -> float:
        """获取传感器数值"""
        state = self.get_state(entity_id)
        if state and "state" in state:
            try:
                return float(state["state"])
            except ValueError:
                return None
        return None

    def create_scene(self, scene_name: str):
        """创建场景"""
        return self.call_service("scene", "create", f"scene.{scene_name}")

    def activate_scene(self, scene_name: str):
        """激活场景"""
        return self.call_service("scene", "turn_on", f"scene.{scene_name}")

# 使用示例
if __name__ == "__main__":
    ha = HomeAssistantZWave(
        host="http://192.168.1.100:8123",
        token="YOUR_LONG_LIVED_ACCESS_TOKEN"
    )

    # 开灯
    ha.turn_on_light("light.living_room_3", brightness=200)

    # 锁门
    ha.lock_door("lock.front_door_5")

    # 读取温度
    temperature = ha.get_sensor_value("sensor.temperature_sensor_4")
    print(f"Current temperature: {temperature}°C")

    # 激活场景
    ha.activate_scene("away_mode")

四、行业案例分析

案例1:高端公寓智能家居系统

项目背景:某房地产开发商在500套高端公寓中预装Z-Wave智能家居系统,覆盖照明、空调、窗帘、门锁等设备。

技术方案

  • 控制器:每户1个Z-Wave Plus控制器(Silicon Labs ZGM230)
  • 节点数量:每户平均40个Z-Wave设备
  • 设备类型:调光灯(15个)、开关(10个)、窗帘电机(5个)、温控器(3个)、门锁(2个)、传感器(5个)
  • 集成平台:Crestron Home + Z-Wave集成模块
  • 安全级别:门锁使用S2 Access Control,其他设备使用S2 Authenticated

实施效果

  • 设备互操作性:100%(所有设备通过Z-Wave Plus认证)
  • 用户满意度:92%
  • 故障率:<1%(24个月运行期)
  • 平均响应延迟:<200ms

关键代码(Crestron SIMPL+集成):

// Crestron SIMPL+ Z-Wave控制模块
#SYMBOL_NAME "ZWave Controller"
#CATEGORY "Control"

DIGITAL_INPUT _SKIP_, _SKIP_, On, Off;
ANALOG_INPUT _SKIP_, _SKIP_, _SKIP_, Brightness;
DIGITAL_OUTPUT Device_On_FB, Device_Off_FB;
ANALOG_OUTPUT Brightness_FB;

STRING_PARAMETER Device_ID[20];

INTEGER node_id;
INTEGER last_brightness;

FUNCTION Initialize()
{
    // 解析设备ID
    node_id = ATOI(Device_ID);
    last_brightness = 0;
}

PUSH On
{
    // 发送开灯命令
    ZWaveSendCommand(node_id, 0x25, 0x01, 0xFF); // SWITCH_BINARY SET ON
    Device_On_FB = 1;
    Device_Off_FB = 0;
}

PUSH Off
{
    // 发送关灯命令
    ZWaveSendCommand(node_id, 0x25, 0x01, 0x00); // SWITCH_BINARY SET OFF
    Device_On_FB = 0;
    Device_Off_FB = 1;
}

CHANGE Brightness
{
    INTEGER level;
    level = Brightness * 99 / 65535; // 转换为0-99

    if (level != last_brightness)
    {
        // 发送调光命令
        ZWaveSendCommand(node_id, 0x26, 0x01, level); // SWITCH_MULTILEVEL SET
        last_brightness = level;
        Brightness_FB = Brightness;

        if (level > 0)
        {
            Device_On_FB = 1;
            Device_Off_FB = 0;
        }
        else
        {
            Device_On_FB = 0;
            Device_Off_FB = 1;
        }
    }
}

FUNCTION Main()
{
    WaitForInitializationComplete();
    Initialize();
}

案例2:酒店客房智能控制

项目背景:某五星级酒店在200间客房部署Z-Wave智能控制系统,实现客房自动化和能源管理。

系统架构

┌─────────────────────────────────────────────┐
│        Hotel Management System (PMS)        │
│         (客房管理、计费系统)                 │
└──────────────────┬──────────────────────────┘
                   │ REST API
┌──────────────────┴──────────────────────────┐
│       Central Z-Wave Gateway (x5)           │
│       (每层楼1个,覆盖40间客房)              │
└──────────────────┬──────────────────────────┘
                   │ Z-Wave Mesh
    ┌──────────────┼──────────────┐
    │              │              │
┌───┴──────┐  ┌───┴──────┐  ┌───┴──────┐
│ Room 101 │  │ Room 102 │  │ Room 103 │
│  (15设备) │  │  (15设备) │  │  (15设备) │
└──────────┘  └──────────┘  └──────────┘

设备配置(每间客房)

  • 调光灯:6个(主灯、床头灯、卫生间灯等)
  • 窗帘电机:2个(主窗、阳台门)
  • 温控器:1个
  • 门磁:1个(入口门)
  • PIR传感器:2个(客厅、卫生间)
  • 智能插座:2个(电视、迷你吧)
  • 门铃:1个

场景控制

#!/usr/bin/env python3
"""
酒店客房场景控制系统
"""

from typing import List, Dict

class HotelRoomController:
    """酒店客房控制器"""

    def __init__(self, room_number: str, zwave_controller):
        self.room_number = room_number
        self.zwave = zwave_controller
        self.devices = {}
        self.current_scene = None

    def register_device(self, device_name: str, node_id: int, device_type: str):
        """注册设备"""
        self.devices[device_name] = {
            'node_id': node_id,
            'type': device_type
        }

    async def welcome_scene(self):
        """欢迎场景:客人入住"""
        print(f"Activating Welcome Scene for Room {self.room_number}")

        tasks = [
            # 主灯开启(70%亮度)
            self.zwave.set_dimmer(self.devices['main_light']['node_id'], 70),
            # 床头灯开启(50%亮度)
            self.zwave.set_dimmer(self.devices['bedside_lamp']['node_id'], 50),
            # 窗帘打开50%
            self.zwave.set_dimmer(self.devices['curtain']['node_id'], 50),
            # 温度设置为22°C
            self.zwave.set_thermostat(self.devices['thermostat']['node_id'], 22)
        ]

        await asyncio.gather(*tasks)
        self.current_scene = "welcome"

    async def sleep_scene(self):
        """睡眠场景"""
        print(f"Activating Sleep Scene for Room {self.room_number}")

        tasks = [
            # 关闭所有灯
            self.zwave.set_switch(self.devices['main_light']['node_id'], False),
            self.zwave.set_switch(self.devices['bedside_lamp']['node_id'], False),
            # 床头灯调至10%(夜灯模式)
            self.zwave.set_dimmer(self.devices['bedside_lamp']['node_id'], 10),
            # 窗帘完全关闭
            self.zwave.set_dimmer(self.devices['curtain']['node_id'], 0),
            # 温度设置为20°C
            self.zwave.set_thermostat(self.devices['thermostat']['node_id'], 20)
        ]

        await asyncio.gather(*tasks)
        self.current_scene = "sleep"

    async def away_scene(self):
        """离开场景:客人外出"""
        print(f"Activating Away Scene for Room {self.room_number}")

        tasks = [
            # 关闭所有灯
            self.zwave.set_switch(self.devices['main_light']['node_id'], False),
            self.zwave.set_switch(self.devices['bedside_lamp']['node_id'], False),
            self.zwave.set_switch(self.devices['bathroom_light']['node_id'], False),
            # 关闭电视插座
            self.zwave.set_switch(self.devices['tv_outlet']['node_id'], False),
            # 温度设置为26°C(节能模式)
            self.zwave.set_thermostat(self.devices['thermostat']['node_id'], 26)
        ]

        await asyncio.gather(*tasks)
        self.current_scene = "away"

    async def checkout_scene(self):
        """退房场景"""
        print(f"Activating Checkout Scene for Room {self.room_number}")

        tasks = [
            # 关闭所有设备
            self.zwave.set_switch(device['node_id'], False)
            for device in self.devices.values()
            if device['type'] in ['light', 'switch', 'outlet']
        ]

        # 温度设置为关闭或节能模式
        tasks.append(
            self.zwave.set_thermostat(self.devices['thermostat']['node_id'], 28)
        )

        await asyncio.gather(*tasks)
        self.current_scene = "checkout"

    async def handle_motion_event(self, sensor_name: str, motion_detected: bool):
        """处理运动检测事件"""
        if motion_detected and self.current_scene == "away":
            # 检测到运动且处于离开模式,可能客人回来了
            await self.welcome_scene()
        elif not motion_detected and self.current_scene == "welcome":
            # 5分钟无运动,可能客人外出
            await asyncio.sleep(300)
            # 再次检查
            if not self.check_any_motion():
                await self.away_scene()

    def check_any_motion(self) -> bool:
        """检查是否有任何运动传感器触发"""
        # 实际实现中应查询所有PIR传感器状态
        return False

class HotelManagementSystem:
    """酒店管理系统"""

    def __init__(self):
        self.rooms = {}

    def register_room(self, room_number: str, controller: HotelRoomController):
        """注册客房"""
        self.rooms[room_number] = controller

    async def guest_checkin(self, room_number: str):
        """客人入住"""
        if room_number in self.rooms:
            await self.rooms[room_number].welcome_scene()
            print(f"Guest checked in to Room {room_number}")

    async def guest_checkout(self, room_number: str):
        """客人退房"""
        if room_number in self.rooms:
            await self.rooms[room_number].checkout_scene()
            print(f"Guest checked out from Room {room_number}")

    async def dnd_mode(self, room_number: str, enabled: bool):
        """勿扰模式"""
        # 实际实现中应禁用门铃、降低传感器灵敏度等
        print(f"Room {room_number} DND mode: {'ON' if enabled else 'OFF'}")

    def get_room_status(self, room_number: str) -> Dict:
        """获取客房状态"""
        if room_number in self.rooms:
            room = self.rooms[room_number]
            return {
                'room_number': room_number,
                'current_scene': room.current_scene,
                'devices': len(room.devices)
            }
        return {}

# 使用示例
if __name__ == "__main__":
    # 初始化系统
    hms = HotelManagementSystem()

    # 注册客房
    room_controller = HotelRoomController("101", zwave_controller=None)  # 实际应传入真实控制器
    room_controller.register_device("main_light", node_id=2, device_type="light")
    room_controller.register_device("bedside_lamp", node_id=3, device_type="light")
    room_controller.register_device("curtain", node_id=4, device_type="dimmer")
    room_controller.register_device("thermostat", node_id=5, device_type="thermostat")

    hms.register_room("101", room_controller)

    # 模拟入住流程
    asyncio.run(hms.guest_checkin("101"))

实施效果

  • 能源节约:28%(对比传统客房)
  • 客户满意度提升:15%
  • 运维成本降低:40%
  • 设备故障率:<0.5%

案例3:养老院健康监护系统

项目背景:某养老院为100名老人部署基于Z-Wave的健康监护系统。

系统功能

  • 跌倒检测:PIR传感器+紧急按钮
  • 活动监测:门磁传感器追踪房间进出
  • 环境监测:温湿度传感器
  • 自动照明:夜间低亮度自动开灯
  • 紧急呼叫:一键求助按钮

Python监控系统

#!/usr/bin/env python3
"""
养老院健康监护系统
"""

from datetime import datetime, timedelta
from typing import Dict, List
from email.mime.text import MIMEText

class ElderCareMonitor:
    """老人健康监护"""

    def __init__(self, elder_name: str, room_number: str):
        self.elder_name = elder_name
        self.room_number = room_number
        self.last_activity_time = datetime.now()
        self.activity_log = []
        self.alerts = []

    def record_activity(self, activity_type: str, sensor_id: int):
        """记录活动"""
        timestamp = datetime.now()
        self.last_activity_time = timestamp

        self.activity_log.append({
            'timestamp': timestamp,
            'type': activity_type,
            'sensor_id': sensor_id
        })

        print(f"[{timestamp}] {self.elder_name} - {activity_type}")

    def check_inactivity(self, threshold_hours: int = 12) -> bool:
        """检查是否长时间无活动"""
        time_since_activity = datetime.now() - self.last_activity_time

        if time_since_activity > timedelta(hours=threshold_hours):
            alert = {
                'timestamp': datetime.now(),
                'type': 'INACTIVITY_ALERT',
                'duration': time_since_activity,
                'elder': self.elder_name,
                'room': self.room_number
            }
            self.alerts.append(alert)
            return True

        return False

    def emergency_alert(self, sensor_id: int):
        """紧急呼叫告警"""
        alert = {
            'timestamp': datetime.now(),
            'type': 'EMERGENCY_CALL',
            'sensor_id': sensor_id,
            'elder': self.elder_name,
            'room': self.room_number,
            'priority': 'HIGH'
        }
        self.alerts.append(alert)
        print(f"⚠️  EMERGENCY ALERT - {self.elder_name} in Room {self.room_number}")

    def fall_detection_alert(self, sensor_id: int):
        """跌倒检测告警"""
        alert = {
            'timestamp': datetime.now(),
            'type': 'FALL_DETECTED',
            'sensor_id': sensor_id,
            'elder': self.elder_name,
            'room': self.room_number,
            'priority': 'CRITICAL'
        }
        self.alerts.append(alert)
        print(f"🚨 FALL DETECTED - {self.elder_name} in Room {self.room_number}")

    def get_daily_activity_report(self) -> Dict:
        """生成每日活动报告"""
        today = datetime.now().date()
        today_activities = [
            a for a in self.activity_log
            if a['timestamp'].date() == today
        ]

        return {
            'elder': self.elder_name,
            'room': self.room_number,
            'date': today.isoformat(),
            'total_activities': len(today_activities),
            'activity_types': {
                activity_type: len([a for a in today_activities if a['type'] == activity_type])
                for activity_type in set(a['type'] for a in today_activities)
            },
            'alerts': [a for a in self.alerts if a['timestamp'].date() == today]
        }

class NursingHomeCareSystem:
    """养老院监护系统"""

    def __init__(self, zwave_controller):
        self.zwave = zwave_controller
        self.monitors = {}
        self.nurses = []

    def register_elder(self, elder_name: str, room_number: str,
                      sensors: Dict[str, int]):
        """注册老人及其传感器"""
        monitor = ElderCareMonitor(elder_name, room_number)
        self.monitors[room_number] = {
            'monitor': monitor,
            'sensors': sensors
        }

    def handle_sensor_event(self, sensor_id: int, event_type: str, value: any):
        """处理传感器事件"""
        # 查找对应的老人
        for room, data in self.monitors.items():
            sensors = data['sensors']
            monitor = data['monitor']

            if sensor_id in sensors.values():
                sensor_name = [k for k, v in sensors.items() if v == sensor_id][0]

                if sensor_name == 'emergency_button' and event_type == 'pressed':
                    monitor.emergency_alert(sensor_id)
                    self.notify_nurses(monitor.alerts[-1])

                elif sensor_name == 'fall_sensor' and event_type == 'triggered':
                    monitor.fall_detection_alert(sensor_id)
                    self.notify_nurses(monitor.alerts[-1])

                elif sensor_name == 'door_sensor':
                    if event_type == 'opened':
                        monitor.record_activity('Door Opened', sensor_id)
                    elif event_type == 'closed':
                        monitor.record_activity('Door Closed', sensor_id)

                elif sensor_name == 'motion_sensor' and event_type == 'motion_detected':
                    monitor.record_activity('Motion Detected', sensor_id)

                    # 夜间自动开灯
                    hour = datetime.now().hour
                    if 0 <= hour < 6:
                        # 开启夜灯(10%亮度)
                        light_node_id = sensors.get('night_light')
                        if light_node_id:
                            self.zwave.set_dimmer(light_node_id, 10)

    def notify_nurses(self, alert: Dict):
        """通知护士"""
        print(f"\n📢 Notifying nurses about alert:")
        print(f"   Type: {alert['type']}")
        print(f"   Elder: {alert['elder']}")
        print(f"   Room: {alert['room']}")
        print(f"   Time: {alert['timestamp']}")

        # 发送邮件通知(示例)
        self.send_email_alert(alert)

        # 发送短信通知(示例)
        # self.send_sms_alert(alert)

    def send_email_alert(self, alert: Dict):
        """发送邮件告警"""
        msg = MIMEText(f"""
Alert Type: {alert['type']}
Elder: {alert['elder']}
Room: {alert['room']}
Time: {alert['timestamp']}
Priority: {alert.get('priority', 'NORMAL')}

Please respond immediately!
        """)

        msg['Subject'] = f"[{alert.get('priority', 'NORMAL')}] Alert - {alert['type']}"
        msg['From'] = 'monitor@nursinghome.com'
        msg['To'] = ', '.join(self.nurses)

        # 发送邮件(需配置SMTP服务器)
        try:
            with smtplib.SMTP('smtp.example.com', 587) as server:
                server.starttls()
                server.login('username', 'password')
                server.send_message(msg)
            print("✉️  Email alert sent successfully")
        except Exception as e:
            print(f"Failed to send email: {e}")

    async def check_all_inactivity(self):
        """定期检查所有老人的活动状态"""
        while True:
            for room, data in self.monitors.items():
                monitor = data['monitor']
                if monitor.check_inactivity(threshold_hours=8):
                    self.notify_nurses(monitor.alerts[-1])

            await asyncio.sleep(3600)  # 每小时检查一次

    def generate_daily_reports(self):
        """生成每日报告"""
        print("\n" + "="*60)
        print("Daily Activity Reports")
        print("="*60)

        for room, data in self.monitors.items():
            monitor = data['monitor']
            report = monitor.get_daily_activity_report()

            print(f"\nRoom {report['room']} - {report['elder']}")
            print(f"  Total Activities: {report['total_activities']}")
            print(f"  Activity Breakdown:")
            for activity_type, count in report['activity_types'].items():
                print(f"    - {activity_type}: {count}")
            print(f"  Alerts: {len(report['alerts'])}")

        print("="*60)

# 使用示例
if __name__ == "__main__":
    system = NursingHomeCareSystem(zwave_controller=None)  # 实际应传入真实控制器

    # 注册老人
    system.register_elder(
        elder_name="Zhang Wei",
        room_number="201",
        sensors={
            'emergency_button': 10,
            'fall_sensor': 11,
            'door_sensor': 12,
            'motion_sensor': 13,
            'night_light': 14
        }
    )

    system.nurses = ['nurse1@example.com', 'nurse2@example.com']

    # 模拟传感器事件
    system.handle_sensor_event(13, 'motion_detected', True)
    system.handle_sensor_event(12, 'opened', None)
    system.handle_sensor_event(10, 'pressed', True)  # 紧急呼叫

    # 生成每日报告
    system.generate_daily_reports()

实施效果

  • 紧急响应时间:从平均15分钟降至<3分钟
  • 跌倒检出率:97%
  • 误报率:<5%
  • 家属满意度:96%

案例4:商业建筑能源管理

项目背景:某办公大楼使用Z-Wave网络管理3,000个照明设备,实现智能照明和能源节约。

实施效果

  • 能源消耗降低:42%
  • 投资回收期:2.3年
  • 维护成本降低:55%
  • 员工满意度提升:18%

案例5:仓库自动化照明

项目背景:某物流公司在50,000平方米仓库部署Z-Wave智能照明,根据人员活动自动控制。

实施效果

  • 照明能耗降低:67%
  • 灯具寿命延长:3倍(减少频繁开关)
  • 年度节约:$120,000
  • 碳排放减少:180吨/年

五、性能优化技巧

5.1 网络拓扑优化

最佳实践

  1. 在关键路径上部署常供电的路由节点
  2. 避免"瓶颈节点"(所有流量都经过的单一节点)
  3. 保持网络密度(每个节点至少3个邻居)
  4. 定期执行网络修复(Heal)操作

5.2 降低功耗

电池供电设备优化

/* Z-Wave电池设备功耗优化 */

/* 1. 使用FLiRS(Frequently Listening Routing Slave) */
void configure_flirs_mode(void) {
    // FLiRS设备每1秒唤醒250ms
    // 比传统设备快很多,但功耗略高
    zw_set_listen_before_talk_threshold(250);
}

/* 2. 优化唤醒间隔 */
void set_wakeup_interval(uint32_t interval_seconds) {
    // 传感器可设置较长间隔(如3600秒=1小时)
    // 门锁等需快速响应的设备设置较短间隔(如300秒=5分钟)
    zw_send_command_class(
        COMMAND_CLASS_WAKE_UP,
        WAKE_UP_INTERVAL_SET,
        interval_seconds
    );
}

/* 3. 批量发送数据 */
void batch_sensor_data(void) {
    // 累积多个传感器读数,一次性发送
    static sensor_reading_t buffer[10];
    static int buffer_count = 0;

    sensor_reading_t reading = read_sensor();
    buffer[buffer_count++] = reading;

    if (buffer_count >= 10) {
        // 发送批量数据
        send_sensor_batch(buffer, buffer_count);
        buffer_count = 0;
    }
}

5.3 提高响应速度

Beaming优化

def enable_beaming(node_id: int):
    """
    启用Beaming技术
    Beaming允许控制器直接唤醒睡眠设备,无需等待设备定期唤醒
    """
    # 设置节点为Beaming目标
    zwave.set_node_info(
        node_id,
        listening=False,
        routing=False,
        beaming=True,  # 启用Beaming
        frequent=False
    )

    print(f"Beaming enabled for node {node_id}")
    print("Device can now be woken up on-demand by controller")

5.4 安全性能平衡

S2安全级别选择

设备类型 推荐安全级别 理由
门锁 S2 Access Control 最高安全需求
报警器 S2 Authenticated 中高安全需求
传感器 S2 Unauthenticated 数据非敏感,优先考虑性能
照明 S2 Unauthenticated 不涉及隐私,性能优先

六、常见问题排查

6.1 设备无法入网

排查步骤

#!/bin/bash
# Z-Wave设备入网诊断脚本

echo "Z-Wave Device Inclusion Troubleshooting"
echo "========================================="
echo

# 1. 检查控制器状态
echo "[1] Checking controller status..."
zw-cli status
echo

# 2. 检查控制器是否处于Inclusion模式
echo "[2] Checking inclusion mode..."
zw-cli get-inclusion-state
echo

# 3. 检查网络容量
echo "[3] Checking network capacity..."
NODE_COUNT=$(zw-cli list-nodes | wc -l)
echo "Current nodes: $NODE_COUNT / 232"
if [ $NODE_COUNT -ge 232 ]; then
    echo "⚠️  Network is full! Cannot add more devices."
fi
echo

# 4. 检查设备是否已在网络中
echo "[4] Checking if device is already included..."
zw-cli list-nodes
echo

# 5. 检查信号强度
echo "[5] Checking signal strength..."
zw-cli get-rssi
echo

echo "Troubleshooting Steps:"
echo "1. Ensure device is in exclusion mode (if previously included)"
echo "2. Move device closer to controller (within 1-2 meters)"
echo "3. Reset device to factory defaults"
echo "4. Start inclusion mode on controller"
echo "5. Trigger device pairing (按照设备说明)"

6.2 通信延迟高

性能测试脚本

#!/usr/bin/env python3
"""
Z-Wave性能测试工具
"""

from typing import List, Dict

class ZWavePerformanceTester:
    """Z-Wave性能测试器"""

    def __init__(self, zwave_controller):
        self.zwave = zwave_controller

    def test_response_time(self, node_id: int, iterations: int = 100) -> Dict:
        """测试响应时间"""
        latencies = []

        print(f"Testing node {node_id} response time ({iterations} iterations)...")

        for i in range(iterations):
            start = time.time()

            # 发送命令
            success = self.zwave.set_switch(node_id, True)

            if success:
                # 等待ACK
                elapsed = (time.time() - start) * 1000  # 转换为毫秒
                latencies.append(elapsed)

            time.sleep(0.1)

        if latencies:
            return {
                'node_id': node_id,
                'samples': len(latencies),
                'min': min(latencies),
                'max': max(latencies),
                'avg': sum(latencies) / len(latencies),
                'success_rate': (len(latencies) / iterations) * 100
            }
        else:
            return {'error': 'No successful responses'}

    def test_throughput(self, node_id: int, duration: int = 60) -> Dict:
        """测试吞吐量"""
        start_time = time.time()
        command_count = 0
        success_count = 0

        print(f"Testing node {node_id} throughput (duration={duration}s)...")

        while time.time() - start_time < duration:
            command_count += 1

            # 交替开关
            state = (command_count % 2 == 0)
            success = self.zwave.set_switch(node_id, state)

            if success:
                success_count += 1

            time.sleep(0.05)  # 50ms间隔

        elapsed = time.time() - start_time

        return {
            'node_id': node_id,
            'duration': elapsed,
            'total_commands': command_count,
            'successful_commands': success_count,
            'commands_per_second': command_count / elapsed,
            'success_rate': (success_count / command_count) * 100
        }

    def test_hop_count(self, source_node: int, dest_node: int) -> int:
        """测试两个节点间的跳数"""
        route = self.zwave.get_route(source_node, dest_node)

        if route:
            hop_count = len(route) - 1
            print(f"Route from {source_node} to {dest_node}: {' -> '.join(map(str, route))}")
            print(f"Hop count: {hop_count}")
            return hop_count
        else:
            print(f"No route found from {source_node} to {dest_node}")
            return -1

    def run_full_test(self, node_id: int):
        """运行完整测试套件"""
        print("\n" + "="*60)
        print(f"Z-Wave Performance Test - Node {node_id}")
        print("="*60)

        # 1. 响应时间测试
        print("\n### Response Time Test ###")
        response_stats = self.test_response_time(node_id, iterations=50)
        for key, value in response_stats.items():
            if isinstance(value, float):
                print(f"  {key}: {value:.2f} ms")
            else:
                print(f"  {key}: {value}")

        # 2. 吞吐量测试
        print("\n### Throughput Test ###")
        throughput_stats = self.test_throughput(node_id, duration=30)
        for key, value in throughput_stats.items():
            if isinstance(value, float):
                print(f"  {key}: {value:.2f}")
            else:
                print(f"  {key}: {value}")

        # 3. 跳数测试
        print("\n### Hop Count Test ###")
        controller_id = 1
        self.test_hop_count(controller_id, node_id)

        print("\n" + "="*60)

# 使用示例
if __name__ == "__main__":
    tester = ZWavePerformanceTester(zwave_controller=None)  # 实际应传入真实控制器
    tester.run_full_test(node_id=5)

6.3 网络风暴

问题:大量设备同时发送状态更新导致网络拥塞。

解决方案

from asyncio import Semaphore

class ZWaveTrafficController:
    """Z-Wave流量控制器"""

    def __init__(self, max_concurrent: int = 5):
        """
        初始化流量控制器
        :param max_concurrent: 最大并发命令数
        """
        self.semaphore = Semaphore(max_concurrent)
        self.command_queue = asyncio.Queue()

    async def send_command_throttled(self, node_id: int, command: callable):
        """
        发送命令(带流量控制)
        """
        async with self.semaphore:
            try:
                result = await asyncio.to_thread(command, node_id)
                return result
            except Exception as e:
                print(f"Command failed for node {node_id}: {e}")
                return None

    async def batch_send(self, commands: list):
        """
        批量发送命令
        """
        tasks = [
            self.send_command_throttled(node_id, cmd)
            for node_id, cmd in commands
        ]

        results = await asyncio.gather(*tasks)
        return results

# 使用示例
async def main():
    controller = ZWaveTrafficController(max_concurrent=5)

    # 100个命令,但最多5个并发
    commands = [
        (i, lambda nid: zwave.set_switch(nid, True))
        for i in range(2, 102)
    ]

    results = await controller.batch_send(commands)
    print(f"Completed {len([r for r in results if r])} / {len(commands)} commands")

asyncio.run(main())

七、技术对比

7.1 Z-Wave vs Zigbee

对比维度 Z-Wave Zigbee
工作频段 Sub-1GHz(868/908/916MHz) 2.4GHz
抗干扰能力 强(避开Wi-Fi/蓝牙) 弱(与Wi-Fi共享频段)
传输距离 30-100m 10-75m
网络规模 232节点 65,535节点
数据速率 9.6-100kbps 250kbps
互操作性 强(Alliance认证) 中等(存在私有实现)
授权费 需支付($3,500首次+年费) 免费
生态系统 中等(3,000+产品) 大(10,000+产品)
主要应用 高端智能家居 大众智能家居

7.2 Z-Wave vs Wi-Fi vs Bluetooth

对比维度 Z-Wave Wi-Fi Bluetooth LE
功耗 低(1-50mW) 高(200-500mW) 极低(0.5-15mW)
电池寿命 1-10年 数天 1-5年
组网能力 Mesh(4跳) 中心化(路由器) Mesh(有限)
安全性 AES-128(S2) WPA2/WPA3 AES-128
成本 中等($5-20/芯片) 高($2-10/模块) 低($1-5/芯片)
带宽 低(100kbps) 高(150Mbps+) 中(1Mbps)
应用场景 家居控制 高带宽传输 可穿戴设备

八、最佳实践

8.1 设备选型建议

原则

  1. 优先选择Z-Wave Plus认证设备(700系列芯片)
  2. 关键设备(门锁)选择支持S2 Access Control的产品
  3. 常供电设备选择支持Beaming的型号
  4. 查看设备兼容性列表(Z-Wave Alliance网站)

8.2 网络规划

步骤

  1. 绘制建筑平面图,标注设备位置
  2. 计算设备密度(建议每10-15平方米1个路由节点)
  3. 规划控制器位置(网络中心)
  4. 预留20%网络容量(未来扩展)

8.3 安全加固

措施

  1. 启用S2安全框架
  2. 定期更新控制器固件
  3. 使用强密码保护管理界面
  4. 隔离Z-Wave网络与互联网(使用网关)

九、总结与展望

9.1 技术优势

Z-Wave作为成熟的智能家居协议,具有以下核心优势:

  1. 互操作性强:Alliance认证保证跨品牌兼容
  2. 抗干扰能力:Sub-1GHz频段避开Wi-Fi拥塞
  3. 低功耗:电池供电设备可运行多年
  4. 易于安装:即插即用,无需专业布线
  5. 成熟生态:3,000+认证产品

9.2 技术演进

未来发展方向

  1. Z-Wave Long Range:传输距离提升至1英里
  2. 更高数据速率:800系列芯片支持200kbps
  3. 与Matter集成:Z-Wave设备通过桥接支持Matter
  4. AI边缘计算:网关集成AI预测能力

9.3 实施建议

选择Z-Wave的场景

  • 高端住宅智能家居
  • 商业建筑照明控制
  • 养老/医疗健康监护
  • 需要强互操作性的场景

不推荐Z-Wave的场景

  • 超大规模网络(>200节点)→ 选择Zigbee
  • 高带宽需求(视频流)→ 选择Wi-Fi
  • 成本敏感型项目 → 选择Zigbee/蓝牙

相关资源

  • Z-Wave Alliance官网:https://z-wavealliance.org/
  • OpenZWave项目:https://github.com/OpenZWave/open-zwave
  • Z-Wave Plus v2规范:ITU-T G.9959
  • Silicon Labs Z-Wave SDK:https://www.silabs.com/wireless/z-wave

本文基于Z-Wave 700系列最新技术标准编写,涵盖从基础原理到工程实践的完整知识体系。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐