Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Features
archiving method (*optional*)
- `AWS S3 <https://aws.amazon.com/s3/>`__ Secure Multipart backup uploads (*optional*)
- `Google Cloud Storage <https://cloud.google.com/storage/>`__ Secure backup uploads (*optional*)
- Rsync (over SSH) secure backup uploads (*optional*)
- `Nagios NSCA <https://sourceforge.net/p/nagios/nsca>`__ push
notification support (*optional*)
- Modular backup, archiving, upload and notification components
Expand Down Expand Up @@ -219,7 +220,6 @@ Roadmap
- Upload compatibility for ZBackup archive phase *(upload unsupported today)*
- Backup retention/rotation *(eg: delete old backups)*
- Support more notification methods *(Prometheus, PagerDuty, etc)*
- Support more upload methods *(Rsync, etc)*
- Support SSL MongoDB connections
- Documentation for running under Docker with persistent volumes
- Python unit tests
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.0
1.2.0
25 changes: 17 additions & 8 deletions conf/mongodb-consistent-backup.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ production:
location: /var/lib/mongodb-consistent-backup
# mongodump:
# binary: [path] (default: /usr/bin/mongodump)
# compression: [auto|none|gzip] (default: auto - enable gzip if supported)
# threads: [1-16] (default: auto-generated - shards/cpu)
# compression: [auto|none|gzip] (default: auto = enable gzip if supported)
# threads: [1-16] (default: auto-generated, shards/cpu)
#replication:
# max_lag_secs: [1+] (default: 10)
# min_priority: [0-999] (default: 0)
Expand All @@ -23,18 +23,19 @@ production:
# wait_secs: [1+] (default: 300)
# ping_secs: [1+] (default: 3)
#oplog:
# compression: [none|gzip] (default: gzip - if gzip is used by backup stage)
# compression: [none|gzip] (default: gzip, if used by backup stage)
# flush:
# max_docs: 100
# max_secs: 1
# resolver_threads: [1+] (default: 2 per CPU)
# max_docs: [1+] (default: 100)
# max_secs: [1+] (default: 1)
# resolver:
# threads: [1+] (default: 2 per CPU)
# tailer:
# enabled: true
# status_interval: 30
archive:
method: tar
# tar:
# compression: [none|gzip] (default: gzip - none if backup is already compressed)
# compression: [none|gzip] (default: gzip, none if backup already compressed)
# threads: [1+] (default: 1 per CPU)
# zbackup:
# binary: [path] (default: /usr/bin/zbackup)
Expand All @@ -58,7 +59,15 @@ production:
# secret_key: [Google Cloud Storage Secret Key]
# bucket_name: [Google Cloud Storage Bucket Name]
# bucket_prefix: [prefix] (default: /)
# threads: [1+] (default: 1 per CPU)
# threads: [1+] (default: 4)
# rsync:
# path: [Rsync Destination Path]
# user: [SSH Username]
# host: [SSH Hostname/IP]
# port: [SSH Port Number] (default: 22)
# delete: [true|false] (default: false)
# threads: [1+] (default: 4)
# retries: [1+] (default: 5)
# s3:
# region: [AWS S3 Region] (default: us-east-1)
# access_key: [AWS S3 Access Key]
Expand Down
21 changes: 21 additions & 0 deletions mongodb_consistent_backup/Common/Util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import socket

from dateutil import parser
from select import select

from mongodb_consistent_backup.Errors import OperationError

Expand Down Expand Up @@ -31,3 +32,23 @@ def validate_hostname(hostname):
socket.getaddrinfo(hostname, None)
except socket.error, e:
raise OperationError("Could not resolve host '%s', error: %s" % (hostname, e))


