Add support for importing subscriptions from NewPipe

main
Micke Nordin 3 years ago
parent b0b05cbecc
commit ed4ddf1d01
Signed by: micke
GPG Key ID: 0DA0A7A5708FE257

@ -1,42 +1,46 @@
import hashlib
import io
import os
import pickle
import threading
import time
from typing import Union
import feedparser
import requests
import wx
from bs4 import BeautifulSoup
from youtube_dl import YoutubeDL as yt
from Channel import Channel
from Items import Item
from Utils import get_default_logo, make_bitmap_from_url
from youtube_dl import YoutubeDL as yt
from youtube_dl.utils import DownloadError, ExtractorError
class YouTube(Channel):
m_cache: dict = dict()
m_cachefile = '/tmp/yt_cache'
def __init__(self, channel_id) -> None:
def __init__(self, channel_id: str, name: str) -> None:
self.m_channel_id = channel_id
self.m_info = get_info(channel_id)
self.m_name = name
rss_url = 'https://www.youtube.com/feeds/videos.xml?channel_id={}'.format(
channel_id)
self.m_cachefile = '/tmp/yt_cache_{}'.format(channel_id)
self.m_logo = get_default_logo('YouTube')
super().__init__(channel_id, rss_url, self.m_logo, self.m_info['title'])
super().__init__(channel_id, rss_url, self.m_logo,
self.m_name)
self.m_items: Union[list[Item], None] = None
if os.path.exists(self.m_cachefile):
with open(self.m_cachefile, 'rb') as cachehandle:
self.m_cache = pickle.load(cachehandle)
try:
self.m_cache = pickle.load(cachehandle)
except EOFError or pickle.UnpicklingError:
pass
self.m_thr = threading.Thread(target=self.parse_feed,
args=(),
kwargs={})
self.m_thr.start()
self.pickle()
def wait(self) -> bool:
return self.m_thr.is_alive()
@ -68,13 +72,16 @@ class YouTube(Channel):
description = str(entry['description'])
link = ''
with yt(ydl_opts) as ydl:
video = ydl.extract_info(entry['link'], download=False)
try:
video = ydl.extract_info(entry['link'], download=False)
for form in video['formats']:
if form['height']:
if form['height'] < 480 and form[
'acodec'] != 'none':
link = form['url']
for form in video['formats']:
if form['height']:
if form['height'] < 480 and form[
'acodec'] != 'none':
link = form['url']
except ExtractorError or DownloadError:
pass
resolved_link = link
@ -87,19 +94,16 @@ class YouTube(Channel):
self.m_cache[key]['description'] = description
self.m_cache[key]['published_parsed'] = published_parsed
self.m_cache[key]['title'] = title
thumbnail = make_bitmap_from_url(thumbnail_link, wx.Size(self.m_screen_width,150))
thumbnail = make_bitmap_from_url(thumbnail_link,
wx.Size(self.m_screen_width, 150))
item = Item(description, resolved_link, self.m_provider_name,
published_parsed, thumbnail, title)
self.m_items.append(item)
def pickle(self) -> None:
while self.wait():
time.sleep(1)
# write to cache file
with open(self.m_cachefile, 'wb') as cachehandle:
pickle.dump(self.m_cache, cachehandle)
def get_info(channel_id: str) -> dict:
info: dict = {'title': 'unknown'}
link = 'https://www.youtube.com/channel/{}'.format(channel_id)
ydl_opts = {'extract_flat': True, 'skip_download': True}
with yt(ydl_opts) as ydl:
info = ydl.extract_info(link, download=False)
return info

