Skip to content

Commit d0b5014

Browse files
1.0.4: Support handling of TailThread cursor/connection failures (#174)
Support better handling/logging of TailThread cursor/connection failures Fsync the tailed oplog to disk on time or doc-writes thresholds. Pass 'backup_stop' event to threads to signal failure to other child threads. Allow oplog tailing to be disabled. Added script to simulate failure of cursor/query.
1 parent b881943 commit d0b5014

File tree

17 files changed

+312
-157
lines changed

17 files changed

+312
-157
lines changed

conf/mongodb-consistent-backup.example.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ production:
2424
# ping_secs: [1+] (default: 3)
2525
#oplog:
2626
# compression: [none|gzip] (default: gzip - if gzip is used by backup stage)
27+
# flush:
28+
# max_docs: 100
29+
# max_secs: 1
2730
# resolver_threads: [1+] (default: 2 per CPU)
2831
# tailer:
32+
# enabled: true
2933
# status_interval: 30
3034
archive:
3135
method: tar

mongodb_consistent_backup/Backup/Backup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44

55
class Backup(Stage):
6-
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, sharding=None):
7-
super(Backup, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, replsets=replsets, sharding=sharding)
6+
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop=None, sharding=None):
7+
super(Backup, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, replsets=replsets, backup_stop=backup_stop, sharding=sharding)
88
self.task = self.config.backup.method
99
self.init()

mongodb_consistent_backup/Backup/Mongodump/Mongodump.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616

1717

1818
class Mongodump(Task):
19-
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, sharding=None):
19+
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop=None, sharding=None):
2020
super(Mongodump, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
2121
self.compression_method = self.config.backup.mongodump.compression
2222
self.binary = self.config.backup.mongodump.binary
2323
self.user = self.config.username
2424
self.password = self.config.password
2525
self.authdb = self.config.authdb
2626
self.replsets = replsets
27+
self.backup_stop = backup_stop
2728
self.sharding = sharding
2829

2930
self.compression_supported = ['auto', 'none', 'gzip']
@@ -90,6 +91,9 @@ def wait(self):
9091
start_threads = len(self.dump_threads)
9192
# wait for all threads to finish
9293
while len(self.dump_threads) > 0:
94+
if self.backup_stop and self.backup_stop.is_set():
95+
logging.error("Received backup stop event due to error(s), stopping backup!")
96+
raise OperationError("Received backup stop event due to error(s)")
9397
for thread in self.dump_threads:
9498
if not thread.is_alive():
9599
if thread.exitcode == 0:
@@ -134,15 +138,11 @@ def run(self):
134138
self.states[shard],
135139
mongo_uri,
136140
self.timer,
137-
self.user,
138-
self.password,
139-
self.authdb,
141+
self.config,
140142
self.backup_dir,
141-
self.binary,
142143
self.version,
143144
self.threads(),
144-
self.do_gzip(),
145-
self.verbose
145+
self.do_gzip()
146146
)
147147
self.dump_threads.append(thread)
148148

@@ -171,20 +171,17 @@ def run(self):
171171
self.states['configsvr'],
172172
mongo_uri,
173173
self.timer,
174-
self.user,
175-
self.password,
176-
self.authdb,
174+
self.config,
177175
self.backup_dir,
178-
self.binary,
179176
self.version,
180177
self.threads(),
181-
self.do_gzip(),
182-
self.verbose
178+
self.do_gzip()
183179
)]
184180
self.dump_threads[0].start()
185181
self.dump_threads[0].join()
186182

187183
self.completed = True
184+
self.stopped = True
188185
return self._summary
189186

190187
def close(self):

mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,21 @@
1414

1515
# noinspection PyStringFormat
1616
class MongodumpThread(Process):
17-
def __init__(self, state, uri, timer, user, password, authdb, base_dir, binary, version,
18-
threads=0, dump_gzip=False, verbose=False):
17+
def __init__(self, state, uri, timer, config, base_dir, version, threads=0, dump_gzip=False):
1918
Process.__init__(self)
2019
self.state = state
2120
self.uri = uri
2221
self.timer = timer
23-
self.user = user
24-
self.password = password
25-
self.authdb = authdb
22+
self.config = config
2623
self.base_dir = base_dir
27-
self.binary = binary
2824
self.version = version
2925
self.threads = threads
3026
self.dump_gzip = dump_gzip
31-
self.verbose = verbose
27+
28+
self.user = self.config.username
29+
self.password = self.config.password
30+
self.authdb = self.config.authdb
31+
self.binary = self.config.backup.mongodump.binary
3232

