You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cast/src/Utils/__init__.py

363 lines
12 KiB

#!/usr/bin/env python3
import hashlib
import io
import json
import sqlite3
import time
from datetime import datetime
from os import environ, makedirs, path
from typing import Callable, Union
from urllib.parse import urlparse
from youtubesearchpython import ChannelsSearch
import requests
import wx
import youtube_dl
from Items import Item
HEIGHT = int(1440 / 2)
BTN_HEIGHT = 40
SIZE = wx.Size(68,100)
MYPATH = path.dirname(path.abspath(__file__))
SCREEN_WIDTH = int(720 / 2)
BASEPATH = path.join(str(environ.get("HOME")), '.config/cast')
DB_FILE_NAME = 'cast.db'
SUB_TABLE = 'subscriptions'
VIDEO_TABLE = 'videos'
def add_subscription(channel_id: str,
name: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> None:
fullpath = path.join(basepath, filename)
thumbpath = path.join(basepath, 'thumbnails')
thumbnail = path.join(thumbpath, channel_id)
fullpath = path.join(basepath, filename)
if not path.isdir(thumbpath):
makedirs(thumbpath)
if not path.isfile(thumbnail):
channels_search = ChannelsSearch(name, limit=1).result()['result'][0] #type: ignore
bitmap = make_bitmap_from_url('https:' + channels_search['thumbnails'][0]['url'])
bitmap.SaveFile(thumbnail, wx.BITMAP_TYPE_PNG)
con = sqlite3.connect(fullpath)
cur = con.cursor()
create_query: str = '''CREATE TABLE IF NOT EXISTS {}
(channel_id TEXT PRIMARY KEY, channel_name TEXT, thumb_path TEXT)'''.format(
SUB_TABLE)
cur.execute(create_query)
con.commit()
upsert_query: str = '''INSERT INTO {} (channel_id, channel_name, thumb_path)
VALUES(?,?,?) ON CONFLICT(channel_id) DO NOTHING'''.format(
SUB_TABLE )
cur.execute(upsert_query, [channel_id, name, thumbnail])
con.commit()
def add_video(video_id: str,
channel_id: str,
provider_id: str,
description: str,
link: str,
published: Union[datetime, time.struct_time],
bitmap: wx.Bitmap,
title: str,
watchtime: int,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> None:
try:
timestamp = published.timestamp() #type: ignore
except AttributeError:
timestamp = time.mktime(published) #type: ignore
if not video_id:
video_id = hash_string(link)
thumbpath = path.join(basepath, 'thumbnails')
thumbnail = path.join(thumbpath, video_id)
fullpath = path.join(basepath, filename)
if not path.isdir(thumbpath):
makedirs(thumbpath)
if not path.isfile(thumbnail):
bitmap.SaveFile(thumbnail, wx.BITMAP_TYPE_PNG)
con = sqlite3.connect(fullpath)
cur = con.cursor()
create_query: str = '''CREATE TABLE IF NOT EXISTS {}
(video_id TEXT PRIMARY KEY, channel_id TEXT, provider_id TEXT,
title TEXT, link TEXT, description TEXT, thumbnail TEXT,
published DATETIME, watchtime NUMBER)'''.format(VIDEO_TABLE)
cur.execute(create_query)
con.commit()
upsert_query: str = '''INSERT INTO {} (video_id, channel_id, provider_id, title, link, description, thumbnail, published, watchtime)
VALUES(?,?,?,?,?,?,?,?,?) ON CONFLICT(video_id) DO NOTHING'''.format(
VIDEO_TABLE)
cur.execute(upsert_query, [
video_id, channel_id, provider_id, title, link, description, thumbnail,
int(timestamp), watchtime
])
con.commit()
def get_default_logo(providerid: str = 'default') -> wx.Bitmap:
if providerid == 'SVT':
return wx.Bitmap('{}/assets/SVT.png'.format(MYPATH))
if providerid == 'YouTube':
return wx.Bitmap('{}/assets/YouTube.png'.format(MYPATH))
else:
return wx.Bitmap('{}/assets/Default.png'.format(MYPATH))
def get_latest(provider_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> list[Item]:
videos = list()
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {} WHERE provider_id = ? ORDER BY published DESC LIMIT 50'''.format(
VIDEO_TABLE)
cur.execute(select_query, [provider_id])
for result in cur.fetchall():
description = result[5]
link = result[4]
provider_id = result[2]
published = datetime.fromtimestamp(int(result[7]))
thumbnail = wx.Bitmap(result[6])
title = result[3]
watchtime = result[8]
videos.append(
Item(description,
link,
provider_id,
published,
thumbnail,
title,
watchtime=watchtime)) # Make an item from db
except sqlite3.OperationalError:
pass
return videos
3 years ago
def get_latest_video_timestamp(channel_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> datetime:
3 years ago
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT max(published) FROM {} WHERE channel_id = ?'''.format(
VIDEO_TABLE)
cur.execute(select_query, [channel_id])
timestamp = cur.fetchone()[0]
except sqlite3.OperationalError:
timestamp = 0
pass
return datetime.fromtimestamp(timestamp)
def get_subscriptions(basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> list[tuple[str, str]]:
subscriptions = list()
fullpath = path.join(basepath, filename)
try:
3 years ago
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {}'''.format(SUB_TABLE)
cur.execute(select_query)
for result in cur.fetchall():
subscriptions.append(result)
except sqlite3.OperationalError:
pass
return subscriptions
def get_videos(channel_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> list[Item]:
videos = list()
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {} WHERE channel_id = ? ORDER BY published DESC'''.format(
VIDEO_TABLE)
cur.execute(select_query, [channel_id])
for result in cur.fetchall():
description = result[5]
link = result[4]
provider_id = result[2]
published = datetime.fromtimestamp(int(result[7]))
thumbnail = wx.Bitmap(result[6])
title = result[3]
watchtime = result[8]
videos.append(
Item(description,
link,
provider_id,
published,
thumbnail,
title,
watchtime=watchtime)) # Make an item from db
except sqlite3.OperationalError:
pass
return videos
def hash_string(string: str) -> str:
hash_object = hashlib.sha256(string.encode('utf-8'))
return hash_object.hexdigest()
def import_from_newpipe(filename) -> None:
if path.isfile(filename):
with open(filename, 'r') as subs:
sub_data = json.loads(subs.read())
for channel in sub_data['subscriptions']:
if channel['service_id'] == 0:
channel_id = urlparse(channel['url']).path.split('/').pop()
add_subscription(channel_id, channel['name'])
def make_sized_button(parent_pnl: wx.Panel, bitmap_or_str: Union[wx.Bitmap,
str],
text: str, callback: Callable) -> wx.BoxSizer:
btn_sizer = wx.StaticBoxSizer(wx.HORIZONTAL,parent_pnl)
if type(bitmap_or_str) == type(str):
if bitmap_or_str.startswith('http'): # type: ignore
bitmap = make_bitmap_from_url(bitmap_or_str) # type: ignore
else:
bitmap = wx.Bitmap(bitmap_or_str, wx.BITMAP_TYPE_ANY)
else:
bitmap = bitmap_or_str
btn_style = wx.BORDER_NONE | wx.BU_AUTODRAW | wx.BU_EXACTFIT | wx.BU_NOTEXT
btn_logo = wx.BitmapButton(parent_pnl,
wx.ID_ANY,
bitmap,
style=btn_style,
size=wx.Size(100,68))
btn_logo.SetToolTip(text)
btn_sizer.Add(btn_logo, 0, wx.EXPAND, 1)
btn_text = wx.Button(parent_pnl,
wx.ID_ANY,
text,
style=wx.BORDER_NONE | wx.BU_AUTODRAW,
size=wx.Size(SCREEN_WIDTH - 100,
SIZE.GetHeight()))
btn_text.SetToolTip(text)
btn_sizer.Add(btn_text, 0, wx.EXPAND, 1)
parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_logo)
parent_pnl.Bind(wx.EVT_BUTTON, callback, btn_text)
return btn_sizer
def make_bitmap_from_url(logo_url: str, size: wx.Size = SIZE) -> wx.Bitmap:
res = requests.get(logo_url)
content = res.content
content_bytes = io.BytesIO(content)
image = wx.Image(content_bytes, type=wx.BITMAP_TYPE_ANY, index=-1)
scale_factor = image.GetWidth() / size.GetWidth()
size.SetWidth(image.GetWidth() / scale_factor)
3 years ago
height = image.GetHeight()
size.SetHeight(height / scale_factor)
image.Rescale(size.GetWidth(), size.GetHeight())
return wx.Bitmap(image)
3 years ago
def make_bitmap_from_file(path, size: wx.Size = SIZE) -> wx.Bitmap:
image = wx.Image(path, type=wx.BITMAP_TYPE_ANY, index=-1)
scale_factor = image.GetWidth() / size.GetWidth()
size.SetWidth(image.GetWidth() / scale_factor)
height = image.GetHeight()
size.SetHeight(height / scale_factor)
image.Rescale(size.GetWidth(), size.GetHeight())
3 years ago
return wx.Bitmap(image)
def resolve_svt_channel(svt_id: str) -> dict:
3 years ago
channels = {
"ch-barnkanalen": {
"name":
"Barnkanalen",
"thumbnail":
make_bitmap_from_file('{}/assets/Barnkanalen.png'.format(MYPATH))
},
"ch-svt1": {
"name": "SVT 1",
"thumbnail":
make_bitmap_from_file('{}/assets/SVT1.png'.format(MYPATH))
},
"ch-svt2": {
"name": "SVT 2",
"thumbnail":
make_bitmap_from_file('{}/assets/SVT2.png'.format(MYPATH))
},
"ch-svt24": {
"name":
"SVT 24",
"thumbnail":
make_bitmap_from_file('{}/assets/SVT24.png'.format(MYPATH))
},
"ch-kunskapskanalen": {
"name":
"Kunskapskanalen",
"thumbnail":
make_bitmap_from_file(
'{}/assets/Kunskapskanalen.png'.format(MYPATH))
},
"feed": {
"name": "Senaste program",
"thumbnail":
make_bitmap_from_file('{}/assets/SVT.png'.format(MYPATH))
},
}
return channels[svt_id]
def resolve_youtube_link(link):
ydl_opts = {
'format':
'worstvideo[ext=mp4]+worstaudio[ext=m4a]/worstvideo+worstaudio',
'container': 'webm dash'
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
try:
video = ydl.extract_info(link, download=False)
for form in video['formats']: # type: ignore
if form['height']:
if form['height'] < 480 and form['acodec'] != 'none':
link = form['url']
except youtube_dl.utils.ExtractorError and youtube_dl.utils.DownloadError:
pass
return link
def video_exists(video_id: str,
channel_id: str,
basepath: str = BASEPATH,
filename: str = DB_FILE_NAME) -> bool:
fullpath = path.join(basepath, filename)
try:
con = sqlite3.connect(fullpath)
cur = con.cursor()
select_query = '''SELECT * FROM {} WHERE channel_id = ? AND video_id = ?'''.format(
VIDEO_TABLE)
cur.execute(select_query, [channel_id, video_id])
return bool(len(cur.fetchall()))
except sqlite3.OperationalError:
return False