Skip to content

Commit 406b483

Browse files
committed
multi: add and init sub-server manager
Define a sub-server manager along with the subServerCfg that each sub-server will need to provide in order to be managed by the manager.
1 parent 15a3920 commit 406b483

File tree

2 files changed

+406
-0
lines changed

2 files changed

+406
-0
lines changed

subserver_mgr.go

Lines changed: 399 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,399 @@
1+
package terminal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/lightninglabs/lndclient"
9+
"github.com/lightningnetwork/lnd/lncfg"
10+
"github.com/lightningnetwork/lnd/lnrpc"
11+
"github.com/lightningnetwork/lnd/macaroons"
12+
"google.golang.org/grpc"
13+
"gopkg.in/macaroon-bakery.v2/bakery"
14+
)
15+
16+
// subServerCfg defines the config values that must be set in order to properly
17+
// define a subServer.
18+
type subServerCfg struct {
19+
// name is the string identifier of the sub-server.
20+
name string
21+
22+
// remote must be set if the sub-server is running remotely and so must
23+
// not be started in integrated mode.
24+
remote bool
25+
26+
// startIntegrated defines how the sub-server should be started if it
27+
// is to be run in integrated mode.
28+
startIntegrated func(lnrpc.LightningClient, *lndclient.GrpcLndServices,
29+
bool) error
30+
31+
// stopIntegrated defines how the sub-server should be stopped if it is
32+
// running in integrated mode.
33+
stopIntegrated func() error
34+
35+
// registerGrpcService is a function that can be called in order to
36+
// register the sub-server's rpc with the given registrar.
37+
registerGrpcService func(grpc.ServiceRegistrar)
38+
39+
// macValidator will be used to verify any macaroons for requests that
40+
// are to be handled by the sub-server.
41+
macValidator macaroons.MacaroonValidator
42+
43+
// onStartError will be called if there is a startup error (in
44+
// integrated mode) or if the remote connection could not be made in
45+
// remote mode.
46+
onStartError func(err error)
47+
48+
// onStartSuccess will be called if either the integrated startup func
49+
// is successful or if the remote connection was successfully made.
50+
onStartSuccess func()
51+
52+
// onStop will be called if the subServer is stopped.
53+
onStop func()
54+
55+
// ownsURI should return true if the sub-server owns the given request
56+
// URI.
57+
ownsURI func(string) bool
58+
59+
// serverErrChan is an optional error channel that should be listened on
60+
// after starting the sub-server to listen for any runtime errors.
61+
serverErrChan chan error
62+
63+
// remoteCfg is the config that defines what is needed in order to make
64+
// the remote connection.
65+
remoteCfg *RemoteDaemonConfig
66+
67+
// macPath is the path to the sub-server's macaroon if it is not running
68+
// in remote mode.
69+
macPath string
70+
}
71+
72+
// subServer defines a LiT subServer that can be run in either integrated or
73+
// remote mode. A subServer is considered non-fatal to LiT meaning that if a
74+
// subServer fails to start, LiT can safetly continue with its operations and
75+
// other subServers can too.
76+
type subServer struct {
77+
cfg *subServerCfg
78+
79+
integratedStarted bool
80+
remoteConn *grpc.ClientConn
81+
82+
stopped sync.Once
83+
quit chan struct{}
84+
}
85+
86+
// stop the subServer by closing the connection to it if it is remote or by
87+
// stopping the integrated process.
88+
func (s *subServer) stop() error {
89+
// If the sub-server is running in integrated mode and has not yet
90+
// started, then we can exit early.
91+
if !s.cfg.remote && !s.integratedStarted {
92+
return nil
93+
}
94+
95+
var returnErr error
96+
s.stopped.Do(func() {
97+
close(s.quit)
98+
99+
s.cfg.onStop()
100+
101+
// If running in remote mode, close the connection.
102+
if s.cfg.remote && s.remoteConn != nil {
103+
err := s.remoteConn.Close()
104+
if err != nil {
105+
returnErr = fmt.Errorf("could not close "+
106+
"remote connection: %v", err)
107+
}
108+
return
109+
}
110+
111+
// Else, stop the integrated sub-server process.
112+
err := s.cfg.stopIntegrated()
113+
if err != nil {
114+
returnErr = fmt.Errorf("could not close "+
115+
"integrated connection: %v", err)
116+
return
117+
}
118+
119+
if s.cfg.serverErrChan == nil {
120+
return
121+
}
122+
123+
returnErr = <-s.cfg.serverErrChan
124+
})
125+
126+
return returnErr
127+
}
128+
129+
// startIntegrated starts the subServer in integrated mode.
130+
func (s *subServer) startIntegrated(lndClient lnrpc.LightningClient,
131+
lndGrpc *lndclient.GrpcLndServices, withMacaroonService bool) {
132+
133+
err := s.cfg.startIntegrated(lndClient, lndGrpc, withMacaroonService)
134+
if err != nil {
135+
s.cfg.onStartError(err)
136+
return
137+
}
138+
s.integratedStarted = true
139+
s.cfg.onStartSuccess()
140+
141+
if s.cfg.serverErrChan == nil {
142+
return
143+
}
144+
145+
go func() {
146+
select {
147+
case err := <-s.cfg.serverErrChan:
148+
// The sub server should shut itself down if an error
149+
// happens. We don't need to try to stop it again.
150+
s.integratedStarted = false
151+
152+
s.cfg.onStartError(fmt.Errorf("received "+
153+
"critical error from sub-server, "+
154+
"shutting down: %v", err),
155+
)
156+
157+
case <-s.quit:
158+
}
159+
}()
160+
}
161+
162+
// connectRemote attempts to make a connection to the remote sub-server.
163+
func (s *subServer) connectRemote() {
164+
var err error
165+
s.remoteConn, err = dialBackend(
166+
s.cfg.name, s.cfg.remoteCfg.RPCServer,
167+
lncfg.CleanAndExpandPath(s.cfg.remoteCfg.TLSCertPath),
168+
)
169+
if err != nil {
170+
err := fmt.Errorf("remote dial error: %v", err)
171+
s.cfg.onStartError(err)
172+
return
173+
}
174+
s.cfg.onStartSuccess()
175+
}
176+
177+
// newSubServer constructs a new subServer using the given config.
178+
func newSubServer(cfg *subServerCfg) *subServer {
179+
return &subServer{
180+
cfg: cfg,
181+
quit: make(chan struct{}),
182+
}
183+
}
184+
185+
// subServerMgr manages a set of subServer objects.
186+
type subServerMgr struct {
187+
servers []*subServer
188+
mu sync.RWMutex
189+
}
190+
191+
// newSubServerMgr constructs a new subServerMgr.
192+
func newSubServerMgr() *subServerMgr {
193+
return &subServerMgr{
194+
servers: []*subServer{},
195+
}
196+
}
197+
198+
// AddServer adds a new subServer to the manager's set.
199+
func (s *subServerMgr) AddServer(server *subServer) {
200+
s.mu.Lock()
201+
defer s.mu.Unlock()
202+
203+
s.servers = append(s.servers, server)
204+
}
205+
206+
// StartIntegratedServers starts all the manager's sub-servers that should be
207+
// started in integrated mode.
208+
func (s *subServerMgr) StartIntegratedServers(lndClient lnrpc.LightningClient,
209+
lndGrpc *lndclient.GrpcLndServices, withMacaroonService bool) {
210+
211+
s.mu.Lock()
212+
defer s.mu.Unlock()
213+
214+
for _, ss := range s.servers {
215+
if ss.cfg.remote {
216+
continue
217+
}
218+
219+
ss.startIntegrated(lndClient, lndGrpc, withMacaroonService)
220+
}
221+
}
222+
223+
// StartRemoteSubServers creates connections to all the manager's sub-servers
224+
// that are running remotely.
225+
func (s *subServerMgr) StartRemoteSubServers() {
226+
s.mu.Lock()
227+
defer s.mu.Unlock()
228+
229+
for _, ss := range s.servers {
230+
if !ss.cfg.remote {
231+
continue
232+
}
233+
234+
ss.connectRemote()
235+
}
236+
}
237+
238+
// RegisterRPCServices registers all the manager's sub-servers with the given
239+
// grpc registrar.
240+
func (s *subServerMgr) RegisterRPCServices(server grpc.ServiceRegistrar) {
241+
s.mu.RLock()
242+
defer s.mu.RUnlock()
243+
244+
for _, ss := range s.servers {
245+
// In remote mode the "director" of the RPC proxy will act as
246+
// a catch-all for any gRPC request that isn't known because we
247+
// didn't register any server for it. The director will then
248+
// forward the request to the remote service.
249+
if ss.cfg.remote {
250+
continue
251+
}
252+
253+
ss.cfg.registerGrpcService(server)
254+
}
255+
}
256+
257+
// GetRemoteConn checks if any of the manager's sub-servers owns the given uri
258+
// and if so, the remote connection to that sub-server is returned. The bool
259+
// return value indicates if the uri is managed by one of the sub-servers
260+
// running in remote mode.
261+
func (s *subServerMgr) GetRemoteConn(uri string) (bool, *grpc.ClientConn) {
262+
s.mu.RLock()
263+
defer s.mu.RUnlock()
264+
265+
for _, ss := range s.servers {
266+
if !ss.cfg.ownsURI(uri) {
267+
continue
268+
}
269+
270+
if !ss.cfg.remote {
271+
return false, nil
272+
}
273+
274+
return true, ss.remoteConn
275+
}
276+
277+
return false, nil
278+
}
279+
280+
// ValidateMacaroon checks if any of the manager's sub-servers owns the given
281+
// uri and if so, if it is running in remote mode, then true is returned since
282+
// the macaroon will be validated by the remote subserver itself when the
283+
// request arrives. Otherwise, the integrated sub-server's validator validates
284+
// the macaroon.
285+
func (s *subServerMgr) ValidateMacaroon(ctx context.Context,
286+
requiredPermissions []bakery.Op, uri string) (bool, error) {
287+
288+
s.mu.RLock()
289+
defer s.mu.RUnlock()
290+
291+
for _, ss := range s.servers {
292+
if !ss.cfg.ownsURI(uri) {
293+
continue
294+
}
295+
296+
if ss.cfg.remote {
297+
return true, nil
298+
}
299+
300+
if !ss.integratedStarted {
301+
return true, fmt.Errorf("%s is not yet ready for "+
302+
"requests, lnd possibly still starting or "+
303+
"syncing", ss.cfg.name)
304+
}
305+
306+
err := ss.cfg.macValidator.ValidateMacaroon(
307+
ctx, requiredPermissions, uri,
308+
)
309+
if err != nil {
310+
return true, &proxyErr{
311+
proxyContext: ss.cfg.name,
312+
wrapped: fmt.Errorf("invalid macaroon: %v",
313+
err),
314+
}
315+
}
316+
}
317+
318+
return false, nil
319+
}
320+
321+
// HandledBy returns true if one of its sub-servers owns the given URI.
322+
func (s *subServerMgr) HandledBy(uri string) (bool, string) {
323+
s.mu.RLock()
324+
defer s.mu.RUnlock()
325+
326+
for _, ss := range s.servers {
327+
if !ss.cfg.ownsURI(uri) {
328+
continue
329+
}
330+
331+
return true, ss.cfg.name
332+
}
333+
334+
return false, ""
335+
}
336+
337+
// MacaroonPath checks if any of the manager's sub-servers owns the given uri
338+
// and if so, the appropriate macaroon path is returned for that sub-server.
339+
func (s *subServerMgr) MacaroonPath(uri string) (bool, string) {
340+
s.mu.RLock()
341+
defer s.mu.RUnlock()
342+
343+
for _, ss := range s.servers {
344+
if !ss.cfg.ownsURI(uri) {
345+
continue
346+
}
347+
348+
if ss.cfg.remote {
349+
return true, ss.cfg.remoteCfg.MacaroonPath
350+
}
351+
return true, ss.cfg.macPath
352+
}
353+
354+
return false, ""
355+
}
356+
357+
// ReadRemoteMacaroon checks if any of the manager's sub-servers running in
358+
// remote mode owns the given uri and if so, the appropriate macaroon path is
359+
// returned for that sub-server.
360+
func (s *subServerMgr) ReadRemoteMacaroon(uri string) (bool, []byte, error) {
361+
s.mu.RLock()
362+
defer s.mu.RUnlock()
363+
364+
for _, ss := range s.servers {
365+
if !ss.cfg.ownsURI(uri) {
366+
continue
367+
}
368+
369+
if !ss.cfg.remote {
370+
return false, nil, nil
371+
}
372+
373+
macBytes, err := readMacaroon(lncfg.CleanAndExpandPath(
374+
ss.cfg.remoteCfg.MacaroonPath,
375+
))
376+
377+
return true, macBytes, err
378+
}
379+
380+
return false, nil, nil
381+
}
382+
383+
// Stop stops all the manager's sub-servers
384+
func (s *subServerMgr) Stop() error {
385+
var returnErr error
386+
387+
s.mu.RLock()
388+
defer s.mu.RUnlock()
389+
390+
for _, ss := range s.servers {
391+
err := ss.stop()
392+
if err != nil {
393+
log.Errorf("Error stopping %s: %v", ss.cfg.name, err)
394+
returnErr = err
395+
}
396+
}
397+
398+
return returnErr
399+
}

0 commit comments

Comments
 (0)