mongoengine

予早 2026-04-30 23:53:28
Categories: Tags:

一般是返回queryset,get和first等直接返回model

但是queryset可以迭代取出model

批量更新

querying

update(upsert=False, multi=True, write_concern=None, read_concern=None, full_result=False, **update)

multi=True默认为多个更新

update_one(upsert=False, write_concern=None, full_result=False, **update)


全局更新
upsert_one(write_concern=None, read_concern=None, **update)

upsert up i
@app.route('/adds')
def add_datas():
    student=[Student(S_name='张三',S_password='12345',S_contacts=Contacts(contacts='小芳',phone=13700000000)),
             Student(S_name='李四',S_password='12345',S_contacts=Contacts(contacts='小芳',phone=13700000000)),
     Student(S_name='小明',S_password='12345',S_contacts=Contacts(contacts='小米',phone=13700000000))]     #创建数据列表
    Student.objects.insert(student)   #添加数据
    return '添加成功'

mongoengine批量插入

文档

document是静态文档,这种文档定义好了之后,是不能添加新的字段的

from mongoengine import Document, StringField
class People(Document):
    title = StringField(max_length=20)
    meta = {
        'db_alias': "yusu",  # 数据库的名字
        'collection': 'people'  # 集合的名字,如果你不定义就用class的小写
    }

但是dynamicdocument可以实现随意添加字段的功能,注意:动态文档中的字段不能以_开头

from mongoengine import *

class Page(DynamicDocument):
    title = StringField(max_length=200, required=True)

# Create a new page and add tags
>>> page = Page(title='Using MongoEngine')
>>> page.tags = ['mongodb', 'mongoengine']
>>> page.save()

>>> Page.objects(tags='mongoengine').count()
>>> 1

EmbeddedDocument是嵌入文档,MongoDB能够在文档中嵌入文档。嵌入文档模式并不是在数据库中真正形成一个集合,而是作为某个集合的字段出现,可以嵌入静态文档

class Comment(EmbeddedDocument):  # EmbeddedDocument嵌入文档模式并不是在数据库中真正形成一个集合,而是作为某个集合的字段出现
    content = StringField()
    name = StringField(max_length=120)


class Article(Document):
    title = StringField(max_length=120, required=True)
    author = ReferenceField(User,
                            reverse_delete_rule=CASCADE)  # 外键,reverse_delete_rule=CASCADE,传统数据库中的级联删除。MapFields and DictFields currently don’t support automatic handling of deleted references
    tags = ListField(StringField(max_length=30))  # 列表的类型,listfield内第一个参数就是限制列表中元素的类型
    comments = ListField(EmbeddedDocumentField(Comment))  # 存储一个评论列表,

    meta = {'allow_inheritance': True}  # allow_inheritance=True允许继承默认是不允许的,当以后需要添加新的字段时,直接继承这个类就行了
article2 = LinkPost(title='MongoEngine Documentation', author=ross)
article2.link_url = 'http://docs.mongoengine.com/'
article2.tags = ['mongoengine']
comment1 = Comment(name='小明', content="真的好棒")
comment2 = Comment(name='小红', content="太好了")
article2.comments = [comment1, comment2]  # 要这样添加
article2.save()
{
    "_id" : ObjectId("5d620a7c23641b07eb85bcd2"),
    "_cls" : "Article.LinkPost",
    "title" : "MongoEngine Documentation",
    "author" : ObjectId("5d620a7c23641b07eb85bccf"),
    "tags" : [ 
        "mongoengine"
    ],
    "comments" : [ 
        {
            "content" : "真的好棒",
            "name" : "小明"
        }, 
        {
            "content" : "太好了",
            "name" : "小红"
        }
    ],
    "link_url" : "http://docs.mongoengine.com/"
}

字段,Field

支持一个列表,它的第一个参数用来指定列表中元素的类型

class Page(Document):
    tags = ListField(StringField(max_length=50))

ReferenceField相当于传统数据库中的外键,用于关联其他集合

class Employee(Document):
    name = StringField()
    boss = ReferenceField('self')#如果要引用的类名没有被定义,则使用self
    profile_page = ReferenceField('ProfilePage') # 参数为要引用的类名

class ProfilePage(Document):
    content = StringField()

一对多需要用listfield,其实我感觉一对多,不好理解,不如改成一引多,把ReferenceField放在一的集合中

class User(Document):
    name = StringField()

class Page(Document):
    content = StringField()
    authors = ListField(ReferenceField(User)) 

