From eca1b1e440d18101044ad4d13ffbc227ee0e27fe Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 19 May 2016 13:31:32 -0400 Subject: [PATCH 1/8] Rewrite ls to use prefixes Designed to get around the case where some parts of a bucket are not accessible --- s3fs/core.py | 161 +++++++++++++++++++++++----------------- s3fs/tests/test_s3fs.py | 5 +- 2 files changed, 95 insertions(+), 71 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index d76d537c..ab66615b 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -218,6 +218,47 @@ def open(self, path, mode='rb', block_size=5 * 1024 ** 2): " and manage bytes" % (mode[0] + 'b')) return S3File(self, path, mode, block_size=block_size) + def _lsdir(self, path, refresh=False): + if path.startswith('s3://'): + path = path[len('s3://'):] + path = path.rstrip('/') + bucket, key = split_path(path) + key = key + '/' if key else "" + if path not in self.dirs or refresh: + try: + pag = self.s3.get_paginator('list_objects') + it = pag.paginate(Bucket=bucket, Prefix=key, Delimiter='/') + files = [] + dirs = None + for i in it: + dirs = dirs or i.get('CommonPrefixes', None) + files.extend(i.get('Contents', [])) + if dirs: + files.extend([{'Key': l['Prefix'][:-1], 'Size': 0, + 'StorageClass': "DIRECTORY"} for l in dirs]) + files = [f for f in files if len(f['Key']) > len(key)] + for f in files: + f['Key'] = '/'.join([bucket, f['Key']]) + except ClientError: + # path not accessible + files = [] + self.dirs[path] = files + return self.dirs[path] + + def _lsbuckets(self, refresh=False): + if '' not in self.dirs or refresh: + if self.anon: + # cannot list buckets if not logged in + return [] + files = self.s3.list_buckets()['Buckets'] + for f in files: + f['Key'] = f['Name'] + f['Size'] = 0 + f['StorageClass'] = 'BUCKET' + del f['Name'] + self.dirs[''] = files + return self.dirs[''] + def _ls(self, path, refresh=False): """ List files in given bucket, or list of buckets. @@ -238,34 +279,10 @@ def _ls(self, path, refresh=False): """ if path.startswith('s3://'): path = path[len('s3://'):] - path = path.rstrip('/') - bucket, key = split_path(path) - if bucket not in self.dirs or refresh: - if bucket == '': - # list of buckets - if self.anon: - # cannot list buckets if not logged in - return [] - files = self.s3.list_buckets()['Buckets'] - for f in files: - f['Key'] = f['Name'] - f['Size'] = 0 - del f['Name'] - else: - try: - pag = self.s3.get_paginator('list_objects') - it = pag.paginate(Bucket=bucket) - files = [] - for i in it: - files.extend(i.get('Contents', [])) - except ClientError: - # bucket not accessible - raise FileNotFoundError(bucket) - for f in files: - f['Key'] = "/".join([bucket, f['Key']]) - self.dirs[bucket] = list(sorted(files, key=lambda x: x['Key'])) - files = self.dirs[bucket] - return files + if path in ['', '/']: + return self._lsbuckets(refresh) + else: + return self._lsdir(path, refresh) def ls(self, path, detail=False, refresh=False): """ List single "directory" with or without details """ @@ -273,14 +290,8 @@ def ls(self, path, detail=False, refresh=False): path = path[len('s3://'):] path = path.rstrip('/') files = self._ls(path, refresh=refresh) - if path: - pattern = re.compile(path + '/[^/]*.$') - files = [f for f in files if pattern.match(f['Key']) is not None] - if not files: - try: - files = [self.info(path)] - except (OSError, IOError, ClientError): - files = [] + if not files and path not in ['', '/']: + files = [self.info(path)] if detail: return files else: @@ -289,26 +300,38 @@ def ls(self, path, detail=False, refresh=False): def info(self, path, refresh=False): """ Detail on the specific file pointed to by path. - NB: path has trailing '/' stripped to work as `ls` does, so key - names that genuinely end in '/' will fail. + Gets details only for a specific key, directories/buckets cannot be + used with info. """ - if path.startswith('s3://'): - path = path[len('s3://'):] - path = path.rstrip('/') - files = self._ls(path, refresh=refresh) - files = [f for f in files if f['Key'].rstrip('/') == path] + parent = path.rsplit('/', 1)[0] + files = self._lsdir(parent, refresh=refresh) + files = [f for f in files if f['Key'] == path and f['StorageClass'] not + in ['DIRECTORY', 'BUCKET']] if len(files) == 1: return files[0] else: - raise IOError("File not found: %s" % path) + try: + bucket, key = split_path(path) + out = self.s3.head_object(Bucket=bucket, Key=key) + return out['Contents'] + except (ClientError, ParamValidationError): + raise FileNotFoundError(path) - def walk(self, path, refresh=False): - """ Return all entries below path """ + def _walk(self, path, refresh=False): if path.startswith('s3://'): path = path[len('s3://'):] - filenames = self._ls(path, refresh=refresh) - return [f['Key'] for f in filenames if f['Key'].rstrip('/' - ).startswith(path.rstrip('/') + '/')] + if path in ['', '/']: + raise ValueError('Cannot walk all of S3') + filenames = self._ls(path, refresh=refresh)[:] + for f in filenames[:]: + if f['StorageClass'] == 'DIRECTORY': + filenames.extend(self._walk(f['Key'], refresh)) + return [f for f in filenames if f['StorageClass'] not in + ['BUCKET', 'DIRECTORY']] + + def walk(self, path, refresh=False): + """ Return all real keys below path """ + return [f['Key'] for f in self._walk(path, refresh)] def glob(self, path): """ @@ -354,15 +377,12 @@ def du(self, path, total=False, deep=False): return {p['Key']: p['Size'] for p in files} def exists(self, path): - """ Does such a file exist? """ - if path.startswith('s3://'): - path = path[len('s3://'):] - path = path.rstrip('/') - if split_path(path)[1]: - return bool(self.ls(path)) + """ Does such a file/directory exist? """ + bucket, key = split_path(path) + if key: + return not raises(FileNotFoundError, lambda: self.ls(path)) else: - return (path in self.ls('') or - not raises(FileNotFoundError, lambda: self.ls(path))) + return bucket in self.ls('') def cat(self, path): """ Returns contents of file """ @@ -441,7 +461,7 @@ def merge(self, path, filelist): part_info = {'Parts': parts} self.s3.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=mpu['UploadId'], MultipartUpload=part_info) - self.invalidate_cache(bucket) + self.invalidate_cache(path) def copy(self, path1, path2): """ Copy file between locations on S3 """ @@ -452,7 +472,7 @@ def copy(self, path1, path2): CopySource='/'.join([buc1, key1])) except (ClientError, ParamValidationError): raise IOError('Copy failed', (path1, path2)) - self.invalidate_cache(buc2) + self.invalidate_cache(path2) def bulk_delete(self, pathlist): """ @@ -478,7 +498,8 @@ def bulk_delete(self, pathlist): in pathlist]} try: self.s3.delete_objects(Bucket=bucket, Delete=delete_keys) - self.invalidate_cache(bucket) + for path in pathlist: + self.invalidate_cache(path) except ClientError: raise IOError('Bulk delete failed') @@ -506,24 +527,27 @@ def rm(self, path, recursive=False): self.s3.delete_object(Bucket=bucket, Key=key) except ClientError: raise IOError('Delete key failed', (bucket, key)) - self.invalidate_cache(bucket) + self.invalidate_cache(path) else: if not self.s3.list_objects(Bucket=bucket).get('Contents'): try: self.s3.delete_bucket(Bucket=bucket) except ClientError: raise IOError('Delete bucket failed', bucket) - self.dirs.pop(bucket, None) self.invalidate_cache(bucket) self.invalidate_cache('') else: raise IOError('Not empty', path) - def invalidate_cache(self, bucket=None): - if bucket is None: + def invalidate_cache(self, path=None): + if path is None: self.dirs.clear() - elif bucket in self.dirs: - del self.dirs[bucket] + else: + print(self.dirs.keys()) + self.dirs.pop(path, None) + parent = path.rsplit('/', 1)[0] + self.dirs.pop(parent, None) + print(self.dirs.keys()) def touch(self, path): """ @@ -534,11 +558,12 @@ def touch(self, path): bucket, key = split_path(path) if key: self.s3.put_object(Bucket=bucket, Key=key) - self.invalidate_cache(bucket) + self.invalidate_cache(path) else: try: self.s3.create_bucket(Bucket=bucket) self.invalidate_cache('') + self.invalidate_cache(bucket) except (ClientError, ParamValidationError): raise IOError('Bucket create failed', path) @@ -875,7 +900,7 @@ def close(self): Body=self.buffer.read()) except (ClientError, ParamValidationError) as e: raise IOError('Write failed: %s' % self.path, e) - self.s3.invalidate_cache(self.bucket) + self.s3.invalidate_cache(self.path) self.closed = True def readable(self): diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index 777a3d79..116d5bc4 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -137,7 +137,7 @@ def test_pickle(s3): def test_ls_touch(s3): - assert not s3.ls(test_bucket_name+'/tmp/test') + assert not s3.exists(test_bucket_name+'/tmp/test') s3.touch(a) s3.touch(b) L = s3.ls(test_bucket_name+'/tmp/test', True) @@ -161,8 +161,7 @@ def test_rm(s3): #whole bucket s3.rm(test_bucket_name, recursive=True) - with pytest.raises((IOError, OSError)): - s3.exists(test_bucket_name+'/2014-01-01.csv') + assert not s3.exists(test_bucket_name+'/2014-01-01.csv') assert not s3.exists(test_bucket_name) From 27becb0c9b4aecf22f47ff47a818e544edb26bd2 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 20 May 2016 11:44:00 -0400 Subject: [PATCH 2/8] File info() should do the same as filesystem info() --- s3fs/core.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index ab66615b..e2d05673 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -687,9 +687,7 @@ def __init__(self, s3, path, mode='rb', block_size=5 * 2 ** 20): def info(self): """ File information about this path """ - info = self.s3.s3.head_object(Bucket=self.bucket, Key=self.key) - info['Size'] = info.get('ContentLength') - return info + return self.s3.info(self.path) def tell(self): """ Current file location """ From b4fbc3b38c5dafd2469e2e0449ee3d1c6c07d152 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 20 May 2016 13:09:11 -0400 Subject: [PATCH 3/8] Fix a couple of tests --- s3fs/core.py | 5 ++++- s3fs/tests/test_mapping.py | 7 +++++++ s3fs/tests/test_s3fs.py | 10 ++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/s3fs/core.py b/s3fs/core.py index ab66615b..d90a3287 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -313,7 +313,10 @@ def info(self, path, refresh=False): try: bucket, key = split_path(path) out = self.s3.head_object(Bucket=bucket, Key=key) - return out['Contents'] + out = {'ETag': out['ETag'], 'Key': '/'.join([bucket, key]), + 'LastModified': out['LastModified'], + 'Size': out['ContentLength'], 'StorageClass': "STANDARD"} + return out except (ClientError, ParamValidationError): raise FileNotFoundError(path) diff --git a/s3fs/tests/test_mapping.py b/s3fs/tests/test_mapping.py index 5b406f2a..d750c957 100644 --- a/s3fs/tests/test_mapping.py +++ b/s3fs/tests/test_mapping.py @@ -1,3 +1,4 @@ +import pytest from s3fs.tests.test_s3fs import s3, test_bucket_name from s3fs import S3Map, S3FileSystem @@ -19,6 +20,12 @@ def test_default_s3filesystem(s3): assert d.s3 is s3 +def test_errors(s3): + d = S3Map(root, s3) + with pytest.raises(KeyError): + d['nonexistent'] + + def test_with_data(s3): d = S3Map(root, s3) d['x'] = b'123' diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index 116d5bc4..772ea104 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -107,6 +107,16 @@ def test_multiple_objects(s3): assert s3.ls('test') == s32.ls('test') +def test_info(s3): + s3.touch(a) + s3.touch(b) + assert s3.info(a) == s3.ls(a, detail=True)[0] + parent = a.rsplit('/', 1)[0] + s3.dirs[parent].pop(0) # disappear our file! + assert a not in s3.ls(parent) + assert s3.info(a) # now uses head_object + + @pytest.mark.xfail() def test_delegate(s3): out = s3.get_delegated_s3pars() From 41a2487857bf6be284a3a0cad4ce65e66f826915 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 20 May 2016 13:27:57 -0400 Subject: [PATCH 4/8] More tests --- s3fs/core.py | 2 +- s3fs/tests/test_s3fs.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/s3fs/core.py b/s3fs/core.py index 76607cc1..9fb932e1 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -290,7 +290,7 @@ def ls(self, path, detail=False, refresh=False): path = path[len('s3://'):] path = path.rstrip('/') files = self._ls(path, refresh=refresh) - if not files and path not in ['', '/']: + if not files and split_path(path)[1]: files = [self.info(path)] if detail: return files diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index 772ea104..74e7e35e 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -366,6 +366,11 @@ def test_errors(s3): with pytest.raises((IOError, OSError)): s3.mkdir('/') + with pytest.raises(ValueError): + s3.walk('') + + with pytest.raises(ValueError): + s3.walk('s3://') def test_read_small(s3): fn = test_bucket_name+'/2014-01-01.csv' From 47c97b369ee7c1426f5856dfc09b4e690d1517c8 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 20 May 2016 13:35:19 -0400 Subject: [PATCH 5/8] Fix failure ls('') should list buckets, but [] if no bockets or anon. ls('nonexistent') should raise if can't access it (whether or not it actually exists) --- s3fs/core.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 9fb932e1..58e8ae06 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -290,8 +290,11 @@ def ls(self, path, detail=False, refresh=False): path = path[len('s3://'):] path = path.rstrip('/') files = self._ls(path, refresh=refresh) - if not files and split_path(path)[1]: - files = [self.info(path)] + if not files: + if split_path(path)[1]: + files = [self.info(path)] + elif path: + raise FileNotFoundError(path) if detail: return files else: From 50aeb62d19c21e158ba4e5f8d874ac322036a235 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 20 May 2016 16:21:29 -0400 Subject: [PATCH 6/8] Remove debugging print statements --- s3fs/core.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 58e8ae06..6f1e9e54 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -497,7 +497,6 @@ def bulk_delete(self, pathlist): bucket = buckets.pop() if len(pathlist) > 1000: for i in range((len(pathlist) // 1000) + 1): - print(i) self.bulk_delete(pathlist[i*1000:(i+1)*1000]) return delete_keys = {'Objects': [{'Key' : split_path(path)[1]} for path @@ -549,11 +548,9 @@ def invalidate_cache(self, path=None): if path is None: self.dirs.clear() else: - print(self.dirs.keys()) self.dirs.pop(path, None) parent = path.rsplit('/', 1)[0] self.dirs.pop(parent, None) - print(self.dirs.keys()) def touch(self, path): """ From 95e87c866fef84baba9a102dec969a345fea7dc1 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 20 May 2016 17:10:59 -0400 Subject: [PATCH 7/8] Rename key->prefix In list_objects call, these are not really keys --- s3fs/core.py | 8 ++++---- s3fs/tests/test_s3fs.py | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 6f1e9e54..70ecd227 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -222,12 +222,12 @@ def _lsdir(self, path, refresh=False): if path.startswith('s3://'): path = path[len('s3://'):] path = path.rstrip('/') - bucket, key = split_path(path) - key = key + '/' if key else "" + bucket, prefix = split_path(path) + prefix = prefix + '/' if prefix else "" if path not in self.dirs or refresh: try: pag = self.s3.get_paginator('list_objects') - it = pag.paginate(Bucket=bucket, Prefix=key, Delimiter='/') + it = pag.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/') files = [] dirs = None for i in it: @@ -236,7 +236,7 @@ def _lsdir(self, path, refresh=False): if dirs: files.extend([{'Key': l['Prefix'][:-1], 'Size': 0, 'StorageClass': "DIRECTORY"} for l in dirs]) - files = [f for f in files if len(f['Key']) > len(key)] + files = [f for f in files if len(f['Key']) > len(prefix)] for f in files: f['Key'] = '/'.join([bucket, f['Key']]) except ClientError: diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index 74e7e35e..7b68da91 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -136,8 +136,6 @@ def test_ls(s3): s3.ls('nonexistent') fn = test_bucket_name+'/test/accounts.1.json' assert fn in s3.ls(test_bucket_name+'/test') - # assert fn in s3.ls(test_bucket_name) - # assert [fn] == s3.ls(fn) def test_pickle(s3): From 0ee13f9679b0073ffd45c3105b55ee0565b96127 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sat, 21 May 2016 12:02:42 -0400 Subject: [PATCH 8/8] Make credential check via delegate token --- s3fs/core.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 70ecd227..4f8a755a 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -115,7 +115,7 @@ def __init__(self, anon=None, key=None, secret=None, token=None, try: self.anon = False self.s3 = self.connect() - self.ls('') + self.get_delegated_s3pars() return except: logger.debug('Accredited connection failed, trying anonymous') @@ -157,13 +157,15 @@ def connect(self, refresh=False): conf = Config(connect_timeout=self.connect_timeout, read_timeout=self.read_timeout, signature_version=UNSIGNED) - s3 = boto3.Session(**self.kwargs).client('s3', config=conf, + self.session = boto3.Session(**self.kwargs) + s3 = self.session.client('s3', config=conf, use_ssl=ssl) else: conf = Config(connect_timeout=self.connect_timeout, read_timeout=self.read_timeout) - s3 = boto3.Session(self.key, self.secret, self.token, - **self.kwargs).client('s3', config=conf, + self.session = boto3.Session(self.key, self.secret, self.token, + **self.kwargs) + s3 = self.session.client('s3', config=conf, use_ssl=ssl) self._conn[tok] = s3 return self._conn[tok] @@ -185,8 +187,7 @@ def get_delegated_s3pars(self, exp=3600): if self.token: # already has temporary cred return {'key': self.key, 'secret': self.secret, 'token': self.token, 'anon': False} - sts = boto3.Session(self.key, self.secret, self.token, - **self.kwargs).client('sts') + sts = self.session.client('sts') cred = sts.get_session_token(DurationSeconds=3600)['Credentials'] return {'key': cred['AccessKeyId'], 'secret': cred['SecretAccessKey'], 'token': cred['SessionToken'], 'anon': False}