Skip to content
This repository was archived by the owner on Aug 26, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/sagemaker_containers/_intermediate_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def _watch(inotify, watchers, watch_flags, s3_uploader):
# for it which should cause any problems because when we copy files to temp dir
# we add a unique timestamp up to microseconds.
if flag is inotify_simple.flags.ISDIR and inotify_simple.flags.CREATE & event.mask:
for folder, dirs, files in os.walk(os.path.join(intermediate_path, event.name)):
path = os.path.join(intermediate_path, watchers[event.wd], event.name)
for folder, dirs, files in os.walk(path):
wd = inotify.add_watch(folder, watch_flags)
relative_path = os.path.relpath(folder, intermediate_path)
watchers[wd] = relative_path
Expand Down
41 changes: 41 additions & 0 deletions test/functional/test_intermediate_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,47 @@ def test_intermediate_upload():
assert content == content_to_assert


def test_nested_delayed_file():
os.environ['TRAINING_JOB_NAME'] = _timestamp()
p = _intermediate_output.start_sync(bucket_uri, region)

os.makedirs(os.path.join(intermediate_path, 'dir1'))
dir1 = os.path.join(intermediate_path, 'dir1')

time.sleep(3)

os.makedirs(os.path.join(dir1, 'dir2'))
dir2 = os.path.join(dir1, 'dir2')

time.sleep(3)

file1 = os.path.join(dir2, 'file1.txt')
write_file(file1, 'file1')

os.makedirs(os.path.join(intermediate_path, 'dir3'))
dir3 = os.path.join(intermediate_path, 'dir3')

time.sleep(3)

file2 = os.path.join(dir3, 'file2.txt')
write_file(file2, 'file2')

_files.write_success_file()
p.join()

# assert that all files that should be under intermediate are still there
assert os.path.exists(file1)
assert os.path.exists(file2)

# assert file exist in S3
key_prefix = os.path.join(os.environ.get('TRAINING_JOB_NAME'), 'output', 'intermediate')
client = boto3.client('s3', region)
assert _file_exists_in_s3(
client, os.path.join(key_prefix, os.path.relpath(file1, intermediate_path)))
assert _file_exists_in_s3(
client, os.path.join(key_prefix, os.path.relpath(file2, intermediate_path)))


def test_large_files():
os.environ['TRAINING_JOB_NAME'] = _timestamp()
p = _intermediate_output.start_sync(bucket_uri, region)
Expand Down
12 changes: 6 additions & 6 deletions test/unit/test_intermediate_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ def test_non_write_ignored(process_mock, upload_file, inotify_mock, copy2):
process = process_mock.return_value
inotify = inotify_mock.return_value

inotify.add_watch.return_value = 'wd'
inotify.add_watch.return_value = 1
mask = flags.CREATE
for flag in flags:
if flag is not flags.CLOSE_WRITE and flag is not flags.ISDIR:
mask = mask | flag
inotify.read.return_value = [Event('wd', mask, 'cookie', 'file_name')]
inotify.read.return_value = [Event(1, mask, 'cookie', 'file_name')]

def watch():
call = process_mock.call_args
Expand Down Expand Up @@ -84,8 +84,8 @@ def test_modification_triggers_upload(process_mock, upload_file, inotify_mock, c
process = process_mock.return_value
inotify = inotify_mock.return_value

inotify.add_watch.return_value = 'wd'
inotify.read.return_value = [Event('wd', flags.CLOSE_WRITE, 'cookie', 'file_name')]
inotify.add_watch.return_value = 1
inotify.read.return_value = [Event(1, flags.CLOSE_WRITE, 'cookie', 'file_name')]

def watch():
call = process_mock.call_args
Expand Down Expand Up @@ -115,8 +115,8 @@ def test_new_folders_are_watched(process_mock, upload_file, inotify_mock, copy2)

new_dir = 'new_dir'
new_dir_path = os.path.join(_env.output_intermediate_dir, new_dir)
inotify.add_watch.return_value = 'wd'
inotify.read.return_value = [Event('wd', flags.CREATE | flags.ISDIR, 'cookie', new_dir)]
inotify.add_watch.return_value = 1
inotify.read.return_value = [Event(1, flags.CREATE | flags.ISDIR, 'cookie', new_dir)]

def watch():
os.makedirs(new_dir_path)
Expand Down