安装依赖

pip install findpeaks

使用

import numpy as np

from findpeaks import findpeaks

if __name__ == '__main__':
    # 初始化
    fp = findpeaks(
        method='peakdetect',  # 检测方式:一维数组【】二维数据【】
        whitelist=['valley'],  # 检测目标【峰peak,谷valley,峰谷['peak','valley']】
        lookahead=1,  # 前瞻性优化算法【数据量越少,此数字越小,比如50个数据,最好选择1或者2】
        interpolate=10,  # 插值,放大横坐标【数字越高,作图的边缘越不锋利】
    )
    
    # 数值
    data = np.asarray([97, 11, 27, 69, 39, 52, 84, 81, 92, 84, 83, 95, 10, 87, 72, 84, 36, 15, 85, 68, 60, 59, 61, 3,
                       13, 12, 4, 80, 28, 53, 24, 32, 10, 2, 9, 57, 15, 66, 99, 26, 40, 63, 97, 22, 27, 98, 15, 84,
                       76, 34])
    results = fp.fit(
        X=data,  # 数据
        x=None  # 作图的x轴坐标(默认0,1,2,3...)
    )
    # 打印结果
    print(results)
    # 作图
    fp.plot()

使用案例:

from datetime import datetime, timedelta

from findpeaks import findpeaks
from pandas import DataFrame

from db.config import session
from db.db_models import ReportCapacityEnergy
from utils.influxdb_util import InfluxdbUtil, InfluxdbStorage


class PeakValleyDto:
    def __init__(self, value, index, time, flag):
        """
        峰谷值DTO
        :param value: 值
        :param index: 所在数组的索引
        :param time: 时间戳
        :param flag: 自定义标识
        """
        self.value = value
        self.index = index
        self.time = time
        self.flag = flag