3333
self.timer_name = "%s-%s" % (self.__class__.__name__, self.uri.replset)
3434
self.exit_code = 1
@@ -112,11 +112,11 @@ def mongodump_cmd(self):
112112
mongodump_flags.extend(["--authenticationDatabase", self.authdb])
113113
if self.user and self.password:
114114
# >= 3.0.2 supports password input via stdin to mask from ps
115-
if tuple("3.0.2".split(".")) <= tuple(self.version.split(".")):
115+
if tuple(self.version.split(".")) >= tuple("3.0.2".split(".")):
116116
mongodump_flags.extend(["-u", self.user, "-p", '""'])
117117
self.do_stdin_passwd = True
118118
else:
119-
logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.2.0 to resolve this")
119+
logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.0.2 to resolve this")
120120
mongodump_flags.extend(["-u", self.user, "-p", self.password])
121121
mongodump_cmd.extend(mongodump_flags)
122122
return mongodump_cmd

mongodb_consistent_backup/Common/DB.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import logging
22

3-
from pymongo import MongoClient
3+
from bson.codec_options import CodecOptions
4+
from inspect import currentframe, getframeinfo
5+
from pymongo import DESCENDING, CursorType, MongoClient
46
from pymongo.errors import ConnectionFailure, OperationFailure, ServerSelectionTimeoutError
57
from time import sleep
68

@@ -117,6 +119,26 @@ def replset(self):
117119
return isMaster['setName']
118120
return None
119121

122+
def get_oplog_rs(self):
123+
if not self._conn:
124+
self.connect()
125+
db = self._conn['local']
126+
return db.oplog.rs.with_options(codec_options=CodecOptions(unicode_decode_error_handler="ignore"))
127+
128+
def get_oplog_tail_ts(self):
129+
logging.debug("Gathering oldest 'ts' in %s oplog" % self.uri)
130+
return self.get_oplog_rs().find_one(sort=[('$natural', DESCENDING)])['ts']
131+
132+
def get_oplog_cursor_since(self, caller, ts=None):
133+
frame = getframeinfo(currentframe().f_back)
134+
comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno)
135+
if not ts:
136+
ts = self.get_oplog_tail_ts()
137+
query = {'ts':{'$gte':ts}}
138+
logging.debug("Querying oplog on %s with query: %s" % (self.uri, query))
139+
# http://api.mongodb.com/python/current/examples/tailable.html
140+
return self.get_oplog_rs().find(query, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True).comment(comment)
141+
120142
def close(self):
121143
if self._conn:
122144
logging.debug("Closing connection to: %s" % self.uri)

mongodb_consistent_backup/Main.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import sys
55

66
from datetime import datetime
7-
from multiprocessing import current_process, Manager
7+
from multiprocessing import current_process, Event, Manager
88

99
from Archive import Archive
1010
from Backup import Backup
@@ -35,6 +35,7 @@ def __init__(self, prog_name="mongodb-consistent-backup"):
3535
self.backup_time = None
3636
self.backup_directory = None
3737
self.backup_root_subdirectory = None
38+
self.backup_stop = Event()
3839
self.uri = None
3940
self.db = None
4041
self.is_sharded = False
@@ -345,7 +346,8 @@ def run(self):
345346
self.timer,
346347
self.backup_root_subdirectory,
347348
self.backup_directory,
348-
self.replsets
349+
self.replsets,
350+
self.backup_stop
349351
)
350352
except Exception, e:
351353
self.exception("Problem initializing oplog tailer! Error: %s" % e, e)
@@ -359,6 +361,7 @@ def run(self):
359361
self.backup_root_subdirectory,
360362
self.backup_directory,
361363
self.replsets,
364+
self.backup_stop,
362365
self.sharding
363366
)
364367
if self.backup.is_compressed():
@@ -465,7 +468,7 @@ def run(self):
465468

