First version, seems to work

This commit is contained in:
Benno Lang 2025-01-21 22:59:56 +10:30
commit 81ffe5a50f
8 changed files with 380 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
podcasts/*
__pycache__
config.json

72
lib/args.py Normal file
View file

@ -0,0 +1,72 @@
import re
import sys
isNumber = re.compile('^[0-9]+$')
class Options:
loadFeed = True
downloadEpisodes = True
generate = True
filter = None
episodes = 'new'
numEpisodes = 1
def outputHelp():
print("Usage: ./process [args] [filter [age] [episodes]]")
print("")
print("args:")
print("--help")
print(" Show this help information")
print("--no-feed, --skip-feed")
print(" Skip the initial step of downloading the RSS feeds")
print("--no-download, --skip-download")
print(" Skip downloading podcast episodes")
print("--no-gen, --skip-gen")
print(" Skip generating transcripts and translations of podcast episodes")
print("")
print("filter:")
print(" If specified, only configured podcasts which match the filter will be processed")
print(" A filter of 'pieds sur' would match 'Les Pieds sur terre'")
print("")
print("age:")
print(" 'new' (default), or 'old'")
print(" Fetch the newest or oldest episode(s) for the matching podcast(s)")
print("")
print("episodes:")
print(" 1 by default")
print(" The number of episode(s) to fetch for the matching podcast(s)")
print("")
print("Examples:")
print(" ./process --help")
print(" ./process --no-feed --skip-download")
print(" Only process previously downloaded episodes (useful e.g. if Whisper ran out of memory,")
print(" or a podcast wasn't configured to translate episodes)")
print(" ./process 'the few' old 3")
print(" Download and process the first 3 episodes of 'The Few Who Do'")
def read():
opts = Options()
for arg in sys.argv[1:]:
if arg == 'help' or arg =='--help':
outputHelp()
sys.exit(0)
elif arg == '--no-feed' or arg == '--skip-feed':
opts.loadFeed = False
elif arg == '--no-download' or arg == '--skip-download':
opts.downloadEpisodes = False
elif arg == "--no-gen" or arg == "--skip-gen":
opts.generate = False
elif arg == 'new':
opts.episodes = 'new'
elif arg == 'old':
opts.episodes = 'old'
elif isNumber.match(arg):
opts.numEpisodes = int(arg)
elif opts.filter == None:
opts.filter = arg.lower()
else:
print(f"Unrecognised argument: {arg}")
print("")
outputHelp()
sys.exit(1)
return opts

19
lib/config.py Normal file
View file

@ -0,0 +1,19 @@
import json
def load():
try:
configFile = open('config.json', 'r', encoding='utf-8')
config = json.load(configFile)
configFile.close()
except:
config = {
'translate': [],
'podcasts': [],
}
return config
def save(configData):
configFile = open('config.json', 'w', encoding='utf-8')
json.dump(configData, configFile, indent=4)
configFile.write('\n')
configFile.close()

16
lib/fetcher.py Normal file
View file

@ -0,0 +1,16 @@
import urllib.request
urllib.request.build_opener()
# Simple wrapper to spoof User-Agent when fetching remote files
def download(remoteUrl, localFilename):
localFile = open(localFilename, 'b+w')
opener = urllib.request.build_opener()
opener.addheaders = [('User-agent', 'Podkastomat/0.1.0')]
remoteFile = opener.open(remoteUrl)
localFile.write(remoteFile.read())

117
lib/files.py Normal file
View file

@ -0,0 +1,117 @@
import glob
import re
import lib.fetcher as fetcher
import os
import os.path
from dateutil import parser as dateparser
charsToRemove = re.compile('[^\w\-]*')
multipleUnderscores = re.compile('__+')
startingNumerals = re.compile(r'^#?([0-9]+)')
def addRef(podcast):
podcast['ref'] = getRef(podcast['name'])
podcast['dir'] = f"podcasts/{podcast['lang']}/{podcast['ref']}"
def getRef(name):
ref = name.lower()
ref = ref.replace(' ', '_')
ref = charsToRemove.sub('', ref)
ref = multipleUnderscores.sub('_', ref)
return ref
def getExtension(fileName):
urlAndParams = fileName.partition('?')
parts = urlAndParams[0].split('.')
return parts.pop()
def replaceExtension(fileName, newExtension):
parts = fileName.split('.')
parts.pop()
return '.'.join(parts) + '.' + newExtension
def saveEpisode(podcast, episode):
match = startingNumerals.match(episode['title'])
if episode['episode']:
# Episode (& possibly season) number encoded in the RSS
epNum = episode['episode']
openingRef = epNum.zfill(5)
identifier = f"{epNum}"
if episode['season']:
seasonNum = episode['season']
openingRef = 's' + seasonNum.zfill(3) + 'ep' + openingRef
identifier += f" of season {seasonNum}"
offset = 0
elif match:
# Episode number deduced from its title
epNum = match.group(1)
openingRef = epNum.zfill(5)
offset = len(match.group(0))
identifier = epNum
elif "date" in episode:
# Podcast (or part thereof) where episodes only have a name and date
date = dateparser.parse(episode['date'])
openingRef = date.strftime("%Y-%m-%d_%H%M")
offset = 0
identifier = date.strftime("from %d %b %Y at %H:%M")
else:
print(f"Unable to determine identifier for episode:\n{episode}")
return
filenamePattern = f"{podcast['dir']}/{openingRef}_*"
extantFiles = glob.glob(filenamePattern)
if len(extantFiles) == 0:
# Download new file
ext = getExtension(episode['url'])
episodeName = openingRef + '_' + episode['title'][offset:]
if episode['subtitle']:
episodeName += ' - ' + episode['subtitle']
episodeRef = getRef(episodeName)
fileName = f"{episodeRef}.{ext}"
print(f"Downloading episode {identifier} of {podcast['name']} as {fileName}")
fetcher.download(episode['url'], f"{podcast['dir']}/{fileName}")
else:
print(f"Episode {identifier} of {podcast['name']} has been downloaded previously; skipping")
def findAudio():
return glob.glob('podcasts/*/*/*.m[4p][a3]')
def getLangCode(audioFile):
parts = audioFile.split('/')
return parts[1]
def getTranscriptFilename(audioFile):
return replaceExtension(audioFile, 'transcript.txt')
def hasTranscript(audioFile):
transcriptFile = getTranscriptFilename(audioFile)
return os.path.isfile(transcriptFile)
def getTranslationFilename(audioFile):
return replaceExtension(audioFile, 'translation.txt')
def hasTranslation(audioFile):
translationFile = getTranslationFilename(audioFile)
return os.path.isfile(translationFile)
def generateFromAudio(audioFile, task):
if task == 'transcribe':
newExt = 'transcript.txt'
else:
task = 'translate'
newExt = 'translation.txt'
langCode = getLangCode(audioFile)
fileParts = audioFile.split('/')
fileName = fileParts.pop()
dir = '/'.join(fileParts)
os.chdir(dir)
cmd = f"whisper {fileName} --model medium --language {langCode} --task {task} --output_format vtt --fp16 False"
os.system(cmd)
# rename transcript/translation file generated
generatedFile = replaceExtension(fileName, 'vtt')
newFileName = replaceExtension(fileName, newExt)
os.rename(generatedFile, newFileName)
os.chdir('../..')

54
lib/rss.py Normal file
View file

@ -0,0 +1,54 @@
import lib.fetcher as fetcher
import os.path
from xml.dom import minidom
namespaces = {'itunes': 'itunes.com'}
def fetch(podcast, loadFeed):
podcast['feed'] = podcast['dir'] + "/rss.xml"
if not os.path.isdir(podcast['dir']):
os.makedirs(podcast['dir'])
if loadFeed:
print(f"Downloading RSS for podcast {podcast['name']}")
fetcher.download(podcast['url'], podcast['feed'])
# Get the text value of a child name with a given name, if available
def extractField(node, childNodeName):
children = node.getElementsByTagName(childNodeName)
if children.length == 0:
return ''
return children[0].firstChild.data
def getEpisodes(podcast, episodesFrom, numEpisodes):
print(f"Getting latest episode from {podcast['name']}")
doc = minidom.parse(podcast['feed'])
root = doc.getElementsByTagName('channel')[0]
epNodes = root.getElementsByTagName('item')
epNodeNum = 0
totalEpisodes = epNodes.length
if episodesFrom == 'old':
epNodeNum = totalEpisodes - 1
episodes = []
for i in range (0, numEpisodes):
epNode = epNodes.item(epNodeNum)
episode = {
'title': extractField(epNode, 'title'),
'subtitle': '',
'url': epNode.getElementsByTagName('enclosure')[0].getAttribute('url'),
'season': extractField(epNode, 'itunes:season'),
'episode': extractField(epNode, 'itunes:episode')
}
subtitle = extractField(epNode, 'itunes:subtitle')
if len(episode['title']) + len(subtitle) < 150:
episode['subtitle'] = subtitle
pubDate = extractField(epNode, 'pubDate')
if pubDate:
episode['date'] = pubDate
episodes.append(episode)
if episodesFrom == 'old':
epNodeNum -= 1
else:
epNodeNum += 1
if epNodeNum == -1 or epNodeNum == totalEpisodes:
break
return episodes

35
process Executable file
View file

@ -0,0 +1,35 @@
#!/usr/bin/python3
import inspect
import lib.args as args
import lib.config as cfg
import lib.files as files
import lib.rss as rss
config = cfg.load()
options = args.read()
for podcast in config['podcasts']:
if options.filter and not options.filter in podcast['name'].lower():
continue
files.addRef(podcast)
rss.fetch(podcast, options.loadFeed)
if options.downloadEpisodes:
eps = rss.getEpisodes(podcast, options.episodes, options.numEpisodes)
for ep in eps:
print(ep)
files.saveEpisode(podcast, ep)
if not options.generate:
exit(0)
audioFiles = files.findAudio()
for audioFile in audioFiles:
print(f"Audio: {audioFile}")
continue
language = files.getLangCode(audioFile)
if not files.hasTranscript(audioFile):
files.generateFromAudio(audioFile, 'transcribe')
if language in config['translate'] and not files.hasTranslation(audioFile):
files.generateFromAudio(audioFile, 'translate')

64
update-config Executable file
View file

@ -0,0 +1,64 @@
#!/usr/bin/python3
#
# Commangs:
#
# translate lang
# Add a language to translate into
# E.g.: translate ru
#
# add
# Add a new podcast
# You will be prompted for the name, language, and URL of the podcast
import lib.config as cfg
import json
import sys
config = cfg.load()
cmd = ''
try:
cmd = sys.argv[1]
except:
pass
# Command to add a language to translate (in addition to generating a transcript)
if cmd == 'translate':
try:
lang = sys.argv[2]
except:
sys.stderr.write("Must specify the language\n")
sys.exit(2)
config['translate'].append(lang)
cfg.save(config)
sys.exit(0)
# Command to add podcast (Name, Language, URL)
elif cmd == 'add':
podcast = {}
print('Podcast name: ', end=None)
podcast['name'] = sys.stdin.readline().strip()
print('Language: ', end=None)
podcast['lang'] = sys.stdin.readline().strip()
print('RSS URL: ', end=None)
podcast['url'] = sys.stdin.readline().strip()
config['podcasts'].append(podcast)
configFile = open('config.json', 'w', encoding='utf-8')
json.dump(config, configFile, indent=4)
configFile.write('\n')
configFile.close()
sys.exit(0)
else:
print("Unrecognised command: '" + cmd + "'. Usage:")
print("")
print("./update-config translate lang")
print(" E.g. ./update-config translate de")
print(" Translate podcasts in the specified language")
print(" For the list of supported languages, check Whisper")
print("")
print("./update-config add")
print(" Add a new podcast which is to be transcribed and possibly translated")
print(" This is interactive and will ask for the name, language, and RSS URL for the podcast")
sys.exit(1)