Skip to content

Commit f59d881

Browse files
author
Vikram Agrawal
committed
Rocksdb state storage implementation
1 parent ec032ce commit f59d881

File tree

6 files changed

+1944
-0
lines changed

6 files changed

+1944
-0
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package org.apache.spark.io;
2+
3+
import org.apache.commons.compress.archivers.ArchiveException;
4+
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
5+
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
6+
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
7+
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
8+
import org.apache.commons.compress.utils.IOUtils;
9+
10+
import java.io.*;
11+
12+
public class FileUtility {
13+
14+
/**
15+
* Untar an input file into an output file.
16+
*
17+
* The output file is created in the output folder, having the same name as
18+
* the input file, minus the '.tar' extension.
19+
*
20+
* @param inputFile the input .tar file
21+
* @throws IOException
22+
*
23+
* @throws ArchiveException
24+
*/
25+
public static void unTar(final File inputFile)
26+
throws IOException, ArchiveException {
27+
28+
String outputDir = inputFile.getAbsolutePath().split(".tar")[0];
29+
File outputTarDir = new File(outputDir);
30+
outputTarDir.mkdir();
31+
final InputStream is = new FileInputStream(inputFile);
32+
final TarArchiveInputStream debInputStream = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream(
33+
"tar", is);
34+
TarArchiveEntry entry = null;
35+
while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
36+
final File outputFile = new File(outputDir, entry.getName());
37+
if (entry.isDirectory()) {
38+
if (!outputFile.exists()) {
39+
if (!outputFile.mkdirs()) {
40+
throw new IllegalStateException(String.format(
41+
"Couldn't create directory %s.", outputFile.getAbsolutePath()));
42+
}
43+
}
44+
} else {
45+
final OutputStream outputFileStream = new FileOutputStream(outputFile);
46+
IOUtils.copy(debInputStream, outputFileStream);
47+
outputFileStream.close();
48+
}
49+
}
50+
debInputStream.close();
51+
}
52+
53+
public static void createTarFile(String source, String destFileName) throws Exception {
54+
TarArchiveOutputStream tarOs = null;
55+
File f = new File(destFileName);
56+
if (f.exists()) {
57+
f.delete();
58+
}
59+
try {
60+
FileOutputStream fos = new FileOutputStream(destFileName);
61+
tarOs = (TarArchiveOutputStream) new ArchiveStreamFactory().createArchiveOutputStream("tar", fos);
62+
tarOs = new TarArchiveOutputStream(fos);
63+
File folder = new File(source);
64+
File[] fileNames = folder.listFiles();
65+
for(File file : fileNames){
66+
TarArchiveEntry tar_file = new TarArchiveEntry(file.getName());
67+
tar_file.setSize(file.length());
68+
tarOs.putArchiveEntry(tar_file);
69+
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file));
70+
IOUtils.copy(bis, tarOs);
71+
bis.close();
72+
tarOs.closeArchiveEntry();
73+
}
74+
} catch (IOException e) {
75+
throw new IllegalStateException(String.format(
76+
"createTarFile failed with exception %s.", e.getMessage()));
77+
} finally {
78+
try {
79+
tarOs.finish();
80+
tarOs.close();
81+
} catch (IOException e) {
82+
e.printStackTrace();
83+
}
84+
}
85+
}
86+
87+
88+
}

sql/core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@
147147
<artifactId>mockito-core</artifactId>
148148
<scope>test</scope>
149149
</dependency>
150+
<!-- RocksDB dependency for Structured Streaming State Store -->
151+
<dependency>
152+
<groupId>org.rocksdb</groupId>
153+
<artifactId>rocksdbjni</artifactId>
154+
<version>6.0.1</version>
155+
</dependency>
150156
</dependencies>
151157
<build>
152158
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

0 commit comments

Comments
 (0)