From a776e913a8ee54b356325e69db17885d34126d52 Mon Sep 17 00:00:00 2001 From: Micke Nordin Date: Tue, 28 Dec 2021 17:30:36 +0100 Subject: [PATCH] Reintroduce ChannelProvider This time a ChannelProvider has multiple Channels and the gui class deals with them instead of Channels directly --- Channel/SVT/__init__.py | 150 ++++++++++++++++++++---------------- Channel/YouTube/__init__.py | 48 +++++++++--- Channel/__init__.py | 26 ++----- ChannelProvider/__init__.py | 35 +++++++++ Items/__init__.py | 3 +- Utils/__init__.py | 64 +++++++++++++++ main.py | 53 +++++++++---- 7 files changed, 268 insertions(+), 111 deletions(-) create mode 100644 ChannelProvider/__init__.py create mode 100644 Utils/__init__.py diff --git a/Channel/SVT/__init__.py b/Channel/SVT/__init__.py index 2a84ee2..fa2bef2 100644 --- a/Channel/SVT/__init__.py +++ b/Channel/SVT/__init__.py @@ -6,97 +6,113 @@ import json import os import pickle import threading +from datetime import datetime -from Channel import Channel import feedparser import requests import wx from bs4 import BeautifulSoup + +from Channel import Channel from Items import Item +from Utils import make_bitmap_from_url, resolve_svt_channel + +default_rss_url = 'http://www.svtplay.se/rss.xml' class SVT(Channel): m_cache: dict = dict() m_cachefile = '/tmp/svt_cache' - def __init__(self) -> None: - rss_url = 'http://www.svtplay.se/rss.xml' - logo_url = 'https://upload.wikimedia.org/wikipedia/commons/' - logo_url += 'thumb/4/4b/Logotyp_SVT_Play.png/480px-Logotyp_SVT_Play.png' - super().__init__('SVT', rss_url, logo_url) + def __init__(self, svt_id: str) -> None: + chan_dict = resolve_svt_channel(svt_id) + super().__init__('SVT', default_rss_url, chan_dict['thumbnail_url']) if os.path.exists(self.m_cachefile): with open(self.m_cachefile, 'rb') as cachehandle: self.m_cache = pickle.load(cachehandle) self.m_thr = threading.Thread(target=self.parse_feed, - args=(), + args=[svt_id], kwargs={}) self.m_thr.start() def wait(self) -> bool: return self.m_thr.is_alive() - def parse_feed(self) -> None: + def parse_feed(self, *args, **kwargs) -> None: + svt_id = args[0] feed = feedparser.parse(self.get_feed()) entries = feed['entries'] self.m_items: list[Item] = list() - for entry in entries: - key = hashlib.sha256(entry['link'].encode('utf-8')).hexdigest() - - if key in self.m_cache.keys(): - thumbnail_link = self.m_cache[key]['thumbnail_link'] - content = self.m_cache[key]['content'] - resolved_link = self.m_cache[key]['resolved_link'] - description = self.m_cache[key]['description'] - published_parsed = self.m_cache[key]['published_parsed'] - title = self.m_cache[key]['title'] - else: - svt_id = '' - - for link in entry['links']: - if str(link['type']).startswith('image/'): - thumbnail_link = str(link['href']) - - break - page = requests.get(str(entry['link'])) - soup = BeautifulSoup(page.text, 'html.parser') - - for element in soup.find_all('a'): - href = element.get('href') - datart = element.get('data-rt') - - if datart == 'top-area-play-button': - svt_id = href.split('=')[1].split('&')[0] - - api = json.loads( - requests.get( - 'https://api.svt.se/video/{}'.format(svt_id)).text) - resolved_link = '' - - for reference in api['videoReferences']: - if reference['format'] == "dashhbbtv": - resolved_link = reference['url'] - print(resolved_link) - description = str(entry['description']) - published_parsed = entry['published_parsed'] - title = str(entry['title']) - res = requests.get(thumbnail_link) - content = res.content - content_bytes = io.BytesIO(content) - self.m_cache[key] = {'thumbnail_link': thumbnail_link} - self.m_cache[key]['content'] = content - self.m_cache[key]['resolved_link'] = resolved_link - self.m_cache[key]['description'] = description - self.m_cache[key]['published_parsed'] = published_parsed - self.m_cache[key]['title'] = title - - image = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1) - thumbnail = wx.Bitmap(image) - item = Item(description, resolved_link, self.m_provider_name, - published_parsed, thumbnail, title) - self.m_items.append(item) - - # write to cache file - with open(self.m_cachefile, 'wb') as cachehandle: - pickle.dump(self.m_cache, cachehandle) + if svt_id == 'feed': + for entry in entries: + key = hashlib.sha256(entry['link'].encode('utf-8')).hexdigest() + + if key in self.m_cache.keys(): + thumbnail_link = self.m_cache[key]['thumbnail_link'] + resolved_link = self.m_cache[key]['resolved_link'] + description = self.m_cache[key]['description'] + published_parsed = self.m_cache[key]['published_parsed'] + title = self.m_cache[key]['title'] + else: + + for link in entry['links']: + if str(link['type']).startswith('image/'): + thumbnail_link = str(link['href']) + + break + page = requests.get(str(entry['link'])) + soup = BeautifulSoup(page.text, 'html.parser') + + for element in soup.find_all('a'): + href = element.get('href') + datart = element.get('data-rt') + + if datart == 'top-area-play-button': + svt_id = href.split('=')[1].split('&')[0] + + resolved_link = resolve_link(svt_id) + description = str(entry['description']) + published_parsed = entry['published_parsed'] + title = str(entry['title']) + self.m_cache[key] = {'thumbnail_link': thumbnail_link} + self.m_cache[key]['resolved_link'] = resolved_link + self.m_cache[key]['description'] = description + self.m_cache[key]['published_parsed'] = published_parsed + self.m_cache[key]['title'] = title + thumbnail = make_bitmap_from_url(thumbnail_link) + if resolved_link: + item = Item(description, resolved_link, + self.m_provider_name, published_parsed, + thumbnail, title) + self.m_items.append(item) + + # write to cache file + with open(self.m_cachefile, 'wb') as cachehandle: + pickle.dump(self.m_cache, cachehandle) + else: + chan_dict = resolve_svt_channel(svt_id) + resolved_link = resolve_link(svt_id) + title = chan_dict['name'] + published_parsed = datetime.now() + description = "Live channel stream" + thumbnail = make_bitmap_from_url(chan_dict['thumbnail_url']) + if resolved_link: + item = Item(description, resolved_link, self.m_provider_name, + published_parsed, thumbnail, title) + self.m_items.append(item) + + +def resolve_link(svt_id: str) -> str: + api = json.loads( + requests.get('https://api.svt.se/video/{}'.format(svt_id)).text) + resolved_link = '' + + try: + for reference in api['videoReferences']: + if reference['format'] == "dashhbbtv": + resolved_link = reference['url'] + except KeyError: + pass + return resolved_link diff --git a/Channel/YouTube/__init__.py b/Channel/YouTube/__init__.py index 1aaa256..a2ede30 100644 --- a/Channel/YouTube/__init__.py +++ b/Channel/YouTube/__init__.py @@ -8,9 +8,12 @@ from typing import Union import feedparser import requests import wx +from bs4 import BeautifulSoup +from youtube_dl import YoutubeDL as yt + from Channel import Channel from Items import Item -from youtube_dl import YoutubeDL as yt +from Utils import get_default_log_url, make_bitmap_from_url class YouTube(Channel): @@ -18,11 +21,12 @@ class YouTube(Channel): m_cachefile = '/tmp/yt_cache' def __init__(self, channel_id) -> None: + self.m_channel_id = channel_id rss_url = 'https://www.youtube.com/feeds/videos.xml?channel_id={}'.format( channel_id) - logo_url = 'https://upload.wikimedia.org/wikipedia/commons/' - logo_url += 'thumb/0/09/YouTube_full-color_icon_(2017).svg/480px-YouTube_full-color_icon_(2017).svg.png' + logo_url = get_default_log_url('YouTube') super().__init__(channel_id, rss_url, logo_url) + self.set_avatar() self.m_items: Union[list[Item], None] = None if os.path.exists(self.m_cachefile): @@ -33,6 +37,25 @@ class YouTube(Channel): kwargs={}) self.m_thr.start() + def set_avatar(self) -> str: + info = get_info(self.m_channel_id) + bmap = self.get_logo_as_bitmap() + title = info['title'] + dc = wx.MemoryDC(bmap) + cblack = wx.Colour(0, 0, 0) + cwhite = wx.Colour(255, 255, 255) + dc.SetTextForeground(cwhite) + dc.SetTextBackground(cblack) + dc.SetFont(wx.Font().Bold()) + dc.SetBackgroundMode(wx.BRUSHSTYLE_SOLID) + w, h = dc.GetSize() + tw, th = dc.GetTextExtent(title) + dc.DrawText(title, (w - tw) / 2, (h - th) / 2) #display text in center + del dc + self.m_logo = bmap + + return "" + def wait(self) -> bool: return self.m_thr.is_alive() @@ -52,7 +75,6 @@ class YouTube(Channel): if key in self.m_cache.keys(): thumbnail_link = self.m_cache[key]['thumbnail_link'] - content = self.m_cache[key]['content'] resolved_link = self.m_cache[key]['resolved_link'] description = self.m_cache[key]['description'] published_parsed = self.m_cache[key]['published_parsed'] @@ -64,8 +86,7 @@ class YouTube(Channel): description = str(entry['description']) link = '' with yt(ydl_opts) as ydl: - video = yt(ydl_opts).extract_info(entry['link'], - download=False) + video = ydl.extract_info(entry['link'], download=False) for form in video['formats']: if form['height']: @@ -76,23 +97,26 @@ class YouTube(Channel): resolved_link = link published_parsed = entry['published_parsed'] - res = requests.get(thumbnail_link) - content = res.content if not resolved_link: continue - content_bytes = io.BytesIO(content) self.m_cache[key] = {'thumbnail_link': thumbnail_link} - self.m_cache[key]['content'] = content self.m_cache[key]['resolved_link'] = resolved_link self.m_cache[key]['description'] = description self.m_cache[key]['published_parsed'] = published_parsed self.m_cache[key]['title'] = title - image = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1) - thumbnail = wx.Bitmap(image) + thumbnail = make_bitmap_from_url(thumbnail_link) item = Item(description, resolved_link, self.m_provider_name, published_parsed, thumbnail, title) self.m_items.append(item) # write to cache file with open(self.m_cachefile, 'wb') as cachehandle: pickle.dump(self.m_cache, cachehandle) + + +def get_info(channel_id: str) -> str: + link = 'https://www.youtube.com/channel/{}'.format(channel_id) + ydl_opts = {'extract_flat': True, 'skip_download': True} + with yt(ydl_opts) as ydl: + info = ydl.extract_info(link, download=False) + return info diff --git a/Channel/__init__.py b/Channel/__init__.py index 6e42ea7..a9f95ad 100644 --- a/Channel/__init__.py +++ b/Channel/__init__.py @@ -1,40 +1,30 @@ #!/usr/bin/env python3 +import io from typing import Union import requests import wx -import io from Items import Item - -default_logo = "https://upload.wikimedia.org/wikipedia/commons/" -default_logo += "thumb/f/fd/Cartoon_Hand_Playing_Multiple_Online_Videos.svg/" -default_logo += "480px-Cartoon_Hand_Playing_Multiple_Online_Videos.svg.png" +from Utils import get_default_log_url, make_bitmap_from_url class Channel: def __init__(self, provider_name: str, feed: str, - logo_url: str = default_logo) -> None: + logo_url: str = '') -> None: + if not logo_url: + logo_url = get_default_logo_url() + self.m_logo_url = logo_url + self.m_logo = make_bitmap_from_url(self.m_logo_url) self.m_provider_name = provider_name self.m_feed = feed self.m_items: Union[list[Item], None] = None - res = requests.get(logo_url) - content = res.content - content_bytes = io.BytesIO(content) - self.m_logo = wx.Image(content_bytes, - type=wx.BITMAP_TYPE_ANY, - index=-1) def get_logo_as_bitmap(self) -> wx.Bitmap: - """ - [TODO:description] - - :rtype wx.Image: [TODO:description] - """ - return wx.Bitmap(self.m_logo) + return self.m_logo def get_feed(self) -> str: return self.m_feed diff --git a/ChannelProvider/__init__.py b/ChannelProvider/__init__.py new file mode 100644 index 0000000..57365ab --- /dev/null +++ b/ChannelProvider/__init__.py @@ -0,0 +1,35 @@ +#/usr/bin/env python3 +import io + +import requests +import wx + +from Channel import Channel +from Utils import get_default_log_url, make_bitmap_from_url + + +class ChannelProvider: + def __init__(self, providerid: str, channels=list()): + self.m_id = providerid + self.m_logo_url = get_default_log_url(providerid) + self.m_logo: wx.Bitmap = make_bitmap_from_url(self.m_logo_url) + self.m_channels: list[Channel] = channels + + def append_channel(self, channel: Channel) -> int: + self.m_channels.append(channel) + return len(self.m_channels) + + def get_channels(self) -> list[Channel]: + return self.m_channels + + def get_channel_by_index(self, channel_index: int) -> Channel: + return self.m_channels[channel_index] + + def get_logo_as_bitmap(self) -> wx.Bitmap: + return self.m_logo + + def get_name(self) -> str: + return self.m_id_ + + def get_logo_url(self) -> str: + links: dict = {'ch-svt1'} diff --git a/Items/__init__.py b/Items/__init__.py index d4dba62..c8f9afc 100644 --- a/Items/__init__.py +++ b/Items/__init__.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 -import wx from datetime import datetime +import wx + class Item(dict): def __init__(self, diff --git a/Utils/__init__.py b/Utils/__init__.py new file mode 100644 index 0000000..dd44837 --- /dev/null +++ b/Utils/__init__.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +import io + +import requests +import wx + + +def get_default_log_url(providerid: str = 'default') -> str: + if providerid == 'SVT': + return 'https://upload.wikimedia.org/wikipedia/commons/thumb/4/4b/Logotyp_SVT_Play.png/480px-Logotyp_SVT_Play.jpg' + if providerid == 'YouTube': + return 'https://upload.wikimedia.org/wikipedia/commons/thumb/a/af/Youtube.png/480px-Youtube.jpg' + + else: + return 'https://upload.wikimedia.org/wikipedia/commons/thumb/f/fd/Cartoon_Hand_Playing_Multiple_Online_Videos.svg/480px-Cartoon_Hand_Playing_Multiple_Online_Videos.svg.jpg' + + +def make_bitmap_from_url(logo_url: str = get_default_log_url()) -> wx.Bitmap: + res = requests.get(logo_url) + content = res.content + content_bytes = io.BytesIO(content) + logo = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1) + return wx.Bitmap(logo) + + +def resolve_svt_channel(svt_id: str) -> dict: + channels = { + "ch-barnkanalen": { + "name": + "Barnkanalen", + "thumbnail_url": + "https://upload.wikimedia.org/wikipedia/commons/thumb/5/52/SVT_Barnkanalen_logo_2008%E2%80%932012.svg/480px-SVT_Barnkanalen_logo_2008%E2%80%932012.svg.jpg" + }, + "ch-svt1": { + "name": + "SVT 1", + "thumbnail_url": + "https://upload.wikimedia.org/wikipedia/commons/thumb/c/c2/SVT1_logo_2012.svg/480px-SVT1_logo_2012.svg.jpg" + }, + "ch-svt2": { + "name": + "SVT 2", + "thumbnail_url": + "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/SVT2_logo_2012.svg/480px-SVT2_logo_2012.svg.jpg" + }, + "ch-svt24": { + "name": + "SVT 24", + "thumbnail_url": + "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8f/SVT24_logo.svg/480px-SVT24_logo.svg.jpg" + }, + "kunskapskanalen": { + "name": + "Kunskapskanalen", + "thumbnail_url": + "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b9/Kunskapskanalen_logo.png/480px-Kunskapskanalen_logo.jpg" + }, + "feed": { + "name": "Senaste program", + "thumbnail_url": get_default_log_url('SVT') + }, + } + + return channels[svt_id] diff --git a/main.py b/main.py index f7ca11c..561193f 100644 --- a/main.py +++ b/main.py @@ -9,8 +9,7 @@ import wx.lib.scrolledpanel as scrolled import wx.media from Channel import SVT, Channel, YouTube - -Channel = NewType('Channel', Channel) +from ChannelProvider import ChannelProvider class Cast(wx.Frame): @@ -33,9 +32,17 @@ class Cast(wx.Frame): self.m_control = None self.m_panel.SetupScrolling() self.m_panel.SetSizer(self.m_sizer) - self.m_providers: list[Channel] = [ - YouTube.YouTube('UCu6mSoMNzHQiBIOCkHUa2Aw'), - SVT.SVT() + self.m_providers: list[ChannelProvider] = [ + ChannelProvider('SVT', + channels=[ + SVT.SVT('feed'), + SVT.SVT('ch-svt1'), + SVT.SVT('ch-svt2'), + SVT.SVT('ch-svt24') + ]), + ChannelProvider( + 'YouTube', + channels=[YouTube.YouTube('UCu6mSoMNzHQiBIOCkHUa2Aw')]) ] self.m_selected_channel = None self.show_splash(None) @@ -51,14 +58,34 @@ class Cast(wx.Frame): def show_splash(self, _) -> None: self.m_sizer.Clear(delete_windows=True) self.m_sizer = wx.BoxSizer(wx.VERTICAL) - channel_index = 0 + provider_index = 0 for provider in self.m_providers: bitmap = provider.get_logo_as_bitmap() + btn = wx.BitmapButton(self.m_panel, + id=provider_index, + bitmap=bitmap) + btn.Bind(wx.EVT_BUTTON, + lambda event, index=provider_index: self. + show_channel_list(event, index)) + self.m_sizer.Add(btn) + provider_index += 1 + + self.m_panel.SetSizer(self.m_sizer) + self.m_sizer.Fit(self) + self.m_panel.Layout() + + def show_channel_list(self, _, provider_index) -> None: + self.m_sizer.Clear(delete_windows=True) + self.m_sizer = wx.BoxSizer(wx.VERTICAL) + self.m_selected_provider = self.m_providers[provider_index] + channel_index = 0 + for channel in self.m_selected_provider.get_channels(): + bitmap = channel.get_logo_as_bitmap() btn = wx.BitmapButton(self.m_panel, id=channel_index, bitmap=bitmap) btn.Bind(wx.EVT_BUTTON, - lambda event, index=channel_index: self.show_list( + lambda event, index=channel_index: self.show_video_list( event, index)) self.m_sizer.Add(btn) channel_index += 1 @@ -67,7 +94,7 @@ class Cast(wx.Frame): self.m_sizer.Fit(self) self.m_panel.Layout() - def show_list(self, _, index=0) -> None: + def show_video_list(self, _, index=0) -> None: """ Shows a list of videos @@ -77,7 +104,7 @@ class Cast(wx.Frame): self.m_sizer.Clear(delete_windows=True) self.m_sizer = wx.BoxSizer(wx.VERTICAL) - channel = self.m_providers[index] + channel = self.m_selected_provider.get_channel_by_index(index) if channel.wait(): with wx.BusyInfo("Please wait, working..."): index = 0 @@ -129,9 +156,9 @@ class Cast(wx.Frame): pause_button = wx.Button(self.m_panel, -1, "Pause") back_button = wx.Button(self.m_panel, -1, "Back") - back_button.Bind( - wx.EVT_BUTTON, - lambda event, index=provider_index: self.show_list(event, index)) + back_button.Bind(wx.EVT_BUTTON, + lambda event, index=provider_index: self. + show_video_list(event, index)) self.m_sizer.Add(self.m_control, (0, 0)) self.m_sizer.SetItemSpan(0, (0, 6)) @@ -157,7 +184,7 @@ class Cast(wx.Frame): play_button.Bind(wx.EVT_BUTTON, self.play) pause_button.Bind(wx.EVT_BUTTON, self.pause) - self.Bind(wx.media.EVT_MEDIA_FINISHED, self.show_list) + self.Bind(wx.media.EVT_MEDIA_FINISHED, self.show_video_list) self.load_uri(uri) self.m_panel.SetSizer(self.m_sizer) self.m_sizer.Fit(self)