Reintroduce ChannelProvider

This time a ChannelProvider has multiple Channels and the gui class
deals with them instead of Channels directly
main
Micke Nordin 2 years ago
parent 54193eb8df
commit a776e913a8
Signed by: micke
GPG Key ID: 014B273D614BE877

@ -6,97 +6,113 @@ import json
import os
import pickle
import threading
from datetime import datetime
from Channel import Channel
import feedparser
import requests
import wx
from bs4 import BeautifulSoup
from Channel import Channel
from Items import Item
from Utils import make_bitmap_from_url, resolve_svt_channel
default_rss_url = 'http://www.svtplay.se/rss.xml'
class SVT(Channel):
m_cache: dict = dict()
m_cachefile = '/tmp/svt_cache'
def __init__(self) -> None:
rss_url = 'http://www.svtplay.se/rss.xml'
logo_url = 'https://upload.wikimedia.org/wikipedia/commons/'
logo_url += 'thumb/4/4b/Logotyp_SVT_Play.png/480px-Logotyp_SVT_Play.png'
super().__init__('SVT', rss_url, logo_url)
def __init__(self, svt_id: str) -> None:
chan_dict = resolve_svt_channel(svt_id)
super().__init__('SVT', default_rss_url, chan_dict['thumbnail_url'])
if os.path.exists(self.m_cachefile):
with open(self.m_cachefile, 'rb') as cachehandle:
self.m_cache = pickle.load(cachehandle)
self.m_thr = threading.Thread(target=self.parse_feed,
args=(),
args=[svt_id],
kwargs={})
self.m_thr.start()
def wait(self) -> bool:
return self.m_thr.is_alive()
def parse_feed(self) -> None:
def parse_feed(self, *args, **kwargs) -> None:
svt_id = args[0]
feed = feedparser.parse(self.get_feed())
entries = feed['entries']
self.m_items: list[Item] = list()
for entry in entries:
key = hashlib.sha256(entry['link'].encode('utf-8')).hexdigest()
if key in self.m_cache.keys():
thumbnail_link = self.m_cache[key]['thumbnail_link']
content = self.m_cache[key]['content']
resolved_link = self.m_cache[key]['resolved_link']
description = self.m_cache[key]['description']
published_parsed = self.m_cache[key]['published_parsed']
title = self.m_cache[key]['title']
else:
svt_id = ''
for link in entry['links']:
if str(link['type']).startswith('image/'):
thumbnail_link = str(link['href'])
break
page = requests.get(str(entry['link']))
soup = BeautifulSoup(page.text, 'html.parser')
for element in soup.find_all('a'):
href = element.get('href')
datart = element.get('data-rt')
if datart == 'top-area-play-button':
svt_id = href.split('=')[1].split('&')[0]
api = json.loads(
requests.get(
'https://api.svt.se/video/{}'.format(svt_id)).text)
resolved_link = ''
for reference in api['videoReferences']:
if reference['format'] == "dashhbbtv":
resolved_link = reference['url']
print(resolved_link)
description = str(entry['description'])
published_parsed = entry['published_parsed']
title = str(entry['title'])
res = requests.get(thumbnail_link)
content = res.content
content_bytes = io.BytesIO(content)
self.m_cache[key] = {'thumbnail_link': thumbnail_link}
self.m_cache[key]['content'] = content
self.m_cache[key]['resolved_link'] = resolved_link
self.m_cache[key]['description'] = description
self.m_cache[key]['published_parsed'] = published_parsed
self.m_cache[key]['title'] = title
image = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1)
thumbnail = wx.Bitmap(image)
item = Item(description, resolved_link, self.m_provider_name,
published_parsed, thumbnail, title)
self.m_items.append(item)
# write to cache file
with open(self.m_cachefile, 'wb') as cachehandle:
pickle.dump(self.m_cache, cachehandle)
if svt_id == 'feed':
for entry in entries:
key = hashlib.sha256(entry['link'].encode('utf-8')).hexdigest()
if key in self.m_cache.keys():
thumbnail_link = self.m_cache[key]['thumbnail_link']
resolved_link = self.m_cache[key]['resolved_link']
description = self.m_cache[key]['description']
published_parsed = self.m_cache[key]['published_parsed']
title = self.m_cache[key]['title']
else:
for link in entry['links']:
if str(link['type']).startswith('image/'):
thumbnail_link = str(link['href'])
break
page = requests.get(str(entry['link']))
soup = BeautifulSoup(page.text, 'html.parser')
for element in soup.find_all('a'):
href = element.get('href')
datart = element.get('data-rt')
if datart == 'top-area-play-button':
svt_id = href.split('=')[1].split('&')[0]
resolved_link = resolve_link(svt_id)
description = str(entry['description'])
published_parsed = entry['published_parsed']
title = str(entry['title'])
self.m_cache[key] = {'thumbnail_link': thumbnail_link}
self.m_cache[key]['resolved_link'] = resolved_link
self.m_cache[key]['description'] = description
self.m_cache[key]['published_parsed'] = published_parsed
self.m_cache[key]['title'] = title
thumbnail = make_bitmap_from_url(thumbnail_link)
if resolved_link:
item = Item(description, resolved_link,
self.m_provider_name, published_parsed,
thumbnail, title)
self.m_items.append(item)
# write to cache file
with open(self.m_cachefile, 'wb') as cachehandle:
pickle.dump(self.m_cache, cachehandle)
else:
chan_dict = resolve_svt_channel(svt_id)
resolved_link = resolve_link(svt_id)
title = chan_dict['name']
published_parsed = datetime.now()
description = "Live channel stream"
thumbnail = make_bitmap_from_url(chan_dict['thumbnail_url'])
if resolved_link:
item = Item(description, resolved_link, self.m_provider_name,
published_parsed, thumbnail, title)
self.m_items.append(item)
def resolve_link(svt_id: str) -> str:
api = json.loads(
requests.get('https://api.svt.se/video/{}'.format(svt_id)).text)
resolved_link = ''
try:
for reference in api['videoReferences']:
if reference['format'] == "dashhbbtv":
resolved_link = reference['url']
except KeyError:
pass
return resolved_link

