Micke Nordin 4 months ago
parent 7a1a3e1da0
commit 0fb3c2d178

@ -0,0 +1,18 @@
[build-system]
requires = ["flit_core >=3.2,<4"]
build-backend = "flit_core.buildapi"
[project]
name = "ocm_cli"
authors = [{name = "Micke Nordin, email = "hej@mic.ke"}]
readme = "README.md"
dynamic = ["version", "description"]
classifiers = [
"License :: OSI Approved :: GNU Affero General Public License version 3",
]
[project.urls]
Home = "https://code.smolnet.org/micke/ocm-cli"
[project.scripts]
ocm-cli = "ocm_cli:main"

@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""A very simple ocm implementation"""
__version__ = "0.0.1"
import argparse
import sys
import yaml
import uuid
from http.server import BaseHTTPRequestHandler, HTTPServer
from os import environ as env
from os import path, makedirs
from urllib.parse import urlparse
import pwd
import requests
class OCMHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(b"Hello, world!")
class OCM:
port: int
host: str
server: HTTPServer
dir: str
legacy_mode = False
config_dir: str
config: dict
config_file: str
user: str
fullname: str
def __init__(self, dir: str = env["PWD"]):
self.user = env["USER"]
self.fullname = pwd.getpwnam(self.user).pw_gecos.strip(',')
self.dir = dir
self.config_dir = path.join(env["HOME"], ".config/ocm")
self.config_file = path.join(self.config_dir, "config.yaml")
if not path.isdir(self.config_dir):
makedirs(self.config_dir)
if not path.isfile(self.config_file):
with open(self.config_file, "w") as f:
config = {self.user: {}}
f.write(yaml.dump(config))
self.read_config()
def read_config(self):
with open(self.config_file, "r") as f:
self.config = yaml.load(f, Loader=yaml.FullLoader)
def write_config(self):
with open(self.config_file, "w") as f:
f.write(yaml.dump(self.config))
def get_share_payload(self, provider_id: str) -> dict:
if self.legacy_mode:
return {
"name": self.config[self.user][provider_id]["name"],
"owner": self.user,
"permission":
self.config[self.user][provider_id]["permission"],
"protocol": {
"name": "webdav",
"options": {
"sharedSecret":
self.config[self.user][provider_id]["token"],
"permissions":
'{http://open-cloud-mesh.org/ns}share-permissions'
}
},
"providerId": provider_id,
"resourceType": "file",
# self.config[self.user][provider_id]["resourceType"],
"shareType": self.config[self.user][provider_id]["shareType"],
"shareWith": self.config[self.user][provider_id]["shareWith"],
}
return {}
def main(self):
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="command")
receive_parser = subparsers.add_parser("receive")
receive_parser.add_argument("--port", type=int, default=8080)
receive_parser.add_argument("--host", type=str, default="localhost")
receive_parser.add_argument("--directory", type=str, default=self.dir)
share_parser = subparsers.add_parser("share")
share_group = share_parser.add_mutually_exclusive_group(required=True)
share_group.add_argument("--file", type=str)
share_group.add_argument("--directory", type=str)
share_parser.add_argument("--to", type=str, required=True)
args = parser.parse_args()
if args.command == "receive":
self.port = args.port
self.host = args.host
self.dir = args.directory
self.server = HTTPServer((self.host, self.port), OCMHandler)
self.receive()
elif args.command == "share":
self.share(args.file, args.directory, args.to)
else:
parser.print_help()
def receive(self):
self.server.serve_forever()
def create_share(self, receiver, resource_type, name) -> str:
provider_id = uuid.uuid4().hex
self.config[self.user][provider_id] = {
"name": name,
"permission": "read",
"resourceType": resource_type,
"shareType": "user",
"shareWith": receiver,
"token": uuid.uuid4().hex,
}
self.write_config()
return provider_id
def share(self, file, directory, to):
uparse = urlparse(to)
receiver = uparse.username
if not receiver:
print("Username not specified")
sys.exit(1)
else:
print("Sharing with " + receiver)
discovery_endpoint = uparse.scheme + "://" + \
uparse.hostname + "/.well-known/ocm"
req = requests.get(discovery_endpoint)
if req.status_code != 200:
print("Could not find ocm-provider at " + discovery_endpoint)
print("Trying again with legacy endpoint")
# Fallback on legacy endpoint
discovery_endpoint = uparse.scheme + \
'://' + uparse.hostname + "/ocm-provider"
req = requests.get(discovery_endpoint)
if req.status_code != 200:
print("Could not find ocm-provider at " + discovery_endpoint)
sys.exit(1)
data = req.json()
share_endpoint = data['endPoint'] + "/shares"
enabled = data['enabled']
if not enabled:
print("OCM is not enabled on this server")
sys.exit(1)
api_version = data['apiVersion']
resource_types = data['resourceTypes']
if api_version == '1.0-proposal1':
self.legacy_mode = True
# provider = uparse.hostname
# else:
# provider = data['provider']
file_or_folder = file
resource_type = "file"
if directory:
file_or_folder = directory
resource_type = "folder"
type_supported = False
for type in resource_types:
if type['name'] == resource_type:
type_supported = True
break
if not type_supported:
print("Resource type " + resource_type + " is not supported")
sys.exit(1)
provider_id = self.create_share(
receiver, resource_type, file_or_folder)
payload = self.get_share_payload(provider_id)
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
share_req = requests.post(
share_endpoint,
headers=headers,
json=payload)
if share_req.status_code != 200:
print("Error: " + str(share_req.status_code))
print(share_req.text)
sys.exit(1)
share_data = share_req.json()
print(share_data)
if __name__ == "__main__":
try:
ocm = OCM()
ocm.main()
except KeyboardInterrupt:
print("Exiting...")
sys.exit(0)
Loading…
Cancel
Save