激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|數據庫技術|

服務器之家 - 數據庫 - MongoDB - 利用MongoDB中oplog機制實現準實時數據的操作監控

利用MongoDB中oplog機制實現準實時數據的操作監控

2020-05-12 17:44零の雜貨鋪 MongoDB

MongoDB 的Replication是通過一個日志來存儲寫操作的,這個日志就叫做oplog,而下面這篇文章主要給大家介紹了利用MongoDB中oplog機制實現準實時數據的操作監控的相關資料,需要的朋友可以參考借鑒,下面來一起看看吧。

前言

最近有一個需求是要實時獲取到新插入到MongoDB的數據,而插入程序本身已經有一套處理邏輯,所以不方便直接在插入程序里寫相關程序,傳統的數據庫大多自帶這種觸發器機制,但是Mongo沒有相關的函數可以用(也可能我了解的太少了,求糾正),當然還有一點是需要python實現,于是收集整理了一個相應的實現方法。

一、引子

首先可以想到,這種需求其實很像數據庫的主從備份機制,從數據庫之所以能夠同步主庫是因為存在某些指標來做控制,我們知道MongoDB雖然沒有現成觸發器,但是它能夠實現主從備份,所以我們就從它的主從備份機制入手。

二、OPLOG

首先,需要以master模式來打開mongod守護,命令行使用–master,或者配置文件增加master鍵為true。

此時,我們可以在Mongo的系統庫local里見到新增的collection——oplog,此時oplog.$main里就會存儲進oplog信息,如果此時還有充當從數據庫的Mongo存在,就會還有一些slaves的信息,由于我們這里并不是主從同步,所以不存在這些集合。

利用MongoDB中oplog機制實現準實時數據的操作監控

再來看看oplog結構:

?
1
2
3
4
5
6
7
"ts" : Timestamp(6417682881216249, 1), 時間戳
"h" : NumberLong(0), 長度
"v" : 2,
"op" : "n", 操作類型
"ns" : "", 操作的庫和集合
"o2" : "_id" update條件
"o" : {} 操作值,即document

這里需要知道op的幾種屬性:

?
1
2
3
4
5
insert,'i'
update, 'u'
remove(delete), 'd'
cmd, 'c'
noop, 'n' 空操作

從上面的信息可以看出,我們只要不斷讀取到ts來做對比,然后根據op即可判斷當前出現的是什么操作,相當于使用程序實現了一個從數據庫的接收端。

三、CODE

在Github上找到了別人的實現方式,不過它的函數庫太老舊,所以在他的基礎上進行修改。

Github地址:https://github.com/RedBeard0531/mongo-oplog-watcher

mongo_oplog_watcher.py如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/python
import pymongo
import re
import time
from pprint import pprint # pretty printer
from pymongo.errors import AutoReconnect
 
class OplogWatcher(object):
  def __init__(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True):
    if collection is not None:
      if db is None:
        raise ValueError('must specify db if you specify a collection')
      self._ns_filter = db + '.' + collection
    elif db is not None:
      self._ns_filter = re.compile(r'^%s\.' % db)
    else:
      self._ns_filter = None
 
    self.poll_time = poll_time
    self.connection = connection or pymongo.Connection()
 
    if start_now:
      self.start()
 
  @staticmethod
  def __get_id(op):
    id = None
    o2 = op.get('o2')
    if o2 is not None:
      id = o2.get('_id')
 
    if id is None:
      id = op['o'].get('_id')
 
    return id
 
  def start(self):
    oplog = self.connection.local['oplog.$main']
    ts = oplog.find().sort('$natural', -1)[0]['ts']
    while True:
      if self._ns_filter is None:
        filter = {}
      else:
        filter = {'ns': self._ns_filter}
      filter['ts'] = {'$gt': ts}
      try:
        cursor = oplog.find(filter, tailable=True)
        while True:
          for op in cursor:
            ts = op['ts']
            id = self.__get_id(op)
            self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op)
          time.sleep(self.poll_time)
          if not cursor.alive:
            break
      except AutoReconnect:
        time.sleep(self.poll_time)
 
  def all_with_noop(self, ns, ts, op, id, raw):
    if op == 'n':
      self.noop(ts=ts)
    else:
      self.all(ns=ns, ts=ts, op=op, id=id, raw=raw)
 
  def all(self, ns, ts, op, id, raw):
    if op == 'i':
      self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw)
    elif op == 'u':
      self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw)
    elif op == 'd':
      self.delete(ns=ns, ts=ts, id=id, raw=raw)
    elif op == 'c':
      self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw)
    elif op == 'db':
      self.db_declare(ns=ns, ts=ts, raw=raw)
 
  def noop(self, ts):
    pass
 
  def insert(self, ns, ts, id, obj, raw, **kw):
    pass
 
  def update(self, ns, ts, id, mod, raw, **kw):
    pass
 
  def delete(self, ns, ts, id, raw, **kw):
    pass
 
  def command(self, ns, ts, cmd, raw, **kw):
    pass
 
  def db_declare(self, ns, ts, **kw):
    pass
 
class OplogPrinter(OplogWatcher):
  def all(self, **kw):
    pprint (kw)
    print #newline
 
if __name__ == '__main__':
  OplogPrinter()

