Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ install: bin/mongodb-consistent-backup
install -m 0644 LICENSE $(SHAREDIR)/$(NAME)/LICENSE
install -m 0644 README.rst $(SHAREDIR)/$(NAME)/README.rst

flake8:
# Ignore long-lines and space-aligned = and : for now
flake8 --ignore E221,E241,E501 $(PWD)/$(NAME)

uninstall:
rm -f $(BINDIR)/mongodb-consistent-backup
rm -rf $(SHAREDIR)/$(NAME)
Expand Down
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ Roadmap
- Documentation for running under Docker with persistent volumes
- Python unit tests

Submitting Code
~~~~~~~~~~~~~~~

- Submitted code must pass Python `'flake8' <https://pypi.python.org/pypi/flake8>`__ checks. Run *'make flake8'* to test.
- To make review easier, pull requests must address and solve one problem at a time.

Links
~~~~~

Expand Down
2 changes: 0 additions & 2 deletions mongodb_consistent_backup/Archive/Archive.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from mongodb_consistent_backup.Archive.Tar import Tar
from mongodb_consistent_backup.Archive.Zbackup import Zbackup
from mongodb_consistent_backup.Pipeline import Stage


Expand Down
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Archive/Tar/Tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from types import MethodType

from TarThread import TarThread
from mongodb_consistent_backup.Common import parse_method
from mongodb_consistent_backup.Errors import Error, OperationError
from mongodb_consistent_backup.Pipeline import Task

Expand All @@ -19,6 +18,7 @@ def _reduce_method(m):
else:
return getattr, (m.im_self, m.im_func.func_name)


pickle(MethodType, _reduce_method)


Expand Down
28 changes: 14 additions & 14 deletions mongodb_consistent_backup/Archive/Tar/TarThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ def run(self):
if os.path.isdir(self.backup_dir):
if not os.path.isfile(self.output_file):
try:
backup_base_dir = os.path.dirname(self.backup_dir)
backup_base_name = os.path.basename(self.backup_dir)
log_msg = "Archiving directory: %s" % self.backup_dir
cmd_flags = ["-C", backup_base_dir, "-cf", self.output_file, "--remove-files", backup_base_name]
if self.do_gzip():
log_msg = "Archiving and compressing directory: %s" % self.backup_dir
cmd_flags = ["-C", backup_base_dir, "-czf", self.output_file, "--remove-files", backup_base_name]
logging.info(log_msg)
self.running = True
self._command = LocalCommand(self.binary, cmd_flags, self.verbose)
self.exit_code = self._command.run()
backup_base_dir = os.path.dirname(self.backup_dir)
backup_base_name = os.path.basename(self.backup_dir)

log_msg = "Archiving directory: %s" % self.backup_dir
cmd_flags = ["-C", backup_base_dir, "-cf", self.output_file, "--remove-files", backup_base_name]

if self.do_gzip():
log_msg = "Archiving and compressing directory: %s" % self.backup_dir
cmd_flags = ["-C", backup_base_dir, "-czf", self.output_file, "--remove-files", backup_base_name]

logging.info(log_msg)
self.running = True
self._command = LocalCommand(self.binary, cmd_flags, self.verbose)
self.exit_code = self._command.run()
except Exception, e:
logging.fatal("Failed archiving file: %s! Error: %s" % (self.output_file, e))
finally:
Expand Down
4 changes: 2 additions & 2 deletions mongodb_consistent_backup/Archive/Tar/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from Tar import Tar
from Tar import Tar # NOQA


