|
|
|
#!/usr/bin/env python3
|
|
|
|
import hashlib
|
|
|
|
import io
|
|
|
|
import json
|
|
|
|
import sqlite3
|
|
|
|
import time
|
|
|
|
from datetime import datetime
|
|
|
|
from os import environ, makedirs, path
|
|
|
|
from typing import Callable, Union
|
|
|
|
|
|
|
|
import requests
|
|
|
|
import wx
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
|
|
from Items import Item
|
|
|
|
|
|
|
|
HEIGHT = int(1440 / 2)
|
|
|
|
BTN_HEIGHT = 40
|
|
|
|
SIZE = wx.Size(68, 100)
|
|
|
|
SCREEN_WIDTH = int(720 / 2)
|
|
|
|
BASEPATH = path.join(str(environ.get("HOME")), '.config/cast')
|
|
|
|
DB_FILE_NAME = 'cast.db'
|
|
|
|
SUB_TABLE = 'subscriptions'
|
|
|
|
VIDEO_TABLE = 'videos'
|
|
|
|
|
|
|
|
|
|
|
|
def add_video(video_id: str,
|
|
|
|
channel_id: str,
|
|
|
|
provider_id: str,
|
|
|
|
description: str,
|
|
|
|
link: str,
|
|
|
|
published: Union[datetime, time.struct_time],
|
|
|
|
bitmap: wx.Bitmap,
|
|
|
|
title: str,
|
|
|
|
watchtime: int,
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
filename: str = DB_FILE_NAME) -> None:
|
|
|
|
try:
|
|
|
|
timestamp = published.timestamp() #type: ignore
|
|
|
|
except AttributeError:
|
|
|
|
timestamp = time.mktime(published) #type: ignore
|
|
|
|
if not video_id:
|
|
|
|
video_id = hash_string(link)
|
|
|
|
thumbpath = path.join(basepath, 'thumbnails')
|
|
|
|
thumbnail = path.join(thumbpath, video_id)
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
if not path.isdir(thumbpath):
|
|
|
|
makedirs(thumbpath)
|
|
|
|
if not path.isfile(thumbnail):
|
|
|
|
bitmap.SaveFile(thumbnail, wx.BITMAP_TYPE_PNG)
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
cur = con.cursor()
|
|
|
|
create_query: str = '''CREATE TABLE IF NOT EXISTS {}
|
|
|
|
(video_id TEXT PRIMARY KEY, channel_id TEXT, provider_id TEXT,
|
|
|
|
title TEXT, link TEXT, description TEXT, thumbnail TEXT,
|
|
|
|
published DATETIME, watchtime NUMBER)'''.format(VIDEO_TABLE)
|
|
|
|
cur.execute(create_query)
|
|
|
|
con.commit()
|
|
|
|
|
|
|
|
upsert_query: str = '''INSERT INTO {} (video_id, channel_id, provider_id, title, link, description, thumbnail, published, watchtime)
|
|
|
|
VALUES(?,?,?,?,?,?,?,?,?) ON CONFLICT(video_id) DO NOTHING'''.format(
|
|
|
|
VIDEO_TABLE)
|
|
|
|
|
|
|
|
cur.execute(upsert_query, [
|
|
|
|
video_id, channel_id, provider_id, title, link, description, thumbnail,
|
|
|
|
int(timestamp), watchtime
|
|
|
|
])
|
|
|
|
|
|
|
|
con.commit()
|
|
|
|
|
|
|
|
def get_svt_thumb_from_id_changed(id: str, changed: str, size: str = "480") -> str:
|
|
|
|
return "https://www.svtstatic.se/image/custom/{}/{}/{}".format(size, id, changed)
|
|
|
|
|
|
|
|
def get_all_svt_categories() -> list:
|
|
|
|
categories: list = list()
|
|
|
|
url = "https://www.svtplay.se/kategori"
|
|
|
|
data = get_svt_data(url)
|
|
|
|
for entry in data:
|
|
|
|
if 'genres' in entry.keys():
|
|
|
|
categories = entry['genres']
|
|
|
|
break
|
|
|
|
return categories
|
|
|
|
|
|
|
|
def get_svt_category(category: str) -> list:
|
|
|
|
url = 'https://www.svtplay.se/kategori/{}?tab=all'.format(category)
|
|
|
|
data = get_svt_data(url)
|
|
|
|
programs = list()
|
|
|
|
for entry in data:
|
|
|
|
if "categoryPage" in entry.keys():
|
|
|
|
for tab in entry["categoryPage"]["lazyLoadedTabs"]:
|
|
|
|
if "selections" in tab.keys():
|
|
|
|
for selection in tab["selections"]:
|
|
|
|
programs += selection['items']
|
|
|
|
break
|
|
|
|
return programs
|
|
|
|
|
|
|
|
def get_svt_data(url: str) -> list:
|
|
|
|
result:list = list()
|
|
|
|
res = requests.get(url)
|
|
|
|
soup = BeautifulSoup(res.text, features="lxml")
|
|
|
|
data = json.loads(soup.find(id="__NEXT_DATA__").string)["props"]["urqlState"]
|
|
|
|
for key in data.keys():
|
|
|
|
result.append(json.loads(data[key]["data"]))
|
|
|
|
return result
|
|
|
|
|
|
|
|
def get_all_svt_programs() -> list:
|
|
|
|
url = 'https://www.svtplay.se/program'
|
|
|
|
data = get_svt_data(url)
|
|
|
|
programs = list()
|
|
|
|
for entry in data:
|
|
|
|
if "programAtillO" in entry.keys():
|
|
|
|
for selection in entry["programAtillO"]["selections"]:
|
|
|
|
for item in selection["items"]:
|
|
|
|
item['selection_name'] = selection['name']
|
|
|
|
programs.append(item)
|
|
|
|
break
|
|
|
|
|
|
|
|
return programs
|
|
|
|
|
|
|
|
|
|
|
|
def get_default_logo(providerid: str = 'default',
|
|
|
|
path: str = '/usr/share/cast') -> wx.Bitmap:
|
|
|
|
if providerid == 'SVT':
|
|
|
|
return wx.Bitmap('{}/assets/SVT.png'.format(path))
|
|
|
|
else:
|
|
|
|
return wx.Bitmap('{}/assets/Default.png'.format(path))
|
|
|
|
|
|
|
|
|
|
|
|
def get_latest(provider_id: str,
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
filename: str = DB_FILE_NAME) -> list[Item]:
|
|
|
|
videos = list()
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
try:
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
cur = con.cursor()
|
|
|
|
select_query = '''SELECT * FROM {} WHERE provider_id = ? ORDER BY published DESC LIMIT 50'''.format(
|
|
|
|
VIDEO_TABLE)
|
|
|
|
cur.execute(select_query, [provider_id])
|
|
|
|
for result in cur.fetchall():
|
|
|
|
description = result[5]
|
|
|
|
link = result[4]
|
|
|
|
provider_id = result[2]
|
|
|
|
published = datetime.fromtimestamp(int(result[7]))
|
|
|
|
thumbnail = wx.Bitmap(result[6])
|
|
|
|
title = result[3]
|
|
|
|
watchtime = result[8]
|
|
|
|
|
|
|
|
videos.append(
|
|
|
|
Item(description,
|
|
|
|
link,
|
|
|
|
provider_id,
|
|
|
|
published,
|
|
|
|
thumbnail,
|
|
|
|
title,
|
|
|
|
watchtime=watchtime)) # Make an item from db
|
|
|
|
except sqlite3.OperationalError:
|
|
|
|
pass
|
|
|
|
return videos
|
|
|
|
|
|
|
|
|
|
|
|
def get_latest_video_timestamp(channel_id: str,
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
filename: str = DB_FILE_NAME) -> datetime:
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
try:
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
cur = con.cursor()
|
|
|
|
select_query = '''SELECT max(published) FROM {} WHERE channel_id = ?'''.format(
|
|
|
|
VIDEO_TABLE)
|
|
|
|
cur.execute(select_query, [channel_id])
|
|
|
|
timestamp = cur.fetchone()[0]
|
|
|
|
except sqlite3.OperationalError:
|
|
|
|
timestamp = 0
|
|
|
|
pass
|
|
|
|
return datetime.fromtimestamp(timestamp)
|
|
|
|
|
|
|
|
|
|
|
|
def get_subscriptions(basepath: str = BASEPATH,
|
|
|
|
filename: str = DB_FILE_NAME) -> list[tuple[str, str]]:
|
|
|
|
subscriptions = list()
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
try:
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
cur = con.cursor()
|
|
|
|
select_query = '''SELECT * FROM {}'''.format(SUB_TABLE)
|
|
|
|
cur.execute(select_query)
|
|
|
|
for result in cur.fetchall():
|
|
|
|
subscriptions.append(result)
|
|
|
|
except sqlite3.OperationalError:
|
|
|
|
pass
|
|
|
|
return subscriptions
|
|
|
|
|
|
|
|
|
|
|
|
def get_svt_id(link: str) -> str:
|
|
|
|
svt_id = str()
|
|
|
|
page = requests.get(link)
|
|
|
|
soup = BeautifulSoup(page.text, 'html.parser')
|
|
|
|
|
|
|
|
for element in soup.find_all('a'):
|
|
|
|
href = element.get('href')
|
|
|
|
datart = element.get('data-rt')
|
|
|
|
|
|
|
|
if datart == 'top-area-play-button':
|
|
|
|
svt_id = href.split('=')[1].split('&')[0]
|
|
|
|
return svt_id
|
|
|
|
|
|
|
|
|
|
|
|
def get_svt_thumbnail(link: str) -> str:
|
|
|
|
page = requests.get(link)
|
|
|
|
soup = BeautifulSoup(page.text, 'html.parser')
|
|
|
|
meta = soup.find(property="og:image")
|
|
|
|
image_link = meta["content"]
|
|
|
|
|
|
|
|
return image_link
|
|
|
|
|
|
|
|
|
|
|
|
def get_videos(channel_id: str,
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
filename: str = DB_FILE_NAME) -> list[Item]:
|
|
|
|
videos = list()
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
try:
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
cur = con.cursor()
|
|
|
|
select_query = '''SELECT * FROM {} WHERE channel_id = ? ORDER BY published DESC'''.format(
|
|
|
|
VIDEO_TABLE)
|
|
|
|
cur.execute(select_query, [channel_id])
|
|
|
|
for result in cur.fetchall():
|
|
|
|
description = result[5]
|
|
|
|
link = result[4]
|
|
|
|
provider_id = result[2]
|
|
|
|
published = datetime.fromtimestamp(int(result[7]))
|
|
|
|
thumbnail = wx.Bitmap(result[6])
|
|
|
|
title = result[3]
|
|
|
|
watchtime = result[8]
|
|
|
|
|
|
|
|
videos.append(
|
|
|
|
Item(description,
|
|
|
|
link,
|
|
|
|
provider_id,
|
|
|
|
published,
|
|
|
|
thumbnail,
|
|
|
|
title,
|
|
|
|
watchtime=watchtime)) # Make an item from db
|
|
|
|
except sqlite3.OperationalError:
|
|
|
|
pass
|
|
|
|
return videos
|
|
|
|
|
|
|
|
|
|
|
|
def hash_string(string: str) -> str:
|
|
|
|
hash_object = hashlib.sha256(string.encode('utf-8'))
|
|
|
|
return hash_object.hexdigest()
|
|
|
|
|
|
|
|
|
|
|
|
def make_sized_button(parent_pnl: wx.Panel, bitmap_or_str: Union[wx.Bitmap,
|
|
|
|
str],
|
|
|
|
text: str, callback: Callable) -> wx.BoxSizer:
|
|
|
|
btn_sizer = wx.StaticBoxSizer(wx.HORIZONTAL, parent_pnl)
|
|
|
|
if type(bitmap_or_str) == type(str):
|
|
|
|
if bitmap_or_str.startswith('http'): # type: ignore
|
|
|
|
bitmap = make_bitmap_from_url(bitmap_or_str) # type: ignore
|
|
|
|
else:
|
|
|
|
bitmap = wx.Bitmap(bitmap_or_str, wx.BITMAP_TYPE_ANY)
|
|
|
|
else:
|
|
|
|
bitmap = bitmap_or_str
|
|
|
|
btn_style = wx.BORDER_NONE | wx.BU_AUTODRAW | wx.BU_EXACTFIT | wx.BU_NOTEXT
|
|
|
|
btn_logo = wx.BitmapButton(parent_pnl,
|
|
|
|
wx.ID_ANY,
|
|
|
|
bitmap,
|
|
|
|
style=btn_style,
|
|
|
|
size=wx.Size(100, 68))
|
|
|
|
btn_logo.SetToolTip(text)
|
|
|
|
btn_sizer.Add(btn_logo, 0, wx.EXPAND, 1)
|
|
|
|
|
|
|
|
btn_text = wx.Button(parent_pnl,
|
|
|
|
wx.ID_ANY,
|
|
|
|
text,
|
|
|
|
style=wx.BORDER_NONE | wx.BU_AUTODRAW,
|
|
|
|
size=wx.Size(SCREEN_WIDTH - 100, SIZE.GetHeight()))
|
|
|
|
btn_text.SetToolTip(text)
|
|
|
|
btn_sizer.Add(btn_text, 0, wx.EXPAND, 1)
|
|
|
|
parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_logo)
|
|
|
|
parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_text)
|
|
|
|
|
|
|
|
return btn_sizer
|
|
|
|
|
|
|
|
|
|
|
|
def make_bitmap_from_url(logo_url: str, size: wx.Size = SIZE) -> wx.Bitmap:
|
|
|
|
res = requests.get(logo_url)
|
|
|
|
content = res.content
|
|
|
|
content_bytes = io.BytesIO(content)
|
|
|
|
image = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1)
|
|
|
|
scale_factor = image.GetWidth() / size.GetWidth()
|
|
|
|
size.SetWidth(int(image.GetWidth() / scale_factor))
|
|
|
|
height = image.GetHeight()
|
|
|
|
size.SetHeight(int(height / scale_factor))
|
|
|
|
image.Rescale(size.GetWidth(), size.GetHeight())
|
|
|
|
return wx.Bitmap(image)
|
|
|
|
|
|
|
|
|
|
|
|
def make_bitmap_from_file(path, size: wx.Size = SIZE) -> wx.Bitmap:
|
|
|
|
image = wx.Image(path, type=wx.BITMAP_TYPE_ANY, index=-1)
|
|
|
|
scale_factor = image.GetWidth() / size.GetWidth()
|
|
|
|
size.SetWidth(int(image.GetWidth() / scale_factor))
|
|
|
|
height = image.GetHeight()
|
|
|
|
size.SetHeight(int(height / scale_factor))
|
|
|
|
image.Rescale(size.GetWidth(), size.GetHeight())
|
|
|
|
return wx.Bitmap(image)
|
|
|
|
|
|
|
|
|
|
|
|
def resolve_svt_channel(svt_id: str, path: str = '/usr/share/cast') -> dict:
|
|
|
|
|
|
|
|
channels = {
|
|
|
|
"ch-barnkanalen": {
|
|
|
|
"name":
|
|
|
|
"Barnkanalen",
|
|
|
|
"thumbnail":
|
|
|
|
make_bitmap_from_file('{}/assets/Barnkanalen.png'.format(path))
|
|
|
|
},
|
|
|
|
"ch-svt1": {
|
|
|
|
"name": "SVT 1",
|
|
|
|
"thumbnail":
|
|
|
|
make_bitmap_from_file('{}/assets/SVT1.png'.format(path))
|
|
|
|
},
|
|
|
|
"ch-svt2": {
|
|
|
|
"name": "SVT 2",
|
|
|
|
"thumbnail":
|
|
|
|
make_bitmap_from_file('{}/assets/SVT2.png'.format(path))
|
|
|
|
},
|
|
|
|
"ch-svt24": {
|
|
|
|
"name": "SVT 24",
|
|
|
|
"thumbnail":
|
|
|
|
make_bitmap_from_file('{}/assets/SVT24.png'.format(path))
|
|
|
|
},
|
|
|
|
"ch-kunskapskanalen": {
|
|
|
|
"name":
|
|
|
|
"Kunskapskanalen",
|
|
|
|
"thumbnail":
|
|
|
|
make_bitmap_from_file('{}/assets/Kunskapskanalen.png'.format(path))
|
|
|
|
},
|
|
|
|
"feed": {
|
|
|
|
"name": "Senaste program",
|
|
|
|
"thumbnail":
|
|
|
|
make_bitmap_from_file('{}/assets/SVT.png'.format(path))
|
|
|
|
},
|
|
|
|
"allprograms": {
|
|
|
|
"name": "Alla program",
|
|
|
|
"thumbnail":
|
|
|
|
make_bitmap_from_file('{}/assets/SVT.png'.format(path))
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
for category in get_all_svt_categories():
|
|
|
|
channels[category['id']] = {"name": category["name"], "thumbnail": make_bitmap_from_url(get_svt_thumb_from_id_changed(category['image']['id'], category['image']['changed']))}
|
|
|
|
|
|
|
|
return channels[svt_id]
|
|
|
|
|
|
|
|
|
|
|
|
def video_exists(video_id: str,
|
|
|
|
channel_id: str,
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
filename: str = DB_FILE_NAME) -> bool:
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
try:
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
cur = con.cursor()
|
|
|
|
select_query = '''SELECT * FROM {} WHERE channel_id = ? AND video_id = ?'''.format(
|
|
|
|
VIDEO_TABLE)
|
|
|
|
cur.execute(select_query, [channel_id, video_id])
|
|
|
|
|
|
|
|
return bool(len(cur.fetchall()))
|
|
|
|
except sqlite3.OperationalError:
|
|
|
|
return False
|