13
13
14
14
import kaptan
15
15
16
- from libvcs .projects .git import GitRemote
16
+ from libvcs ._internal .types import StrPath
17
+ from libvcs .sync .git import GitRemote
17
18
18
19
from . import exc
20
+ from .types import ConfigDict
19
21
from .util import get_config_dir , update_dict
20
22
21
23
log = logging .getLogger (__name__ )
@@ -45,7 +47,7 @@ def expand_dir(
45
47
return _dir
46
48
47
49
48
- def extract_repos (config : dict , cwd = pathlib .Path .cwd ()) -> list [dict ]:
50
+ def extract_repos (config : dict , cwd = pathlib .Path .cwd ()) -> list [ConfigDict ]:
49
51
"""Return expanded configuration.
50
52
51
53
end-user configuration permit inline configuration shortcuts, expand to
@@ -62,11 +64,11 @@ def extract_repos(config: dict, cwd=pathlib.Path.cwd()) -> list[dict]:
62
64
-------
63
65
list : List of normalized repository information
64
66
"""
65
- configs = []
67
+ configs : list [ ConfigDict ] = []
66
68
for directory , repos in config .items ():
67
69
for repo , repo_data in repos .items ():
68
70
69
- conf = {}
71
+ conf : ConfigDict = {}
70
72
71
73
"""
72
74
repo_name: http://myrepo.com/repo.git
@@ -94,18 +96,26 @@ def extract_repos(config: dict, cwd=pathlib.Path.cwd()) -> list[dict]:
94
96
if "parent_dir" not in conf :
95
97
conf ["parent_dir" ] = expand_dir (directory , cwd = cwd )
96
98
97
- # repo_dir -> dir in libvcs 0.12.0b25
98
- if "repo_dir" in conf and "dir" not in conf :
99
- conf ["dir" ] = conf .pop ("repo_dir" )
100
-
101
99
if "dir" not in conf :
102
- conf ["dir" ] = expand_dir (conf ["parent_dir" ] / conf ["name" ], cwd )
100
+ conf ["dir" ] = expand_dir (
101
+ pathlib .Path (conf ["parent_dir" ]) / conf ["name" ], cwd
102
+ )
103
103
104
104
if "remotes" in conf :
105
105
for remote_name , url in conf ["remotes" ].items ():
106
- conf ["remotes" ][remote_name ] = GitRemote (
107
- name = remote_name , fetch_url = url , push_url = url
108
- )
106
+ if isinstance (url , GitRemote ):
107
+ continue
108
+ if isinstance (url , str ):
109
+ conf ["remotes" ][remote_name ] = GitRemote (
110
+ name = remote_name , fetch_url = url , push_url = url
111
+ )
112
+ elif isinstance (url , dict ):
113
+ assert "push_url" in url
114
+ assert "fetch_url" in url
115
+ conf ["remotes" ][remote_name ] = GitRemote (
116
+ name = remote_name , ** url
117
+ )
118
+
109
119
configs .append (conf )
110
120
111
121
return configs
@@ -192,12 +202,12 @@ def find_config_files(
192
202
configs .extend (find_config_files (path , match , f ))
193
203
else :
194
204
match = f"{ match } .{ filetype } "
195
- configs = path .glob (match )
205
+ configs = list ( path .glob (match ) )
196
206
197
207
return configs
198
208
199
209
200
- def load_configs (files : list [Union [ str , pathlib . Path ] ], cwd = pathlib .Path .cwd ()):
210
+ def load_configs (files : list [StrPath ], cwd = pathlib .Path .cwd ()):
201
211
"""Return repos from a list of files.
202
212
203
213
Parameters
@@ -216,10 +226,11 @@ def load_configs(files: list[Union[str, pathlib.Path]], cwd=pathlib.Path.cwd()):
216
226
----
217
227
Validate scheme, check for duplicate destinations, VCS urls
218
228
"""
219
- repos = []
229
+ repos : list [ ConfigDict ] = []
220
230
for file in files :
221
231
if isinstance (file , str ):
222
232
file = pathlib .Path (file )
233
+ assert isinstance (file , pathlib .Path )
223
234
ext = file .suffix .lstrip ("." )
224
235
conf = kaptan .Kaptan (handler = ext ).import_config (str (file ))
225
236
newrepos = extract_repos (conf .export ("dict" ), cwd = cwd )
@@ -230,51 +241,49 @@ def load_configs(files: list[Union[str, pathlib.Path]], cwd=pathlib.Path.cwd()):
230
241
231
242
dupes = detect_duplicate_repos (repos , newrepos )
232
243
233
- if dupes :
244
+ if len ( dupes ) > 0 :
234
245
msg = ("repos with same path + different VCS detected!" , dupes )
235
246
raise exc .VCSPullException (msg )
236
247
repos .extend (newrepos )
237
248
238
249
return repos
239
250
240
251
241
- def detect_duplicate_repos (repos1 : list [dict ], repos2 : list [dict ]):
252
+ ConfigDictTuple = tuple [ConfigDict , ConfigDict ]
253
+
254
+
255
+ def detect_duplicate_repos (
256
+ config1 : list [ConfigDict ], config2 : list [ConfigDict ]
257
+ ) -> list [ConfigDictTuple ]:
242
258
"""Return duplicate repos dict if repo_dir same and vcs different.
243
259
244
260
Parameters
245
261
----------
246
- repos1 : dict
247
- list of repo expanded dicts
262
+ config1 : list[ConfigDict]
248
263
249
- repos2 : dict
250
- list of repo expanded dicts
264
+ config2 : list[ConfigDict]
251
265
252
266
Returns
253
267
-------
254
- list of dict, or None
255
- Duplicate repos
268
+ list[ConfigDictTuple]
269
+ List of duplicate tuples
256
270
"""
257
- dupes = []
258
- path_dupe_repos = []
259
-
260
- curpaths = [r ["dir" ] for r in repos1 ]
261
- newpaths = [r ["dir" ] for r in repos2 ]
262
- path_duplicates = list (set (curpaths ).intersection (newpaths ))
271
+ if not config1 :
272
+ return []
263
273
264
- if not path_duplicates :
265
- return None
274
+ dupes : list [ConfigDictTuple ] = []
266
275
267
- path_dupe_repos .extend (
268
- [r for r in repos2 if any (r ["dir" ] == p for p in path_duplicates )]
269
- )
276
+ repo_dirs = {
277
+ pathlib .Path (repo ["parent_dir" ]) / repo ["name" ]: repo for repo in config1
278
+ }
279
+ repo_dirs_2 = {
280
+ pathlib .Path (repo ["parent_dir" ]) / repo ["name" ]: repo for repo in config2
281
+ }
270
282
271
- if not path_dupe_repos :
272
- return None
283
+ for repo_dir , repo in repo_dirs .items ():
284
+ if repo_dir in repo_dirs_2 :
285
+ dupes .append ((repo , repo_dirs_2 [repo_dir ]))
273
286
274
- for n in path_dupe_repos :
275
- currepo = next ((r for r in repos1 if r ["dir" ] == n ["dir" ]), None )
276
- if n ["url" ] != currepo ["url" ]:
277
- dupes += (n , currepo )
278
287
return dupes
279
288
280
289
@@ -304,11 +313,11 @@ def in_dir(config_dir=None, extensions: list[str] = [".yml", ".yaml", ".json"]):
304
313
305
314
306
315
def filter_repos (
307
- config : dict ,
308
- dir : Union [pathlib .Path , None ] = None ,
316
+ config : list [ ConfigDict ] ,
317
+ dir : Union [pathlib .Path , Literal [ "*" ], None ] = None ,
309
318
vcs_url : Union [str , None ] = None ,
310
319
name : Union [str , None ] = None ,
311
- ):
320
+ ) -> list [ ConfigDict ] :
312
321
"""Return a :py:obj:`list` list of repos from (expanded) config file.
313
322
314
323
dir, vcs_url and name all support fnmatch.
@@ -329,23 +338,31 @@ def filter_repos(
329
338
list :
330
339
Repos
331
340
"""
332
- repo_list = []
341
+ repo_list : list [ ConfigDict ] = []
333
342
334
343
if dir :
335
- repo_list .extend ([r for r in config if fnmatch .fnmatch (r ["parent_dir" ], dir )])
344
+ repo_list .extend (
345
+ [r for r in config if fnmatch .fnmatch (str (r ["parent_dir" ]), str (dir ))]
346
+ )
336
347
337
348
if vcs_url :
338
349
repo_list .extend (
339
- r for r in config if fnmatch .fnmatch (r .get ("url" , r .get ("repo" )), vcs_url )
350
+ r
351
+ for r in config
352
+ if fnmatch .fnmatch (str (r .get ("url" , r .get ("repo" ))), vcs_url )
340
353
)
341
354
342
355
if name :
343
- repo_list .extend ([r for r in config if fnmatch .fnmatch (r .get ("name" ), name )])
356
+ repo_list .extend (
357
+ [r for r in config if fnmatch .fnmatch (str (r .get ("name" )), name )]
358
+ )
344
359
345
360
return repo_list
346
361
347
362
348
- def is_config_file (filename : str , extensions : list [str ] = [".yml" , ".yaml" , ".json" ]):
363
+ def is_config_file (
364
+ filename : str , extensions : Union [list [str ], str ] = [".yml" , ".yaml" , ".json" ]
365
+ ):
349
366
"""Return True if file has a valid config file type.
350
367
351
368
Parameters
0 commit comments