import threading import feedparser import wx from youtube_dl import YoutubeDL as yt from youtube_dl.utils import DownloadError, ExtractorError from Channel import Channel from Items import Item from Utils import (add_video, get_default_logo, hash_string, make_bitmap_from_url, video_exists) class YouTube(Channel): def __init__(self, channel_id: str, name: str) -> None: self.m_channel_id = channel_id self.m_name = name rss_url = 'https://www.youtube.com/feeds/videos.xml?channel_id={}'.format( channel_id) self.m_logo = get_default_logo('YouTube') super().__init__(channel_id, 'YouTube', rss_url, self.m_logo, name) self.m_thr = threading.Thread(target=self.parse_feed, args=(), kwargs={}) def refresh(self) -> None: self.m_thr.start() def wait(self) -> bool: return self.m_thr.is_alive() def parse_feed(self) -> None: feed = feedparser.parse(self.get_feed()) ydl_opts = { 'format': 'worstvideo[ext=mp4]+worstaudio[ext=m4a]/worstvideo+worstaudio', 'container': 'webm_dash', } entries = feed['entries'] self.m_items: list[Item] = list() for entry in entries: video_id = hash_string(entry['id']) if video_exists(video_id, self.m_channel_id): pass title = str(entry['title']) thumbnail_link = str(entry['media_thumbnail'][0]['url']) description = str(entry['description']) link = '' with yt(ydl_opts) as ydl: try: video = ydl.extract_info(entry['link'], download=False) for form in video['formats']: # type: ignore if form['height']: if form['height'] < 480 and form[ 'acodec'] != 'none': link = form['url'] except ExtractorError or DownloadError: pass resolved_link = link published_parsed = entry['published_parsed'] if not resolved_link: continue thumbnail = make_bitmap_from_url(thumbnail_link, wx.Size(self.m_screen_width, 150)) item = Item(description, resolved_link, self.m_provider_name, published_parsed, thumbnail, title) self.m_items.append(item) add_video(video_id, self.m_channel_id, self.m_provider_name, description, resolved_link, published_parsed, thumbnail, title, 0)