bob = User(name="Bob Jones").save()
john = User(name="John Smith").save()

Page(content="Test Page", authors=[bob, john]).save()
Page(content="Another Page", authors=[john]).save()

需要级联删除:reverse_delete_rule =Cascade

举个例子:

举个例子:

class ProfilePage(Document):
    ...
    employee = ReferenceField('Employee', reverse_delete_rule=mongoengine.CASCADE)
本例中的声明意味着,当删除雇员对象时,引用该雇员的ProfilePage配置文件页也将被删除。如果删除了整批员工,则链接的所有配置文件页也将被删除。

默认删除是这个选项:mongoengine.DO_NOTHING。不做任何操作。

GenericReferenceField

该字段的作用是设置某个集合用来引用任何的集合。随便引用

class Link(Document):
    url = StringField()

class Post(Document):
    title = StringField()

class Bookmark(Document):
    bookmark_object = GenericReferenceField()

link = Link(url='http://hmarr.com/mongoengine/')
link.save()

post = Post(title='Using MongoEngine')
post.save()

Bookmark(bookmark_object=link).save()
Bookmark(bookmark_object=post).save()
/* 1 */
{
    "_id" : ObjectId("5d6213aa23641b08c1a29c64"),
    "bookmark_object" : {
        "_cls" : "Link",#要引用的类
        "_ref" : {
            "$ref" : "link",
            "$id" : ObjectId("5d6213aa23641b08c1a29c62")
        }
    }
}

/* 2 */
{
    "_id" : ObjectId("5d6213ab23641b08c1a29c65"),
    "bookmark_object" : {
        "_cls" : "Post",
        "_ref" : {
            "$ref" : "post",
            "$id" : ObjectId("5d6213aa23641b08c1a29c63")
        }
    }
}

注意:使用genericreferencefields的效率比标准referencefields稍低,因此,如果您只引用一种文档类型,则首选标准referencefield。

Unique

class User(Document):
    username = StringField(unique=True) #唯一
    first_name = StringField()
    last_name = StringField(unique_with='first_name') #联合唯一

Skipping Document validation on save

我们知道MongoDBengine中的字段默认有检查的功能,也就是valid,如果你要保存数据的时候不需要他检验可以这样设置

class Recipient(Document):
    name = StringField()
    email = EmailField()

recipient = Recipient(name='admin', email='root@localhost')
recipient.save()               # will raise a ValidationError while
recipient.save(validate=False) # won't

改变数据库的名字:

默认集合的名字就是当前类名的小写,如果你要更改集合的名字的话这样操作

class Page(Document):
    title = StringField(max_length=200, required=True)
    meta = {'collection': 'cmsPage'}

对集合中的文档的个数和大小限制可以这样做,默认为10M,以字节显示

class Log(Document):
    ip_address = StringField()
    meta = {'max_documents': 1000, 'max_size': 2000000}

LazyReferenceField

这个字段是懒惰关联字段,相当于懒加载,和迭代器的作用差不多,什么时候调用fetch方法我什么时候产生数据,它不想referenceField字段那样直接得出数据

# models.py
from mongoengine import LazyReferenceField, DynamicDocument, StringField, connect

connect(host="mongodb://127.0.0.1:27017/test1011")


class Address(DynamicDocument):
    add_name = StringField(max_length=22)


class People(DynamicDocument):
    name = StringField(max_length=22)
    adds = LazyReferenceField(Address)

数据库信息为:

{
    "_id" : ObjectId("5da048b823641b655f9049eb"),
    "name" : "小红",
    "adds" : ObjectId("5da0485e23641b6513319660")
}

我们要查询小红的地址

p1 = People.objects.filter(name="小红").first()
print('>>>', p1.adds)  # 这样是获取不到地址对象的
print("使用了fetch", p1.adds.fetch())  # 我们要通过fetch方法才能获取到小红对应的地址对象
print("获取小红的地址为", p1.adds.fetch().add_name) # 获取地址

结果:

>>> <LazyReference(<class 'MongoDBengine_text.model1011.Address'>, ObjectId('5da0485e23641b6513319660'))>
使用了fetch Address object
获取小红的地址为 上海

注意使用了lazyReference字段后,获取引用的对象时要用fetch()方法来获取,但是可以直接获取它的主键值:

p1 = People.objects.filter(name="小红").first()
print('>>>', p1.adds.id)  


p2 = People.objects.filter(name="小红").first()
print('>>>', p2.adds.pk)  

#结果都是
5da0485e23641b6513319660