def config(parser):
parser.add_argument("--archive.tar.compression", dest="archive.tar.compression",
help="Tar archiver compression method (default: gzip)", default='gzip', choices=['gzip', 'none'])
parser.add_argument("--archive.tar.threads", dest="archive.tar.threads",
parser.add_argument("--archive.tar.threads", dest="archive.tar.threads",
help="Number of Tar archiver threads to use (default: 1-per-CPU)", default=0, type=int)
return parser
18 changes: 9 additions & 9 deletions mongodb_consistent_backup/Archive/Zbackup/Zbackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,20 @@ def version(self):
return None
except Exception, e:
raise OperationError("Could not gather ZBackup version: %s" % e)

def has_zbackup(self):
if self.version():
return True
return True
return False

def close(self, exit_code=None, frame=None):
del exit_code
del frame
if not self.stopped:
if self._zbackup and self._zbackup.poll() == None:
if self._zbackup and self._zbackup.poll() is None:
logging.debug("Stopping running ZBackup command")
self._zbackup.terminate()
if self._tar and self._tar.poll() == None:
if self._tar and self._tar.poll() is None:
logging.debug("Stopping running ZBackup tar command")
self._tar.terminate()
self.stopped = True
Expand All @@ -131,12 +131,12 @@ def wait(self):
self.poll()
if tar_done:
self._zbackup.communicate()
if self._zbackup.poll() != None:
if self._zbackup.poll() is not None:
logging.info("ZBackup completed successfully with exit code: %i" % self._zbackup.returncode)
if self._zbackup.returncode != 0:
raise OperationError("ZBackup exited with code: %i!" % self._zbackup.returncode)
break
elif self._tar.poll() != None:
elif self._tar.poll() is not None:
if self._tar.returncode == 0:
logging.debug("ZBackup tar command completed successfully with exit code: %i" % self._tar.returncode)
tar_done = True
Expand All @@ -160,9 +160,9 @@ def run(self):
lock = Lock(self.zbackup_lock)
lock.acquire()
try:
logging.info("Starting ZBackup version: %s (options: compression=%s, encryption=%s, threads=%i, cache_mb=%i)" %
(self.version(), self.compression(), self.encrypted, self.threads(), self.zbackup_cache_mb)
)
logging.info("Starting ZBackup version: %s (options: compression=%s, encryption=%s, threads=%i, cache_mb=%i)" % (
self.version(), self.compression(), self.encrypted, self.threads(), self.zbackup_cache_mb
))
self.running = True
try:
for sub_dir in os.listdir(self.backup_dir):
Expand Down
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Archive/Zbackup/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from Zbackup import Zbackup
from Zbackup import Zbackup # NOQA


def config(parser):
Expand Down
4 changes: 2 additions & 2 deletions mongodb_consistent_backup/Archive/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from Archive import Archive
from Archive import Archive # NOQA


def config(parser):
parser.add_argument("--archive.method", dest="archive.method", help="Archiver method (default: tar)", default='tar', choices=['tar','zbackup','none'])
parser.add_argument("--archive.method", dest="archive.method", help="Archiver method (default: tar)", default='tar', choices=['tar', 'zbackup', 'none'])
return parser
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Backup/Backup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from mongodb_consistent_backup.Backup.Mongodump import Mongodump
from mongodb_consistent_backup.Backup.Mongodump import Mongodump # NOQA
from mongodb_consistent_backup.Pipeline import Stage


Expand Down
9 changes: 3 additions & 6 deletions mongodb_consistent_backup/Backup/Mongodump/Mongodump.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import os, sys
import os
import logging
import signal
import sys

from math import floor
from multiprocessing import cpu_count
from subprocess import check_output
from time import sleep

from mongodb_consistent_backup.Common import MongoUri, config_to_string
from mongodb_consistent_backup.Errors import Error, OperationError
from mongodb_consistent_backup.Errors import OperationError
from mongodb_consistent_backup.Oplog import OplogState
from mongodb_consistent_backup.Pipeline import Task

Expand Down Expand Up @@ -82,8 +81,6 @@ def summary(self):
def get_summaries(self):
for shard in self.states:
state = self.states[shard]
host = state.get('host')
port = state.get('port')
self._summary[shard] = state.get().copy()

def wait(self):
Expand Down
8 changes: 4 additions & 4 deletions mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def parse_mongodump_line(self, line):
(date, line) = line.split("\t")
elif is_datetime(line):
return None
return "%s:\t%s" % (self.uri, line)
return "%s:\t%s" % (self.uri, line)
except:
return None

Expand Down Expand Up @@ -105,7 +105,7 @@ def wait(self):
break
else:
logging.info(line)
if self._process.poll() != None:
if self._process.poll() is not None:
break
except Exception, e:
logging.exception("Error reading mongodump output: %s" % e)
Expand All @@ -117,7 +117,7 @@ def mongodump_cmd(self):
mongodump_cmd = [self.binary]
mongodump_flags = ["--host", mongodump_uri.host, "--port", str(mongodump_uri.port), "--oplog", "--out", "%s/dump" % self.backup_dir]
if self.threads > 0:
mongodump_flags.extend(["--numParallelCollections="+str(self.threads)])
mongodump_flags.extend(["--numParallelCollections=" + str(self.threads)])
if self.dump_gzip:
mongodump_flags.extend(["--gzip"])
if tuple("3.4.0".split(".")) <= tuple(self.version.split(".")):
Expand All @@ -131,7 +131,7 @@ def mongodump_cmd(self):
mongodump_flags.extend(["-u", self.user, "-p", '""'])
self.do_stdin_passwd = True
else:
logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.0.2 to resolve this")
logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.0.2 to resolve this")
mongodump_flags.extend(["-u", self.user, "-p", self.password])
mongodump_cmd.extend(mongodump_flags)
return mongodump_cmd
Expand Down
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Backup/Mongodump/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from Mongodump import Mongodump
from Mongodump import Mongodump # NOQA


def config(parser):
Expand Down
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Backup/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from Backup import Backup
from Backup import Backup # NOQA


def config(parser):
Expand Down
6 changes: 3 additions & 3 deletions mongodb_consistent_backup/Common/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __call__(self, parser, namespace, values, option_string=None):

class ConfigParser(BaseConfiguration):
def makeParserLoadSubmodules(self, parser):
for _, modname, ispkg in walk_packages(path=mongodb_consistent_backup.__path__, prefix=mongodb_consistent_backup.__name__+'.'):
for _, modname, ispkg in walk_packages(path=mongodb_consistent_backup.__path__, prefix=mongodb_consistent_backup.__name__ + '.'):
if not ispkg:
continue
try:
Expand All @@ -41,7 +41,7 @@ def makeParserLoadSubmodules(self, parser):
for comp in components[1:]:
mod = getattr(mod, comp)
parser = mod.config(parser)
except AttributeError, e:
except AttributeError:
continue
return parser

Expand Down Expand Up @@ -104,7 +104,7 @@ def to_dict(self, data):
value = "******"
ret[key] = value
return ret
elif isinstance(data, (str, int, bool)): # or isinstance(data, int) or isinstance(data, bool):
elif isinstance(data, (str, int, bool)):
return data

def dump(self):
Expand Down
11 changes: 6 additions & 5 deletions mongodb_consistent_backup/Common/DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pymongo.errors import ConnectionFailure, OperationFailure, ServerSelectionTimeoutError
from time import sleep

from mongodb_consistent_backup.Errors import DBAuthenticationError, DBConnectionError, DBOperationError, Error, OperationError
from mongodb_consistent_backup.Errors import DBAuthenticationError, DBConnectionError, DBOperationError, Error


class DB:
Expand All @@ -31,8 +31,9 @@ def connect(self):
try:
if self.do_replset:
self.replset = self.uri.replset
logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s)" %
(self.uri, self.replset, self.read_pref))
logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s)" % (
self.uri, self.replset, self.read_pref
))
conn = MongoClient(
connect=self.do_connect,
host=self.uri.hosts(),
Expand All @@ -44,7 +45,7 @@ def connect(self):
w="majority"
)
if self.do_connect:
conn['admin'].command({"ping":1})
conn['admin'].command({"ping": 1})
except (ConnectionFailure, OperationFailure, ServerSelectionTimeoutError), e:
logging.error("Unable to connect to %s! Error: %s" % (self.uri, e))
raise DBConnectionError(e)
Expand Down Expand Up @@ -134,7 +135,7 @@ def get_oplog_cursor_since(self, caller, ts=None):
comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno)
if not ts:
ts = self.get_oplog_tail_ts()
query = {'ts':{'$gte':ts}}
query = {'ts': {'$gte': ts}}
logging.debug("Querying oplog on %s with query: %s" % (self.uri, query))
# http://api.mongodb.com/python/current/examples/tailable.html
return self.get_oplog_rs().find(query, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True).comment(comment)
Expand Down
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Common/LocalCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def run(self):
sleep(0.1)
except Exception, e:
raise Error(e)

