ocm-cli/src/ocm_cli/__init__.py
2024-08-09 11:46:24 +02:00

203 lines
6.7 KiB
Python
Executable file

#!/usr/bin/env python3
"""A very simple ocm implementation"""
__version__ = "0.0.1"
import argparse
import sys
import yaml
import uuid
from http.server import BaseHTTPRequestHandler, HTTPServer
from os import environ as env
from os import path, makedirs
from urllib.parse import urlparse, quote_plus
import pwd
import requests
class OCMHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(b"Hello, world!")
class OCM:
port: int
host: str
server: HTTPServer
dir: str
legacy_mode = False
config_dir: str
config: dict
config_file: str
user: str
fullname: str
def __init__(self, dir: str = env["PWD"]):
self.user = env["USER"]
self.fullname = pwd.getpwnam(self.user).pw_gecos.strip(',')
self.dir = dir
self.config_dir = path.join(env["HOME"], ".config/ocm")
self.config_file = path.join(self.config_dir, "config.yaml")
if not path.isdir(self.config_dir):
makedirs(self.config_dir)
if not path.isfile(self.config_file):
with open(self.config_file, "w") as f:
config = {self.user: {}}
f.write(yaml.dump(config))
self.read_config()
def read_config(self):
with open(self.config_file, "r") as f:
self.config = yaml.load(f, Loader=yaml.FullLoader)
def write_config(self):
with open(self.config_file, "w") as f:
f.write(yaml.dump(self.config))
def get_share_payload(self, provider_id: str) -> dict:
if self.legacy_mode:
return {
"name": self.config[self.user][provider_id]["name"],
"owner": self.user + '@localhost.local',
"permission":
self.config[self.user][provider_id]["permission"],
"protocol": {
"name": "webdav",
"options": {
"sharedSecret":
self.config[self.user][provider_id]["token"],
"permissions":
'{http://open-cloud-mesh.org/ns}share-permissions'
}
},
"providerId": provider_id,
"resourceType": "file",
# self.config[self.user][provider_id]["resourceType"],
"shareType": self.config[self.user][provider_id]["shareType"],
"shareWith": self.config[self.user][provider_id]["shareWith"],
}
return {}
def main(self):
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="command")
receive_parser = subparsers.add_parser("receive")
receive_parser.add_argument("--port", type=int, default=8080)
receive_parser.add_argument("--host", type=str, default="localhost")
receive_parser.add_argument("--directory", type=str, default=self.dir)
share_parser = subparsers.add_parser("share")
share_group = share_parser.add_mutually_exclusive_group(required=True)
share_group.add_argument("--file", type=str)
share_group.add_argument("--directory", type=str)
share_parser.add_argument("--to", type=str, required=True)
args = parser.parse_args()
if args.command == "receive":
self.port = args.port
self.host = args.host
self.dir = args.directory
self.server = HTTPServer((self.host, self.port), OCMHandler)
self.receive()
elif args.command == "share":
self.share(args.file, args.directory, args.to)
else:
parser.print_help()
def receive(self):
self.server.serve_forever()
def create_share(self, receiver, resource_type, name, share_endpoint) -> None:
provider_id = uuid.uuid4().hex
self.config[self.user][provider_id] = {
"name": name,
"permission": "read",
"resourceType": resource_type,
"shareType": "user",
"shareWith": receiver,
"token": uuid.uuid4().hex,
}
self.write_config()
payload = self.get_share_payload(provider_id)
print(payload)
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
share_req = requests.post(
share_endpoint,
headers=headers,
json=payload)
if share_req.status_code != 200:
print("Error: " + str(share_req.status_code))
print(share_req.text)
sys.exit(1)
share_data = share_req.json()
print(share_data)
def share(self, file, directory, to):
uparse = urlparse(to)
cloudid = f'{uparse.username}@{uparse.hostname}'
if not cloudid:
print("Username not specified")
sys.exit(1)
else:
print("Sharing with " + cloudid)
discovery_endpoint = uparse.scheme + "://" + \
uparse.hostname + "/.well-known/ocm"
req = requests.get(discovery_endpoint)
if req.status_code != 200:
print("Could not find ocm-provider at " + discovery_endpoint)
print("Trying again with legacy endpoint")
# Fallback on legacy endpoint
discovery_endpoint = uparse.scheme + \
'://' + uparse.hostname + "/ocm-provider"
req = requests.get(discovery_endpoint)
if req.status_code != 200:
print("Could not find ocm-provider at " + discovery_endpoint)
sys.exit(1)
data = req.json()
share_endpoint = data['endPoint'] + "/shares"
enabled = data['enabled']
if not enabled:
print("OCM is not enabled on this server")
sys.exit(1)
api_version = data['apiVersion']
resource_types = data['resourceTypes']
if api_version == '1.0-proposal1':
self.legacy_mode = True
# provider = uparse.hostname
# else:
# provider = data['provider']
file_or_folder = file
resource_type = "file"
if directory:
file_or_folder = directory
resource_type = "folder"
type_supported = False
for type in resource_types:
if type['name'] == resource_type:
type_supported = True
break
if not type_supported:
print("Resource type " + resource_type + " is not supported")
sys.exit(1)
self.create_share(
cloudid, resource_type, file_or_folder, share_endpoint)
if __name__ == "__main__":
try:
ocm = OCM()
ocm.main()
except KeyboardInterrupt:
print("Exiting...")
sys.exit(0)