pymongo功能整理与基础操作类
以下是 Python 与 PyMongo 的完整功能整理,涵盖基础操作、高级功能、性能优化及常见应用场景:
1. 安装与连接
(1) 安装 PyMongo
pip install pymongo
(2) 连接 MongoDB
from pymongo import MongoClient# 基础连接(默认本地,端口27017)
client = MongoClient('mongodb://localhost:27017/')# 带认证的连接
client = MongoClient('mongodb://username:password@host:27017/dbname?authSource=admin'
)# 连接副本集或分片集群
client = MongoClient('mongodb://node1:27017,node2:27017/?replicaSet=rs0')
2. 数据库与集合操作
(1) 选择数据库和集合
db = client['mydatabase'] # 选择数据库(惰性创建)
collection = db['users'] # 选择集合(惰性创建)
(2) 列出数据库和集合
# 列出所有数据库(需权限)
print(client.list_database_names())# 列出数据库中的集合
print(db.list_collection_names())
(3) 删除集合或数据库
db.drop_collection('users') # 删除集合
client.drop_database('mydatabase') # 删除数据库
3. 文档操作(CRUD)
(1) 插入文档
# 插入单条文档
user = {'name': 'Alice', 'age': 30, 'tags': ['python', 'dev']}
result = collection.insert_one(user)
print(result.inserted_id) # 输出插入的 ObjectId# 批量插入
users = [{'name': 'Bob'}, {'name': 'Charlie'}]
result = collection.insert_many(users)
print(result.inserted_ids) # 输出所有插入的 ObjectId 列表
(2) 查询文档
# 查询单条文档
user = collection.find_one({'name': 'Alice'})
print(user) # 返回字典或 None# 查询多条文档(带条件、投影和排序)
cursor = collection.find({'age': {'$gt': 25}}, # 条件:年龄大于25{'_id': 0, 'name': 1}, # 投影:仅返回 name 字段
).sort('age', pymongo.ASCENDING) # 按年龄升序排序for doc in cursor:print(doc)# 统计数量
count = collection.count_documents({'age': {'$gt': 25}})
(3) 更新文档
# 更新单条文档
result = collection.update_one({'name': 'Alice'},{'$set': {'age': 31}, '$addToSet': {'tags': 'database'}} # 更新操作符
)
print(result.modified_count) # 输出影响的文档数# 批量更新
result = collection.update_many({'age': {'$lt': 30}},{'$inc': {'age': 1}} # 年龄加1
)
(4) 删除文档
# 删除单条文档
result = collection.delete_one({'name': 'Alice'})# 批量删除
result = collection.delete_many({'age': {'$gt': 40}})
4. 高级查询与聚合
(1) 查询操作符
# 比较:$gt, $gte, $lt, $lte, $ne
collection.find({'age': {'$gt': 20}})# 逻辑:$and, $or, $not
collection.find({'$or': [{'age': 30}, {'name': 'Bob'}]})# 数组:$in, $nin, $all, $elemMatch
collection.find({'tags': {'$in': ['python', 'java']}})
(2) 聚合管道
pipeline = [{'$match': {'age': {'$gt': 25}}}, # 筛选条件{'$group': {'_id': '$city', 'count': {'$sum': 1}}}, # 按城市分组计数{'$sort': {'count': -1}}, # 按计数降序排序{'$limit': 5} # 取前5条
]result = collection.aggregate(pipeline)
for doc in result:print(doc)
(3) 索引管理
# 创建索引
collection.create_index([('name', pymongo.ASCENDING)], unique=True)# 查看索引
print(collection.index_information())# 删除索引
collection.drop_index('name_1')
5. 事务与原子性操作
(1) 多文档事务(MongoDB 4.0+)
with client.start_session() as session:session.start_transaction()try:collection.update_one({'name': 'Alice'},{'$inc': {'balance': -100}},session=session)collection.update_one({'name': 'Bob'},{'$inc': {'balance': 100}},session=session)session.commit_transaction()except Exception as e:session.abort_transaction()print("事务回滚:", e)
(2) 原子操作符
# 原子更新
collection.find_one_and_update({'name': 'Alice'},{'$inc': {'counter': 1}},return_document=pymongo.ReturnDocument.AFTER # 返回更新后的文档
)
6. 性能优化与最佳实践
(1) 查询优化
• 使用投影减少返回字段:
collection.find({}, {'_id': 0, 'name': 1})
• 覆盖查询(Covered Query):确保查询字段和投影字段在索引中。
(2) 批量操作
# 批量插入(减少网络开销)
bulk_ops = [pymongo.InsertOne({'name': f'User_{i}'}) for i in range(1000)]
collection.bulk_write(bulk_ops)
(3) 连接池管理
client = MongoClient('mongodb://localhost:27017/',maxPoolSize=100, # 最大连接数minPoolSize=10, # 最小空闲连接socketTimeoutMS=3000 # 超时时间
)
7. 数据建模与模式设计
(1) 内嵌文档与引用
• 内嵌文档:适合频繁访问的子数据。
user = {'name': 'Alice','address': {'city': 'New York', 'zip': '10001'} # 内嵌文档
}
• 引用关系:适合独立实体。
# 用户引用订单
order = {'user_id': user['_id'], 'product': 'Laptop'}
(2) 分片策略
• 选择分片键:高频查询字段(如 user_id
)。
• 分片命令(需在 mongos
执行):
sh.shardCollection("mydb.orders", {"user_id": 1})
8. 安全与运维
(1) 认证与权限
# 创建用户
db.command('createUser', 'admin',pwd='secret',roles=[{'role': 'readWrite', 'db': 'mydb'}]
)
(2) 备份与恢复
• mongodump
备份:
mongodump --uri="mongodb://user:pass@host:27017/mydb" --out=/backup
• mongorestore
恢复:
mongorestore --uri="mongodb://host:27017" /backup/mydb
(3) 监控与日志
• 查看数据库状态:
server_status = db.command('serverStatus')
print(server_status['connections']['available'])
• 启用慢查询日志:
mongod --setParameter slowMS=100 --profileLevel 2
9. 常见应用场景
(1) 日志存储与分析
# 插入日志
log_entry = {'timestamp': datetime.now(),'level': 'INFO','message': 'User login success'
}
collection.insert_one(log_entry)# 分析错误日志数量
pipeline = [{'$match': {'level': 'ERROR'}},{'$group': {'_id': '$service', 'count': {'$sum': 1}}}
]
(2) 实时排行榜
# 更新分数
collection.update_one({'user_id': 1001},{'$inc': {'score': 10}},upsert=True
)# 获取前10名
top_players = collection.find().sort('score', -1).limit(10)
10. 扩展工具与库
(1) 使用 Motor 实现异步操作
from motor.motor_asyncio import AsyncIOMotorClientasync def query_data():client = AsyncIOMotorClient('mongodb://localhost:27017')collection = client.mydb.userscursor = collection.find({'age': {'$gt': 20}})async for doc in cursor:print(doc)
(2) 使用 MongoEngine(ORM)
from mongoengine import Document, StringField, IntFieldclass User(Document):name = StringField(required=True)age = IntField()# 查询数据
users = User.objects(age__gt=25)
总结
功能 | PyMongo 方法/操作 | 典型场景 |
---|---|---|
基础 CRUD | insert_one , find , update_many | 数据增删改查 |
聚合分析 | aggregate + 管道操作 | 复杂统计、日志分析 |
事务管理 | start_session + 事务块 | 转账、订单处理 |
性能优化 | 索引、批量操作、连接池 | 高并发读写、大数据处理 |
数据建模 | 内嵌文档、引用关系、分片 | 电商、社交网络、IoT 数据存储 |
通过合理使用 PyMongo,可以高效操作 MongoDB 应对多样化的数据存储需求,结合 Redis 实现缓存加速,构建高性能应用。
以下是一个基于 pymongo
封装的 MongoDB 基础操作类,支持连接管理、CRUD、索引操作、聚合查询、分页等常用功能:
from typing import Any, Dict, List, Optional, Union
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.errors import PyMongoError
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, DeleteResult
from pymongo.collection import Collection
from bson import ObjectIdclass MongoDBClient:"""MongoDB 基础操作类"""def __init__(self, uri: str = "mongodb://localhost:27017/",db_name: str = "mydatabase", collection_name: str = "default_collection"):"""初始化 MongoDB 客户端:param uri: MongoDB 连接 URI:param db_name: 数据库名称:param collection_name: 集合名称"""self.client = MongoClient(uri)self.db = self.client[db_name]self.collection: Collection = self.db[collection_name]# ------------------------- 基础 CRUD 操作 -------------------------def insert_one(self, document: Dict) -> Optional[ObjectId]:"""插入单条文档"""try:result: InsertOneResult = self.collection.insert_one(document)return result.inserted_idexcept PyMongoError as e:print(f"Insert one error: {e}")return Nonedef insert_many(self, documents: List[Dict]) -> Optional[List[ObjectId]]:"""批量插入文档"""try:result: InsertManyResult = self.collection.insert_many(documents)return result.inserted_idsexcept PyMongoError as e:print(f"Insert many error: {e}")return Nonedef find_one(self, query: Dict, projection: Optional[Dict] = None) -> Optional[Dict]:"""查询单条文档"""try:return self.collection.find_one(query, projection)except PyMongoError as e:print(f"Find one error: {e}")return Nonedef find(self, query: Dict, projection: Optional[Dict] = None,sort: Optional[List[tuple]] = None,limit: int = 0) -> List[Dict]:"""查询多条文档"""try:cursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)if limit > 0:cursor = cursor.limit(limit)return list(cursor)except PyMongoError as e:print(f"Find error: {e}")return []def update_one(self, query: Dict, update_data: Dict, upsert: bool = False) -> Optional[UpdateResult]:"""更新单条文档"""try:result: UpdateResult = self.collection.update_one(query, {'$set': update_data}, upsert=upsert)return resultexcept PyMongoError as e:print(f"Update one error: {e}")return Nonedef delete_one(self, query: Dict) -> Optional[DeleteResult]:"""删除单条文档"""try:result: DeleteResult = self.collection.delete_one(query)return resultexcept PyMongoError as e:print(f"Delete one error: {e}")return None# ------------------------- 高级操作 -------------------------def create_index(self, keys: List[tuple], unique: bool = False, background: bool = True) -> Optional[str]:"""创建索引"""try:index_name = self.collection.create_index(keys, unique=unique, background=background)return index_nameexcept PyMongoError as e:print(f"Create index error: {e}")return Nonedef count_documents(self, query: Dict) -> int:"""统计文档数量"""try:return self.collection.count_documents(query)except PyMongoError as e:print(f"Count documents error: {e}")return 0def aggregate(self, pipeline: List[Dict]) -> List[Dict]:"""聚合查询"""try:return list(self.collection.aggregate(pipeline))except PyMongoError as e:print(f"Aggregate error: {e}")return []def paginate(self, query: Dict, page: int = 1, per_page: int = 10,sort: Optional[List[tuple]] = None,projection: Optional[Dict] = None) -> Dict:"""分页查询"""try:total = self.count_documents(query)skip = (page - 1) * per_pagecursor = self.collection.find(query, projection)if sort:cursor = cursor.sort(sort)documents = cursor.skip(skip).limit(per_page)return {"total": total,"page": page,"per_page": per_page,"data": list(documents)}except PyMongoError as e:print(f"Paginate error: {e}")return {"total": 0, "page": page, "per_page": per_page, "data": []}# ------------------------- 事务支持 -------------------------def execute_transaction(self, operations: callable) -> bool:"""执行事务操作"""session = self.client.start_session()try:with session.start_transaction():operations(self.collection, session)return Trueexcept PyMongoError as e:session.abort_transaction()print(f"Transaction aborted: {e}")return False# ------------------------- 工具方法 -------------------------@staticmethoddef to_objectid(_id: Union[str, ObjectId]) -> ObjectId:"""将字符串转换为 ObjectId"""return _id if isinstance(_id, ObjectId) else ObjectId(_id)def close(self):"""关闭连接"""self.client.close()# ------------------------- 使用示例 -------------------------
if __name__ == "__main__":# 初始化客户端mongo_client = MongoDBClient(uri="mongodb://user:pass@localhost:27017/",db_name="test_db",collection_name="users")# 插入数据user_id = mongo_client.insert_one({"name": "Alice","age": 30,"email": "alice@example.com"})print(f"Inserted ID: {user_id}")# 查询数据user = mongo_client.find_one({"name": "Alice"})print(f"Found user: {user}")# 分页查询pagination = mongo_client.paginate(query={"age": {"$gt": 20}},page=1,per_page=10,sort=[("age", ASCENDING)])print(f"Page 1 data: {pagination['data']}")# 关闭连接mongo_client.close()
核心功能说明
功能 | 方法 | 说明 |
---|---|---|
连接管理 | __init__ , close | 支持自定义 URI、数据库和集合名称 |
CRUD 操作 | insert_one , find 等 | 提供单条/批量操作,支持投影和排序 |
索引管理 | create_index | 可创建唯一索引、后台索引 |
聚合查询 | aggregate | 支持完整的聚合管道操作 |
分页查询 | paginate | 返回分页数据和总记录数 |
事务支持 | execute_transaction | 封装多文档事务操作 |
类型转换 | to_objectid | 字符串与 ObjectId 互转 |
使用场景
- 快速开发:直接继承或实例化类,无需重复编写 CRUD 代码。
- Web 后端:集成到 FastAPI/Django 服务中,处理用户数据。
- 数据分析:通过聚合方法实现复杂统计。
- 定时任务:封装数据清洗、日志处理等操作。
优化建议
- 连接池配置:在初始化时添加
maxPoolSize
、minPoolSize
参数。 - 日志记录:将
print
替换为 logging 模块记录错误信息。 - 异步支持:使用
motor
库实现异步版本(适合 FastAPI 等异步框架)。 - 数据校验:集成
pydantic
对输入数据进行模式验证。 - 缓存集成:结合 Redis 实现高频查询缓存。