Openai api使用
Openai api使用
·
1、文档
openai:
官方文档:https://platform.openai.com/docs/quickstart
官方python文档:https://github.com/openai/openai-python
batch批量请求:
- https://platform.openai.com/docs/guides/batch/overview
- https://github.com/SpellcraftAI/oaib
2、单个请求
- client.chat.completions
import os # 设置环境变量 os.environ['OPENAI_API_KEY'] = 'you_key' #用你的openai key # 验证环境变量是否设置成功 print(os.getenv('OPENAI_API_KEY')) from openai import OpenAI client = OpenAI() stream=True completion = client.chat.completions.create( model="gpt-4", max_tokens=200, temperature=0.5, messages=[ # {"role": "system", "content": "You are a helpful assistant."}, # { # "role": "user", # "content": "Write a haiku about recursion in programming." # }, {"role": "system", "content": "你是一位作家."}, { "role": "user", "content": "写一篇小说,关于猫的." } ], stream=stream ) if stream: output_text = '' for chunk in completion: # print(chunk.choices[0].delta.content,end='') print(chunk.choices[0].delta.content or "", end="") content= chunk.choices[0].delta.content if content: # 检查 content 是否为 None output_text +=content print('\n','*'*100) print('接收到内容:\n',output_text) else: print(completion) print(completion.choices[0].message)
2.requests url的方式,参考官方的curl,可先在postman上试
import requests
import json
url = "https://api.openai.com/v1/chat/completions"
payload = json.dumps({
"model": "gpt-4",
"max_tokens":200,
"temperature":0.5,
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "讲个故事,200字."
}
],
"stream": False,
# "response_format":"json"
})
headers = {
'Authorization': 'Bearer sk-you_key', ##用你的openai key,前面加上Bearer
'Content-Type': 'application/json',
# 'Cookie': '__' #postman生成,或者不要
}
response = requests.request("POST", url, headers=headers, data=payload,stream=True)
print('类型:',type(response))
print(response.text)
#流
# # 遍历响应内容,要转换byte
# for line in response.iter_lines():
# if line:
# # 对每一行进行解码并处理
# line_decoded = line.decode('utf-8').strip()
# if line_decoded.startswith('data:'):
# # 去掉 "data: " 前缀并解析为 JSON
# data = line_decoded[5:].strip()
# try:
# json_data = json.loads(data)
# print(json_data)
# # print(json_data['choices'][0]['delta']['content']or "", end="")
# print(json_data['choices'][0]['delta']['content'] or "", end="")
# except ValueError as e:
# print(f"Invalid JSON: {data}")
3、batch批量请求
-
使用oaib库,https://github.com/SpellcraftAI/oaib
from oaib import Auto import asyncio import os # 设置环境变量 os.environ['OPENAI_API_KEY'] = 'sk-you_key' # 验证环境变量是否设置成功 print(os.getenv('OPENAI_API_KEY')) async def get_batch(): # Automatically set rate limits. batch = Auto(workers=8) # Fetch 1,000 chat completions as quickly as possible, setting rate limits # automatically from OpenAI's response headers. for i in range(5): await batch.add( "chat.completions.create", model="gpt-4", max_tokens=200, temperature=0.5, messages=[{"role": "user", "content": "讲个故事"}] ) resp= await batch.run() print('类型:',type(resp)) print('返回:',resp) for i in resp['result'].tolist(): print(type(i),i) # return batch if __name__ == "__main__": # batch=get_batch() # await batch.run() # get_batch() asyncio.run(get_batch())
-
aiohttp和 asyncio实现异步批量
import aiohttp
import asyncio
# 要请求的URL
url = "https://api.openai.com/v1/chat/completions"
headers = {
'Authorization': 'Bearer sk-you_key',
'Content-Type': 'application/json',
# 'Cookie': '__cf_bm=fGp5'
}
# 要发送的数据
# data_list = [{"name":"urm1","data":"post请求数据1"},{"name":"urm2","data":"post请求数据2"},{"name":"urm3","data":"post请求数据3"}]
data=['讲个故事','讲个笑话','分析股市']
# data_list=[ item['messages'][0]['content']==i for i in data]
data_list=[ {
"model": "gpt-4",
"max_tokens":200,
"temperature":0.5,
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": i
}
],
"stream": False,
# "response_format":"json"
} for i in data]
print(data_list)
# quit('测试')
async def fetch_data(session, data):
try:
async with session.post(url, json=data, headers=headers) as response:
response.raise_for_status() # 确保响应状态码为2xx,否则引发HTTPError
return await response.json()
except aiohttp.ClientError as e:
print(f"请求失败: {e}")
return None
async def main():
con = aiohttp.TCPConnector(ssl=False) #ssl问题
async with aiohttp.ClientSession(connector=con, trust_env=True) as session:
tasks = [fetch_data(session, data) for data in data_list]
results = await asyncio.gather(*tasks)
for result in results:
if result:
print(result)
# 运行主程序
if __name__ == '__main__':
asyncio.run(main())
4、aiohttp和 asyncio实现异步批量
1、client
import aiohttp
import asyncio
# 要请求的URL
url = 'http://172.31.208.3:8057/testp'
# 要发送的数据
data_list = [{"name":"urm1","data":"post请求数据1"},{"name":"urm2","data":"post请求数据2"},{"name":"urm3","data":"post请求数据3"}]
# {"name":"urm1","data":"post请求数据1"}
# 自定义请求头
# headers = {
# 'User-Agent': 'my-app',
# 'Authorization': 'Bearer YOUR_TOKEN'
# }
headers = {
'Content-Type': 'application/json'
}
async def fetch_data(session, data):
try:
async with session.post(url, json=data, headers=headers) as response:
response.raise_for_status() # 确保响应状态码为2xx,否则引发HTTPError
return await response.json()
except aiohttp.ClientError as e:
print(f"请求失败: {e}")
return None
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, data) for data in data_list]
results = await asyncio.gather(*tasks)
for result in results:
if result:
print(result)
# 运行主程序
if __name__ == '__main__':
asyncio.run(main())
2、server
from typing import Union
from fastapi import FastAPI
import uvicorn
from pydantic import BaseModel,Field
from typing import Union,Optional,Literal
app = FastAPI()
# 1、get请求
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/testg")
def read_root(name: str, content: str):
print('get测试:',name,'和',content)
return {"name":name, "include":content}
class Item(BaseModel):
data: str
name: str
@app.post("/testp")
async def update_item(item: Item):
print('post测试:',item.name,'和',item.data)
results = {"name":item.name, "data":item.data,"item":item}
return results
if __name__ == "__main__":
#方法一:
# config = uvicorn.Config("main:app", host='0.0.0.0',port=8888, reload=True, log_level="info") #指定模块,当前用FastAPI()的app
config = uvicorn.Config('getpost:app',host='0.0.0.0',port=8888, reload=True, log_level="info")
# config = uvicorn.Config(app,host='0.0.0.0',port=8888, reload=True, log_level="info") #用app,只有在不使用多处理(worker=NUM)或重新加载(reload=True)的情况下,此样式才有效,因此我们建议使用导入字符串样式
server = uvicorn.Server(config)
server.run()
#方法二:
# from pathlib import Path
# uvicorn.run('getpost:app',host='0.0.0.0',port=8888, reload=True, log_level="info")
5、应用:构造对话,并存到Exel表
import aiohttp
import asyncio
import os
import json
import re
def is_valid_json(json_str):
try:
json.loads(json_str)
return True
except json.JSONDecodeError:
return False
def fix_json(json_string):
# 移除开头和结尾的空白符,并去掉多余的空格
# json_string = json_string.strip()
# 移除外部的多余内容,只保留花括号内的内容
json_string = re.sub(r'^[^{]*{|}[^}]*$', '', json_string).strip()
print('移除外部的多余内容:',json_string)
# 如果字符串不以 '{' 开头,则添加 '{'
if not json_string.startswith('{'):
json_string = '{' + json_string
# 如果字符串不以 '}' 结尾,则添加 '}'
if not json_string.endswith('"}') :
json_string = json_string + '"}'
# 如果字符串不以 '}' 结尾,则添加 '}'
if not json_string.endswith('}'):
json_string = json_string + '}'
# 尝试将字符串修正为包含一对花括号的格式
if not (json_string.startswith('{') and json_string.endswith('}')):
json_string = '{' + json_string.strip('{}') + '}'
# 尝试将单引号替换为双引号
json_string = json_string.replace("'", '"')
# 添加缺少的引号(简单示例,可能需要更复杂的逻辑)
# json_string = re.sub(r'(\w+):', r'"\1":', json_string)
# 移除尾部逗号
json_string = re.sub(r',\s*}', '}', json_string)
json_string = re.sub(r',\s*]', ']', json_string)
return json_string
def check_and_fix_json(json_str):
if is_valid_json(json_str):
return json_str
else:
fixed_str = fix_json(json_str)
print('修复后',fixed_str)
if is_valid_json(fixed_str):
return fixed_str
else:
raise ValueError("无法修正为有效的 JSON 格式")
# 要请求的URL
url = "https://api.openai.com/v1/chat/completions"
headers = {
'Authorization': 'Bearer sk-xxx', #openai key
'Content-Type': 'application/json',
# 'Cookie': '__cf_bm=fGp5y4Qn_Qg8mF3bZFuXVofslJjF'
}
# 要发送的数据
# data_list = [{"name":"urm1","data":"post请求数据1"},{"name":"urm2","data":"post请求数据2"},{"name":"urm3","data":"post请求数据3"}]
# data=['讲个故事','讲个笑话','分析股市']
data=[
f"""构造一条快乐情感相关语音对话,一问一答,字数不大于100。并以 JSON 格式提供,其中包含以下键:questions 、 answers。例如:{{"questions":"xxx","answers":"xxx"}}""",
f"""构造一条悲伤情感相关语音对话,一问一答,字数不大于100。并以 JSON 格式提供,其中包含以下键:questions 、 answers。例如:{{"questions":"xxx","answers":"xxx"}}""",
f"""构造一条恐惧情感相关语音对话,一问一答,字数不大于100。并以 JSON 格式提供,其中包含以下键:questions 、 answers。例如:{{"questions":"xxx","answers":"xxx"}}"""
]
# data_list=[ item['messages'][0]['content']==i for i in data]
data_list=[ {
"model": "gpt-4",
"max_tokens":200,
"temperature":0.5,
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": i
}
],
"stream": False,
# "response_format":"json"
} for i in data]
print(data_list)
#批量异步请求
async def fetch_data(session, data):
try:
# async with session.post(url, json=data, headers=headers) as response:
async with session.post(url, json=data, headers=headers,timeout=aiohttp.ClientTimeout(total=300)) as response:
response.raise_for_status() # 确保响应状态码为2xx,否则引发HTTPError
return await response.json(),data
except aiohttp.ClientError as e:
print(f"请求失败: {e}")
return None,data
async def main():
con = aiohttp.TCPConnector(ssl=False) #ssl问题
# async with aiohttp.ClientSession(connector=con, trust_env=True) as session:
async with aiohttp.ClientSession(connector=con, trust_env=True) as session:
tasks = [fetch_data(session, data) for data in data_list]
results = await asyncio.gather(*tasks)
column_all=[]
column_right=[]
column_error=[]
for result,data in results:
print('数据:',data)
if result:
print(result)
#校验json格式
try:
output_text=result['choices'][0]['message']['content']
print(output_text)
valid_json_str = check_and_fix_json(output_text)
print("有效的 JSON:", valid_json_str)
json_obj = json.loads(valid_json_str)
# 检查字典中是否包含所需的键
keys=['questions' ,'answers']
missing_keys = [key for key in keys if key not in json_obj]
if missing_keys:
print('不包含所有键')
column_error.append(['×','不包含所有键',valid_json_str,data['messages'],data])
column_all.append(['×','不包含所有键',valid_json_str,data['messages'],data])
column_right.append(['√',None,valid_json_str,data['messages'],data])
column_all.append(['√',None,valid_json_str,data['messages'],data])
except ValueError as e:
print(e)
column_error.append(['×','校验json格式出异常',valid_json_str,data['messages'],data])
column_all.append(['×','校验json格式出异常',valid_json_str,data['messages'],data])
else :
print(f"请求失败")
column_error.append(['×','请求失败',None,data['messages'],data])
column_all.append(['×','请求失败',None,data['messages'],data])
return column_all,column_right,column_error
import pandas as pd
import openpyxl
import shutil
import datetime
# 运行主程序
if __name__ == '__main__':
column_all,column_right,column_error=asyncio.run(main())
# print('处理后数据:',results)
print('所有数据:',column_all)
print('正常数据:',column_right)
print('异常数据:',column_error)
print('*'*100)
#写入表格
df_all = pd.DataFrame(
data=column_all,
columns=['yes_or_no', 'message_err','out_text','input_text','promt_data']
)
df_right = pd.DataFrame(
data=column_right,
columns=['yes_or_no', 'message_err','out_text','input_text','promt_data']
)
df_error = pd.DataFrame(
data=column_error,
columns=['yes_or_no', 'message_err','out_text','input_text','promt_data']
)
out_dir='./text_out/Gpt4_out1'
# if os.path.exists(out_dir): shutil.rmtree(out_dir) #清空目录
if not os.path.exists(out_dir): os.makedirs(out_dir) #目录不存在,创建
#filename生成
# 获取当前时间
now = datetime.datetime.now()
# 格式化时间为 "YYYYMMDD_HHMMSS"
timestamp = now.strftime("%Y%m%d_%H%M%S")
filename = f"GPT_{timestamp}.xlsx"
#写入多个sheet
with pd.ExcelWriter(f'{out_dir}/{filename}') as writer:
df_all.to_excel(writer, sheet_name='sheet_all',index=False)
df_right.to_excel(writer, sheet_name='sheet_right',index=False)
df_error.to_excel(writer, sheet_name='sheet_error',index=False)
# df_sheet_all1 = pd.read_excel(f"{out_dir}/Descrip_xlsx.xlsx", sheet_name=None,index_col=0)
df_sheet_all = pd.read_excel(f'{out_dir}/{filename}', sheet_name=None,index_col=False)
print('Excel_sheet_all数据:\n',df_sheet_all)
print('*'*100)
# # print('数据:',df)
# # df.to_csv(f"{out_dir}/Descrip_csv1.csv", index=False)
# # pf_read = pd.read_csv(f"{out_dir}/Descrip_csv1.csv")
# # print('csv_read数据:\n',pf_read)
# # print('*'*100)
# df.to_excel(f"{out_dir}/Descrip_xlsx.xlsx", sheet_name='处理记录',index=False)
# # df_sheet_all1 = pd.read_excel(f"{out_dir}/Descrip_xlsx.xlsx", sheet_name=None,index_col=0)
# df_sheet_all1 = pd.read_excel(f"{out_dir}/Descrip_xlsx.xlsx", sheet_name='处理记录',index_col=False)
# print('Excel_sheet_all1数据:\n',df_sheet_all1)
# print('*'*100)
更多推荐
所有评论(0)