@ -89,7 +89,7 @@ def resolve_svt_channel(svt_id: str) -> dict:
"SVT 24",
"thumbnail": make_bitmap_from_file('{}/assets/SVT24.png'.format(MYPATH))
},
"kunskapskanalen": {
"ch-kunskapskanalen": {
"name":
"Kunskapskanalen",
"thumbnail": make_bitmap_from_file('{}/assets/Kunskapskanalen.png'.format(MYPATH))

@ -1,14 +1,16 @@
#!/usr/bin/env python3
import json
import os
import threading
import time
from typing import Callable
from urllib.parse import urlparse
import pychromecast
import wx
import wx.lib.scrolledpanel as scrolled
import wx.media
from typing import Callable
from Channel import SVT, YouTube
from ChannelProvider import ChannelProvider
from Utils import make_sized_button
@ -22,6 +24,7 @@ SCROLL_RATE = 5
BM_BTN_STYLE = wx.BOTTOM | wx.EXPAND | wx.LEFT | wx.TOP
BTN_STYLE = wx.BORDER_NONE | wx.BU_AUTODRAW | wx.BU_EXACTFIT | wx.BU_NOTEXT
class Cast(wx.Frame):
def __init__(self, *args, **kw):
"""__init__.
@ -32,47 +35,27 @@ class Cast(wx.Frame):
self.m_selected_chromecast = None
self.SetSizeHints(WIDTH, HEIGHT, maxW=WIDTH)
self.m_index = 0
self.m_chromecast_thr = threading.Thread(
target=self.get_chromecasts, args=(), kwargs={}
)
self.m_chromecast_thr = threading.Thread(target=self.get_chromecasts,
args=(),
kwargs={})
self.m_chromecast_thr.start()
self.m_sizer: wx.Sizer = wx.BoxSizer(wx.VERTICAL)
self.m_panel: wx.lib.scrolledpanel.ScrolledPanel = scrolled.ScrolledPanel( # type: ignore
self, -1, style=wx.VSCROLL
)
self.m_panel: wx.lib.scrolledpanel.ScrolledPanel = scrolled.ScrolledPanel( # type: ignore
self, -1, style=wx.VSCROLL)
self.m_control = None
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE,scrollToTop=True)
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True)
self.m_panel.SetSizer(self.m_sizer)
self.m_providers: list[ChannelProvider] = [
ChannelProvider(
"SVT",
channels=[
SVT.SVT("feed"),
SVT.SVT("ch-svt1"),
SVT.SVT("ch-svt2"),
SVT.SVT("ch-svt24"),
],
),
ChannelProvider(
"YouTube",
channels=[
YouTube.YouTube("UCtESv1e7ntJaLJYKIO1FoYw"),
YouTube.YouTube("UC9-y-6csu5WGm29I7JiwpnA"),
YouTube.YouTube("UCoxcjq-8xIDTYp3uz647V5A"),
YouTube.YouTube("UCu6mSoMNzHQiBIOCkHUa2Aw"),
],
),
]
self.m_providers: list[ChannelProvider] = self.get_providers()
self.m_selected_channel = None
self.m_selected_provider_index = None
self,
self.show_provider_list(None)
def add_back_button(self, callback: Callable) -> None:
backbtn = wx.Button(self.m_panel, -1, label="Go back", size=(WIDTH, BTN_HEIGHT))
backbtn.Bind(
wx.EVT_BUTTON, callback
)
backbtn = wx.Button(self.m_panel,
-1,
label="Go back",
size=(WIDTH, BTN_HEIGHT))
backbtn.Bind(wx.EVT_BUTTON, callback)
self.m_sizer.Add(backbtn)
def get_chromecasts(self) -> None:
@ -83,22 +66,68 @@ class Cast(wx.Frame):
"""
self.m_chromecasts, self.m_browser = pychromecast.get_chromecasts()
def get_providers(self) -> list[ChannelProvider]:
providers = list()
svt = ChannelProvider(
"SVT",
channels=[
SVT.SVT("feed"),
SVT.SVT("ch-svt1"),
SVT.SVT("ch-svt2"),
SVT.SVT("ch-svt24"),
SVT.SVT("ch-barnkanalen"),
SVT.SVT("ch-kunskapskanalen"),
],
)
providers.append(svt)
youtube = ChannelProvider(
"YouTube",
channels=[
YouTube.YouTube("UCs6A_0Jm21SIvpdKyg9Gmxw", "Pine 64"),
],
)
subfile = 'yt_subs.json'
if os.path.isfile(subfile):
with open(subfile, 'r') as subs:
janson = json.loads(subs.read())
for channel in janson['subscriptions']:
if channel['service_id'] == 0:
channel_id = urlparse(
channel['url']).path.split('/').pop()
youtube.append_channel(
YouTube.YouTube(channel_id, channel['name']))
providers.append(youtube)
return providers
def show_provider_list(self, _) -> None:
self.m_sizer.Clear(delete_windows=True)
self.m_sizer = wx.BoxSizer(wx.VERTICAL)
self.m_sizer.AddSpacer(SPACER_HEIGHT * 4)
closebtn = wx.Button(self.m_panel, -1, label="Close", size=(WIDTH, BTN_HEIGHT))
closebtn = wx.Button(self.m_panel,
-1,
label="Close",
size=(WIDTH, BTN_HEIGHT))
closebtn.Bind(wx.EVT_BUTTON, lambda event: self.Destroy())
self.m_sizer.Add(closebtn)
provider_index = 0
for provider in self.m_providers:
bitmap = provider.get_logo_as_bitmap()
callback = lambda event, index=provider_index: self.show_channel_list(event, index)
btn_sizer: wx.BoxSizer = make_sized_button(self.m_panel,bitmap,provider.get_name(),callback)
callback = lambda event, index=provider_index: self.show_channel_list(
event, index)
btn_sizer: wx.BoxSizer = make_sized_button(self.m_panel, bitmap,
provider.get_name(),
callback)
self.m_sizer.Add(btn_sizer)
provider_index += 1
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE,scrollToTop=True)
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True)
self.m_panel.SetSizer(self.m_sizer)
self.m_sizer.Fit(self)
self.m_sizer.Layout()
@ -112,14 +141,18 @@ class Cast(wx.Frame):
bck_callback = lambda event: self.show_provider_list(event)
self.add_back_button(bck_callback)
channel_index = 0
for channel in self.m_selected_provider.get_channels():
bitmap = channel.get_logo_as_bitmap()
callback = lambda event, index=channel_index: self.show_video_list(event, index)
btn_sizer: wx.BoxSizer = make_sized_button(self.m_panel,bitmap,channel.get_name(),callback)
callback = lambda event, index=channel_index: self.show_video_list(
event, index)
btn_sizer: wx.BoxSizer = make_sized_button(self.m_panel, bitmap,
channel.get_name(),
callback)
self.m_sizer.Add(btn_sizer)
channel_index += 1
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE,scrollToTop=True)
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True)
self.m_panel.SetSizer(self.m_sizer)
self.m_sizer.Fit(self)
self.m_sizer.Layout()
@ -129,30 +162,36 @@ class Cast(wx.Frame):
self.m_sizer = wx.BoxSizer(wx.VERTICAL)
self.m_sizer.AddSpacer(SPACER_HEIGHT * 4)
channel = self.m_selected_provider.get_channel_by_index(index)
if channel.wait():
with wx.BusyInfo("Please wait, working..."):
index = 0
while channel.wait():
time.sleep(1)
wx.GetApp().Yield()
callback = lambda event: self.show_channel_list(event, self.m_selected_provider_index)
callback = lambda event: self.show_channel_list(
event, self.m_selected_provider_index)
self.add_back_button(callback)
for item in channel.get_items(): # type: ignore
for item in channel.get_items(): # type: ignore
inner_sizer = wx.BoxSizer(wx.VERTICAL)
title = wx.StaticText(self.m_panel, -1)
title.SetLabelMarkup("<span weight='bold' >{}</span>".format(item["title"]))
title.SetLabelMarkup("<span weight='bold' >{}</span>".format(
item["title"]))
description = wx.StaticText(self.m_panel, -1, item["description"])
description.Wrap(WIDTH -2)
description.Wrap(WIDTH - 2)
bitmap = item["thumbnail"]
btn = wx.BitmapButton(self.m_panel, id=self.m_index, bitmap=bitmap, style= BM_BTN_STYLE)
btn = wx.BitmapButton(self.m_panel,
id=self.m_index,
bitmap=bitmap,
style=BM_BTN_STYLE)
btn.Bind(
wx.EVT_BUTTON,
lambda event, link=item["link"], provider_index=index: self.show_player(
event, link, provider_index
),
lambda event, link=item["link"], provider_index=index: self.
show_player(event, link, provider_index),
)
inner_sizer.Add(title)
inner_sizer.Add(btn)
@ -160,7 +199,7 @@ class Cast(wx.Frame):
self.m_sizer.Add(inner_sizer)
self.m_sizer.AddSpacer(SPACER_HEIGHT)
self.m_index = self.m_index + 1
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE,scrollToTop=True)
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True)
self.m_panel.SetSizer(self.m_sizer)
self.m_sizer.Fit(self)
self.m_sizer.Layout()
@ -178,8 +217,8 @@ class Cast(wx.Frame):
inner_sizer = wx.GridBagSizer()
self.m_control = wx.media.MediaCtrl(
self.m_panel,
# size=(WIDTH, HEIGHT/2),
# style=wx.SIMPLE_BORDER,
size=(WIDTH, HEIGHT / 2),
style=wx.SIMPLE_BORDER,
szBackend=wx.media.MEDIABACKEND_GSTREAMER,
)
play_button = wx.Button(self.m_panel, -1, "Play")
@ -189,7 +228,8 @@ class Cast(wx.Frame):
back_button = wx.Button(self.m_panel, -1, "Back")
back_button.Bind(
wx.EVT_BUTTON,
lambda event, index=provider_index: self.show_video_list(event, index),
lambda event, index=provider_index: self.show_video_list(
event, index),
)
inner_sizer.Add(self.m_control, (0, 0))
@ -198,13 +238,13 @@ class Cast(wx.Frame):
inner_sizer.Add(pause_button, (1, 2))
inner_sizer.Add(back_button, (1, 3))
if not self.m_chromecast_thr.is_alive() and not self.m_selected_chromecast:
if not self.m_chromecast_thr.is_alive(
) and not self.m_selected_chromecast:
chromecast_button = wx.Button(self.m_panel, -1, "Cast")
chromecast_button.Bind(
wx.EVT_BUTTON,
lambda event, muri=uri, index=provider_index: self.select_chromecast(
event, muri, index
),
lambda event, muri=uri, index=provider_index: self.
select_chromecast(event, muri, index),
)
inner_sizer.Add(chromecast_button, (2, 2))
self.m_sizer.Add(inner_sizer)
@ -223,7 +263,7 @@ class Cast(wx.Frame):
self.Bind(wx.media.EVT_MEDIA_FINISHED, self.show_video_list)
self.load_uri(uri)
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE,scrollToTop=True)
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True)
self.m_panel.SetSizer(self.m_sizer)
self.m_sizer.Fit(self)
self.m_sizer.Layout()
@ -234,23 +274,31 @@ class Cast(wx.Frame):
self.m_sizer = wx.BoxSizer(wx.VERTICAL)
self.m_sizer.AddSpacer(SPACER_HEIGHT * 4)
cancel_btn = wx.Button(self.m_panel, -1, "Cancel", size=(WIDTH, BTN_HEIGHT), style=wx.BU_EXACTFIT)
cancel_btn = wx.Button(self.m_panel,
-1,
"Cancel",
size=(WIDTH, BTN_HEIGHT),
style=wx.BU_EXACTFIT)
cancel_btn.Bind(
wx.EVT_BUTTON,
lambda event, index=provider_index: self.show_video_list(event, index),
lambda event, index=provider_index: self.show_video_list(
event, index),
)
self.m_sizer.Add(cancel_btn) #, wx.ALIGN_CENTER_VERTICAL)
self.m_sizer.Add(cancel_btn) #, wx.ALIGN_CENTER_VERTICAL)
for cast in self.m_chromecasts:
friendly_name = cast.cast_info.friendly_name
btn = wx.Button(self.m_panel, id=-1, label=friendly_name, size=(WIDTH, BTN_HEIGHT))
btn = wx.Button(self.m_panel,
id=-1,
label=friendly_name,
size=(WIDTH, BTN_HEIGHT))
btn.Bind(
wx.EVT_BUTTON,
lambda event, chromecast=cast, muri=uri, index=provider_index: self.set_chromecast(
event, chromecast, muri, index
),
lambda event, chromecast=cast, muri=uri, index=provider_index:
self.set_chromecast(event, chromecast, muri, index),
)
self.m_sizer.Add(btn)#, wx.ALIGN_CENTER_VERTICAL)
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE,scrollToTop=True)
self.m_sizer.Add(btn) #, wx.ALIGN_CENTER_VERTICAL)
self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True)
self.m_panel.SetSizer(self.m_sizer)
self.m_sizer.Fit(self)
self.m_sizer.Layout()
@ -278,11 +326,8 @@ class Cast(wx.Frame):
break
if (
cast.socket_client.is_connected
and has_played
and player_state != "PLAYING"
):
if (cast.socket_client.is_connected and has_played
and player_state != "PLAYING"):
has_played = False
cast.media_controller.play_media(uri, mimetype)

Loading…
Cancel
Save