@ -8,9 +8,12 @@ from typing import Union
import feedparser
import requests
import wx
from bs4 import BeautifulSoup
from youtube_dl import YoutubeDL as yt
from Channel import Channel
from Items import Item
from youtube_dl import YoutubeDL as yt
from Utils import get_default_log_url, make_bitmap_from_url
class YouTube(Channel):
@ -18,11 +21,12 @@ class YouTube(Channel):
m_cachefile = '/tmp/yt_cache'
def __init__(self, channel_id) -> None:
self.m_channel_id = channel_id
rss_url = 'https://www.youtube.com/feeds/videos.xml?channel_id={}'.format(
channel_id)
logo_url = 'https://upload.wikimedia.org/wikipedia/commons/'
logo_url += 'thumb/0/09/YouTube_full-color_icon_(2017).svg/480px-YouTube_full-color_icon_(2017).svg.png'
logo_url = get_default_log_url('YouTube')
super().__init__(channel_id, rss_url, logo_url)
self.set_avatar()
self.m_items: Union[list[Item], None] = None
if os.path.exists(self.m_cachefile):
@ -33,6 +37,25 @@ class YouTube(Channel):
kwargs={})
self.m_thr.start()
def set_avatar(self) -> str:
info = get_info(self.m_channel_id)
bmap = self.get_logo_as_bitmap()
title = info['title']
dc = wx.MemoryDC(bmap)
cblack = wx.Colour(0, 0, 0)
cwhite = wx.Colour(255, 255, 255)
dc.SetTextForeground(cwhite)
dc.SetTextBackground(cblack)
dc.SetFont(wx.Font().Bold())
dc.SetBackgroundMode(wx.BRUSHSTYLE_SOLID)
w, h = dc.GetSize()
tw, th = dc.GetTextExtent(title)
dc.DrawText(title, (w - tw) / 2, (h - th) / 2) #display text in center
del dc
self.m_logo = bmap
return ""
def wait(self) -> bool:
return self.m_thr.is_alive()
@ -52,7 +75,6 @@ class YouTube(Channel):
if key in self.m_cache.keys():
thumbnail_link = self.m_cache[key]['thumbnail_link']
content = self.m_cache[key]['content']
resolved_link = self.m_cache[key]['resolved_link']
description = self.m_cache[key]['description']
published_parsed = self.m_cache[key]['published_parsed']
@ -64,8 +86,7 @@ class YouTube(Channel):
description = str(entry['description'])
link = ''
with yt(ydl_opts) as ydl:
video = yt(ydl_opts).extract_info(entry['link'],
download=False)
video = ydl.extract_info(entry['link'], download=False)
for form in video['formats']:
if form['height']:
@ -76,23 +97,26 @@ class YouTube(Channel):
resolved_link = link
published_parsed = entry['published_parsed']
res = requests.get(thumbnail_link)
content = res.content
if not resolved_link:
continue
content_bytes = io.BytesIO(content)
self.m_cache[key] = {'thumbnail_link': thumbnail_link}
self.m_cache[key]['content'] = content
self.m_cache[key]['resolved_link'] = resolved_link
self.m_cache[key]['description'] = description
self.m_cache[key]['published_parsed'] = published_parsed
self.m_cache[key]['title'] = title
image = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1)
thumbnail = wx.Bitmap(image)
thumbnail = make_bitmap_from_url(thumbnail_link)
item = Item(description, resolved_link, self.m_provider_name,
published_parsed, thumbnail, title)
self.m_items.append(item)
# write to cache file
with open(self.m_cachefile, 'wb') as cachehandle:
pickle.dump(self.m_cache, cachehandle)
def get_info(channel_id: str) -> str:
link = 'https://www.youtube.com/channel/{}'.format(channel_id)
ydl_opts = {'extract_flat': True, 'skip_download': True}
with yt(ydl_opts) as ydl:
info = ydl.extract_info(link, download=False)
return info

