22
22
from abc import ABC , abstractmethod
23
23
from collections .abc import Iterator , MutableSequence , Sequence
24
24
from datetime import datetime , timedelta
25
- from functools import reduce
25
+ from functools import cached_property , reduce
26
26
from operator import mul , or_
27
27
from re import Pattern
28
28
from typing import TYPE_CHECKING , Any , Generic , TypeVar , Union
29
29
30
30
from beets import util
31
31
32
32
if TYPE_CHECKING :
33
- from beets .dbcore import Model
34
- from beets .dbcore .db import AnyModel
33
+ from beets .dbcore .db import AnyModel , Model
35
34
36
35
P = TypeVar ("P" , default = Any )
37
36
else :
@@ -283,13 +282,11 @@ class PathQuery(FieldQuery[bytes]):
283
282
and case-sensitive otherwise.
284
283
"""
285
284
286
- def __init__ (self , field , pattern , fast = True ):
285
+ def __init__ (self , field : str , pattern : bytes , fast : bool = True ) -> None :
287
286
"""Create a path query.
288
287
289
288
`pattern` must be a path, either to a file or a directory.
290
289
"""
291
- super ().__init__ (field , pattern , fast )
292
-
293
290
path = util .normpath (pattern )
294
291
295
292
# Case sensitivity depends on the filesystem that the query path is located on.
@@ -304,49 +301,56 @@ def __init__(self, field, pattern, fast=True):
304
301
# from `col_clause()` do the same thing.
305
302
path = path .lower ()
306
303
307
- # Match the path as a single file.
308
- self .file_path = path
309
- # As a directory (prefix).
310
- self .dir_path = os .path .join (path , b"" )
304
+ super ().__init__ (field , path , fast )
311
305
312
- @classmethod
313
- def is_path_query (cls , query_part ):
306
+ @cached_property
307
+ def dir_path (self ) -> bytes :
308
+ return os .path .join (self .pattern , b"" )
309
+
310
+ @staticmethod
311
+ def is_path_query (query_part : str ) -> bool :
314
312
"""Try to guess whether a unicode query part is a path query.
315
313
316
- Condition: separator precedes colon and the file exists.
314
+ The path query must
315
+ 1. precede the colon in the query, if a colon is present
316
+ 2. contain either ``os.sep`` or ``os.altsep`` (Windows)
317
+ 3. this path must exist on the filesystem.
317
318
"""
318
- colon = query_part .find (":" )
319
- if colon != - 1 :
320
- query_part = query_part [:colon ]
321
-
322
- # Test both `sep` and `altsep` (i.e., both slash and backslash on
323
- # Windows).
324
- if not (
325
- os .sep in query_part or (os .altsep and os .altsep in query_part )
326
- ):
327
- return False
319
+ query_part = query_part .split (":" )[0 ]
328
320
329
- return os .path .exists (util .syspath (util .normpath (query_part )))
321
+ return (
322
+ # make sure the query part contains a path separator
323
+ bool (set (query_part ) & {os .sep , os .altsep })
324
+ and os .path .exists (util .normpath (query_part ))
325
+ )
330
326
331
- def match (self , item ):
332
- path = item .path if self .case_sensitive else item .path .lower ()
333
- return (path == self .file_path ) or path .startswith (self .dir_path )
327
+ def match (self , obj : Model ) -> bool :
328
+ """Check whether a model object's path matches this query.
329
+
330
+ Performs either an exact match against the pattern or checks if the path
331
+ starts with the given directory path. Case sensitivity depends on the object's
332
+ filesystem as determined during initialization.
333
+ """
334
+ path = obj .path if self .case_sensitive else obj .path .lower ()
335
+ return (path == self .pattern ) or path .startswith (self .dir_path )
334
336
335
- def col_clause (self ):
336
- file_blob = BLOB_TYPE (self .file_path )
337
- dir_blob = BLOB_TYPE (self .dir_path )
337
+ def col_clause (self ) -> tuple [str , Sequence [SQLiteType ]]:
338
+ """Generate an SQL clause that implements path matching in the database.
338
339
340
+ Returns a tuple of SQL clause string and parameter values list that matches
341
+ paths either exactly or by directory prefix. Handles case sensitivity
342
+ appropriately using BYTELOWER for case-insensitive matches.
343
+ """
339
344
if self .case_sensitive :
340
- query_part = "({0} = ?) || (substr({0}, 1, ?) = ?) "
345
+ left , right = self . field , "? "
341
346
else :
342
- query_part = "(BYTELOWER({0}) = BYTELOWER(?)) || \
343
- (substr(BYTELOWER({0}), 1, ?) = BYTELOWER(?))"
347
+ left , right = f"BYTELOWER({ self .field } )" , "BYTELOWER(?)"
344
348
345
- return query_part . format ( self . field ), (
346
- file_blob ,
347
- len (dir_blob ),
349
+ return f"( { left } = { right } ) || (substr( { left } , 1, ?) = { right } )" , [
350
+ BLOB_TYPE ( self . pattern ) ,
351
+ len (dir_blob := BLOB_TYPE ( self . dir_path ) ),
348
352
dir_blob ,
349
- )
353
+ ]
350
354
351
355
def __repr__ (self ) -> str :
352
356
return (
0 commit comments