From 7f95e24687304973cd8996ffaeb8f35f44cd1e77 Mon Sep 17 00:00:00 2001 From: Mikhail Pryakhin Date: Wed, 6 May 2020 20:39:29 +0300 Subject: [PATCH 1/9] seek method implementation --- .../apache/hadoop/fs/sftp/SFTPFileSystem.java | 5 +- .../hadoop/fs/sftp/SFTPInputStream.java | 117 +++++++++++++----- .../fs/contract/AbstractFSContract.java | 8 ++ .../contract/AbstractFSContractTestBase.java | 1 + .../hadoop/fs/contract/sftp/SFTPContract.java | 90 ++++++++++++++ .../contract/sftp/TestSFTPContractSeek.java | 13 ++ .../src/test/resources/contract/sftp.xml | 79 ++++++++++++ 7 files changed, 275 insertions(+), 38 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java create mode 100644 hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java index ed33357b51d2b..3fc774389a765 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java @@ -516,16 +516,13 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { disconnect(channel); throw new IOException(String.format(E_PATH_DIR, f)); } - InputStream is; try { // the path could be a symbolic link, so get the real path absolute = new Path("/", channel.realpath(absolute.toUri().getPath())); - - is = channel.get(absolute.toUri().getPath()); } catch (SftpException e) { throw new IOException(e); } - return new FSDataInputStream(new SFTPInputStream(is, statistics)){ + return new FSDataInputStream(new SFTPInputStream(channel, absolute, statistics)){ @Override public void close() throws IOException { super.close(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java index 7af299bd113e1..4e3bdc383a74c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -17,49 +17,83 @@ */ package org.apache.hadoop.fs.sftp; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.SftpATTRS; +import com.jcraft.jsch.SftpException; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; /** SFTP FileSystem input stream. */ class SFTPInputStream extends FSInputStream { - public static final String E_SEEK_NOTSUPPORTED = "Seek not supported"; - public static final String E_NULL_INPUTSTREAM = "Null InputStream"; public static final String E_STREAM_CLOSED = "Stream closed"; - + private final ChannelSftp channel; + private final Path path; private InputStream wrappedStream; private FileSystem.Statistics stats; - private boolean closed; + private volatile boolean closed; private long pos; - - SFTPInputStream(InputStream stream, FileSystem.Statistics stats) { - - if (stream == null) { - throw new IllegalArgumentException(E_NULL_INPUTSTREAM); - } - this.wrappedStream = stream; + private long contentLength; + private long nextPos; + + SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats) { + this.channel = channel; + this.path = path; + this.wrappedStream = openStream(channel, path); + this.contentLength = getContentLength(channel, path); this.stats = stats; - - this.pos = 0; - this.closed = false; } @Override public void seek(long position) throws IOException { - throw new IOException(E_SEEK_NOTSUPPORTED); + checkNotClosed(); + if (position < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } + nextPos = position; + } + + @Override + public int available() throws IOException { + checkNotClosed(); + long remaining = contentLength - nextPos; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int) remaining; + } + + private void seekInternal() throws IOException { + if (pos == nextPos) { + return; + } + if (nextPos > pos) { + long skipped = wrappedStream.skip(nextPos - pos); + pos = pos + skipped; + } + if (nextPos < pos) { + IOUtils.closeStream(wrappedStream); + wrappedStream = openStream(channel, path); + pos = wrappedStream.skip(nextPos); + } } @Override public boolean seekToNewSource(long targetPos) throws IOException { - throw new IOException(E_SEEK_NOTSUPPORTED); + return false; } @Override public long getPos() throws IOException { - return pos; + return nextPos; } @Override @@ -67,10 +101,14 @@ public synchronized int read() throws IOException { if (closed) { throw new IOException(E_STREAM_CLOSED); } - + if (this.contentLength == 0 || (nextPos >= contentLength)) { + return -1; + } + seekInternal(); int byteRead = wrappedStream.read(); if (byteRead >= 0) { pos++; + nextPos++; } if (stats != null & byteRead >= 0) { stats.incrementBytesRead(1); @@ -78,29 +116,40 @@ public synchronized int read() throws IOException { return byteRead; } - public synchronized int read(byte[] buf, int off, int len) - throws IOException { + public synchronized void close() throws IOException { if (closed) { - throw new IOException(E_STREAM_CLOSED); + return; } + super.close(); + wrappedStream.close(); + closed = true; + } - int result = wrappedStream.read(buf, off, len); - if (result > 0) { - pos += result; - } - if (stats != null & result > 0) { - stats.incrementBytesRead(result); + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException(path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } + } - return result; + private InputStream openStream(ChannelSftp channel, Path path) { + try { + return channel.get(path.toUri().getPath()); + } catch (SftpException e) { + throw new UncheckedIOException(new IOException(e)); + } } - public synchronized void close() throws IOException { - if (closed) { - return; + private long getContentLength(ChannelSftp channel, Path path) { + try { + SftpATTRS stat = channel.lstat(path.toString()); + return stat.getSize(); + } catch (SftpException e) { + throw new UncheckedIOException(new IOException(e)); } - super.close(); - wrappedStream.close(); - closed = true; } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java index f09496a6082c8..035bcde312565 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java @@ -69,6 +69,14 @@ public void init() throws IOException { } + /** + * Any teardown logic can go here + * @throws IOException IO problems + */ + public void teardown() throws IOException { + + } + /** * Add a configuration resource to this instance's configuration * @param resource resource reference diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java index 60373f67992eb..edeb9e29d50a4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java @@ -214,6 +214,7 @@ public void teardown() throws Exception { LOG.debug("== Teardown =="); deleteTestDirInTeardown(); LOG.debug("== Teardown complete =="); + contract.teardown(); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java new file mode 100644 index 0000000000000..10521a8a3684a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java @@ -0,0 +1,90 @@ +package org.apache.hadoop.fs.contract.sftp; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.sftp.SFTPFileSystem; +import org.apache.sshd.common.NamedFactory; +import org.apache.sshd.server.Command; +import org.apache.sshd.server.SshServer; +import org.apache.sshd.server.auth.UserAuth; +import org.apache.sshd.server.auth.password.PasswordAuthenticator; +import org.apache.sshd.server.auth.password.UserAuthPasswordFactory; +import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; +import org.apache.sshd.server.session.ServerSession; +import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory; + +public class SFTPContract extends AbstractFSContract { + + private String testDataDir = new FileSystemTestHelper().getTestRootDir(); + private Configuration conf; + public static final String CONTRACT_XML = "contract/sftp.xml"; + private SshServer sshd; + + public SFTPContract(Configuration conf) { + super(conf); + addConfResource(CONTRACT_XML); + this.conf = conf; + } + + @Override + public void init() throws IOException { + sshd = SshServer.setUpDefaultServer(); + // ask OS to assign a port + sshd.setPort(0); + sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider()); + + List> userAuthFactories = new ArrayList<>(); + userAuthFactories.add(new UserAuthPasswordFactory()); + + sshd.setUserAuthFactories(userAuthFactories); + sshd.setPasswordAuthenticator((username, password, session) -> + username.equals("user") && password.equals("password") + ); + + sshd.setSubsystemFactories( + Collections.singletonList(new SftpSubsystemFactory())); + + sshd.start(); + int port = sshd.getPort(); + + conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class); + conf.setInt("fs.sftp.host.port", port); + conf.setBoolean("fs.sftp.impl.disable.cache", true); + } + + @Override + public void teardown() throws IOException { + sshd.stop(); + } + + @Override + public FileSystem getTestFileSystem() throws IOException { + return FileSystem.get(URI.create("sftp://user:password@localhost"), conf); + } + + @Override + public String getScheme() { + return "sftp"; + } + + @Override + public Path getTestPath() { + try { + FileSystem fs = FileSystem.get(URI.create("sftp://user:password@localhost"), conf); + return fs.makeQualified(new Path(testDataDir)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java new file mode 100644 index 0000000000000..790b482691de0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java @@ -0,0 +1,13 @@ +package org.apache.hadoop.fs.contract.sftp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractSeekTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestSFTPContractSeek extends AbstractContractSeekTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new SFTPContract(conf); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml new file mode 100644 index 0000000000000..9fc66f96535e8 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml @@ -0,0 +1,79 @@ + + + + + + + fs.contract.test.root-tests-enabled + false + + + + fs.contract.is-case-sensitive + true + + + + fs.contract.supports-append + false + + + + fs.contract.supports-atomic-directory-delete + true + + + + fs.contract.supports-atomic-rename + true + + + + fs.contract.supports-block-locality + false + + + + fs.contract.supports-concat + false + + + + fs.contract.supports-seek + true + + + + fs.contract.rejects-seek-past-eof + true + + + + fs.contract.supports-strict-exceptions + true + + + + fs.contract.supports-unix-permissions + false + + + From e0fa916714bc4421ba8b1cba7ad3d96272401bfc Mon Sep 17 00:00:00 2001 From: Mikhail Pryakhin Date: Wed, 6 May 2020 21:49:58 +0300 Subject: [PATCH 2/9] fixed code style issues --- .../apache/hadoop/fs/sftp/SFTPFileSystem.java | 4 +- .../hadoop/fs/sftp/SFTPInputStream.java | 71 ++++++++----------- .../hadoop/fs/contract/sftp/SFTPContract.java | 8 +-- 3 files changed, 35 insertions(+), 48 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java index 3fc774389a765..c685e81ff7f1c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java @@ -19,7 +19,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URLDecoder; @@ -522,7 +521,8 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { } catch (SftpException e) { throw new IOException(e); } - return new FSDataInputStream(new SFTPInputStream(channel, absolute, statistics)){ + return new FSDataInputStream( + new SFTPInputStream(channel, absolute, statistics)){ @Override public void close() throws IOException { super.close(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java index 4e3bdc383a74c..8ceede0c32fc4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -1,19 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.hadoop.fs.sftp; @@ -45,11 +40,16 @@ class SFTPInputStream extends FSInputStream { private long nextPos; SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats) { - this.channel = channel; - this.path = path; - this.wrappedStream = openStream(channel, path); - this.contentLength = getContentLength(channel, path); - this.stats = stats; + try { + this.channel = channel; + this.path = path; + this.stats = stats; + this.wrappedStream = channel.get(path.toUri().getPath()); + SftpATTRS stat = channel.lstat(path.toString()); + this.contentLength = stat.getSize(); + } catch (SftpException e) { + throw new UncheckedIOException(new IOException(e)); + } } @Override @@ -80,9 +80,13 @@ private void seekInternal() throws IOException { pos = pos + skipped; } if (nextPos < pos) { - IOUtils.closeStream(wrappedStream); - wrappedStream = openStream(channel, path); - pos = wrappedStream.skip(nextPos); + wrappedStream.close(); + try { + wrappedStream = channel.get(path.toUri().getPath()); + pos = wrappedStream.skip(nextPos); + } catch (SftpException e) { + throw new UncheckedIOException(new IOException(e)); + } } } @@ -132,24 +136,9 @@ public synchronized void close() throws IOException { */ private void checkNotClosed() throws IOException { if (closed) { - throw new IOException(path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); - } - } - - private InputStream openStream(ChannelSftp channel, Path path) { - try { - return channel.get(path.toUri().getPath()); - } catch (SftpException e) { - throw new UncheckedIOException(new IOException(e)); - } - } - - private long getContentLength(ChannelSftp channel, Path path) { - try { - SftpATTRS stat = channel.lstat(path.toString()); - return stat.getSize(); - } catch (SftpException e) { - throw new UncheckedIOException(new IOException(e)); + throw new IOException( + path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED + ); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java index 10521a8a3684a..0612bdc0802ed 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java @@ -4,7 +4,6 @@ import java.io.UncheckedIOException; import java.net.URI; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -15,13 +14,10 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.sftp.SFTPFileSystem; import org.apache.sshd.common.NamedFactory; -import org.apache.sshd.server.Command; import org.apache.sshd.server.SshServer; import org.apache.sshd.server.auth.UserAuth; -import org.apache.sshd.server.auth.password.PasswordAuthenticator; import org.apache.sshd.server.auth.password.UserAuthPasswordFactory; import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; -import org.apache.sshd.server.session.ServerSession; import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory; public class SFTPContract extends AbstractFSContract { @@ -81,7 +77,9 @@ public String getScheme() { @Override public Path getTestPath() { try { - FileSystem fs = FileSystem.get(URI.create("sftp://user:password@localhost"), conf); + FileSystem fs = FileSystem.get( + URI.create("sftp://user:password@localhost"), conf + ); return fs.makeQualified(new Path(testDataDir)); } catch (IOException e) { throw new UncheckedIOException(e); From 9fa1307d554efc04ab20f190c541224937324d43 Mon Sep 17 00:00:00 2001 From: Mikhail Pryakhin Date: Wed, 6 May 2020 22:31:51 +0300 Subject: [PATCH 3/9] close ssh server only in case it is initialised --- .../hadoop/fs/sftp/SFTPInputStream.java | 27 +++++++++++-------- .../hadoop/fs/contract/sftp/SFTPContract.java | 24 +++++++++++++++-- .../contract/sftp/TestSFTPContractSeek.java | 18 +++++++++++++ 3 files changed, 56 insertions(+), 13 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java index 8ceede0c32fc4..47bb2f91271e0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -1,15 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + package org.apache.hadoop.fs.sftp; import java.io.EOFException; @@ -24,7 +30,6 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; /** SFTP FileSystem input stream. */ class SFTPInputStream extends FSInputStream { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java index 0612bdc0802ed..e1316a3c574d8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.fs.contract.sftp; import java.io.IOException; @@ -61,7 +79,9 @@ public void init() throws IOException { @Override public void teardown() throws IOException { - sshd.stop(); + if (sshd != null) { + sshd.stop(); + } } @Override @@ -85,4 +105,4 @@ public Path getTestPath() { throw new UncheckedIOException(e); } } -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java index 790b482691de0..20f4116b98019 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.fs.contract.sftp; import org.apache.hadoop.conf.Configuration; From 9896efd6855b6552c1bce20cfac4e4641c31dd6d Mon Sep 17 00:00:00 2001 From: Mikhail Pryakhin Date: Thu, 7 May 2020 09:44:45 +0300 Subject: [PATCH 4/9] fixed broken thread safety --- .../apache/hadoop/fs/sftp/SFTPInputStream.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java index 47bb2f91271e0..82248bf29d37f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -39,10 +39,10 @@ class SFTPInputStream extends FSInputStream { private final Path path; private InputStream wrappedStream; private FileSystem.Statistics stats; - private volatile boolean closed; + private boolean closed; private long pos; - private long contentLength; private long nextPos; + private long contentLength; SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats) { try { @@ -58,7 +58,7 @@ class SFTPInputStream extends FSInputStream { } @Override - public void seek(long position) throws IOException { + public synchronized void seek(long position) throws IOException { checkNotClosed(); if (position < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); @@ -67,7 +67,7 @@ public void seek(long position) throws IOException { } @Override - public int available() throws IOException { + public synchronized int available() throws IOException { checkNotClosed(); long remaining = contentLength - nextPos; if (remaining > Integer.MAX_VALUE) { @@ -101,7 +101,7 @@ public boolean seekToNewSource(long targetPos) throws IOException { } @Override - public long getPos() throws IOException { + public synchronized long getPos() throws IOException { return nextPos; } @@ -134,11 +134,6 @@ public synchronized void close() throws IOException { closed = true; } - /** - * Verify that the input stream is open. Non blocking; this gives - * the last state of the volatile {@link #closed} field. - * @throws IOException if the connection is closed. - */ private void checkNotClosed() throws IOException { if (closed) { throw new IOException( From 46a2d51ee5ec0c8162154bf3ecfc0f9bd151f057 Mon Sep 17 00:00:00 2001 From: Mikhail Pryakhin Date: Thu, 7 May 2020 14:27:58 +0300 Subject: [PATCH 5/9] Fixed broken tests --- .../java/org/apache/hadoop/fs/sftp/SFTPInputStream.java | 5 +---- .../hadoop/fs/contract/AbstractFSContractTestBase.java | 4 +++- .../java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java | 9 ++++++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java index 82248bf29d37f..44e2a0c7b40d9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -34,7 +34,6 @@ /** SFTP FileSystem input stream. */ class SFTPInputStream extends FSInputStream { - public static final String E_STREAM_CLOSED = "Stream closed"; private final ChannelSftp channel; private final Path path; private InputStream wrappedStream; @@ -107,9 +106,7 @@ public synchronized long getPos() throws IOException { @Override public synchronized int read() throws IOException { - if (closed) { - throw new IOException(E_STREAM_CLOSED); - } + checkNotClosed(); if (this.contentLength == 0 || (nextPos >= contentLength)) { return -1; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java index edeb9e29d50a4..ac9de6d7bfe8c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java @@ -213,8 +213,10 @@ public void teardown() throws Exception { Thread.currentThread().setName("teardown"); LOG.debug("== Teardown =="); deleteTestDirInTeardown(); + if (contract != null) { + contract.teardown(); + } LOG.debug("== Teardown complete =="); - contract.teardown(); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java index 02d5a4852ba7c..d3750e64469b2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java @@ -55,13 +55,16 @@ public class TestFTPFileSystem { private FtpTestServer server; - + private java.nio.file.Path testDir; @Rule public Timeout testTimeout = new Timeout(180000); @Before public void setUp() throws Exception { - server = new FtpTestServer(GenericTestUtils.getTestDir().toPath()).start(); + testDir = Files.createTempDirectory( + GenericTestUtils.getTestDir().toPath(), getClass().getName() + ); + server = new FtpTestServer(testDir).start(); } @After @@ -69,7 +72,7 @@ public void setUp() throws Exception { public void tearDown() throws Exception { if (server != null) { server.stop(); - Files.walk(server.getFtpRoot()) + Files.walk(testDir) .sorted(Comparator.reverseOrder()) .map(java.nio.file.Path::toFile) .forEach(File::delete); From 4e823fdc32eeb209c7306eda32284653a3225b51 Mon Sep 17 00:00:00 2001 From: Mikhail Pryakhin Date: Tue, 19 May 2020 20:50:46 +0300 Subject: [PATCH 6/9] code review improvements --- .../java/org/apache/hadoop/fs/sftp/SFTPInputStream.java | 7 ++++--- .../org/apache/hadoop/fs/contract/sftp/SFTPContract.java | 9 +++++---- .../hadoop-common/src/test/resources/contract/sftp.xml | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java index 44e2a0c7b40d9..b2920a7eed83a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -26,6 +26,7 @@ import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.SftpATTRS; import com.jcraft.jsch.SftpException; + import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; @@ -43,7 +44,7 @@ class SFTPInputStream extends FSInputStream { private long nextPos; private long contentLength; - SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats) { + SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats) throws IOException { try { this.channel = channel; this.path = path; @@ -52,7 +53,7 @@ class SFTPInputStream extends FSInputStream { SftpATTRS stat = channel.lstat(path.toString()); this.contentLength = stat.getSize(); } catch (SftpException e) { - throw new UncheckedIOException(new IOException(e)); + throw new IOException(e); } } @@ -89,7 +90,7 @@ private void seekInternal() throws IOException { wrappedStream = channel.get(path.toUri().getPath()); pos = wrappedStream.skip(nextPos); } catch (SftpException e) { - throw new UncheckedIOException(new IOException(e)); + throw new IOException(e); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java index e1316a3c574d8..8b7021a55e7d2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java @@ -40,9 +40,10 @@ public class SFTPContract extends AbstractFSContract { - private String testDataDir = new FileSystemTestHelper().getTestRootDir(); - private Configuration conf; - public static final String CONTRACT_XML = "contract/sftp.xml"; + private static final String CONTRACT_XML = "contract/sftp.xml"; + private static final URI TEST_URI = URI.create("sftp://user:password@localhost"); + private final String testDataDir = new FileSystemTestHelper().getTestRootDir(); + private final Configuration conf; private SshServer sshd; public SFTPContract(Configuration conf) { @@ -86,7 +87,7 @@ public void teardown() throws IOException { @Override public FileSystem getTestFileSystem() throws IOException { - return FileSystem.get(URI.create("sftp://user:password@localhost"), conf); + return FileSystem.get(TEST_URI, conf); } @Override diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml index 9fc66f96535e8..20a24b7e54061 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml @@ -18,7 +18,7 @@ From fb49a212116103c81e62b50cda6ee1bce80b15d5 Mon Sep 17 00:00:00 2001 From: Mikhail Pryakhin Date: Tue, 19 May 2020 23:51:02 +0300 Subject: [PATCH 7/9] codestyle improvements --- .../java/org/apache/hadoop/fs/sftp/SFTPInputStream.java | 4 ++-- .../org/apache/hadoop/fs/contract/AbstractFSContract.java | 2 +- .../org/apache/hadoop/fs/contract/sftp/SFTPContract.java | 6 ++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java index b2920a7eed83a..d0f9a8d0887ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -21,7 +21,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.io.UncheckedIOException; import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.SftpATTRS; @@ -44,7 +43,8 @@ class SFTPInputStream extends FSInputStream { private long nextPos; private long contentLength; - SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats) throws IOException { + SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats) + throws IOException { try { this.channel = channel; this.path = path; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java index 035bcde312565..7d7f6b23673e7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java @@ -121,7 +121,7 @@ public FileSystem getFileSystem(URI uri) throws IOException { public abstract FileSystem getTestFileSystem() throws IOException; /** - * Get the scheme of this FS + * Get the scheme of this FS. * @return the scheme this FS supports */ public abstract String getScheme(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java index 8b7021a55e7d2..f72a2aec86242 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java @@ -41,8 +41,10 @@ public class SFTPContract extends AbstractFSContract { private static final String CONTRACT_XML = "contract/sftp.xml"; - private static final URI TEST_URI = URI.create("sftp://user:password@localhost"); - private final String testDataDir = new FileSystemTestHelper().getTestRootDir(); + private static final URI TEST_URI = + URI.create("sftp://user:password@localhost"); + private final String testDataDir = + new FileSystemTestHelper().getTestRootDir(); private final Configuration conf; private SshServer sshd; From b874cdf5b10d61184b6602147aa6a9ae1e6f9929 Mon Sep 17 00:00:00 2001 From: Mikhail Pryakhin Date: Wed, 20 May 2020 00:19:21 +0300 Subject: [PATCH 8/9] make sure an underlying channel gets closed --- .../java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java index c685e81ff7f1c..a91b50f2e9fa7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java @@ -525,8 +525,11 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { new SFTPInputStream(channel, absolute, statistics)){ @Override public void close() throws IOException { - super.close(); - disconnect(channel); + try { + super.close(); + } finally { + disconnect(channel); + } } }; } From 86bf896afe20c4d39af9a039dab8c695f98feb52 Mon Sep 17 00:00:00 2001 From: Mikhail Pryakhin Date: Tue, 2 Jun 2020 16:50:40 +0300 Subject: [PATCH 9/9] fixed missing persiod in javadoc --- .../java/org/apache/hadoop/fs/contract/AbstractFSContract.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java index 7d7f6b23673e7..76d3116c3abdc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java @@ -70,7 +70,7 @@ public void init() throws IOException { } /** - * Any teardown logic can go here + * Any teardown logic can go here. * @throws IOException IO problems */ public void teardown() throws IOException {