if self._process.returncode != 0:
raise OperationError("%s command failed with exit code %i! Stderr output:\n%s" % (
self.command,
Expand Down
6 changes: 3 additions & 3 deletions mongodb_consistent_backup/Common/Lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
class Lock:
def __init__(self, lock_file, acquire=True):
self.lock_file = lock_file

self._lock = None
if acquire:
self.acquire()

def acquire(self):
try:
self._lock = open(self.lock_file, "w")
Expand All @@ -24,7 +24,7 @@ def acquire(self):
if self._lock:
self._lock.close()
raise OperationError("Could not acquire lock on file: %s!" % self.lock_file)

def release(self):
if self._lock:
logging.debug("Releasing exclusive lock on file: %s" % self.lock_file)
Expand Down
5 changes: 3 additions & 2 deletions mongodb_consistent_backup/Common/MongoUri.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def str(self):
def __str__(self):
return self.str()


class MongoUri:
def __init__(self, url, default_port=27017, replset=None):
self.url = url
Expand Down Expand Up @@ -49,8 +50,8 @@ def parse(self):
addr = MongoAddr()
addr.replset = self.replset
if ":" in url:
addr.host, addr.port = url.split(":")
addr.port = int(addr.port)
addr.host, addr.port = url.split(":")
addr.port = int(addr.port)
else:
addr.host = url
if not addr.port:
Expand Down
4 changes: 2 additions & 2 deletions mongodb_consistent_backup/Common/Timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def __init__(self, manager):
self.timers = manager.dict()

def start(self, timer_name):
self.timers[timer_name] = { 'start': time(), 'started': True }
self.timers[timer_name] = {'start': time(), 'started': True}

def stop(self, timer_name):
try:
Expand All @@ -22,7 +22,7 @@ def stop(self, timer_name):
else:
raise OperationError("No started timer named %s to stop!" % timer_name)
except IOError:
pass
pass

def duration(self, timer_name):
try:
Expand Down
3 changes: 3 additions & 0 deletions mongodb_consistent_backup/Common/Util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ def config_to_string(config):
config_pairs.append("%s=%s" % (key, config[key]))
return ", ".join(config_pairs)


def is_datetime(string):
try:
parser.parse(string)
return True
except:
return False


def parse_method(method):
return method.rstrip().lower()


def validate_hostname(hostname):
try:
if ":" in hostname:
Expand Down
Loading