class CapacityEnergyService:
    """
    产能与能耗
    """

    def __init__(self):
        """
        初始化:
        """
        # influxdb工具类
        self.influxdb_util = InfluxdbUtil(url="http://xxxxxxxxx.com/",
                                          token="xxxxxxxx",
                                          org="xxxx")

    @staticmethod
    def check_out_peak_valley(capacity_data) -> list[PeakValleyDto]:
        """
        检出的峰谷值
        :return:
        """
        result: list[PeakValleyDto] = []

        m = map(lambda x: x.value, capacity_data)
        fp = findpeaks(method='peakdetect', whitelist=['peak', 'valley'], lookahead=1, interpolate=1)
        # 获取峰谷值
        fit: dict = fp.fit(X=list(m))
        # 原始数据
        df_: DataFrame = fit['df']
        # 平滑数据
        df_interp_: DataFrame = fit['df_interp']
        # 波谷
        df__valley_true_: DataFrame = df_[df_['valley'] == True]
        for valley in df__valley_true_.iterrows():
            row = valley[1]
            value = row['y']  # 值
            index = row['x']  # 索引
            time = capacity_data[row['x']].time  # 时间戳
            flag = 'valley'  # 标记
            result.append(PeakValleyDto(value, index, time, flag))
        # 波峰
        df__peak_true_: DataFrame = df_[df_['peak'] == True]
        for peak in df__peak_true_.iterrows():
            row = peak[1]
            value = row['y']  # 值
            index = row['x']  # 索引
            time = capacity_data[row['x']].time  # 时间戳
            flag = 'peak'  # 标记
            result.append(PeakValleyDto(value, index, time, flag))
        # 作图
        fp.plot()
        if len(result) > 0:
            sor = sorted(result, key=lambda x: x.time)
            # 起始点和结束点不是峰谷
            # sor[0].flag = 'first'
            # sor[-1].flag = 'last'
            sor[0] = PeakValleyDto(capacity_data[0].value, 0, capacity_data[0].time, 'first')
            sor[-1] = PeakValleyDto(capacity_data[-1].value, len(capacity_data) - 1,
                                    capacity_data[-1].time, 'last')
            return sor
        else:
            if len(capacity_data) == 0:
                return [PeakValleyDto(0, 0, None, 'first'),
                        PeakValleyDto(0, 0, None, 'last')]
            return [PeakValleyDto(capacity_data[0].value, 0, capacity_data[0].time, 'first'),
                    PeakValleyDto(capacity_data[-1].value, len(capacity_data) - 1,
                                  capacity_data[-1].time, 'last')]

    def calculate_capacity(self, start: datetime, end: datetime):
        """
        计算产能
        :return:
        """
        capacity = 0

        capacity_data: list[InfluxdbStorage] = self.influxdb_util.query(bucket="JLKZQ",
                                                                        measurement="N_PLTS",
                                                                        every="1m",
                                                                        start=start,
                                                                        stop=end,
                                                                        createEmpty=False)
        # 获取峰谷值
        peak_valleys = self.check_out_peak_valley(capacity_data)

        # 拆成两个一组
        group_every_two = [peak_valleys[i:i + 2] for i in range(0, len(peak_valleys), 2)]
        for group in group_every_two:
            print(float(group[1].value), "-", float(group[0].value), "=", float(group[1].value) - float(group[0].value))
            capacity += float(group[1].value) - float(group[0].value)
        print("---------------------------")
        return capacity

    def calculate_energy(self, start: datetime, end: datetime):
        """
        计算能耗
        :return:
        """
        energy = 0
        energy_data: list[InfluxdbStorage] = self.influxdb_util.query(bucket="TRQ",
                                                                      measurement="HZ_TRQ1_BKJLLTJ_GET_M3",
                                                                      every="1m",
                                                                      start=start,
                                                                      stop=end,
                                                                      createEmpty=False)
        # # 获取峰谷值
        # peak_valleys = self.check_out_peak_valley(energy_data)
        # # 拆成两个一组
        # group_every_two = [peak_valleys[i:i + 2] for i in range(0, len(peak_valleys), 2)]
        # for group in group_every_two:
        #     print(float(group[1].value), "-", float(group[0].value), "=", float(group[1].value) - float(group[0].value))
        #     energy += float(group[1].value) - float(group[0].value)
        # print("---------------------------")
        # return energy
        # 不需要峰谷计算,直接尾头相减即可11656.67857142858
        return float(energy_data[-1].value) - float(energy_data[0].value)

    def start(self, start: datetime, end: datetime) -> tuple:
        """
        实时查询产能和能耗
        :param start:
        :param end:
        :return:
        """
        capacity_num = self.calculate_capacity(start, end)
        energy_num = self.calculate_energy(start, end)
        print("计算范围:", start, " 到 ", end)
        print("产能:", capacity_num)
        print("能耗:", energy_num)
        return capacity_num, energy_num

    def capacity_energy_db(self, start_date: datetime, end_date: datetime):
        """
        产能能耗-按天计算-数据入库
        :return:
        """
        # 这里应该是根据作业指导书来分别查询一天中不同的作业指导书时段,计算出各个桶数和相应的重量,他们之和作为产能 TODO

        start_date = start_date.replace(hour=0, minute=0, second=0)
        end_date = end_date.replace(hour=0, minute=0, second=0)

        for i in range(end_date.__sub__(start_date).days):
            start = start_date + timedelta(days=i)
            end = start_date + + timedelta(days=i + 1)
            capacity_energy_service = CapacityEnergyService()
            capacity_num, energy_num = capacity_energy_service.start(start=start, end=end)
            report_capacity_energy = ReportCapacityEnergy(
                capacity_bucket='JLKZQ',
                energy_bucket='TRQ',
                capacity_measurement='N_PLTS',
                energy_measurement='HZ_TRQ1_BKJLLTJ_GET_M3',
                barrel_num=capacity_num,
                gas_num=energy_num,
                start_time=start,
                end_time=end,
                week=start.isocalendar().week,
                instruction_ids="",
                type=1,
                remarks='',
            )
            count = session.query(ReportCapacityEnergy) \
                .filter(ReportCapacityEnergy.start_time == start) \
                .filter(ReportCapacityEnergy.end_time == end) \
                .count()
            if count == 1:
                session.query(ReportCapacityEnergy) \
                    .filter(ReportCapacityEnergy.start_time == start) \
                    .filter(ReportCapacityEnergy.end_time == end) \
                    .update({ReportCapacityEnergy.barrel_num: capacity_num, ReportCapacityEnergy.gas_num: energy_num})
            else:
                session.add(report_capacity_energy)

        session.commit()

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