@ -1,40 +1,30 @@
#!/usr/bin/env python3
import io
from typing import Union
import requests
import wx
import io
from Items import Item
default_logo = "https://upload.wikimedia.org/wikipedia/commons/"
default_logo += "thumb/f/fd/Cartoon_Hand_Playing_Multiple_Online_Videos.svg/"
default_logo += "480px-Cartoon_Hand_Playing_Multiple_Online_Videos.svg.png"
from Utils import get_default_log_url, make_bitmap_from_url
class Channel:
def __init__(self,
provider_name: str,
feed: str,
logo_url: str = default_logo) -> None:
logo_url: str = '') -> None:
if not logo_url:
logo_url = get_default_logo_url()
self.m_logo_url = logo_url
self.m_logo = make_bitmap_from_url(self.m_logo_url)
self.m_provider_name = provider_name
self.m_feed = feed
self.m_items: Union[list[Item], None] = None
res = requests.get(logo_url)
content = res.content
content_bytes = io.BytesIO(content)
self.m_logo = wx.Image(content_bytes,
type=wx.BITMAP_TYPE_ANY,
index=-1)
def get_logo_as_bitmap(self) -> wx.Bitmap:
"""
[TODO:description]
:rtype wx.Image: [TODO:description]
"""
return wx.Bitmap(self.m_logo)
return self.m_logo
def get_feed(self) -> str:
return self.m_feed

