gutted audio and replcaed with beets call
This commit is contained in:
parent
24144bb550
commit
acff0c350c
@ -7,212 +7,31 @@
|
||||
# the Free Software Foundation, either version 3 of the License,
|
||||
# or (at your option) any later version.
|
||||
|
||||
"""Submodule for processing audio files and their metadata"""
|
||||
"""Defining the standard torrent class"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from collections import defaultdict
|
||||
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from mutagen.flac import FLAC
|
||||
from mutagen.id3._util import ID3NoHeaderError
|
||||
|
||||
from . import generic
|
||||
from .functions import char_wash
|
||||
import subprocess
|
||||
|
||||
|
||||
META_FRAMES = ['artist', 'album', 'title', 'tracknumber', 'discnumber',
|
||||
'date', 'genre']
|
||||
EXT_CLASS_DIC = {'.mp3': EasyID3, '.flac': FLAC}
|
||||
|
||||
class Torrent(generic.Torrent):
|
||||
"""Custom Torrent class for mp3 and flac files. Edits metadata and renames
|
||||
according to scheme."""
|
||||
|
||||
## 1: READING
|
||||
|
||||
def read_tag(self, TagClass, tag_dic):
|
||||
"""run once per file, extracts metadata from tag using mutagen's
|
||||
id3/flac common interface."""
|
||||
try:
|
||||
tag = TagClass(tag_dic['filepath'])
|
||||
tag_frames = list(set(META_FRAMES).intersection(list(tag.keys())))
|
||||
tag_metadata = {frame:tag[frame][0] for frame in tag_frames}
|
||||
except ID3NoHeaderError:
|
||||
tag_frames = []
|
||||
tag_metadata = {}
|
||||
# remove '/' from track and disc numbers
|
||||
fix_number = lambda x: int(str(x).split('/')[0])
|
||||
tag_numbers = {number: fix_number(tag_metadata[number]) for number
|
||||
in ['tracknumber', 'discnumber'] if number
|
||||
in tag_frames}
|
||||
# merge all the metadata back into tag_dic
|
||||
tag_metadata.update(tag_numbers)
|
||||
tag_dic.update(tag_metadata)
|
||||
tag_dic['frames'] = tag_frames
|
||||
return tag_dic
|
||||
|
||||
def variance(self, tag_name):
|
||||
"""Establish variance of tag values or missing values"""
|
||||
try:
|
||||
tag_set = set(tag_dic[tag_name] for tag_dic in self.tag_lst)
|
||||
except KeyError:
|
||||
tag_set = set()
|
||||
return tag_set
|
||||
|
||||
def read_tags(self):
|
||||
"""Creates a list of all music files with metadata information"""
|
||||
self.tag_lst = [self.read_tag(EXT_CLASS_DIC[file_dic['ext']], file_dic)
|
||||
for file_dic in self.stats.file_lst
|
||||
if file_dic['ext'] in EXT_CLASS_DIC]
|
||||
self.artist_no = len(self.variance('artist'))
|
||||
self.album_no = len(self.variance('album'))
|
||||
self.tracknumber_no = len(self.variance('tracknumber'))
|
||||
self.title_no = len(self.variance('title'))
|
||||
self.discnumber_no = len(self.variance('discnumber'))
|
||||
self.track_width = len(str(max(self.variance('tracknumber'))))
|
||||
|
||||
## 2: NAMING SCHEME
|
||||
|
||||
def rename_scheme(self, tag_dic):
|
||||
"""Work out the rename scheme based on number of artists and albums
|
||||
in the total file mass"""
|
||||
# level one
|
||||
if self.artist_no == 0:
|
||||
level_one = "z_missing_artist"
|
||||
elif self.album_no == 0:
|
||||
level_one = "z_missing_album"
|
||||
elif self.artist_no == 1:
|
||||
level_one = char_wash(tag_dic['artist'])
|
||||
elif self.artist_no > 1:
|
||||
level_one = "z_various artists"
|
||||
# level two
|
||||
if self.album_no == 0:
|
||||
level_two = char_wash(tag_dic['parentdir'])
|
||||
elif self.album_no > 0 and self.discnumber_no <= 1:
|
||||
level_two = char_wash(tag_dic['album'])
|
||||
elif self.album_no > 0 and self.discnumber_no > 1:
|
||||
level_two = char_wash(tag_dic['album']) + '_disc_' + \
|
||||
str(tag_dic['discnumber'])
|
||||
# file name
|
||||
if self.tracknumber_no == 0 or self.title_no == 0:
|
||||
filename = char_wash(tag_dic['basename']) + tag_dic['ext']
|
||||
elif self.album_no == 0:
|
||||
filename = char_wash(tag_dic['basename']) + tag_dic['ext']
|
||||
elif self.artist_no <= 1:
|
||||
filename = str(tag_dic['tracknumber']).zfill(self.track_width) + \
|
||||
'_' + char_wash(tag_dic['title']) + tag_dic['ext']
|
||||
elif self.artist_no > 1:
|
||||
filename = str(tag_dic['tracknumber']).zfill(self.track_width) + \
|
||||
'_' + char_wash(tag_dic['artist']) + '_-_' + \
|
||||
char_wash(tag_dic['title']) + tag_dic['ext']
|
||||
# putting it all together
|
||||
return os.path.join(self.type_path, level_one, level_two, filename)
|
||||
|
||||
def rename(self):
|
||||
'''Run rename scheme for each file'''
|
||||
for tag_dic in self.tag_lst:
|
||||
tag_dic['new_path'] = self.rename_scheme(tag_dic)
|
||||
|
||||
## 3: WRITING
|
||||
|
||||
def new_tags(self):
|
||||
"""Create temp files, rewrite their tags, prepare for move_files"""
|
||||
for tag_dic in self.tag_lst:
|
||||
tmp_file_path = os.path.join(self.tmp_folder.name,
|
||||
char_wash(tag_dic['filename']))
|
||||
shutil.copy(tag_dic['filepath'], tmp_file_path)
|
||||
# run appropriate tag function on file to access and delete it
|
||||
if tag_dic['frames']:
|
||||
tag = EXT_CLASS_DIC[tag_dic['ext']](tmp_file_path)
|
||||
tag.delete()
|
||||
elif tag_dic['ext'] == '.mp3':
|
||||
tag = EasyID3()
|
||||
tag.save(tmp_file_path)
|
||||
elif tag_dic['ext'] == '.flac':
|
||||
tag = FLAC(tmp_file_path)
|
||||
tag.save(tmp_file_path)
|
||||
# why are we turning ints to strs? is it a requirement of mutagen?
|
||||
for frame in tag_dic['frames']:
|
||||
if frame == 'discnumber' and self.discnumber_no <= 1:
|
||||
continue
|
||||
if frame == 'genre' and \
|
||||
not self.config['Audio']['retain_genre']:
|
||||
continue
|
||||
if type(tag_dic[frame]) == int:
|
||||
tag[frame] = [str(tag_dic[frame])]
|
||||
else:
|
||||
tag[frame] = [tag_dic[frame]]
|
||||
tag.save(tmp_file_path)
|
||||
if type(tag).__name__ == 'EasyID3' and \
|
||||
self.config['Audio']['retain_id3v1']:
|
||||
tag.save(tmp_file_path, v1=2)
|
||||
self.move_lst.append((tmp_file_path, tag_dic['new_path']))
|
||||
self.move_type_lst.append(tag_dic['filetype'])
|
||||
|
||||
## 4: COVERART
|
||||
|
||||
def coverart(self):
|
||||
"""For every destination folder, choose an image if any. Available
|
||||
images are scored for suitability by various factors, including
|
||||
name, placement and size."""
|
||||
if not self.stats.file_by_type_dic['image']:
|
||||
return
|
||||
# for every destination, list origins
|
||||
dest_dic = defaultdict(set)
|
||||
for tag_dic in self.tag_lst:
|
||||
ppath = os.path.dirname(tag_dic['new_path'])
|
||||
dest_dic[ppath].add(tag_dic['folderpath'])
|
||||
image_lst = self.stats.file_by_type_dic['image']
|
||||
max_image_size = max([image['bytesize'] for image in image_lst])
|
||||
# for every destination, list candidates
|
||||
for dest_dir in list(dest_dic.keys()):
|
||||
image_scores_dic = {}
|
||||
for image in image_lst:
|
||||
image_score = 0
|
||||
# image shares origin_folder with tags with this destination
|
||||
if image['folderpath'] in dest_dic[dest_dir]:
|
||||
if len(dest_dic[dest_dir]) == 1:
|
||||
image_score += 1000
|
||||
else:
|
||||
image_score += 500
|
||||
# basename is cover etc. or it's part of the basename
|
||||
candidate_basenames = ['cover', 'front', 'folder', 'art']
|
||||
if image['basename'].lower() in candidate_basenames:
|
||||
image_score += 200
|
||||
else:
|
||||
regex = False
|
||||
for basename in candidate_basenames:
|
||||
re_test = re.compile('.*%s.*'%basename)
|
||||
if re_test.match(image['basename'].lower()):
|
||||
regex = True
|
||||
if regex:
|
||||
image_score += 100
|
||||
# give a bonus to the largest image (in bytesize)
|
||||
size_score = float(image['bytesize']) / max_image_size * 99
|
||||
image_score += size_score
|
||||
# enter the image and the image score into the dictionary
|
||||
image_path = os.path.join(image['folderpath'],
|
||||
image['filename'])
|
||||
image_scores_dic[image_path] = image_score
|
||||
# find the image with the highest score for that destination
|
||||
highscorer = max(iter(image_scores_dic.keys()),
|
||||
key=(lambda key: image_scores_dic[key]))
|
||||
# Add winner to move_lst
|
||||
new_image_name = self.config['Audio']['rename_cover_image'] + \
|
||||
os.path.splitext(highscorer)[1]
|
||||
new_file_path = os.path.join(self.type_path, dest_dir,
|
||||
new_image_name)
|
||||
self.move_lst.append((highscorer, new_file_path))
|
||||
self.move_type_lst.append('image')
|
||||
class Torrent:
|
||||
"""Basic torrent class with all required values and functions. Exists as
|
||||
template for more customized content specific Torrent classes."""
|
||||
|
||||
def __init__(self, config, stats, type_path):
|
||||
"""standard generic torrent class"""
|
||||
self.config = config
|
||||
self.stats = stats
|
||||
self.type_path = type_path
|
||||
self.torrent_name = stats.torrent_name
|
||||
self.torrent_path = stats.torrent_path
|
||||
self.torrent_ppath = stats.torrent_ppath
|
||||
self.move_lst, self.move_type_lst = [], []
|
||||
self.tmp_folder = config['Postproc']['tmp_path']
|
||||
|
||||
def manipulate(self):
|
||||
"""Overall music manipulation function, encompassing read_tags,
|
||||
categorize, rename, new_tags, and coverart."""
|
||||
self.read_tags()
|
||||
self.rename()
|
||||
self.new_tags()
|
||||
if self.config['Audio']['retain_cover_image']:
|
||||
self.coverart()
|
||||
"""Replaced by a call to beets"""
|
||||
subprocess.call(['/home/mads/.local/bin/beet', 'import', '-ql', '/home/mads/.config/beets/import.log', self.torrent_path])
|
||||
|
||||
def move_files(self):
|
||||
"""vestigial class"""
|
||||
errors = False
|
||||
return errors
|
||||
|
Loading…
Reference in New Issue
Block a user