cast/src/Utils/__init__.py

354 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2022-01-08 21:21:34 +01:00
import hashlib
import io
2022-01-07 18:16:13 +01:00
import json
import sqlite3
2022-01-08 21:21:34 +01:00
import time
2022-01-07 21:00:09 +01:00
from datetime import datetime
2022-01-07 18:16:13 +01:00
from os import environ, makedirs, path
from typing import Callable, Union
from urllib.parse import urlparse
import requests
import wx
import youtube_dl
2022-01-08 21:21:34 +01:00
from Items import Item
HEIGHT = int(1440 / 2)
BTN_HEIGHT = 40
2022-01-07 18:16:13 +01:00
SIZE = wx.Size(100, 68)
MYPATH = path.dirname(path.abspath(__file__))
2022-01-07 18:16:13 +01:00
SCREEN_WIDTH = int(720 / 2)
BASEPATH = path.join(str(environ.get("HOME")), '.config/cast')
DB_FILE_NAME = 'cast.db'
SUB_TABLE = 'subscriptions'
VIDEO_TABLE = 'videos'
2022-01-07 18:16:13 +01:00
def add_subscription(channel_id: str,
name: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> None:
fullpath = path.join(basepath, filename)
if not path.isdir(basepath):
makedirs(basepath)
con = sqlite3.connect(fullpath)
cur = con.cursor()
create_query: str = '''CREATE TABLE IF NOT EXISTS {}
(channel_id TEXT PRIMARY KEY, channel_name TEXT)'''.format(SUB_TABLE)
cur.execute(create_query)
con.commit()
upsert_query: str = '''INSERT INTO {} (channel_id, channel_name)
2022-01-07 21:00:09 +01:00
VALUES('{}',"{}") ON CONFLICT(channel_id) DO NOTHING'''.format(
SUB_TABLE, channel_id, name)
2022-01-07 18:16:13 +01:00
cur.execute(upsert_query)
con.commit()
2022-01-07 21:00:09 +01:00
def add_video(video_id: str,
channel_id: str,
provider_id: str,
description: str,
link: str,
2022-01-08 21:21:34 +01:00
published: Union[datetime, time.struct_time],
2022-01-07 21:00:09 +01:00
bitmap: wx.Bitmap,
title: str,
2022-01-08 21:21:34 +01:00
watchtime: int,
2022-01-07 21:00:09 +01:00
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> None:
2022-01-08 21:21:34 +01:00
try:
timestamp = published.timestamp() #type: ignore
except AttributeError:
timestamp = time.mktime(published) #type: ignore
if not video_id:
video_id = hash_string(link)
thumbpath = path.join(basepath, 'thumbnails')
thumbnail = path.join(thumbpath, video_id)
2022-01-07 21:00:09 +01:00
fullpath = path.join(basepath, filename)
2022-01-08 21:21:34 +01:00
if not path.isdir(thumbpath):
makedirs(thumbpath)
2022-01-09 17:26:07 +01:00
if not path.isfile(thumbnail):
bitmap.SaveFile(thumbnail, wx.BITMAP_TYPE_PNG)
2022-01-07 21:00:09 +01:00
con = sqlite3.connect(fullpath)
cur = con.cursor()
create_query: str = '''CREATE TABLE IF NOT EXISTS {}
(video_id TEXT PRIMARY KEY, channel_id TEXT, provider_id TEXT,
2022-01-09 17:26:07 +01:00
title TEXT, link TEXT, description TEXT, thumbnail TEXT,
2022-01-08 21:21:34 +01:00
published DATETIME, watchtime NUMBER)'''.format(VIDEO_TABLE)
2022-01-07 21:00:09 +01:00
cur.execute(create_query)
con.commit()
upsert_query: str = '''INSERT INTO {} (video_id, channel_id, provider_id, title, link, description, thumbnail, published, watchtime)
VALUES(?,?,?,?,?,?,?,?,?) ON CONFLICT(video_id) DO NOTHING'''.format(
VIDEO_TABLE)
2022-01-08 21:21:34 +01:00
cur.execute(upsert_query, [
video_id, channel_id, provider_id, title, link, description, thumbnail,
int(timestamp), watchtime
])
2022-01-07 21:00:09 +01:00
con.commit()
def get_default_logo(providerid: str = 'default') -> wx.Bitmap:
if providerid == 'SVT':
return wx.Bitmap('{}/assets/SVT.png'.format(MYPATH))
if providerid == 'YouTube':
return wx.Bitmap('{}/assets/YouTube.png'.format(MYPATH))
else:
return wx.Bitmap('{}/assets/Default.png'.format(MYPATH))
2022-01-07 18:16:13 +01:00
2022-01-08 21:21:34 +01:00
def get_latest(provider_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> list[Item]:
videos = list()
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {} WHERE provider_id = ? ORDER BY published DESC LIMIT 50'''.format(
VIDEO_TABLE)
cur.execute(select_query, [provider_id])
for result in cur.fetchall():
description = result[5]
link = result[4]
provider_id = result[2]
published = datetime.fromtimestamp(int(result[7]))
thumbnail = wx.Bitmap(result[6])
title = result[3]
watchtime = result[8]
videos.append(
Item(description,
link,
provider_id,
published,
thumbnail,
title,
watchtime=watchtime)) # Make an item from db
except sqlite3.OperationalError:
pass
return videos
2022-01-09 13:24:17 +01:00
def get_latest_video_timestamp(channel_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> datetime:
2022-01-09 13:24:17 +01:00
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT max(published) FROM {} WHERE channel_id = ?'''.format(
VIDEO_TABLE)
cur.execute(select_query, [channel_id])
timestamp = cur.fetchone()[0]
except sqlite3.OperationalError:
timestamp = 0
pass
return datetime.fromtimestamp(timestamp)
2022-01-08 21:21:34 +01:00
2022-01-07 18:16:13 +01:00
def get_subscriptions(basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> list[tuple[str, str]]:
subscriptions = list()
fullpath = path.join(basepath, filename)
2022-01-08 22:33:55 +01:00
try:
2022-01-09 19:04:48 +01:00
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {}'''.format(SUB_TABLE)
2022-01-08 22:33:55 +01:00
cur.execute(select_query)
for result in cur.fetchall():
subscriptions.append(result)
except sqlite3.OperationalError:
pass
2022-01-07 18:16:13 +01:00
return subscriptions
2022-01-08 21:21:34 +01:00
2022-01-07 21:00:09 +01:00
def get_videos(channel_id: str,
2022-01-08 21:21:34 +01:00
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> list[Item]:
2022-01-07 21:00:09 +01:00
videos = list()
fullpath = path.join(basepath, filename)
2022-01-08 21:21:34 +01:00
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {} WHERE channel_id = ? ORDER BY published DESC'''.format(
VIDEO_TABLE)
cur.execute(select_query, [channel_id])
for result in cur.fetchall():
description = result[5]
link = result[4]
provider_id = result[2]
published = datetime.fromtimestamp(int(result[7]))
thumbnail = wx.Bitmap(result[6])
title = result[3]
watchtime = result[8]
videos.append(
Item(description,
link,
provider_id,
published,
thumbnail,
title,
watchtime=watchtime)) # Make an item from db
except sqlite3.OperationalError:
2022-01-07 21:00:09 +01:00
pass
return videos
2022-01-07 18:16:13 +01:00
2022-01-08 21:21:34 +01:00
def hash_string(string: str) -> str:
hash_object = hashlib.sha256(string.encode('utf-8'))
return hash_object.hexdigest()
2022-01-07 18:16:13 +01:00
def import_from_newpipe(filename) -> None:
if path.isfile(filename):
with open(filename, 'r') as subs:
sub_data = json.loads(subs.read())
for channel in sub_data['subscriptions']:
if channel['service_id'] == 0:
channel_id = urlparse(channel['url']).path.split('/').pop()
add_subscription(channel_id, channel['name'])
def make_sized_button(parent_pnl: wx.Panel, bitmap_or_str: Union[wx.Bitmap,
str],
text: str, callback: Callable) -> wx.BoxSizer:
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
if type(bitmap_or_str) == type(str):
if bitmap_or_str.startswith('http'): # type: ignore
bitmap = make_bitmap_from_url(bitmap_or_str) # type: ignore
else:
2022-01-07 18:16:13 +01:00
bitmap = wx.Bitmap(bitmap_or_str, wx.BITMAP_TYPE_ANY)
else:
bitmap = bitmap_or_str
btn_style = wx.BORDER_NONE | wx.BU_AUTODRAW | wx.BU_EXACTFIT | wx.BU_NOTEXT
btn_logo = wx.BitmapButton(parent_pnl,
wx.ID_ANY,
bitmap,
style=btn_style,
size=bitmap.GetSize())
2022-01-07 18:16:13 +01:00
btn_logo.SetToolTip(text)
btn_sizer.Add(btn_logo, 0, wx.BOTTOM | wx.EXPAND | wx.LEFT | wx.TOP, 1)
2022-01-07 18:16:13 +01:00
btn_text = wx.Button(parent_pnl,
wx.ID_ANY,
text,
style=wx.BORDER_NONE | wx.BU_AUTODRAW,
size=wx.Size(SCREEN_WIDTH - SIZE.GetWidth(),
SIZE.GetHeight()))
2022-01-07 18:16:13 +01:00
btn_text.SetToolTip(text)
btn_sizer.Add(btn_text, 0, wx.BOTTOM | wx.RIGHT | wx.TOP | wx.EXPAND, 1)
parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_logo)
parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_text)
2022-01-07 18:16:13 +01:00
return btn_sizer
2022-01-07 18:16:13 +01:00
def make_bitmap_from_url(logo_url: str, size: wx.Size = SIZE) -> wx.Bitmap:
res = requests.get(logo_url)
content = res.content
content_bytes = io.BytesIO(content)
image = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1)
2022-01-06 17:20:26 +01:00
scale_factor = image.GetWidth() / SCREEN_WIDTH
size.SetWidth(SCREEN_WIDTH)
height = image.GetHeight()
2022-01-07 18:16:13 +01:00
size.SetHeight(height / scale_factor)
image.Rescale(size.GetWidth(), size.GetHeight())
return wx.Bitmap(image)
2022-01-06 17:20:26 +01:00
def make_bitmap_from_file(path, size: wx.Size = SIZE) -> wx.Bitmap:
image = wx.Image(path, type=wx.BITMAP_TYPE_ANY, index=-1)
2022-01-07 18:16:13 +01:00
# scale_factor = image.GetWidth() / SCREEN_WIDTH
# size.SetWidth(SCREEN_WIDTH)
# height = image.GetHeight()
# size.SetHeight(height/scale_factor)
# image.Rescale(size.GetWidth(), size.GetHeight() )
2022-01-06 17:20:26 +01:00
return wx.Bitmap(image)
2022-01-07 18:16:13 +01:00
def resolve_svt_channel(svt_id: str) -> dict:
2022-01-06 17:20:26 +01:00
channels = {
"ch-barnkanalen": {
"name":
"Barnkanalen",
2022-01-07 18:16:13 +01:00
"thumbnail":
make_bitmap_from_file('{}/assets/Barnkanalen.png'.format(MYPATH))
},
"ch-svt1": {
2022-01-07 18:16:13 +01:00
"name": "SVT 1",
"thumbnail":
make_bitmap_from_file('{}/assets/SVT1.png'.format(MYPATH))
},
"ch-svt2": {
2022-01-07 18:16:13 +01:00
"name": "SVT 2",
"thumbnail":
make_bitmap_from_file('{}/assets/SVT2.png'.format(MYPATH))
},
"ch-svt24": {
"name":
"SVT 24",
2022-01-07 18:16:13 +01:00
"thumbnail":
make_bitmap_from_file('{}/assets/SVT24.png'.format(MYPATH))
},
"ch-kunskapskanalen": {
"name":
"Kunskapskanalen",
2022-01-07 18:16:13 +01:00
"thumbnail":
make_bitmap_from_file(
'{}/assets/Kunskapskanalen.png'.format(MYPATH))
},
"feed": {
"name": "Senaste program",
2022-01-07 18:16:13 +01:00
"thumbnail":
make_bitmap_from_file('{}/assets/SVT.png'.format(MYPATH))
},
}
return channels[svt_id]
2022-01-08 21:21:34 +01:00
def resolve_youtube_link(link):
ydl_opts = {
'format':
'worstvideo[ext=mp4]+worstaudio[ext=m4a]/worstvideo+worstaudio',
'container': 'webm dash'
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
try:
video = ydl.extract_info(link, download=False)
for form in video['formats']: # type: ignore
if form['height']:
if form['height'] < 480 and form['acodec'] != 'none':
link = form['url']
except youtube_dl.utils.ExtractorError and youtube_dl.utils.DownloadError:
pass
return link
2022-01-08 21:21:34 +01:00
def video_exists(video_id: str,
channel_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> bool:
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {} WHERE channel_id = ? AND video_id = ?'''.format(
VIDEO_TABLE)
cur.execute(select_query, [channel_id, video_id])
return bool(len(cur.fetchall()))
except sqlite3.OperationalError:
return False