@ -0,0 +1,35 @@
#/usr/bin/env python3
import io
import requests
import wx
from Channel import Channel
from Utils import get_default_log_url, make_bitmap_from_url
class ChannelProvider:
def __init__(self, providerid: str, channels=list()):
self.m_id = providerid
self.m_logo_url = get_default_log_url(providerid)
self.m_logo: wx.Bitmap = make_bitmap_from_url(self.m_logo_url)
self.m_channels: list[Channel] = channels
def append_channel(self, channel: Channel) -> int:
self.m_channels.append(channel)
return len(self.m_channels)
def get_channels(self) -> list[Channel]:
return self.m_channels
def get_channel_by_index(self, channel_index: int) -> Channel:
return self.m_channels[channel_index]
def get_logo_as_bitmap(self) -> wx.Bitmap:
return self.m_logo
def get_name(self) -> str:
return self.m_id_
def get_logo_url(self) -> str:
links: dict = {'ch-svt1'}

@ -1,7 +1,8 @@
#!/usr/bin/env python3
import wx
from datetime import datetime
import wx
class Item(dict):
def __init__(self,

@ -0,0 +1,64 @@
#!/usr/bin/env python3
import io
import requests
import wx
def get_default_log_url(providerid: str = 'default') -> str:
if providerid == 'SVT':
return 'https://upload.wikimedia.org/wikipedia/commons/thumb/4/4b/Logotyp_SVT_Play.png/480px-Logotyp_SVT_Play.jpg'
if providerid == 'YouTube':
return 'https://upload.wikimedia.org/wikipedia/commons/thumb/a/af/Youtube.png/480px-Youtube.jpg'
else:
return 'https://upload.wikimedia.org/wikipedia/commons/thumb/f/fd/Cartoon_Hand_Playing_Multiple_Online_Videos.svg/480px-Cartoon_Hand_Playing_Multiple_Online_Videos.svg.jpg'
def make_bitmap_from_url(logo_url: str = get_default_log_url()) -> wx.Bitmap:
res = requests.get(logo_url)
content = res.content
content_bytes = io.BytesIO(content)
logo = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1)
return wx.Bitmap(logo)
def resolve_svt_channel(svt_id: str) -> dict:
channels = {
"ch-barnkanalen": {
"name":
"Barnkanalen",
"thumbnail_url":
"https://upload.wikimedia.org/wikipedia/commons/thumb/5/52/SVT_Barnkanalen_logo_2008%E2%80%932012.svg/480px-SVT_Barnkanalen_logo_2008%E2%80%932012.svg.jpg"
},
"ch-svt1": {
"name":
"SVT 1",
"thumbnail_url":
"https://upload.wikimedia.org/wikipedia/commons/thumb/c/c2/SVT1_logo_2012.svg/480px-SVT1_logo_2012.svg.jpg"
},
"ch-svt2": {
"name":
"SVT 2",
"thumbnail_url":
"https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/SVT2_logo_2012.svg/480px-SVT2_logo_2012.svg.jpg"
},
"ch-svt24": {
"name":
"SVT 24",
"thumbnail_url":
"https://upload.wikimedia.org/wikipedia/commons/thumb/8/8f/SVT24_logo.svg/480px-SVT24_logo.svg.jpg"
},
"kunskapskanalen": {
"name":
"Kunskapskanalen",
"thumbnail_url":
"https://upload.wikimedia.org/wikipedia/commons/thumb/b/b9/Kunskapskanalen_logo.png/480px-Kunskapskanalen_logo.jpg"
},
"feed": {
"name": "Senaste program",
"thumbnail_url": get_default_log_url('SVT')
},
}
return channels[svt_id]