def wait_popen(process, stderr_callback, stdout_callback):
try:
while not process.returncode:
poll = select([process.stderr.fileno(), process.stdout.fileno()], [], [])
if len(poll) >= 1:
for fd in poll[0]:
if process.stderr and fd == process.stderr.fileno():
stderr_callback(process.stderr.readline().rstrip())
if process.stdout and fd == process.stdout.fileno():
stdout_callback(process.stdout.readline().rstrip())
if process.poll() is not None:
break
stderr, stdout = process.communicate()
stderr_callback(stderr.rstrip())
stdout_callback(stdout.rstrip())
except Exception, e:
raise e
return True
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from Lock import Lock # NOQA
from MongoUri import MongoUri # NOQA
from Timer import Timer # NOQA
from Util import config_to_string, is_datetime, parse_method, validate_hostname # NOQA
from Util import config_to_string, is_datetime, parse_method, validate_hostname, wait_popen # NOQA
6 changes: 4 additions & 2 deletions mongodb_consistent_backup/Upload/Gs/Gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
super(Gs, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
self.backup_location = self.config.backup.location
self.remove_uploaded = self.config.upload.remove_uploaded
self.retries = self.config.upload.retries
self.project_id = self.config.upload.gs.project_id
self.access_key = self.config.upload.gs.access_key
self.secret_key = self.config.upload.gs.secret_key
self.bucket = self.config.upload.gs.bucket

self.threads(self.config.upload.gs.threads)
self.threads(self.config.upload.threads)
self._pool = Pool(processes=self.threads())

def close(self):
Expand Down Expand Up @@ -69,7 +70,8 @@ def run(self):
self.project_id,
self.access_key,
self.secret_key,
self.remove_uploaded
self.remove_uploaded,
self.retries
).run)
self._pool.close()
self._pool.join()
Expand Down
22 changes: 17 additions & 5 deletions mongodb_consistent_backup/Upload/Gs/GsUploadThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class GsUploadThread:
def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_key, secret_key, remove_uploaded=False):
def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_key, secret_key, remove_uploaded=False, retries=5):
self.backup_dir = backup_dir
self.file_path = file_path
self.gs_path = gs_path
Expand All @@ -16,6 +16,7 @@ def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_ke
self.access_key = access_key
self.secret_key = secret_key
self.remove_uploaded = remove_uploaded
self.retries = retries

self.path = "%s/%s" % (self.bucket, self.gs_path)
self.meta_data_dir = "mongodb_consistent_backup-META"
Expand Down Expand Up @@ -76,10 +77,21 @@ def run(self):
logging.debug("Path %s does not exist, uploading" % self.path)

try:
f = open(self.file_path, 'r')
uri = self.get_uri()
logging.info("Uploading %s to Google Cloud Storage" % self.path)
uri.new_key().set_contents_from_file(f)
f = open(self.file_path, 'r')
uri = self.get_uri()
retry = 0
error = None
while retry < self.retries:
try:
logging.info("Uploading %s to Google Cloud Storage (attempt %i/%i)" % (self.path, retry, self.retries))
uri.new_key().set_contents_from_file(f)
except Exception, e:
logging.error("Received error for Google Cloud Storage upload of %s: %s" % (self.path, e))
error = e
retry += 1
continue
if retry >= self.retries and error:
raise error
finally:
if f:
f.close()
Expand Down
1 change: 0 additions & 1 deletion mongodb_consistent_backup/Upload/Gs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ def config(parser):
parser.add_argument("--upload.gs.secret_key", dest="upload.gs.secret_key", help="Google Cloud Storage Uploader Secret Key (required for GS upload)", type=str)
parser.add_argument("--upload.gs.bucket_name", dest="upload.gs.bucket_name", help="Google Cloud Storage Uploader destination bucket name", type=str)
parser.add_argument("--upload.gs.bucket_prefix", dest="upload.gs.bucket_prefix", help="Google Cloud Storage Uploader destination bucket path prefix", type=str)
parser.add_argument("--upload.gs.threads", dest="upload.gs.threads", help="Google Cloud Storage Uploader worker threads (default: 4)", default=4, type=int)
return parser
143 changes: 143 additions & 0 deletions mongodb_consistent_backup/Upload/Rsync/Rsync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import os
import logging
import re

