website/admin/admin.py
ngn c8a4067596
fix the admin script return value
Signed-off-by: ngn <ngn@ngn.tf>
2025-01-22 11:12:37 +03:00

409 lines
13 KiB
Python

#!/bin/python3
"""
website/admin | Administration script for my personal website
written by ngn (https://ngn.tf) (2025)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from urllib.parse import quote_plus
from typing import Dict, List, Any
from datetime import datetime, UTC
from colorama import Fore, Style
from json import dumps, loads
from getpass import getpass
import requests as req
from os import getenv
from sys import argv
# logger used by the script
class Log:
def __init__(self) -> None:
self.reset = Fore.RESET + Style.RESET_ALL
def info(self, m: str) -> None:
print(Fore.BLUE + Style.BRIGHT + "[*]" + self.reset + " " + m)
def error(self, m: str) -> None:
print(Fore.RED + Style.BRIGHT + "[-]" + self.reset + " " + m)
def input(self, m: str) -> str:
return input(Fore.CYAN + Style.BRIGHT + "[?]" + self.reset + " " + m + ": ")
def password(self, m: str) -> str:
return getpass(Fore.CYAN + Style.BRIGHT + "[?]" + self.reset + " " + m + ": ")
# API interface for the admin endpoints
class AdminAPI:
def __init__(self, url: str, password: str) -> None:
self.languages: List[str] = [
"en",
"tr",
] # languages supported by multilang fields
self.password = password
self.api_url = url
def _title_to_id(self, title: str) -> str:
return title.lower().replace(" ", "_")
def _check_multilang_field(self, ml: Dict[str, str]) -> bool:
for lang in self.languages:
if lang in ml and ml[lang] != "":
return True
return False
def _api_url_join(self, path: str) -> str:
api_has_slash = self.api_url.endswith("/")
path_has_slash = path.startswith("/")
if api_has_slash or path_has_slash:
return self.api_url + path
elif api_has_slash and path_has_slash:
return self.api_url + path[1:]
else:
return self.api_url + "/" + path
def _to_json(self, res: req.Response) -> dict:
if res.status_code == 403:
raise Exception("Authentication failed")
json = res.json()
if json["error"] != "":
raise Exception("API error: %s" % json["error"])
return json
def PUT(self, url: str, data: dict) -> req.Response:
return self._to_json(
req.put(
self._api_url_join(url),
json=data,
headers={"Authorization": self.password},
)
)
def DELETE(self, url: str) -> req.Response:
return self._to_json(
req.delete(
self._api_url_join(url), headers={"Authorization": self.password}
)
)
def GET(self, url: str) -> req.Response:
return self._to_json(
req.get(self._api_url_join(url), headers={"Authorization": self.password})
)
def add_service(self, service: Dict[str, str]):
if "name" not in service or service["name"] == "":
raise Exception('Service structure is missing required "name" field')
if "desc" not in service:
raise Exception('Service structure is missing required "desc" field')
if (
("clear" not in service or service["clear"] == "")
and ("onion" not in service or service["onion"] == "")
and ("i2p" not in service or service["i2p"] == "")
):
raise Exception(
'Service structure is missing "clear", "onion" '
+ 'and "i2p" field, at least one needed'
)
if not self._check_multilang_field(service["desc"]):
raise Exception(
'Service structure field "desc" needs at least '
+ "one supported language entry"
)
self.PUT("/v1/admin/service/add", service)
def del_service(self, name: str) -> None:
if name == "":
raise Exception("Service name cannot be empty")
self.DELETE("/v1/admin/service/del?name=%s" % quote_plus(name))
def add_project(self, project: Dict[str, str]):
if "name" not in project or project["name"] == "":
raise Exception('Project structure is missing required "name" field')
if "desc" not in project:
raise Exception('Project structure is missing required "desc" field')
if not self._check_multilang_field(project["desc"]):
raise Exception(
'Project structure field "desc" needs at least '
+ "one supported language entry"
)
self.PUT("/v1/admin/project/add", project)
def del_project(self, name: str) -> None:
if name == "":
raise Exception("Project name cannot be empty")
self.DELETE("/v1/admin/project/del?name=%s" % quote_plus(name))
def check_services(self) -> None:
self.GET("/v1/admin/service/check")
def add_news(self, news: Dict[str, str]):
if "id" not in news or news["id"] == "":
raise Exception('News structure is missing required "id" field')
if "author" not in news or news["author"] == "":
raise Exception('News structure is missing required "author" field')
if "title" not in news:
raise Exception('News structure is missing required "title" field')
if "content" not in news:
raise Exception('News structure is missing required "content" field')
if not self._check_multilang_field(news["title"]):
raise Exception(
'News structure field "title" needs at least '
+ "one supported language entry"
)
if not self._check_multilang_field(news["content"]):
raise Exception(
'News structure field "content" needs at least '
+ "one supported language entry"
)
self.PUT("/v1/admin/news/add", news)
def del_news(self, id: str) -> None:
if id == "":
raise Exception("News ID cannot be empty")
self.DELETE("/v1/admin/news/del?id=%s" % quote_plus(id))
def logs(self) -> List[Dict[str, Any]]:
return self.GET("/v1/admin/logs")
class AdminScript:
def __init__(self):
self.log: Log = Log()
self.api: AdminAPI = None
self.commands = {
"add_service": self.add_service,
"del_service": self.del_service,
"add_project": self.add_project,
"del_project": self.del_project,
"add_news": self.add_news,
"del_news": self.del_news,
"check_services": self.check_services,
"logs": self.get_logs,
}
self.api_url_env = "API_URL"
self.password_env = "API_PASSWORD"
def __format_time(self, ts: int) -> str:
return datetime.fromtimestamp(ts, UTC).strftime("%H:%M:%S %d/%m/%Y")
def __load_json_file(self, file: str) -> Dict[str, Any]:
with open(file, "r") as f:
data = loads(f.read())
return data
def __dump_json_file(self, data: Dict[str, Any], file: str) -> None:
with open(file, "w") as f:
data = dumps(data, indent=2)
f.write(data)
def run(self) -> bool:
if len(argv) < 2 or len(argv) > 3:
self.log.error("Usage: %s [command] <file>" % argv[0])
self.log.info("Here is a list of available commands:")
for command in self.commands.keys():
print("\t%s" % command)
return False
url = getenv(self.api_url_env)
valid_cmd = False
if url is None:
self.log.error(
"Please specify the API URL using %s environment variable"
% self.api_url_env
)
return False
for cmd in self.commands:
if argv[1] == cmd:
valid_cmd = True
break
if not valid_cmd:
self.log.error(
"Invalid command, run the script with no commands to list the available commands"
)
return False
try:
password = getenv(self.password_env)
if password is None:
password = self.log.password("Please enter the admin password")
self.api = AdminAPI(url, password)
if len(argv) == 2:
self.handle_command(argv[1])
elif len(argv) == 3:
self.handle_command(argv[1], argv[2])
except KeyboardInterrupt:
self.log.error("Command cancelled")
return False
except Exception as e:
self.log.error("Command failed: %s" % e)
return False
# service commands
def add_service(self, data: Dict[str, Any] = None) -> None:
if data is None:
data: Dict[str, str] = {}
data["desc"] = {}
data["name"] = self.log.input("Serivce name")
for lang in self.api.languages:
data["desc"][lang] = self.log.input("Serivce desc (%s)" % lang)
data["check_url"] = self.log.input("Serivce status check URL")
data["clear"] = self.log.input("Serivce clearnet URL")
data["onion"] = self.log.input("Serivce onion URL")
data["i2p"] = self.log.input("Serivce I2P URL")
self.api.add_service(data)
self.log.info("Service has been added")
def del_service(self, data: Dict[str, Any] = None) -> None:
if data is None:
data: Dict[str, str] = {}
data["name"] = self.log.input("Service name")
self.api.del_service(data["name"])
self.log.info("Service has been deleted")
# project commands
def add_project(self, data: Dict[str, Any] = None) -> None:
if data is None:
data: Dict[str, str] = {}
data["desc"] = {}
data["name"] = self.log.input("Project name")
for lang in self.api.languages:
data["desc"][lang] = self.log.input("Project desc (%s)" % lang)
data["url"] = self.log.input("Project URL")
data["license"] = self.log.input("Project license")
self.api.add_project(data)
self.log.info("Project has been added")
def del_project(self, data: Dict[str, Any] = None) -> None:
if data is None:
data: Dict[str, str] = {}
data["name"] = self.log.input("Project name")
self.api.del_project(data["name"])
self.log.info("Project has been deleted")
# news command
def add_news(self, data: Dict[str, Any] = None) -> None:
if data is None:
data: Dict[str, str] = {}
data["title"] = {}
data["content"] = {}
data["id"] = self.log.input("News ID")
for lang in self.api.languages:
data["title"][lang] = self.log.input("News title (%s)" % lang)
data["author"] = self.log.input("News author")
for lang in self.api.languages:
data["content"][lang] = self.log.input("News content (%s)" % lang)
self.api.add_news(data)
self.log.info("News has been added")
def del_news(self, data: Dict[str, Any] = None) -> None:
if data is None:
data: Dict[str, str] = {}
data["id"] = self.log.input("News ID")
self.api.del_project(data["id"])
self.log.info("News has been deleted")
def check_services(self, data: Dict[str, Any] = None) -> None:
self.api.check_services()
self.log.info("Requested status check for all the services")
def get_logs(self, data: Dict[str, Any] = None) -> None:
logs = self.api.logs()
if logs["result"] is None or len(logs["result"]) == 0:
return self.log.info("No available logs")
for log in logs["result"]:
self.log.info(
"Time: %s | Action: %s"
% (self.__format_time(log["time"]), log["action"])
)
def handle_command(self, cmd: str, file: str = None) -> bool:
for command in self.commands.keys():
if command != cmd:
continue
data = None
try:
if file != "" and file is not None:
data = self.__load_json_file(file)
self.commands[cmd](data)
return True
except Exception as e:
self.log.error("Command failed: %s" % e)
return False
self.log.error("Invalid command: %s", cmd)
return False
if __name__ == "__main__":
script = AdminScript()
exit(0 if script.run() else 1)