@ -9,8 +9,7 @@ import wx.lib.scrolledpanel as scrolled
import wx.media
from Channel import SVT, Channel, YouTube
Channel = NewType('Channel', Channel)
from ChannelProvider import ChannelProvider
class Cast(wx.Frame):
@ -33,9 +32,17 @@ class Cast(wx.Frame):
self.m_control = None
self.m_panel.SetupScrolling()
self.m_panel.SetSizer(self.m_sizer)
self.m_providers: list[Channel] = [
YouTube.YouTube('UCu6mSoMNzHQiBIOCkHUa2Aw'),
SVT.SVT()
self.m_providers: list[ChannelProvider] = [
ChannelProvider('SVT',
channels=[
SVT.SVT('feed'),
SVT.SVT('ch-svt1'),
SVT.SVT('ch-svt2'),
SVT.SVT('ch-svt24')
]),
ChannelProvider(
'YouTube',
channels=[YouTube.YouTube('UCu6mSoMNzHQiBIOCkHUa2Aw')])
]
self.m_selected_channel = None
self.show_splash(None)
@ -51,14 +58,34 @@ class Cast(wx.Frame):
def show_splash(self, _) -> None:
self.m_sizer.Clear(delete_windows=True)
self.m_sizer = wx.BoxSizer(wx.VERTICAL)
channel_index = 0
provider_index = 0
for provider in self.m_providers:
bitmap = provider.get_logo_as_bitmap()
btn = wx.BitmapButton(self.m_panel,
id=provider_index,
bitmap=bitmap)
btn.Bind(wx.EVT_BUTTON,
lambda event, index=provider_index: self.
show_channel_list(event, index))
self.m_sizer.Add(btn)
provider_index += 1
self.m_panel.SetSizer(self.m_sizer)
self.m_sizer.Fit(self)
self.m_panel.Layout()
def show_channel_list(self, _, provider_index) -> None:
self.m_sizer.Clear(delete_windows=True)
self.m_sizer = wx.BoxSizer(wx.VERTICAL)
self.m_selected_provider = self.m_providers[provider_index]
channel_index = 0
for channel in self.m_selected_provider.get_channels():
bitmap = channel.get_logo_as_bitmap()
btn = wx.BitmapButton(self.m_panel,
id=channel_index,
bitmap=bitmap)
btn.Bind(wx.EVT_BUTTON,
lambda event, index=channel_index: self.show_list(
lambda event, index=channel_index: self.show_video_list(
event, index))
self.m_sizer.Add(btn)
channel_index += 1
@ -67,7 +94,7 @@ class Cast(wx.Frame):
self.m_sizer.Fit(self)
self.m_panel.Layout()
def show_list(self, _, index=0) -> None:
def show_video_list(self, _, index=0) -> None:
"""
Shows a list of videos
@ -77,7 +104,7 @@ class Cast(wx.Frame):
self.m_sizer.Clear(delete_windows=True)
self.m_sizer = wx.BoxSizer(wx.VERTICAL)
channel = self.m_providers[index]
channel = self.m_selected_provider.get_channel_by_index(index)
if channel.wait():
with wx.BusyInfo("Please wait, working..."):
index = 0
@ -129,9 +156,9 @@ class Cast(wx.Frame):
pause_button = wx.Button(self.m_panel, -1, "Pause")
back_button = wx.Button(self.m_panel, -1, "Back")
back_button.Bind(
wx.EVT_BUTTON,
lambda event, index=provider_index: self.show_list(event, index))
back_button.Bind(wx.EVT_BUTTON,
lambda event, index=provider_index: self.
show_video_list(event, index))
self.m_sizer.Add(self.m_control, (0, 0))
self.m_sizer.SetItemSpan(0, (0, 6))
@ -157,7 +184,7 @@ class Cast(wx.Frame):
play_button.Bind(wx.EVT_BUTTON, self.play)
pause_button.Bind(wx.EVT_BUTTON, self.pause)
self.Bind(wx.media.EVT_MEDIA_FINISHED, self.show_list)
self.Bind(wx.media.EVT_MEDIA_FINISHED, self.show_video_list)
self.load_uri(uri)
self.m_panel.SetSizer(self.m_sizer)
self.m_sizer.Fit(self)

Loading…
Cancel
Save