继承

继承后的类并不会在数据库中创建新的集合,而是作为基类的属性存在

from mongoengine import *
from datetime import datetime

connect('app')


class Page(Document):
    title = StringField(max_length=200, required=True)

    meta = {'allow_inheritance': True}


# Also stored in the collection named 'page'
class DatedPage(Page):
    date = DateTimeField()


DatedPage(title='你好',date=datetime.now()).save()

结果:

{
    "_id" : ObjectId("5d621c8e23641b092a2107e2"),
    "_cls" : "Page.DatedPage",
    "title" : "你好",
    "date" : ISODate("2019-08-25T13:28:46.794Z")
}

对于继承的集合我们要注意:查询的时候既可以用基类查又可以用子类查,但是添加的时候只能用子类添加,不能用基类添加

DatedPage(title='你好', date=datetime.datetime.now()).save()  #子类添加
p = Page.objects.filter(title='你好').first()  # 基类查询
p = DatedPage.objects.filter(title='你好').first() #子类查询
print(p.date)

StringField 字符串

URLField Url

EmailField 邮箱地址字段

Intfield 32位整数

LongField 64位整数

FloatField 浮点数字段

DecimalField 定点十进制

BooleanField 布尔

DateTimeField 时间

ComplexDateTimeField 精确毫秒级时间

EmbeddedDocumentField 嵌入式文档,有声明的document_type

GenericEmbeddedDocumentField 通用嵌入式文档

DynamicField 动态字段类型

ListField 列表字段

EmbeddedDocumentListField 嵌入式有文件的List字段

SortedListField 排序的列表字段,确保始终检索为已排序的列表

DictField 字典

MapField 名称映射到指定字段

ReferenceField 文档引用

# 使用reverse_delete_rule可以处理删除字段引用的文档时应该发生的情况。
DO_NOTHING(0) - 不做任何事情(默认)。
NULLIFY(1) - 更新对null的引用。
CASCADE(2) - 删除与参考相关的文档。
DENY(3) - 防止删除参考对象。
PULL(4) - 从ListField参考文献中拉出参考

初始化参考字段。

参数:	
dbref - 将引用存储DBRef 为ObjectId.id 或.id。
reverse_delete_rule - 确定删除引用对象时要执行的操作
123456789101112
LazyReferenceField
GenericReferenceField

BinaryField 二进制数据字段

FileField GirdFS存储字段

ImageField 图像文件存储字段

SequenceField
ObjectIdField
UUIDField
GridFSProxy
ImageGridFsProxy
ImproperlyConfigured

十分不错的参考文档

https://www.cnblogs.com/hailin2018/p/15080288.html

scalar

allow user disk,参数

类型转换

SON Object

对于mongodb的document转为dict

  1. document.to_mongo().to_dict(),然后强制转换其特殊类型,如ObjectId
  2. 对document自定义to_dict方法,主要是强制转换
  3. document.to_json(),然后将json加载为dict

避免死锁

  1. 避免多次锁定。 尽量避免同一个线程对多个 Lock 进行锁定。 例如上面的死锁程序,主线程要对 A、B 两个对象的 Lock 进行锁定,副线程也要对 A、B 两个对象的 Lock 进行锁定,这就埋下了导致死锁的隐患。
  2. 具有相同的加锁顺序。 如果多个线程需要对多个 Lock 进行锁定,则应该保证它们以相同的顺序请求加锁。 …
  3. 使用定时锁。 程序在调用 acquire () 方法加锁时可指定 timeout 参数,该参数指定超过 timeout 秒后会自动释放对 Lock 的锁定,这样就可以解开死锁了。
  4. 死锁检测。 死锁检测是一种依靠算法机制来实现的死锁预防机制,它主要是针对那些不可能实现按序加锁,也不能使用定时锁的场景的。

mongodb副本集读偏好更改

​ Read Preference描述MongoDB客户端如何路由读操作到复制集成员。

​ 默认情况下,客户端直接将它的读操作发送到primary成员上,但同时客户端可以定义一个读操作的读取顺序,例如优先读secondary成员

Read Preference这个选项由read preference mode、tag set以及maxStalenessSeconds参数组成(后两者可选)。其中:

maxStalenessSeconds :代表最大延时时间。单词stale代表旧数据。也就是非旧数据的最大延时时间,最少设置为90s

意思是如果从服务器的预期延时时间超过了maxStalenessSeconds时间,则不会从它上面读取。

注意:

