目录

一、python连接操作hdfs

1 往hdfs上传文件

2 处理并存储到hdfs

3 读取hdfs上的txt文件


这里使用的是 pip 安装,很方便:

pip install hdfs

一、python连接操作hdfs

from hdfs.client import Client
client = Client("http://LocalHost:Port")

client.makedirs('/ml/zmingmingmng')#建立文件夹
client.delete('/ml/zmming')#删除文件夹
client.upload("/ml/zmingmingmng/zm.txt","E:/ttt/testhdfs.txt")#上传文件
client.download("/ml/zmingmingmng/zm.txt","E:/ming.txt")#下载文件

# -*- encoding=utf-8 -*-
from hdfs.client import Client


client = Client("http://XXX.XXX.XX.XX:50070")

# 创建目录
def mkdirs(client, hdfs_path):
    client.makedirs(hdfs_path)


# 删除hdfs文件
def delete_hdfs_file(client, hdfs_path):
    client.delete(hdfs_path)


# 上传文件到hdfs
def put_to_hdfs(client, local_path, hdfs_path):
    client.upload(hdfs_path, local_path, cleanup=True)


# 从hdfs获取文件到本地
def get_from_hdfs(client, hdfs_path, local_path):
    client.download(hdfs_path, local_path, overwrite=False)


# 追加数据到hdfs文件
def append_to_hdfs(client, hdfs_path, data):
    client.write(hdfs_path, data, overwrite=False, append=True)


# 覆盖数据写到hdfs文件
def write_to_hdfs(client, hdfs_path, data):
    client.write(hdfs_path, data, overwrite=True, append=False)


# 移动或者修改文件
def move_or_rename(client, hdfs_src_path, hdfs_dst_path):
    client.rename(hdfs_src_path, hdfs_dst_path)


# 返回目录下的文件
def list(client, hdfs_path):
    return client.list(hdfs_path, status=False)


if __name__ == '__main__':
    # 调用
    kk=list(client,"/user/admin/deploy/user_lable_dimension/")
    for each in kk:
        print(each)


 

1 往hdfs上传文件

from hdfs.client import Client

"""往hdfs上传文件"""

# TODO 往hdfs上传文件
client = Client("http://XXX.XXX.XX.XX:50070")

# 新建文件夹
hdfs_path ="【文件要存放的目录路径,eg:/a/b/c】"
client.makedirs(hdfs_path)

print("uploading data...")
client.upload(hdfs_path, "intersection.xlsx", overwrite=True)	# 资源中心上传的文件

 

2 处理并存储到hdfs

# TODO 先得到结果列表。eg:i_list

# TODO 把结果列表存储成文件上传到hdfs
print("===============================================")
i_df = pd.DataFrame(i_list)
client = Client("http://XXX.XXX.XX.XX:50070")

fout = "【文件要存放的路径,eg:/a/b/c.csv】"  # hdfs下的目录
with client.write(fout, encoding='utf-8') as writer:
    i_df.to_csv(writer)
print("存储成功")

 

3 读取hdfs上的txt文件

from hdfs.client import Client
import json
from kafka import KafkaConsumer
import time
import pyhdfs

def  GetEncodingSheme(_filename):
    """ 查看文本编码方式 """
    with open(_filename, 'rb') as file:
        buf = file.read()
    result = chardet.detect(buf)
    return result['encoding']


def read_hdfs_file(client, filename):
    """读取hdfs文件内容,将每行存入数组返回"""
    lines = []
    print("开始读取txt数据")
    with client.open(filename, delimiter='\n') as reader:
        for line in reader:
            lines.append(line.decode("GB2312").strip())
    return lines


def deleteHDFSfile(client, hdfs_path):
    """删除hdfs文件,删除文件夹时该文件夹必须为空"""
    client.delete(hdfs_path)



if __name__ == "__main__":
	print(GetEncodingSheme('intersection.xlsx'))    # GB2312
    
    # hdfs连接
    client = pyhdfs.HdfsClient(hosts="http://xxxxxx:50070,http://xxxxxx:50070", user_name="xxxxxx")

    # TODO 读取hdfs文件内容,将每行存入数组返回
    hdfs_path = "【文件路径,eg:/a/b/c.xlsx】"    # hdfs存储目录
    print("===============================================")
    print("开始读取hdfs上的txt文件")
    lines = read_hdfs_file(client, hdfs_path)
    print(lines)
    print("读取完成")
    print("===============================================")


    # TODO 删除hdfs存储目录下的文件
    hdfs_path = "【文件路径】"
    deleteHDFSfile(client, hdfs_path)

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