466469
StateDoneStamp(self.backup_directory, self.config).write()
467470
self.update_symlinks()
468-
logging.info("Completed %s in %.2f sec" % (self.program_name, self.timer.duration(self.timer_name)))
469471

470472
self.logger.rotate()
473+
logging.info("Completed %s in %.2f sec" % (self.program_name, self.timer.duration(self.timer_name)))
471474
self.release_lock()

mongodb_consistent_backup/Oplog/Oplog.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,27 @@
44
from gzip import GzipFile
55
from bson import BSON, decode_file_iter
66
from bson.codec_options import CodecOptions
7+
from time import time
78

89
from mongodb_consistent_backup.Errors import OperationError
910

1011

1112
class Oplog:
12-
def __init__(self, oplog_file, do_gzip=False, file_mode="r"):
13+
def __init__(self, oplog_file, do_gzip=False, file_mode="r", flush_docs=100, flush_secs=1):
1314
self.oplog_file = oplog_file
1415
self.do_gzip = do_gzip
1516
self.file_mode = file_mode
17+
self.flush_docs = flush_docs
18+
self.flush_secs = flush_secs
1619

1720
self._count = 0
1821
self._first_ts = None
1922
self._last_ts = None
2023
self._oplog = None
2124

25+
self._last_flush_time = time()
26+
self._writes_unflushed = 0
27+
2228
self.open()
2329

2430
def handle(self):
@@ -56,23 +62,50 @@ def load(self):
5662
logging.fatal("Error reading oplog file %s! Error: %s" % (self.oplog_file, e))
5763
raise OperationError(e)
5864

59-
def add(self, doc):
65+
def add(self, doc, autoflush=True):
6066
try:
6167
self._oplog.write(BSON.encode(doc))
62-
self._count += 1
68+
self._writes_unflushed += 1
69+
self._count += 1
6370
if not self._first_ts:
6471
self._first_ts = doc['ts']
6572
self._last_ts = doc['ts']
73+
if autoflush:
74+
self.autoflush()
6675
except Exception, e:
6776
logging.fatal("Cannot write to oplog file %s! Error: %s" % (self.oplog_file, e))
6877
raise OperationError(e)
6978

79+
def secs_since_flush(self):
80+
return time() - self._last_flush_time
81+
82+
def do_flush(self):
83+
if self._writes_unflushed > self.flush_docs:
84+
return True
85+
elif self.secs_since_flush() > self.flush_secs:
86+
return True
87+
return False
88+
7089
def flush(self):
7190
if self._oplog:
7291
return self._oplog.flush()
7392

93+
def fsync(self):
94+
if self._oplog:
95+
# https://docs.python.org/2/library/os.html#os.fsync
96+
self._oplog.flush()
97+
self._last_flush_time = time()
98+
self._writes_unflushed = 0
99+
return os.fsync(self._oplog.fileno())
100+
101+
def autoflush(self):
102+
if self._oplog and self.do_flush():
103+
logging.debug("Fsyncing %s (secs_since=%.2f, changes=%i, ts=%s)" % (self.oplog_file, self.secs_since_flush(), self._writes_unflushed, self.last_ts()))
104+
return self.fsync()
105+
74106
def close(self):
75107
if self._oplog:
108+
self.fsync()
76109
return self._oplog.close()
77110

78111
def count(self):

mongodb_consistent_backup/Oplog/OplogState.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,20 @@ def get(self, key=None):
3434
else:
3535
return None
3636
return state
37+
except IOError, e:
38+
return None
3739
except Exception, e:
3840
raise OperationError(e)
3941

40-
def set(self, key, value):
42+
def set(self, key, value, merge=False):
4143
try:
42-
self._state[key] = value
44+
if merge and isinstance(value, dict):
45+
for key in value:
46+
self._state[key] = value[key]
47+
else:
48+
self._state[key] = value
49+
except IOError, e:
50+
pass
4351
except Exception, e:
4452
raise OperationError(e)
4553

0 commit comments

Comments
 (0)