You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cast/src/Utils/__init__.py

413 lines
14 KiB

#!/usr/bin/env python3
import hashlib
import io
import json
import sqlite3
import time
from datetime import datetime
from os import environ, makedirs, path
from typing import Callable, Union
import requests
import wx
from bs4 import BeautifulSoup
from Items import Item
HEIGHT = int(1440 / 2)
BTN_HEIGHT = 40
SIZE = wx.Size(68, 100)
SCREEN_WIDTH = int(720 / 2)
BASEPATH = path.join(str(environ.get("HOME")), '.config/cast')
DB_FILE_NAME = 'cast.db'
SUB_TABLE = 'subscriptions'
VIDEO_TABLE = 'videos'
2 years ago
CAT_CACHE = None
CHAN_CACHE = None
def add_video(video_id: str,
channel_id: str,
provider_id: str,
description: str,
link: str,
published: Union[datetime, time.struct_time],
bitmap: wx.Bitmap,
title: str,
watchtime: int,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> None:
try:
timestamp = published.timestamp() #type: ignore
except AttributeError:
timestamp = time.mktime(published) #type: ignore
if not video_id:
video_id = hash_string(link)
thumbpath = path.join(basepath, 'thumbnails')
thumbnail = path.join(thumbpath, video_id)
fullpath = path.join(basepath, filename)
if not path.isdir(thumbpath):
makedirs(thumbpath)
if not path.isfile(thumbnail):
bitmap.SaveFile(thumbnail, wx.BITMAP_TYPE_PNG)
con = sqlite3.connect(fullpath)
cur = con.cursor()
create_query: str = '''CREATE TABLE IF NOT EXISTS {}
(video_id TEXT PRIMARY KEY, channel_id TEXT, provider_id TEXT,
title TEXT, link TEXT, description TEXT, thumbnail TEXT,
published DATETIME, watchtime NUMBER)'''.format(VIDEO_TABLE)
cur.execute(create_query)
con.commit()
upsert_query: str = '''INSERT INTO {} (video_id, channel_id, provider_id, title, link, description, thumbnail, published, watchtime)
VALUES(?,?,?,?,?,?,?,?,?) ON CONFLICT(video_id) DO NOTHING'''.format(
VIDEO_TABLE)
cur.execute(upsert_query, [
video_id, channel_id, provider_id, title, link, description, thumbnail,
int(timestamp), watchtime
])
con.commit()
2 years ago
def get_svt_thumb_from_id_changed(id: str,
changed: str,
size: str = "480") -> str:
return "https://www.svtstatic.se/image/custom/{}/{}/{}".format(
size, id, changed)
2 years ago
def get_all_svt_categories() -> list:
global CAT_CACHE
if CAT_CACHE:
categories = CAT_CACHE
else:
categories: list = list()
url = "https://www.svtplay.se/kategori"
data = get_svt_data(url)
for entry in data:
if 'genres' in entry.keys():
categories = entry['genres']
break
CAT_CACHE = categories
return categories
2 years ago
def get_all_svt_channels() -> dict:
url = "https://www.svtplay.se/kanaler"
result: dict = dict()
2 years ago
data = get_svt_data(url)
for entry in data:
2 years ago
if "channels" in entry:
for channel in entry["channels"]:
if type(entry["channels"][channel]) == type(list()):
for item in entry["channels"][channel]:
if item["__typename"] == "Channel" and "running" in item:
result[item["id"]] = {
"thumbnail":
make_bitmap_from_url(
get_svt_thumb_from_id_changed(
item["running"]["image"]['id'],
2 years ago
item["running"]["image"]['changed'],
SCREEN_WIDTH),
wx.Size(width=SCREEN_WIDTH, height=480)),
2 years ago
"name":
item["name"]
}
return result
2 years ago
def get_svt_category(category: str) -> list:
url = 'https://www.svtplay.se/kategori/{}?tab=all'.format(category)
2 years ago
data = get_svt_data(url)
programs = list()
for entry in data:
2 years ago
if "categoryPage" in entry.keys():
for tab in entry["categoryPage"]["lazyLoadedTabs"]:
if "selections" in tab.keys():
for selection in tab["selections"]:
programs += selection['items']
break
return programs
2 years ago
def get_svt_data(url: str) -> list:
2 years ago
result: list = list()
res = requests.get(url)
2 years ago
soup = BeautifulSoup(res.text, features="lxml")
2 years ago
data = json.loads(soup.find(
id="__NEXT_DATA__").string)["props"]["urqlState"] # type: ignore
for key in data.keys():
2 years ago
result.append(json.loads(data[key]["data"]))
return result
2 years ago
def get_all_svt_programs() -> list:
url = 'https://www.svtplay.se/program'
2 years ago
data = get_svt_data(url)
programs = list()
for entry in data:
if "programAtillO" in entry.keys():
for selection in entry["programAtillO"]["selections"]:
for item in selection["items"]:
item['selection_name'] = selection['name']
programs.append(item)
break
return programs
def get_default_logo(providerid: str = 'default',
path: str = '/usr/share/cast') -> wx.Bitmap:
if providerid.startswith('SVT'):
return wx.Bitmap('{}/assets/SVT.png'.format(path))
else:
return wx.Bitmap('{}/assets/Default.png'.format(path))
def get_latest(provider_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> list[Item]:
videos = list()
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {} WHERE provider_id = ? ORDER BY published DESC LIMIT 50'''.format(
VIDEO_TABLE)
cur.execute(select_query, [provider_id])
for result in cur.fetchall():
description = result[5]
link = result[4]
provider_id = result[2]
published = datetime.fromtimestamp(int(result[7]))
thumbnail = wx.Bitmap(result[6])
title = result[3]
watchtime = result[8]
videos.append(
Item(description,
link,
provider_id,
published,
thumbnail,
title,
watchtime=watchtime)) # Make an item from db
except sqlite3.OperationalError:
pass
return videos
2 years ago
def get_latest_video_timestamp(channel_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> datetime:
2 years ago
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT max(published) FROM {} WHERE channel_id = ?'''.format(
VIDEO_TABLE)
cur.execute(select_query, [channel_id])
timestamp = cur.fetchone()[0]
except sqlite3.OperationalError:
timestamp = 0
pass
return datetime.fromtimestamp(timestamp)
def get_subscriptions(basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> list[tuple[str, str]]:
subscriptions = list()
fullpath = path.join(basepath, filename)
try:
2 years ago
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {}'''.format(SUB_TABLE)
cur.execute(select_query)
for result in cur.fetchall():
subscriptions.append(result)
except sqlite3.OperationalError:
pass
return subscriptions
def get_svt_id(link: str) -> str:
svt_id = str()
page = requests.get(link)
soup = BeautifulSoup(page.text, 'html.parser')
for element in soup.find_all('a'):
href = element.get('href')
datart = element.get('data-rt')
if datart == 'top-area-play-button':
svt_id = href.split('=')[1].split('&')[0]
return svt_id
def get_svt_thumbnail(link: str) -> str:
page = requests.get(link)
soup = BeautifulSoup(page.text, 'html.parser')
meta = soup.find(property="og:image")
2 years ago
image_link = meta["content"] #type: ignore
2 years ago
return image_link # type: ignore
def get_videos(channel_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> list[Item]:
videos = list()
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {} WHERE channel_id = ? ORDER BY published DESC'''.format(
VIDEO_TABLE)
cur.execute(select_query, [channel_id])
for result in cur.fetchall():
description = result[5]
link = result[4]
provider_id = result[2]
published = datetime.fromtimestamp(int(result[7]))
thumbnail = wx.Bitmap(result[6])
title = result[3]
watchtime = result[8]
videos.append(
Item(description,
link,
provider_id,
published,
thumbnail,
title,
watchtime=watchtime)) # Make an item from db
except sqlite3.OperationalError:
pass
return videos
def hash_string(string: str) -> str:
hash_object = hashlib.sha256(string.encode('utf-8'))
return hash_object.hexdigest()
def make_sized_button(parent_pnl: wx.Panel, bitmap_or_str: Union[wx.Bitmap,
str],
text: str, callback: Callable) -> wx.BoxSizer:
btn_sizer = wx.StaticBoxSizer(wx.HORIZONTAL, parent_pnl)
if type(bitmap_or_str) == type(str):
if bitmap_or_str.startswith('http'): # type: ignore
bitmap = make_bitmap_from_url(bitmap_or_str) # type: ignore
else:
bitmap = wx.Bitmap(bitmap_or_str, wx.BITMAP_TYPE_ANY)
else:
bitmap = bitmap_or_str
btn_style = wx.BORDER_NONE | wx.BU_AUTODRAW | wx.BU_EXACTFIT | wx.BU_NOTEXT
btn_logo = wx.BitmapButton(parent_pnl,
wx.ID_ANY,
bitmap,
style=btn_style,
size=wx.Size(100, 68))
btn_logo.SetToolTip(text)
btn_sizer.Add(btn_logo, 0, wx.EXPAND, 1)
btn_text = wx.Button(parent_pnl,
wx.ID_ANY,
text,
style=wx.BORDER_NONE | wx.BU_AUTODRAW,
size=wx.Size(SCREEN_WIDTH - 100, SIZE.GetHeight()))
btn_text.SetToolTip(text)
btn_sizer.Add(btn_text, 0, wx.EXPAND, 1)
parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_logo)
parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_text)
return btn_sizer
2 years ago
def make_bitmap_from_url(logo_url: str,
size: wx.Size = SIZE,
video_id: str = "") -> wx.Bitmap:
if not video_id:
video_id = hash_string(logo_url)
thumbpath = path.join(BASEPATH, 'thumbnails')
thumbnail = path.join(thumbpath, video_id)
if path.isfile(thumbnail):
return make_bitmap_from_file(thumbnail, size)
res = requests.get(logo_url)
content = res.content
content_bytes = io.BytesIO(content)
image = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1)
scale_factor = image.GetWidth() / size.GetWidth()
size.SetWidth(int(image.GetWidth() / scale_factor))
2 years ago
height = image.GetHeight()
size.SetHeight(int(height / scale_factor))
image.Rescale(size.GetWidth(), size.GetHeight())
bitmap = wx.Bitmap(image)
if not path.isdir(thumbpath):
makedirs(thumbpath)
if not path.isfile(thumbnail):
bitmap.SaveFile(thumbnail, wx.BITMAP_TYPE_PNG)
return bitmap
2 years ago
def make_bitmap_from_file(path, size: wx.Size = SIZE) -> wx.Bitmap:
image = wx.Image(path, type=wx.BITMAP_TYPE_ANY, index=-1)
scale_factor = image.GetWidth() / size.GetWidth()
size.SetWidth(int(image.GetWidth() / scale_factor))
height = image.GetHeight()
size.SetHeight(int(height / scale_factor))
image.Rescale(size.GetWidth(), size.GetHeight())
2 years ago
return wx.Bitmap(image)
def resolve_svt_channel(svt_id: str, path: str = '/usr/share/cast') -> dict:
2 years ago
global CHAN_CACHE
2 years ago
2 years ago
if CHAN_CACHE:
channels = CHAN_CACHE
else:
channels = get_all_svt_channels()
channels["feed"] = {
"name": "Senaste program",
"thumbnail":
make_bitmap_from_file('{}/assets/SVT.png'.format(path))
2 years ago
}
channels["allprograms"] = {
"name": "Alla program",
"thumbnail":
make_bitmap_from_file('{}/assets/SVT.png'.format(path))
2 years ago
}
2 years ago
for category in get_all_svt_categories():
channels[category['id']] = {
"name":
category["name"],
"thumbnail":
make_bitmap_from_url(
get_svt_thumb_from_id_changed(
category['image']['id'], category['image']['changed']))
}
CHAN_CACHE = channels
return channels[svt_id]
def video_exists(video_id: str,
channel_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> bool:
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {} WHERE channel_id = ? AND video_id = ?'''.format(
VIDEO_TABLE)
cur.execute(select_query, [channel_id, video_id])
return bool(len(cur.fetchall()))
except sqlite3.OperationalError:
return False