从hdfs下载parquet数据,并使用pandas读取。

import pandas as pd
df = pd.read_parquet(hfs_path) 

如果没有装过一些读取parquet相关的包,会报如下错误:

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
pyarrow or fastparquet is required for parquet support

此时,安装下相关包即可。

$ conda install -c conda-forge pyarrow
$ conda install -c conda-forge fastparquet

或者

!pip install pyarrow
!pip install fastparquet

如果下载安装的速度太慢,可以切换不同的镜像源。

命令如下:pip  install fastparquet -i https://mirrors.cloud.tencent.com/pypi/simple

    ** 安装fastparquet的时候报错,报错信息是系统没有gcc命令。按照系统提示执行 yum install gcc

 附录:国内不同的镜像源地址:

清华:https://pypi.tuna.tsinghua.edu.cn/simple

阿里云:http://mirrors.aliyun.com/pypi/simple/

腾讯云:https://mirrors.cloud.tencent.com/pypi/simple

豆瓣:http://pypi.douban.com/simple/

中国科技大学 https://pypi.mirrors.ustc.edu.cn/simple/

华中理工大学:http://pypi.hustunique.com/

山东理工大学:http://pypi.sdutlinux.org/ 
————————————————

成功读取parquet数据后,就可以进行数据读取、类型转换操作了。

def preprocess_parquet_features(feature_hdfs_path):
    feature_local_path = os.path.join(os.getcwd(), "parquet_features")
    mkdir_local_path(feature_local_path)
    os.system("""hadoop fs -get {}/part-* {}""".format(feature_hdfs_path, feature_local_path))
    # 格式转换
    features = []
    for parent, dirnames, filenames in os.walk(feature_local_path):
        for filename in filenames:
            if filename.endswith('.parquet') and filename.startswith('part-'):
                feature_file = os.path.join(parent, filename)
                features.append(pd.read_parquet(feature_file))
    data = pd.concat(features, ignore_index=True)

    features = []
    for num in range(data.order_vector.size):
        features.append(data.order_vector[num]['values'].tolist() + data.view_vector[num][
                'values'].tolist() + data.add_cart_vector[num][
                'values'].tolist())

    features_df = pd.DataFrame(features)

    from sklearn.model_selection import train_test_split
    X_train_full, X_test, y_train_full, y_test = train_test_split(features_df,
                                                                  data.label,
                                                                  test_size=0.2,
                                                                  random_state=42)
    X_train, X_valid, y_train, y_valid = train_test_split(X_train_full,
                                                          y_train_full,
                                                          test_size=0.2,
                                                          random_state=42)

    return X_train_full, X_test, y_train_full, y_test, X_train, X_valid, y_train, y_valid

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