1、选定除Primary之外的Read Preference可能返回旧的数据,因为复制是异步进行的。

2、Secondary上面的数据可能不能反映最近的写操作

3、Read Preference不影响数据是否可见,客户机可以在写入结果得到确认或已传播到大多数副本集成员之前看到它们。

02

选项

Read Preference常见的模式:

1、primary

​ 默认模式,当前的读操作都从primary上面读。

2、primaryPreferred

​ 多数情况下,读操作从primary读,特殊情况从secondary读

3、secondary

​ 所有操作从secondary上读

4、secondaryPreferred

​ 多数情况下从secondary上读,特殊情况从primary读

5、nearest

​ 从网络延时最低的那个节点读,不管是primary还是secondary

​ 上面说过了,Read Preference这个选项由read preference mode、tag set以及maxStalenessSeconds参数组成,如果一个复制集中的成员member有tag标签,可以通过下面的办法来让read操作定位到带有某个标签的成员上。(当然,Primary Mode不支持标签,因为它只有一个固定的节点。)

db.collection.find({}).readPref( "secondary", [ { "region": "South" } ] )

update可以使用__raw__方法写原语句

也可以使用model直接save,操作底层数据结构即可

2、MongoEngine所支持的部分操作符

操作符的表示形式为:加在关键字后面使用”__+操作符”(此处是两个” _ “),例如:publish_data__gt

3、检测字符串的部分操作符

4、可以对字段值进行修改的操作符

不同版本支持的程度不一样

get查询容错

 # 查询某个用户时,get方法有则返回queryset,无则报错User.DoesNotExist
 user = User.objects.get(name="xx")
 # 为防止报错, 有则返回queryset,无则返回None
 user = User.objects.filter(name="xx")
 if user:
     user = user[0]
 # 或者
 user = User.objects.filter(name="xx").first()
 # 进一步优化
 user = User.objects(name="xx").first()
 

queryset转dict

 user = User.objects.get(name="xxx")
 # 若将此功能作为结果集的serializer使用,不应该包含外键关联字段
 # 用fields方法过滤指定字段也不起作用
 user_dict = user.to_mongo().to_dict()

in_bulk

通过索引列表获取queryset

 # 不使用in_bulk
 # 通常情况,前端发送id列表
 ids = data.json["ids"]
 result = [Role.objects.with_id(i) for i in ids]
 或
 result = Role.objects(pk__in=ids)
 
 # 使用in_bulk
 ids = data.json("ids")
 ids = [ObjectId(i) for i in ids]
 documents = Role.objects.in_bulk(ids)
 results = [documents.get(obj_id) for obj_id in ids]
 

分表,动态写collection,switch_collection

文件

fieldfield

注册连接

 mongoengine.register_connection(alias, **attrs)
 bson.objectid.ObjectId.is_valid(x) # 验证x是否为合法objectid

flask.request.args,可重复key字典

ImmutableMultiDict([(‘tag’, ‘1’), (‘tag’, ‘2’)])

 mongoengine使用__raw__是如果需要匹配id,需要转换为ObjectId,pymongo不会自动转换类型

布尔值

 v = d.get("key") in vvv
 Collection.objects.aggregate([
     {"$match": {"product_code": {"$in": ["x", "xx", "xxx"]}, "status": 1}},
     {"$order": {"xx": 1}},
     {"project": list(set(["product_code", *args]))},  # 去重
     {"$group": {"_id": "$product_code", "data": {"$first": "$$ROOT"}}}
 ])

管理接口版本

 %Y-%m-%d %H:%M:%S.%f
 """
 >>> print("xxx")  # pycharm特殊处理
 """
 decimal.Decimal("0.3") 
 decimal.Decimal(0.3)

all 是针对模块公开接口的一种约定,以提供了”白名单“的形式暴露接口。如果定义了all,其他文件中使用from xxx import *导入该文件时,只会导入 all 列出的成员,可以其他成员都被排除在外。

改动接口最好使用关键字参数,且显式传入,否则容易被位置参数和参数默认值匹配混乱造成不兼容

importlib.import_module(module)

 // exists true包括值为null的 
 db.getCollection("cal_real_rr_rank").find({"rr_last.rr_last_7_year": {"$exists": true}})
 
 // eq null 包括不存在的 ne null 不包括不存在的
 db.getCollection("cal_real_rr_rank").find({"rr_last.rr_last_7_year": {"$ne": null}})
 
 // 一般用ne null合适
 
 专门查null可能需要
 db.getCollection("cal_real_rr_rank").find({"rr_last.rr_last_7_year": {"$exists": true, "$eq": null}})

