#!/usr/bin/env python3 import hashlib import io import json import sqlite3 import time from datetime import datetime from os import environ, makedirs, path from typing import Callable, Union from urllib.parse import urlparse from youtubesearchpython import ChannelsSearch import requests import wx import youtube_dl from Items import Item HEIGHT = int(1440 / 2) BTN_HEIGHT = 40 SIZE = wx.Size(100, 68) MYPATH = path.dirname(path.abspath(__file__)) SCREEN_WIDTH = int(720 / 2) BASEPATH = path.join(str(environ.get("HOME")), '.config/cast') DB_FILE_NAME = 'cast.db' SUB_TABLE = 'subscriptions' VIDEO_TABLE = 'videos' def add_subscription(channel_id: str, name: str, basepath: str = BASEPATH, filename: str = DB_FILE_NAME) -> None: fullpath = path.join(basepath, filename) thumbpath = path.join(basepath, 'thumbnails') thumbnail = path.join(thumbpath, channel_id) fullpath = path.join(basepath, filename) if not path.isdir(thumbpath): makedirs(thumbpath) if not path.isfile(thumbnail): channels_search = ChannelsSearch(name, limit=1).result()['result'][0] #type: ignore bitmap = make_bitmap_from_url('https:' + channels_search['thumbnails'][0]['url']) bitmap.SaveFile(thumbnail, wx.BITMAP_TYPE_PNG) con = sqlite3.connect(fullpath) cur = con.cursor() create_query: str = '''CREATE TABLE IF NOT EXISTS {} (channel_id TEXT PRIMARY KEY, channel_name TEXT, thumb_path TEXT)'''.format( SUB_TABLE) cur.execute(create_query) con.commit() upsert_query: str = '''INSERT INTO {} (channel_id, channel_name, thumb_path) VALUES(?,?,?) ON CONFLICT(channel_id) DO NOTHING'''.format( SUB_TABLE ) cur.execute(upsert_query, [channel_id, name, thumbnail]) con.commit() def add_video(video_id: str, channel_id: str, provider_id: str, description: str, link: str, published: Union[datetime, time.struct_time], bitmap: wx.Bitmap, title: str, watchtime: int, basepath: str = BASEPATH, filename: str = DB_FILE_NAME) -> None: try: timestamp = published.timestamp() #type: ignore except AttributeError: timestamp = time.mktime(published) #type: ignore if not video_id: video_id = hash_string(link) thumbpath = path.join(basepath, 'thumbnails') thumbnail = path.join(thumbpath, video_id) fullpath = path.join(basepath, filename) if not path.isdir(thumbpath): makedirs(thumbpath) if not path.isfile(thumbnail): bitmap.SaveFile(thumbnail, wx.BITMAP_TYPE_PNG) con = sqlite3.connect(fullpath) cur = con.cursor() create_query: str = '''CREATE TABLE IF NOT EXISTS {} (video_id TEXT PRIMARY KEY, channel_id TEXT, provider_id TEXT, title TEXT, link TEXT, description TEXT, thumbnail TEXT, published DATETIME, watchtime NUMBER)'''.format(VIDEO_TABLE) cur.execute(create_query) con.commit() upsert_query: str = '''INSERT INTO {} (video_id, channel_id, provider_id, title, link, description, thumbnail, published, watchtime) VALUES(?,?,?,?,?,?,?,?,?) ON CONFLICT(video_id) DO NOTHING'''.format( VIDEO_TABLE) cur.execute(upsert_query, [ video_id, channel_id, provider_id, title, link, description, thumbnail, int(timestamp), watchtime ]) con.commit() def get_default_logo(providerid: str = 'default') -> wx.Bitmap: if providerid == 'SVT': return wx.Bitmap('{}/assets/SVT.png'.format(MYPATH)) if providerid == 'YouTube': return wx.Bitmap('{}/assets/YouTube.png'.format(MYPATH)) else: return wx.Bitmap('{}/assets/Default.png'.format(MYPATH)) def get_latest(provider_id: str, basepath: str = BASEPATH, filename: str = DB_FILE_NAME) -> list[Item]: videos = list() fullpath = path.join(basepath, filename) try: con = sqlite3.connect(fullpath) cur = con.cursor() select_query = '''SELECT * FROM {} WHERE provider_id = ? ORDER BY published DESC LIMIT 50'''.format( VIDEO_TABLE) cur.execute(select_query, [provider_id]) for result in cur.fetchall(): description = result[5] link = result[4] provider_id = result[2] published = datetime.fromtimestamp(int(result[7])) thumbnail = wx.Bitmap(result[6]) title = result[3] watchtime = result[8] videos.append( Item(description, link, provider_id, published, thumbnail, title, watchtime=watchtime)) # Make an item from db except sqlite3.OperationalError: pass return videos def get_latest_video_timestamp(channel_id: str, basepath: str = BASEPATH, filename: str = DB_FILE_NAME) -> datetime: fullpath = path.join(basepath, filename) try: con = sqlite3.connect(fullpath) cur = con.cursor() select_query = '''SELECT max(published) FROM {} WHERE channel_id = ?'''.format( VIDEO_TABLE) cur.execute(select_query, [channel_id]) timestamp = cur.fetchone()[0] except sqlite3.OperationalError: timestamp = 0 pass return datetime.fromtimestamp(timestamp) def get_subscriptions(basepath: str = BASEPATH, filename: str = DB_FILE_NAME) -> list[tuple[str, str]]: subscriptions = list() fullpath = path.join(basepath, filename) try: con = sqlite3.connect(fullpath) cur = con.cursor() select_query = '''SELECT * FROM {}'''.format(SUB_TABLE) cur.execute(select_query) for result in cur.fetchall(): subscriptions.append(result) except sqlite3.OperationalError: pass return subscriptions def get_videos(channel_id: str, basepath: str = BASEPATH, filename: str = DB_FILE_NAME) -> list[Item]: videos = list() fullpath = path.join(basepath, filename) try: con = sqlite3.connect(fullpath) cur = con.cursor() select_query = '''SELECT * FROM {} WHERE channel_id = ? ORDER BY published DESC'''.format( VIDEO_TABLE) cur.execute(select_query, [channel_id]) for result in cur.fetchall(): description = result[5] link = result[4] provider_id = result[2] published = datetime.fromtimestamp(int(result[7])) thumbnail = wx.Bitmap(result[6]) title = result[3] watchtime = result[8] videos.append( Item(description, link, provider_id, published, thumbnail, title, watchtime=watchtime)) # Make an item from db except sqlite3.OperationalError: pass return videos def hash_string(string: str) -> str: hash_object = hashlib.sha256(string.encode('utf-8')) return hash_object.hexdigest() def import_from_newpipe(filename) -> None: if path.isfile(filename): with open(filename, 'r') as subs: sub_data = json.loads(subs.read()) for channel in sub_data['subscriptions']: if channel['service_id'] == 0: channel_id = urlparse(channel['url']).path.split('/').pop() add_subscription(channel_id, channel['name']) def make_sized_button(parent_pnl: wx.Panel, bitmap_or_str: Union[wx.Bitmap, str], text: str, callback: Callable) -> wx.BoxSizer: btn_sizer = wx.BoxSizer(wx.HORIZONTAL) if type(bitmap_or_str) == type(str): if bitmap_or_str.startswith('http'): # type: ignore bitmap = make_bitmap_from_url(bitmap_or_str) # type: ignore else: bitmap = wx.Bitmap(bitmap_or_str, wx.BITMAP_TYPE_ANY) else: bitmap = bitmap_or_str btn_logo_style = wx.BU_AUTODRAW | wx.BU_EXACTFIT | wx.BU_NOTEXT | wx.BORDER_NONE#| wx.BOTTOM | wx.RIGHT | wx.TOP btn_logo = wx.BitmapButton(parent_pnl, wx.ID_ANY, bitmap, style=btn_logo_style, size=SIZE) btn_logo.SetToolTip(text) btn_sizer.Add(btn_logo, 0, wx.EXPAND, 1) btn_text = wx.Button(parent_pnl, wx.ID_ANY, text, style=wx.BU_AUTODRAW | wx.BOTTOM | wx.LEFT | wx.TOP | wx.BORDER_NONE, size=wx.Size(SCREEN_WIDTH - SIZE.GetWidth(), SIZE.GetHeight())) btn_text.SetToolTip(text) btn_sizer.Add(btn_text, 0, wx.EXPAND, 1) parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_logo) parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_text) return btn_sizer def make_bitmap_from_url(logo_url: str, size: wx.Size = SIZE) -> wx.Bitmap: res = requests.get(logo_url) content = res.content content_bytes = io.BytesIO(content) image = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1) scale_factor = image.GetWidth() / size.GetWidth() size.SetWidth(image.GetWidth() / scale_factor) height = image.GetHeight() size.SetHeight(height / scale_factor) image.Rescale(size.GetWidth(), size.GetHeight()) return wx.Bitmap(image) def make_bitmap_from_file(path, size: wx.Size = SIZE) -> wx.Bitmap: image = wx.Image(path, type=wx.BITMAP_TYPE_ANY, index=-1) scale_factor = image.GetWidth() / size.GetWidth() size.SetWidth(image.GetWidth() / scale_factor) height = image.GetHeight() size.SetHeight(height / scale_factor) image.Rescale(size.GetWidth(), size.GetHeight()) return wx.Bitmap(image) def resolve_svt_channel(svt_id: str) -> dict: channels = { "ch-barnkanalen": { "name": "Barnkanalen", "thumbnail": make_bitmap_from_file('{}/assets/Barnkanalen.png'.format(MYPATH)) }, "ch-svt1": { "name": "SVT 1", "thumbnail": make_bitmap_from_file('{}/assets/SVT1.png'.format(MYPATH)) }, "ch-svt2": { "name": "SVT 2", "thumbnail": make_bitmap_from_file('{}/assets/SVT2.png'.format(MYPATH)) }, "ch-svt24": { "name": "SVT 24", "thumbnail": make_bitmap_from_file('{}/assets/SVT24.png'.format(MYPATH)) }, "ch-kunskapskanalen": { "name": "Kunskapskanalen", "thumbnail": make_bitmap_from_file( '{}/assets/Kunskapskanalen.png'.format(MYPATH)) }, "feed": { "name": "Senaste program", "thumbnail": make_bitmap_from_file('{}/assets/SVT.png'.format(MYPATH)) }, } return channels[svt_id] def resolve_youtube_link(link): ydl_opts = { 'format': 'worstvideo[ext=mp4]+worstaudio[ext=m4a]/worstvideo+worstaudio', 'container': 'webm dash' } with youtube_dl.YoutubeDL(ydl_opts) as ydl: try: video = ydl.extract_info(link, download=False) for form in video['formats']: # type: ignore if form['height']: if form['height'] < 480 and form['acodec'] != 'none': link = form['url'] except youtube_dl.utils.ExtractorError and youtube_dl.utils.DownloadError: pass return link def video_exists(video_id: str, channel_id: str, basepath: str = BASEPATH, filename: str = DB_FILE_NAME) -> bool: fullpath = path.join(basepath, filename) try: con = sqlite3.connect(fullpath) cur = con.cursor() select_query = '''SELECT * FROM {} WHERE channel_id = ? AND video_id = ?'''.format( VIDEO_TABLE) cur.execute(select_query, [channel_id, video_id]) return bool(len(cur.fetchall())) except sqlite3.OperationalError: return False