提示词工程快速上手
智谱AI开放平台
这是一个表现姣好的提示词工程平台
在图片所示位置复制你的API key
# pip install zhipuai 请先在终端进行安装
import json
from zhipuai import ZhipuAI
import pandas as pd
from tqdm import tqdm
import pandas as pd
from tqdm import tqdm
import json
import concurrent.futures
import threadingclient = ZhipuAI(api_key="0f56bcd3ce3faeebfe.nvc4AlZb8rw1WhFG")def get_zhipuAI(prompt):response = client.chat.completions.create(model="glm-4-flashx",messages=[{"role": "system","content": "你是一个乐于解答各种问题的助手,你的任务是为用户提供专业、准确、有见地的建议。"},{"role": "user","content": prompt}],top_p=0.1,temperature=0.1,max_tokens=1024,stream=True)result = ""for trunk in response:result += trunk.choices[0].delta.contentreturn result# 线程安全的写入锁
write_lock = threading.Lock()
# 全局食材集合
global_set = set()
# 进度条计数器
progress_counter = {"value": 0}def process_batch(batch_data, batch_index):global global_set, progress_countertext = "。".join(batch_data)prompt = f"""假设你是一个资深的美食博主,你的父亲是高级厨师,你对食物非常熟悉,你需要提取以下文本内容中的食材词汇,要求如下:
1. 去掉量词
2. 去掉品牌名和地名
3. 只保留能食用的词汇
4. 只保留中文词汇
5. 按照python的list格式输出
6. 无需输出其他内容
样例如下:
输入:500g主料:猪肋骨。250g配料:红薯。25g香菜。100g调料:五香米粉
输出:["猪肋骨","红薯","香菜","五香米粉"]输入:广州西红柿切块。美食锅放入油先把豆腐煎一下,接着把沛县西红柿放入锅中炒一下。在加入500毫升水烧开。接着加入广西姜,美洲蒜,干辣椒和番茄块
输出:["西红柿","豆腐","姜","蒜","干辣椒","番茄"] 输入:{text}
输出:"""try:ans = get_zhipuAI(prompt)ans_list = json.loads(ans)with write_lock: # 对共享资源加锁# 计算新增食材new_items = list(set(ans_list) - global_set)if new_items:global_set.update(new_items)# 写入文件with open("food_words.txt", "a+", encoding="utf-8") as f:f.write("\n".join(new_items) + "\n")print(f"批次{batch_index}, 新增{len(new_items)}词: {new_items}")except Exception as e:pass # 处理异常# 更新进度条with write_lock:progress_counter["value"] += 1def main():all_data = pd.read_csv("new_food.csv")foods_text = all_data["食材"].tolist()batch_size = 3batches = [foods_text[i:i + batch_size]for i in range(0, len(foods_text), batch_size)]# 初始化进度条pbar = tqdm(total=len(batches))with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:# 提交所有任务futures = [executor.submit(process_batch, batch, idx)for idx, batch in enumerate(batches)]# 异步更新进度条while not all(f.done() for f in futures):pbar.n = progress_counter["value"]pbar.refresh()pbar.close()if __name__ == "__main__":main()
关键代码拆解
futures = [executor.submit(process_batch, batch, idx) for idx, batch in enumerate(batches)]
-
ThreadPoolExecutor(max_workers=8)
-
创建一个包含 8个线程 的线程池,可同时处理8个任务。
-
-
executor.submit(func, *args)
-
向线程池提交一个任务:
-
func
: 要执行的函数(这里是process_batch
) -
*args
: 函数的参数(当前批次的batch
数据和批次编号idx
)
-
-
-
列表推导式
-
通过
enumerate(batches)
遍历所有数据批次,为每个批次生成一个任务。 -
最终得到一个 Future对象列表(
futures
),每个对象代表一个异步任务。
-
类比理解
想象一个餐厅厨房:
-
线程池 = 8个厨师(8个线程)
-
batches
= 待处理的食材篮子列表 -
submit()
= 给每个厨师分配一个篮子(batch
)并告知编号(idx
) -
futures
= 厨师们的任务收据,凭此查看进度
关键作用
-
并行处理
max_workers=8 # 同时最多8个批次并发处理
-
避免串行处理(1个线程)的低效问题。
-
-
任务跟踪
futures = [...] # 保存所有任务的Future对象
-
后续可通过
futures
检查任务状态或获取结果(此代码未使用该特性)。
-
-
资源管理
-
with
语句确保线程池在使用后自动关闭。
-
工作流程
sequenceDiagramparticipant Mainparticipant ThreadPoolparticipant Worker1..8Main->>ThreadPool: 提交batch1..N的任务ThreadPool->>Worker1: 分配batch1ThreadPool->>Worker2: 分配batch2Note over Worker1,Worker8: 最多8个并行Worker1-->>ThreadPool: 完成batch1ThreadPool->>Worker1: 分配新batch...