首先是實現一個數據庫的初始化,設定一個延遲時間(準實時):

?
1
2
self.poll_time = poll_time
self.connection = connection or pymongo.MongoClient()

主要的函數是start() ,實現一個時間的比對并進行相應字段的處理:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def start(self):
 oplog = self.connection.local['oplog.$main']
 #讀取之前提到的庫
 ts = oplog.find().sort('$natural', -1)[0]['ts']
 #獲取一個時間邊際
 while True:
 if self._ns_filter is None:
  filter = {}
 else:
  filter = {'ns': self._ns_filter}
 filter['ts'] = {'$gt': ts}
 try:
  cursor = oplog.find(filter)
  #對此時間之后的進行處理
  while True:
  for op in cursor:
   ts = op['ts']
   id = self.__get_id(op)
   self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op)
   #可以指定處理插入監控,更新監控或者刪除監控等
  time.sleep(self.poll_time)
  if not cursor.alive:
   break
 except AutoReconnect:
  time.sleep(self.poll_time)

循環這個start函數,在all_with_noop這里就可以編寫相應的監控處理邏輯。

這樣就可以實現一個簡易的準實時Mongo數據庫操作監控器,下一步就可以配合其他操作來對新入庫的程序進行相應處理。

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。

原文鏈接:http://phantom0301.cc/2017/05/08/MongoOplog/

延伸 · 閱讀

精彩推薦
  • MongoDBMongoDB 內存使用情況分析

    MongoDB 內存使用情況分析

    都說 MongoDB 是個內存大戶,但是怎么知道它到底用了多少內存呢...

    MongoDB教程網10002020-09-29
  • MongoDBMongoDB安裝圖文教程

    MongoDB安裝圖文教程

    這篇文章主要為大家詳細介紹了MongoDB安裝圖文教程,分為兩大部分為大家介紹下載MongoDB和安裝MongoDB的方法,感興趣的小伙伴們可以參考一下 ...

    Yangyi.He6132020-05-07
  • MongoDBMongoDB中javascript腳本編程簡介和入門實例

    MongoDB中javascript腳本編程簡介和入門實例

    作為一個數據庫,MongoDB有一個很大的優勢——它使用js管理數據庫,所以也能夠使用js腳本進行復雜的管理——這種方法非常靈活 ...

    MongoDB教程網6982020-04-24
  • MongoDBMongoDB憑什么躋身數據庫排行前五

    MongoDB憑什么躋身數據庫排行前五

    MongoDB以比去年同期超出65.96分的成績繼續雄踞榜單前五,這個增幅在全榜僅次于PostgreSQL的77.99,而其相對于4月份的6.10分的增長也是僅次于微軟SQL Server排名...

    孫浩峰3892020-05-22
  • MongoDB分布式文檔存儲數據庫之MongoDB分片集群的問題

    分布式文檔存儲數據庫之MongoDB分片集群的問題

    這篇文章主要介紹了分布式文檔存儲數據庫之MongoDB分片集群的問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋...

    Linux-18743072020-12-20
  • MongoDBMongodb實現定時備份與恢復的方法教程

    Mongodb實現定時備份與恢復的方法教程

    這篇文章主要給大家介紹了Mongodb實現定時備份與恢復的方法教程,文中通過示例代碼介紹的非常詳細,對大家具有一定的參考學習價值,需要的朋友們下面...

    chenjsh364522020-05-13
  • MongoDB遷移sqlserver數據到MongoDb的方法

    遷移sqlserver數據到MongoDb的方法

    這篇文章主要介紹了遷移sqlserver數據到MongoDb的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下...

    聽楓xl9682021-01-03
  • MongoDBmongodb基本命令實例小結

    mongodb基本命令實例小結

    這篇文章主要介紹了mongodb基本命令,結合實例形式總結分析了MongoDB數據庫切換、查看、刪除、查詢等基本命令用法與操作注意事項,需要的朋友可以參考下...

    dawn-liu3652020-05-26
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25
主站蜘蛛池模板: 日韩视频一区二区三区在线观看 | 成年免费视频黄网站在线观看 | 91久久国产露脸精品国产护士 | 免费国产人成网站 | 成人一级毛片 | 毛片小网站 | 国产一精品久久99无吗一高潮 | 美国av免费看| 中国av一级片 | 欧美日韩亚洲另类 | 伊人99在线 | 国产精品免费视频观看 | 九一免费国产 | 成人福利在线播放 | 亚洲 综合 欧美 动漫 丝袜图 | 国产精品二区高清在线 | 亚洲视屏在线 | 视频一区二区国产 | 黄网站免费观看视频 | 国产一级桃视频播放 | 久久久综 | 久久99亚洲精品久久99果 | 成人在线观看免费高清 | 国产一级αv片免费观看 | 日美av在线 | 萌白酱福利视频在线网站 | 国产日产精品一区四区介绍 | 国产免费永久在线观看 | 日本精品一二区 | 成人性视频在线 | 欧洲成人综合网 | 国产va在线观看 | 久久免费精品视频 | 免费黄色av | 日本一区二区在线 | 久久久久国产成人精品亚洲午夜 | 九九热视频在线免费观看 | 黄色av.com | 国产羞羞视频免费在线观看 | 特级无码毛片免费视频尤物 | 国产一区免费观看 |