from copy_reg import pickle
from multiprocessing import Pool
from subprocess import check_output
from types import MethodType

from RsyncUploadThread import RsyncUploadThread

from mongodb_consistent_backup.Common import config_to_string
from mongodb_consistent_backup.Errors import OperationError
from mongodb_consistent_backup.Pipeline import Task


# Allows pooled .apply_async()s to work on Class-methods:
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)


pickle(MethodType, _reduce_method)


class Rsync(Task):
def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
super(Rsync, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
self.backup_location = self.config.backup.location
self.backup_name = self.config.backup.name
self.remove_uploaded = self.config.upload.remove_uploaded
self.rsync_path = self.config.upload.rsync.path
self.rsync_user = self.config.upload.rsync.user
self.rsync_host = self.config.upload.rsync.host
self.rsync_port = self.config.upload.rsync.port
self.rsync_ssh_key = self.config.upload.rsync.ssh_key
self.retries = self.config.upload.rsync.retries
self.thread_count = self.config.upload.rsync.threads
self.rsync_binary = "rsync"

self.rsync_flags = ["--archive", "--compress"]
self.rsync_version = None
self._rsync_info = None

self._pool = Pool(processes=self.threads())

def init(self):
if not self.host_has_rsync():
raise OperationError("Cannot find rsync binary on this host!")
if not os.path.isdir(self.backup_dir):
logging.error("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir)
raise OperationError("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir)

def rsync_info(self):
if not self._rsync_info:
output = check_output([self.rsync_binary, "--version"])
search = re.search("^rsync\s+version\s([0-9.-]+)\s+protocol\sversion\s(\d+)", output)
self.rsync_version = search.group(1)
self._rsync_info = {"version": self.rsync_version, "protocol_version": int(search.group(2))}
return self._rsync_info

def host_has_rsync(self):
if self.rsync_info():
return True
return False

def prepare_dest_dir(self):
# mkdir -p the rsync dest path via ssh
ssh_mkdir_cmd = ["ssh"]
if self.rsync_ssh_key:
ssh_mkdir_cmd.extend(["-i", self.rsync_ssh_key])
ssh_mkdir_cmd.extend([
"%s@%s" % (self.rsync_user, self.rsync_host),
"mkdir", "-p", self.base_dir
])

# run the mkdir via ssh
try:
check_output(ssh_mkdir_cmd)
except Exception, e:
logging.error("Creating rsync dest path with ssh failed for %s: %s" % (
self.rsync_host,
e
))
raise e

return True

def done(self, data):
logging.info(data)

def run(self):
try:
self.init()
self.timer.start(self.timer_name)

logging.info("Preparing destination path on %s" % self.rsync_host)
self.prepare_dest_dir()

rsync_config = {
"dest": "%s@%s:%s" % (self.rsync_user, self.rsync_host, self.rsync_path),
"threads": self.threads(),
"retries": self.retries
}
rsync_config.update(self.rsync_info())
logging.info("Starting upload using rsync version %s (%s)" % (
self.rsync_info()['version'],
config_to_string(rsync_config)
))
for child in os.listdir(self.backup_dir):
self._pool.apply_async(RsyncUploadThread(
os.path.join(self.backup_dir, child),
self.base_dir,
self.rsync_flags,
self.rsync_path,
self.rsync_user,
self.rsync_host,
self.rsync_port,
self.rsync_ssh_key,
self.remove_uploaded,
self.retries
).run, callback=self.done)
self.wait()
except Exception, e:
logging.error("Rsync upload failed! Error: %s" % e)
raise OperationError(e)
finally:
self.timer.stop(self.timer_name)
self.completed = True

def wait(self):
if self._pool:
logging.info("Waiting for Rsync upload threads to stop")
self._pool.close()
self._pool.join()

def close(self):
if self._pool:
logging.error("Stopping Rsync upload threads")
self._pool.terminate()
self._pool.join()
Loading