MongoEngine 的QuerySet 对象具有exec_js() 方法,允许在MongoDB 服务器上执行Javascript 函数

注意:$exists 无法利用到索引, 但 $ne 和 $in 可以用上索引, 所以处于性能的考虑尽可能用 $ne:null,当然前提是你的字段上有索引

pymongo.errors.CursorNotFound: Cursor not found


demos = db['demo'].find({},{"_id": 0})
for cursor in demos:
         do_something()
         
         
但是当do_something函数耗时过长,在cursor上长时间没有进行操作,引发cursor在mongodb服务端超时,报错:pymongo.errors.CursorNotFound: Cursor not found


设置no_cursor_timeout = True,永不超时,游标连接不会主动关闭,需要手动关闭
demos = db['demo'].find({},{"_id": 0},no_cursor_timeout = True)
for cursor in demos:
        do_something()
demo.close() # 关闭游标


mongoengine对queryset对象使用timeout(False)


分析原因:你在用 db.collection.find() 的时候,它返回的不是所有的数据,而实际上是一个“cursor”。它的默认行为是:第一次向数据库查询 101 个文档,或 1 MB 的文档,取决于哪个条件先满足;之后每次 cursor 中的文档用尽后,查询 4 MB 的文档。另外,find() 的默认行为是返回一个 10 分钟无操作后超时的 cursor。如果我一个 batch 的文档十分钟内没处理完,过后再处理完了,再用同一个 cursor id 向服务器取下一个 batch,这时候 cursor id 当然已经过期了,这也就能解释为啥我得到 cursor id 无效的错误了。

 

思路总结:默认 mongo server维护连接的时间窗口是十分钟;默认 单次从 server获取数据是101条或者 大于1M小于16M的数据,所以默认情况下,如果10分钟内未能处理完数据,则抛出该异常。

 修改每批次获取数据量的条数,即batch size:

  collection.find(condition).batch_size(10)

  批量数需 估算十分钟内能处理的数据量
  
  
总结,两种方案,一是不设置游标超时时间,二是不把每次游标获取的数据量调小为十分钟可以处理完的量

meta

from mongoengine import Document, StringField, DateTimeField, QuerySet


class AwesomeQuerySet(QuerySet):

    def get_awesome(self):
        return self.filter(awesome=True)


class User(Document):
    meta = {
        'allow_inheritance': True,  # 允许继承,这里要设置为True,默认是False
        # 'abstract': True,  # 抽象文档

        # 自定义集合名
        'verbose_name': '员工',
        # 自定义在数据库存储的集合名
        'collection': 'staff',
        # 最大文档数
        'max_documents': 1000,
        # 最大存储 2M
        'max_size': 2000000,

        # 排序方式
        'ordering': ['-published_date'],

        # 自定义查询集,扩展查询的正确方式
        'queryset_class': AwesomeQuerySet,

        # 索引相关
        'index_opts': {},  # https://www.mongodb.com/docs/manual/reference/method/db.collection.createIndex/#db.collection.createIndex
        'index_background': True,
        'index_cls': False,
        'auto_create_index': True, # 自动创建索引
        'index_drop_dups': True,
        'indexes': [
            'title',
            '$title',  # 文本索引
            '#title',  # hashed index
            ('title', '-rating'),
            ('category', '_cls'),
            {
                'fields': ['created'],
                'expireAfterSeconds': 3600
            }
        ]
    }

    name = StringField()
    published_date = DateTimeField()


User.objects.get_awesome()
# 快速查询时间范围
aggregate = NetValueCalculated.objects.aggregate(
    *[{'$match': {'product_code': product_code, 'status': 1,
                  'trading_date': {'$gte': start_trade_date,
                                   '$lte': end_trade_date}}},
      {'$project': {'trading_date': 1}},
      {"$group": {"_id": None,
                  "start_trade_date": {"$min": "$trading_date"},
                  "end_trade_date": {"$max": "$trading_date"}}},
      {"$project": {"_id": 0}}])
    netvalue_aggregate = NetValueCalculated.objects.aggregate(*[
        {"$match": {"product_code": {"$in": l_pcode}, "status": 1}, },
        {"$sort": {"trading_date": 1, }},
        {"$group": {"_id": {"product_code": "$product_code"},
                    "trading_date": {"$push": "$trading_date"},
                    "rehabilitation_nv": {"$push": "$rehabilitation_nv"}, }
         },
    ])