|
|
|
@ -1,16 +1,19 @@
|
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
import hashlib
|
|
|
|
|
import io
|
|
|
|
|
import json
|
|
|
|
|
import sqlite3
|
|
|
|
|
import time
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from os import environ, makedirs, path
|
|
|
|
|
from typing import Callable, Union
|
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
from Items import Item
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
import wx
|
|
|
|
|
|
|
|
|
|
from Items import Item
|
|
|
|
|
|
|
|
|
|
SIZE = wx.Size(100, 68)
|
|
|
|
|
MYPATH = path.dirname(path.abspath(__file__))
|
|
|
|
|
SCREEN_WIDTH = int(720 / 2)
|
|
|
|
@ -45,22 +48,31 @@ def add_video(video_id: str,
|
|
|
|
|
provider_id: str,
|
|
|
|
|
description: str,
|
|
|
|
|
link: str,
|
|
|
|
|
published: datetime,
|
|
|
|
|
published: Union[datetime, time.struct_time],
|
|
|
|
|
bitmap: wx.Bitmap,
|
|
|
|
|
title: str,
|
|
|
|
|
watchtime: str,
|
|
|
|
|
watchtime: int,
|
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
|
filename: str = DB_FILE_NAME) -> None:
|
|
|
|
|
thumbnail = bitmap.GetData()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
timestamp = published.timestamp() #type: ignore
|
|
|
|
|
except AttributeError:
|
|
|
|
|
timestamp = time.mktime(published) #type: ignore
|
|
|
|
|
if not video_id:
|
|
|
|
|
video_id = hash_string(link)
|
|
|
|
|
thumbpath = path.join(basepath, 'thumbnails')
|
|
|
|
|
thumbnail = path.join(thumbpath, video_id)
|
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
|
if not path.isdir(basepath):
|
|
|
|
|
makedirs(basepath)
|
|
|
|
|
if not path.isdir(thumbpath):
|
|
|
|
|
makedirs(thumbpath)
|
|
|
|
|
bitmap.SaveFile(thumbnail, wx.BITMAP_TYPE_PNG)
|
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
|
cur = con.cursor()
|
|
|
|
|
create_query: str = '''CREATE TABLE IF NOT EXISTS {}
|
|
|
|
|
(video_id TEXT PRIMARY KEY, channel_id TEXT, provider_id TEXT,
|
|
|
|
|
title TEXT, link text, description TEXT, thumbnail BLOB, published DATETIME)'''.format(
|
|
|
|
|
VIDEO_TABLE)
|
|
|
|
|
title TEXT, link text, description TEXT, thumbnail TEXT,
|
|
|
|
|
published DATETIME, watchtime NUMBER)'''.format(VIDEO_TABLE)
|
|
|
|
|
cur.execute(create_query)
|
|
|
|
|
con.commit()
|
|
|
|
|
|
|
|
|
@ -68,8 +80,10 @@ def add_video(video_id: str,
|
|
|
|
|
VALUES(?,?,?,?,?,?,?,?,?) ON CONFLICT(video_id) DO NOTHING'''.format(
|
|
|
|
|
VIDEO_TABLE)
|
|
|
|
|
|
|
|
|
|
cur.execute(upsert_query, video_id, channel_id, provider_id, title, link,
|
|
|
|
|
description, thumbnail, published, watchtime)
|
|
|
|
|
cur.execute(upsert_query, [
|
|
|
|
|
video_id, channel_id, provider_id, title, link, description, thumbnail,
|
|
|
|
|
int(timestamp), watchtime
|
|
|
|
|
])
|
|
|
|
|
|
|
|
|
|
con.commit()
|
|
|
|
|
|
|
|
|
@ -83,6 +97,39 @@ def get_default_logo(providerid: str = 'default') -> wx.Bitmap:
|
|
|
|
|
return wx.Bitmap('{}/assets/Default.png'.format(MYPATH))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_latest(provider_id: str,
|
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
|
filename: str = DB_FILE_NAME) -> list[Item]:
|
|
|
|
|
videos = list()
|
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
|
try:
|
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
|
cur = con.cursor()
|
|
|
|
|
select_query = '''SELECT * FROM {} WHERE provider_id = ? ORDER BY published DESC LIMIT 50'''.format(
|
|
|
|
|
VIDEO_TABLE)
|
|
|
|
|
cur.execute(select_query, [provider_id])
|
|
|
|
|
for result in cur.fetchall():
|
|
|
|
|
description = result[5]
|
|
|
|
|
link = result[4]
|
|
|
|
|
provider_id = result[2]
|
|
|
|
|
published = datetime.fromtimestamp(int(result[7]))
|
|
|
|
|
thumbnail = wx.Bitmap(result[6])
|
|
|
|
|
title = result[3]
|
|
|
|
|
watchtime = result[8]
|
|
|
|
|
|
|
|
|
|
videos.append(
|
|
|
|
|
Item(description,
|
|
|
|
|
link,
|
|
|
|
|
provider_id,
|
|
|
|
|
published,
|
|
|
|
|
thumbnail,
|
|
|
|
|
title,
|
|
|
|
|
watchtime=watchtime)) # Make an item from db
|
|
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
pass
|
|
|
|
|
return videos
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_subscriptions(basepath: str = BASEPATH,
|
|
|
|
|
filename: str = DB_FILE_NAME) -> list[tuple[str, str]]:
|
|
|
|
|
subscriptions = list()
|
|
|
|
@ -95,20 +142,45 @@ def get_subscriptions(basepath: str = BASEPATH,
|
|
|
|
|
subscriptions.append(result)
|
|
|
|
|
return subscriptions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_videos(channel_id: str,
|
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
|
filename: str = DB_FILE_NAME) -> list[Item]:
|
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
|
filename: str = DB_FILE_NAME) -> list[Item]:
|
|
|
|
|
videos = list()
|
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
|
cur = con.cursor()
|
|
|
|
|
select_query = '''SELECT * FROM {} WHERE channel_id = ?'''.format(VIDEO_TABLE)
|
|
|
|
|
cur.execute(select_query,channel_id)
|
|
|
|
|
for result in cur.fetchall():
|
|
|
|
|
try:
|
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
|
cur = con.cursor()
|
|
|
|
|
select_query = '''SELECT * FROM {} WHERE channel_id = ? ORDER BY published DESC'''.format(
|
|
|
|
|
VIDEO_TABLE)
|
|
|
|
|
cur.execute(select_query, [channel_id])
|
|
|
|
|
for result in cur.fetchall():
|
|
|
|
|
description = result[5]
|
|
|
|
|
link = result[4]
|
|
|
|
|
provider_id = result[2]
|
|
|
|
|
published = datetime.fromtimestamp(int(result[7]))
|
|
|
|
|
thumbnail = wx.Bitmap(result[6])
|
|
|
|
|
title = result[3]
|
|
|
|
|
watchtime = result[8]
|
|
|
|
|
|
|
|
|
|
videos.append(
|
|
|
|
|
Item(description,
|
|
|
|
|
link,
|
|
|
|
|
provider_id,
|
|
|
|
|
published,
|
|
|
|
|
thumbnail,
|
|
|
|
|
title,
|
|
|
|
|
watchtime=watchtime)) # Make an item from db
|
|
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
pass
|
|
|
|
|
# videos.append(Item()) # Make an item from db
|
|
|
|
|
return videos
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def hash_string(string: str) -> str:
|
|
|
|
|
hash_object = hashlib.sha256(string.encode('utf-8'))
|
|
|
|
|
return hash_object.hexdigest()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def import_from_newpipe(filename) -> None:
|
|
|
|
|
|
|
|
|
|
if path.isfile(filename):
|
|
|
|
@ -215,3 +287,20 @@ def resolve_svt_channel(svt_id: str) -> dict:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return channels[svt_id]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def video_exists(video_id: str,
|
|
|
|
|
channel_id: str,
|
|
|
|
|
basepath: str = BASEPATH,
|
|
|
|
|
filename: str = DB_FILE_NAME) -> bool:
|
|
|
|
|
fullpath = path.join(basepath, filename)
|
|
|
|
|
try:
|
|
|
|
|
con = sqlite3.connect(fullpath)
|
|
|
|
|
cur = con.cursor()
|
|
|
|
|
select_query = '''SELECT * FROM {} WHERE channel_id = ? AND video_id = ?'''.format(
|
|
|
|
|
VIDEO_TABLE)
|
|
|
|
|
cur.execute(select_query, [channel_id, video_id])
|
|
|
|
|
|
|
|
|
|
return bool(len(cur.fetchall()))
|
|
|
|
|
except sqlite3.OperationalError:
|
|
|
|
|
return False
|
|
|
|
|