Skip to content

Commit e8afc6f

Browse files
authored
Merge pull request #52 from sinbad/resume-upload-tus
Resumable uploads via tusd
2 parents 3968aac + 8fe0429 commit e8afc6f

File tree

5 files changed

+247
-10
lines changed

5 files changed

+247
-10
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ There are few things that can be configured via environment variables:
4949
LFS_CERT # Certificate file for tls
5050
LFS_KEY # tls key
5151
LFS_SCHEME # set to 'https' to override default http
52+
LFS_USETUS # set to 'true' to enable tusd (tus.io) resumable upload server; tusd must be on PATH, installed separately
53+
LFS_TUSHOST # The host used to start the tusd upload server, default "localhost:1080"
5254

5355
If the `LFS_ADMINUSER` and `LFS_ADMINPASS` variables are set, a
5456
rudimentary admin interface can be accessed via

config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ type Configuration struct {
2121
Key string `config:""`
2222
Scheme string `config:"http"`
2323
Public string `config:"public"`
24+
UseTus string `config:"false"`
25+
TusHost string `config:"localhost:1080"`
2426
}
2527

2628
func (c *Configuration) IsHTTPS() bool {
@@ -35,6 +37,14 @@ func (c *Configuration) IsPublic() bool {
3537
return false
3638
}
3739

40+
func (c *Configuration) IsUsingTus() bool {
41+
switch Config.UseTus {
42+
case "1", "true", "TRUE":
43+
return true
44+
}
45+
return false
46+
}
47+
3848
// Config is the global app configuration
3949
var Config = &Configuration{}
4050

main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ func main() {
107107
logger.Log(kv{"fn": "main", "msg": "listening", "pid": os.Getpid(), "addr": Config.Listen, "version": version})
108108

109109
app := NewApp(contentStore, metaStore)
110+
if Config.IsUsingTus() {
111+
tusServer.Start()
112+
}
110113
app.Serve(listener)
111114
tl.WaitForChildren()
115+
if Config.IsUsingTus() {
116+
tusServer.Stop()
117+
}
112118
}

server.go

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,20 @@ type ObjectError struct {
5858
Message string `json:"message"`
5959
}
6060

61-
// ObjectLink builds a URL linking to the object.
62-
func (v *RequestVars) ObjectLink() string {
61+
// DownloadLink builds a URL to download the object.
62+
func (v *RequestVars) DownloadLink() string {
63+
return v.internalLink("objects")
64+
}
65+
66+
// UploadLink builds a URL to upload the object.
67+
func (v *RequestVars) UploadLink(useTus bool) string {
68+
if useTus {
69+
return v.tusLink()
70+
}
71+
return v.internalLink("objects")
72+
}
73+
74+
func (v *RequestVars) internalLink(subpath string) string {
6375
path := ""
6476

6577
if len(v.User) > 0 {
@@ -70,7 +82,25 @@ func (v *RequestVars) ObjectLink() string {
7082
path += fmt.Sprintf("/%s", v.Repo)
7183
}
7284

73-
path += fmt.Sprintf("/objects/%s", v.Oid)
85+
path += fmt.Sprintf("/%s/%s", subpath, v.Oid)
86+
87+
if Config.IsHTTPS() {
88+
return fmt.Sprintf("%s://%s%s", Config.Scheme, Config.Host, path)
89+
}
90+
91+
return fmt.Sprintf("http://%s%s", Config.Host, path)
92+
}
93+
94+
func (v *RequestVars) tusLink() string {
95+
link, err := tusServer.Create(v.Oid, v.Size)
96+
if err != nil {
97+
logger.Fatal(kv{"fn": fmt.Sprintf("Unable to create tus link for %s: %v", v.Oid, err)})
98+
}
99+
return link
100+
}
101+
102+
func (v *RequestVars) VerifyLink() string {
103+
path := fmt.Sprintf("/verify/%s", v.Oid)
74104

75105
if Config.IsHTTPS() {
76106
return fmt.Sprintf("%s://%s%s", Config.Scheme, Config.Host, path)
@@ -115,6 +145,8 @@ func NewApp(content *ContentStore, meta *MetaStore) *App {
115145

116146
r.HandleFunc("/objects", app.PostHandler).Methods("POST").MatcherFunc(MetaMatcher)
117147

148+
r.HandleFunc("/verify/{oid}", app.VerifyHandler).Methods("POST")
149+
118150
app.addMgmt(r)
119151

120152
app.router = r
@@ -191,7 +223,7 @@ func (a *App) GetMetaHandler(w http.ResponseWriter, r *http.Request) {
191223

192224
if r.Method == "GET" {
193225
enc := json.NewEncoder(w)
194-
enc.Encode(a.Represent(rv, meta, true, false))
226+
enc.Encode(a.Represent(rv, meta, true, false, false))
195227
}
196228

197229
logRequest(r, 200)
@@ -219,7 +251,7 @@ func (a *App) PostHandler(w http.ResponseWriter, r *http.Request) {
219251
w.WriteHeader(sentStatus)
220252

221253
enc := json.NewEncoder(w)
222-
enc.Encode(a.Represent(rv, meta, meta.Existing, true))
254+
enc.Encode(a.Represent(rv, meta, meta.Existing, true, false))
223255
logRequest(r, sentStatus)
224256
}
225257

@@ -229,11 +261,21 @@ func (a *App) BatchHandler(w http.ResponseWriter, r *http.Request) {
229261

230262
var responseObjects []*Representation
231263

264+
var useTus bool
265+
if bv.Operation == "upload" && Config.IsUsingTus() {
266+
for _, t := range bv.Transfers {
267+
if t == "tus" {
268+
useTus = true
269+
break
270+
}
271+
}
272+
}
273+
232274
// Create a response object
233275
for _, object := range bv.Objects {
234276
meta, err := a.metaStore.Get(object)
235277
if err == nil && a.contentStore.Exists(meta) { // Object is found and exists
236-
responseObjects = append(responseObjects, a.Represent(object, meta, true, false))
278+
responseObjects = append(responseObjects, a.Represent(object, meta, true, false, false))
237279
continue
238280
}
239281

@@ -245,13 +287,17 @@ func (a *App) BatchHandler(w http.ResponseWriter, r *http.Request) {
245287
// Object is not found
246288
meta, err = a.metaStore.Put(object)
247289
if err == nil {
248-
responseObjects = append(responseObjects, a.Represent(object, meta, meta.Existing, true))
290+
responseObjects = append(responseObjects, a.Represent(object, meta, meta.Existing, true, useTus))
249291
}
250292
}
251293

252294
w.Header().Set("Content-Type", metaMediaType)
253295

254296
respobj := &BatchResponse{Objects: responseObjects}
297+
// Respond with TUS support if advertised
298+
if useTus {
299+
respobj.Transfer = "tus"
300+
}
255301

256302
enc := json.NewEncoder(w)
257303
enc.Encode(respobj)
@@ -281,9 +327,21 @@ func (a *App) PutHandler(w http.ResponseWriter, r *http.Request) {
281327
logRequest(r, 200)
282328
}
283329

330+
func (a *App) VerifyHandler(w http.ResponseWriter, r *http.Request) {
331+
vars := mux.Vars(r)
332+
oid := vars["oid"]
333+
err := tusServer.Finish(oid, a.contentStore)
334+
335+
if err != nil {
336+
logger.Fatal(kv{"fn": "VerifyHandler", "err": fmt.Sprintf("Failed to verify %s: %v", oid, err)})
337+
}
338+
339+
logRequest(r, 200)
340+
}
341+
284342
// Represent takes a RequestVars and Meta and turns it into a Representation suitable
285343
// for json encoding
286-
func (a *App) Represent(rv *RequestVars, meta *MetaObject, download, upload bool) *Representation {
344+
func (a *App) Represent(rv *RequestVars, meta *MetaObject, download, upload, useTus bool) *Representation {
287345
rep := &Representation{
288346
Oid: meta.Oid,
289347
Size: meta.Size,
@@ -296,11 +354,14 @@ func (a *App) Represent(rv *RequestVars, meta *MetaObject, download, upload bool
296354
header["Authorization"] = rv.Authorization
297355
}
298356
if download {
299-
rep.Actions["download"] = &link{Href: rv.ObjectLink(), Header: header}
357+
rep.Actions["download"] = &link{Href: rv.DownloadLink(), Header: header}
300358
}
301359

302360
if upload {
303-
rep.Actions["upload"] = &link{Href: rv.ObjectLink(), Header: header}
361+
rep.Actions["upload"] = &link{Href: rv.UploadLink(useTus), Header: header}
362+
if useTus {
363+
rep.Actions["verify"] = &link{Href: rv.VerifyLink(), Header: header}
364+
}
304365
}
305366
return rep
306367
}

tus.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"net/http"
7+
"os"
8+
"os/exec"
9+
"path/filepath"
10+
"strings"
11+
"sync"
12+
"time"
13+
)
14+
15+
type TusServer struct {
16+
serverMutex sync.Mutex
17+
tusProcess *exec.Cmd
18+
dataPath string
19+
tusBaseUrl string
20+
httpClient *http.Client
21+
oidToTusUrl map[string]string
22+
}
23+
24+
var (
25+
tusServer *TusServer = &TusServer{}
26+
)
27+
28+
// Start launches the tus server & stores uploads in the given contentPath
29+
func (t *TusServer) Start() {
30+
t.serverMutex.Lock()
31+
defer t.serverMutex.Unlock()
32+
33+
if t.tusProcess != nil {
34+
return
35+
}
36+
37+
t.dataPath = filepath.Join(os.TempDir(), "lfs_tusserver")
38+
hostparts := strings.Split(Config.TusHost, ":")
39+
host := "localhost"
40+
port := "1080"
41+
if len(hostparts) > 0 {
42+
host = hostparts[0]
43+
}
44+
if len(hostparts) > 1 {
45+
port = hostparts[1]
46+
}
47+
t.tusProcess = exec.Command("tusd",
48+
"-dir", t.dataPath,
49+
"-host", host,
50+
"-port", port)
51+
// Make sure tus server is started before continuing
52+
var procWait sync.WaitGroup
53+
procWait.Add(1)
54+
go func(p *exec.Cmd) {
55+
56+
stdout, err := p.StdoutPipe()
57+
if err != nil {
58+
panic(fmt.Sprintf("Error getting tus server stdout: %v", err))
59+
}
60+
stderr, err := p.StderrPipe()
61+
if err != nil {
62+
panic(fmt.Sprintf("Error getting tus server stderr: %v", err))
63+
}
64+
err = p.Start()
65+
if err != nil {
66+
panic(fmt.Sprintf("Error starting tus server: %v", err))
67+
}
68+
go func() {
69+
scanner := bufio.NewScanner(stdout)
70+
for scanner.Scan() {
71+
logger.Log(kv{"fn": "tusout", "msg": scanner.Text()})
72+
}
73+
}()
74+
go func() {
75+
scanner := bufio.NewScanner(stderr)
76+
for scanner.Scan() {
77+
logger.Log(kv{"fn": "tuserr", "msg": scanner.Text()})
78+
}
79+
}()
80+
time.Sleep(2)
81+
procWait.Done()
82+
defer p.Wait()
83+
84+
}(t.tusProcess)
85+
procWait.Wait()
86+
logger.Log(kv{"fn": "Start", "msg": "Tus server started"})
87+
t.tusBaseUrl = fmt.Sprintf("http://%s:%s/files/", host, port)
88+
t.httpClient = &http.Client{}
89+
t.oidToTusUrl = make(map[string]string)
90+
}
91+
92+
func (t *TusServer) Stop() {
93+
t.serverMutex.Lock()
94+
defer t.serverMutex.Unlock()
95+
if t.tusProcess != nil {
96+
t.tusProcess.Process.Kill()
97+
t.tusProcess = nil
98+
}
99+
logger.Log(kv{"fn": "Stop", "msg": "Tus server stopped"})
100+
}
101+
102+
// Create a new upload URL for the given object
103+
// Required to call CREATE on the tus API before uploading but not part of LFS API
104+
func (t *TusServer) Create(oid string, size int64) (string, error) {
105+
t.serverMutex.Lock()
106+
defer t.serverMutex.Unlock()
107+
req, err := http.NewRequest("POST", t.tusBaseUrl, nil)
108+
if err != nil {
109+
return "", err
110+
}
111+
req.Header.Set("Tus-Resumable", "1.0.0")
112+
req.Header.Set("Upload-Length", fmt.Sprintf("%d", size))
113+
req.Header.Set("Upload-Metadata", fmt.Sprintf("oid %s", oid))
114+
115+
res, err := t.httpClient.Do(req)
116+
if err != nil {
117+
return "", err
118+
}
119+
if res.StatusCode != 201 {
120+
return "", fmt.Errorf("Expected tus status code 201, got %d", res.StatusCode)
121+
}
122+
loc := res.Header.Get("Location")
123+
if len(loc) == 0 {
124+
return "", fmt.Errorf("Missing Location header in tus response")
125+
}
126+
t.oidToTusUrl[oid] = loc
127+
return loc, nil
128+
}
129+
130+
// Move the finished uploaded data from TUS to the content store (called by verify)
131+
func (t *TusServer) Finish(oid string, store *ContentStore) error {
132+
t.serverMutex.Lock()
133+
defer t.serverMutex.Unlock()
134+
135+
loc, ok := t.oidToTusUrl[oid]
136+
if !ok {
137+
return fmt.Errorf("Unable to find upload for %s", oid)
138+
}
139+
parts := strings.Split(loc, "/")
140+
filename := filepath.Join(t.dataPath, fmt.Sprintf("%s.bin", parts[len(parts)-1]))
141+
stat, err := os.Stat(filename)
142+
if err != nil {
143+
return err
144+
}
145+
meta := &MetaObject{Oid: oid, Size: stat.Size(), Existing: false}
146+
f, err := os.Open(filename)
147+
if err != nil {
148+
return err
149+
}
150+
defer f.Close()
151+
err = store.Put(meta, f)
152+
if err == nil {
153+
os.Remove(filename)
154+
// tus also stores a .info file, remove that
155+
os.Remove(filepath.Join(t.dataPath, fmt.Sprintf("%s.info", parts[len(parts)-1])))
156+
}
157+
return err
158+
}

0 commit comments

Comments
 (0)