#!/usr/bin/env python3 import mpv import json import os import threading import time from typing import Callable from urllib.parse import urlparse import pychromecast import wx import wx.lib.scrolledpanel as scrolled import wx.media from Channel import SVT, YouTube from ChannelProvider import ChannelProvider from Utils import get_subscriptions, import_from_newpipe, make_sized_button WIDTH = int(720 / 2) HEIGHT = int(1440 / 2) BTN_HEIGHT = 40 SPACER_HEIGHT = 10 SCROLL_RATE = 5 BM_BTN_STYLE = wx.BOTTOM | wx.EXPAND | wx.LEFT | wx.TOP BTN_STYLE = wx.BORDER_NONE | wx.BU_AUTODRAW | wx.BU_EXACTFIT | wx.BU_NOTEXT class Cast(wx.Frame): def __init__(self, *args, **kw): """__init__. :param args: :param kw: """ super().__init__(*args, **kw) self.m_selected_chromecast = None self.SetSizeHints(WIDTH, HEIGHT, maxW=WIDTH) self.m_chromecast_thr = threading.Thread(target=self.get_chromecasts, args=(), kwargs={}) self.m_chromecast_thr.start() self.m_sizer: wx.Sizer = wx.BoxSizer(wx.VERTICAL) self.m_panel: wx.lib.scrolledpanel.ScrolledPanel = scrolled.ScrolledPanel( # type: ignore self, -1, style=wx.VSCROLL) self.m_control = None self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True) self.m_panel.SetSizer(self.m_sizer) self.m_providers: list[ChannelProvider] = self.get_providers() # self.m_selected_channel = None # self.m_selected_provider_index = None self.show_provider_list(None) def add_back_button(self, callback: Callable) -> None: backbtn = wx.Button(self.m_panel, -1, label="Go back", size=(WIDTH, BTN_HEIGHT)) backbtn.Bind(wx.EVT_BUTTON, callback) self.m_sizer.Add(backbtn) def get_chromecasts(self) -> None: """ [TODO:description] :rtype None: [TODO:description] """ self.m_chromecasts, self.m_browser = pychromecast.get_chromecasts() def get_providers(self) -> list[ChannelProvider]: providers = list() channels = list() svt = ChannelProvider( "SVT", channels=[ SVT.SVT("feed"), SVT.SVT("ch-svt1"), SVT.SVT("ch-svt2"), SVT.SVT("ch-svt24"), SVT.SVT("ch-barnkanalen"), SVT.SVT("ch-kunskapskanalen"), ], ) providers.append(svt) subscriptions = get_subscriptions() if subscriptions: for channel in subscriptions: channels.append(YouTube.YouTube(channel[0], channel[1])) else: channels.append(YouTube.YouTube("UCs6A_0Jm21SIvpdKyg9Gmxw", "Pine 64")) youtube = ChannelProvider("YouTube", channels=channels) providers.append(youtube) return providers def show_importer(self, _) -> None: with wx.FileDialog(self, "Open Newpipe json file", wildcard="Json files (*.json)|*.json", style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) as file_dialog: if file_dialog.ShowModal() == wx.ID_CANCEL: return # the user changed their mind # Proceed loading the file chosen by the user subfile = file_dialog.GetPath() channels = list() if os.path.isfile(subfile): import_from_newpipe(subfile) subscriptions = get_subscriptions() for channel in subscriptions: yt_chan = YouTube.YouTube(channel[0], channel[1]) yt_chan.refresh() channels.append(yt_chan) self.m_providers[1].set_channels(channels) self.m_providers[1].make_latest() self.show_channel_list(None,self.m_selected_provider_index) def show_provider_list(self, _) -> None: self.m_sizer.Clear(delete_windows=True) self.m_sizer = wx.BoxSizer(wx.VERTICAL) # self.m_sizer.AddSpacer(SPACER_HEIGHT * 4) closebtn = wx.Button(self.m_panel, -1, label="Close", size=(WIDTH, BTN_HEIGHT)) closebtn.Bind(wx.EVT_BUTTON, lambda event: self.Destroy()) self.m_sizer.Add(closebtn) provider_index = 0 for provider in self.m_providers: bitmap = provider.get_logo_as_bitmap() callback = lambda event, pindex=provider_index: self.show_channel_list( event, pindex) btn_sizer: wx.BoxSizer = make_sized_button(self.m_panel, bitmap, provider.get_name(), callback) self.m_sizer.Add(btn_sizer) provider_index += 1 self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True) self.m_panel.SetSizer(self.m_sizer) self.m_sizer.Fit(self) self.m_sizer.Layout() def show_channel_list(self, _, provider_index) -> None: self.m_selected_provider_index = provider_index self.m_selected_provider = self.m_providers[provider_index] self.m_sizer.Clear(delete_windows=True) self.m_sizer = wx.BoxSizer(wx.VERTICAL) #self.m_sizer.AddSpacer(SPACER_HEIGHT * 4) bck_callback = lambda event: self.show_provider_list(event) self.add_back_button(bck_callback) if self.m_selected_provider.get_name() == "YouTube": importbtn = wx.Button(self.m_panel, -1, label="Import from NewPipe",size=(WIDTH, BTN_HEIGHT)) importbtn.Bind(wx.EVT_BUTTON, lambda event: self.show_importer(event)) self.m_sizer.Add(importbtn) channel_index = 0 for channel in self.m_selected_provider.get_channels(): bitmap = channel.get_logo_as_bitmap() callback = lambda event, cindex=channel_index: self.show_video_list( event, cindex) btn_sizer: wx.BoxSizer = make_sized_button(self.m_panel, bitmap, channel.get_name(), callback) self.m_sizer.Add(btn_sizer) channel_index += 1 self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True) self.m_panel.SetSizer(self.m_sizer) self.m_sizer.Fit(self) self.m_sizer.Layout() def show_video_list(self, _,channel_index) -> None: self.m_selected_channel = self.m_selected_provider.get_channel_by_index(channel_index) self.m_sizer.Clear(delete_windows=True) self.m_sizer = wx.BoxSizer(wx.VERTICAL) # self.m_sizer.AddSpacer(SPACER_HEIGHT * 4) back_callback = lambda event: self.show_channel_list( event, self.m_selected_provider_index) self.add_back_button(back_callback) if self.m_selected_provider.get_name() == 'YouTube' or (self.m_selected_provider.get_name() == 'SVT' and self.m_selected_channel.get_id() == 'feed'): def refresh_callback(event): if self.m_selected_provider.get_name() == 'YouTube' and channel_index == 0: with wx.BusyInfo("Please wait, working..."): for chan in self.m_selected_provider.get_channels(): chan.refresh() wait = 1 while wait > 0: wait = 0 for chan in self.m_selected_provider.get_channels(): if chan.wait(): wait += 1 time.sleep(1) wx.GetApp().Yield() else: self.m_selected_channel.refresh() self.show_video_list(event,channel_index) refreshbtn = wx.Button(self.m_panel, -1, label="Refresh", size=(WIDTH, BTN_HEIGHT)) refreshbtn.Bind(wx.EVT_BUTTON, refresh_callback) self.m_sizer.Add(refreshbtn) if self.m_selected_channel.wait(): with wx.BusyInfo("Please wait, working..."): while self.m_selected_channel.wait(): time.sleep(1) wx.GetApp().Yield() btnindex = 0 for item in self.m_selected_channel.get_items(): # type: ignore inner_sizer = wx.BoxSizer(wx.VERTICAL) pane_sizer = wx.BoxSizer(wx.VERTICAL) title = wx.StaticText(self.m_panel, -1) title.SetLabelMarkup("{}".format( item["title"])) bitmap = item["thumbnail"] btn = wx.BitmapButton(self.m_panel, id=btnindex, bitmap=bitmap, style=BM_BTN_STYLE) btn.Bind( wx.EVT_BUTTON, lambda event, link=item["link"], cindex=channel_index: self. show_player(event, link, cindex), ) inner_sizer.Add(title) inner_sizer.Add(btn) collapsable_pane = wx.CollapsiblePane(self.m_panel, wx.ID_ANY, "Details:") inner_sizer.Add(collapsable_pane, 0, wx.GROW | wx.ALL, 5) pane_win = collapsable_pane.GetPane() # now add a test label in the collapsible pane using a sizer to layout it: description = wx.StaticText(pane_win, -1, item["description"]) description.Wrap(WIDTH - 2) pane_sizer.Add(description, wx.GROW | wx.ALL, 2) pane_win.SetSizer(pane_sizer) pane_sizer.SetSizeHints(pane_win) def fit_and_layout(_): pane_sizer.Layout() pane_sizer.Fit(pane_win) inner_sizer.Layout() inner_sizer.Fit(self) self.m_sizer.Fit(self) self.m_sizer.Layout() collapsable_pane.Bind(wx.EVT_COLLAPSIBLEPANE_CHANGED, lambda event: fit_and_layout(event) ) #inner_sizer.Add(description) self.m_sizer.Add(inner_sizer) self.m_sizer.AddSpacer(SPACER_HEIGHT) btnindex = btnindex + 1 self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True) self.m_panel.SetSizer(self.m_sizer) self.m_sizer.Fit(self) self.m_sizer.Layout() def show_player(self, _, uri, channel_index: int): """ Show the video player :param _ event: unused :param uri str: the link to the video stream """ self.m_sizer.Clear(delete_windows=True) self.m_sizer = wx.BoxSizer(wx.VERTICAL) #inner_sizer = wx.GridBagSizer() window = wx.Window(self.m_panel, size=(WIDTH, int(HEIGHT / 2)), ) self.m_sizer.Add(window) player = mpv.MPV(ytdl=False,player_operation_mode='pseudo-gui', script_opts='osc-layout=box,osc-seekbarstyle=bar,osc-deadzonesize=0,osc-minmousemove=3', input_default_bindings=True, input_vo_keyboard=True, osc=True) #player = mpv.MPV(ytdl=True, input_default_bindings=True, input_vo_keyboard=True, osc=True, wid=str(window.GetId())) self.m_panel.SetSizer(self.m_sizer) self.m_sizer.Fit(self) self.m_sizer.Layout() try: player.play(uri) player.wait_for_playback() except mpv.ShutdownError: player.terminate() del player self.show_video_list(None,0) #self.m_control = wx.media.MediaCtrl( # self.m_panel, # size=(WIDTH, int(HEIGHT / 2)), # style=wx.SIMPLE_BORDER, # szBackend=wx.media.MEDIABACKEND_GSTREAMER, #) #play_button = wx.Button(self.m_panel, -1, "Play") #pause_button = wx.Button(self.m_panel, -1, "Pause") #back_button = wx.Button(self.m_panel, -1, "Back") #back_button.Bind( # wx.EVT_BUTTON, # lambda event, cindex=channel_index: self.show_video_list(event, cindex), #) #inner_sizer.Add(self.m_control, (0, 0)) #inner_sizer.SetItemSpan(0, (0, 6)) #inner_sizer.Add(play_button, (1, 1)) #inner_sizer.Add(pause_button, (1, 2)) #inner_sizer.Add(back_button, (1, 3)) #if not self.m_chromecast_thr.is_alive( #) and not self.m_selected_chromecast: # chromecast_button = wx.Button(self.m_panel, -1, "Cast") # chromecast_button.Bind( # wx.EVT_BUTTON, # lambda event, muri=uri, cindex=channel_index: self.select_chromecast(event, muri, cindex), # ) # inner_sizer.Add(chromecast_button, (2, 2)) #self.m_sizer.Add(inner_sizer) #if self.m_selected_chromecast: # self.Bind( # wx.media.EVT_MEDIA_LOADED, # lambda event, muri=uri: self.cast(event, muri), # ) # play_button.Bind(wx.EVT_BUTTON, self.play_cast) # pause_button.Bind(wx.EVT_BUTTON, self.pause_cast) #else: # self.Bind(wx.media.EVT_MEDIA_LOADED, self.play) # play_button.Bind(wx.EVT_BUTTON, self.play) # pause_button.Bind(wx.EVT_BUTTON, self.pause) #self.Bind(wx.media.EVT_MEDIA_FINISHED, lambda event: self.show_video_list(event,0)) #self.load_uri(uri) #self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True) #self.m_panel.SetSizer(self.m_sizer) #self.m_sizer.Fit(self) #self.m_sizer.Layout() #self.m_panel.ScrollChildIntoView(self.m_control) def select_chromecast(self, _, uri, channel_index): self.m_sizer.Clear(delete_windows=True) self.m_sizer = wx.BoxSizer(wx.VERTICAL) # self.m_sizer.AddSpacer(SPACER_HEIGHT * 4) cancel_btn = wx.Button(self.m_panel, -1, "Cancel", size=(WIDTH, BTN_HEIGHT), style=wx.BU_EXACTFIT) cancel_btn.Bind( wx.EVT_BUTTON, lambda event, cindex=channel_index: self.show_video_list( event, cindex), ) self.m_sizer.Add(cancel_btn) #, wx.ALIGN_CENTER_VERTICAL) for cast in self.m_chromecasts: friendly_name = cast.cast_info.friendly_name btn = wx.Button(self.m_panel, id=-1, label=friendly_name, size=(WIDTH, BTN_HEIGHT)) btn.Bind( wx.EVT_BUTTON, lambda event, chromecast=cast, muri=uri, cindex=channel_index: self.set_chromecast(event, chromecast, muri, cindex), ) self.m_sizer.Add(btn) #, wx.ALIGN_CENTER_VERTICAL) self.m_panel.SetupScrolling(rate_y=SCROLL_RATE, scrollToTop=True) self.m_panel.SetSizer(self.m_sizer) self.m_sizer.Fit(self) self.m_sizer.Layout() def set_chromecast(self, event, chromecast, uri, channel_index): self.m_selected_chromecast = chromecast self.show_player(event, uri, channel_index) def cast(self, _, uri): mimetype = "video/mp4" cast = self.m_selected_chromecast cast.wait() cast.media_controller.play_media(uri, mimetype) # Wait for player_state PLAYING player_state = None has_played = False while True: if player_state != cast.media_controller.status.player_state: player_state = cast.media_controller.status.player_state # print("Player state:", player_state) if player_state == "PLAYING": has_played = True break if (cast.socket_client.is_connected and has_played and player_state != "PLAYING"): has_played = False cast.media_controller.play_media(uri, mimetype) time.sleep(0.1) self.m_browser.stop_discovery() def pause_cast(self, event): cast = self.m_selected_chromecast cast.media_controller.pause() def play_cast(self, event): cast = self.m_selected_chromecast cast.media_controller.play() def load_uri(self, uri): self.m_control.LoadURI(uri) def play(self, _): self.m_control.Play() def pause(self, _): self.m_control.Pause() def quit(self, _): self.Destroy() if __name__ == "__main__": # When this module is run (not imported) then create the app, the # frame, show it, and start the event loop. app: wx.App = wx.App() frm: Cast = Cast(None, title="Cast") frm.Show() app.MainLoop()