Module ARgorithmToolkit.cli
CLI tool for ARgorithm made using typer.
Example
$ ARgorithm –help
Expand source code
# pylint: disable=no-self-use
# pylint: disable=too-many-statements
# pylint: disable=raise-missing-from
"""CLI tool for ARgorithm made using typer.
Example:
$ ARgorithm --help
"""
import os
import sys
import re
import json
import traceback
import requests
import typer
from halo import Halo
import ARgorithmToolkit
from ARgorithmToolkit.security import injection_check,execution_check
from ARgorithmToolkit.parser import input_data,create,validateconfig,ValidationError
CLOUD_URL = "https://argorithm.el.r.appspot.com"
CACHE_DIR = typer.get_app_dir("ARgorithm")
app = typer.Typer(help="ARgorithm CLI")
class Messager():
"""Class for pretty printing messages using typer."""
def msg(self,tag:str,title:str,message:str,color:str):
"""Pretty messaging for standard log messages."""
code = typer.style(f"[{tag.upper()}]: {title.upper()}" , fg=color , bold=True)
typer.echo(code)
if message:
typer.echo(message)
def info(self,title:str,message:str=""):
"""Information message."""
self.msg("info",title,message,typer.colors.BLUE)
def warn(self,title:str,message:str=""):
"""Warning message."""
self.msg("error",title,message,typer.colors.YELLOW)
def fail(self,title:str,message:str=""):
"""Error message."""
self.msg("critical error",title,message,typer.colors.RED)
def good(self,title:str,message:str=""):
"""Success message."""
self.msg("success",title,message,typer.colors.GREEN)
def menuitem(self,argorithm):
"""pretty print argorithm details."""
head = typer.style(f"- {argorithm['argorithmID']}",fg=typer.colors.GREEN,bold=True)
typer.echo(head)
typer.secho(f"by {argorithm['maintainer']}",fg=typer.colors.CYAN)
if argorithm['description']:
typer.echo(f"{argorithm['description']}")
if argorithm['parameters']:
typer.echo("Parameters")
for key in argorithm['parameters']:
if argorithm['parameters'][key]['description']:
typer.echo(f"- {key} : {argorithm['parameters'][key]['description']}")
else:
typer.echo(f"- {key}")
typer.echo()
def state(self,states):
"""pretty print states."""
states = states['data']
for state in states:
typer.echo('\n'+'-'*50)
typer.secho(state['state_type'],bold=True)
typer.secho('\t'+state['comments'],fg=typer.colors.CYAN)
if state['state_def']:
for key in state['state_def']:
typer.echo(f"\t{key} : {state['state_def'][key]}")
msg = Messager()
class Settings():
"""handles connection endpoints."""
endpoint:str=CLOUD_URL
def get_endpoint(self):
"""returns required endpoint."""
config_file = os.path.join(CACHE_DIR , "config")
if os.path.isfile(config_file):
with open(config_file,"r") as conf:
self.endpoint = conf.read()
return self.endpoint
def set_endpoint(self,url):
"""set up cloud endpoint."""
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.get(url+"/argorithm")
if rq.status_code == 200:
msg.good("Connected",f"web requests will now go to {url}")
else:
raise AttributeError("Not a server endpoint")
except ValueError as ve:
msg.warn("Please try again with proper URL")
raise typer.Exit(1) from ve
except AttributeError as ex:
msg.fail(str(ex))
raise typer.Exit(1) from ex
except Exception as ex:
msg.fail("Endpoint couldnt be found.")
print(ex)
raise typer.Exit(2) from ex
config_file = os.path.join(CACHE_DIR , "config")
with open(config_file,'w') as config:
config.write(url)
app_settings = Settings()
class AuthManager():
"""Handles authentication."""
def __init__(self):
"""sets up credfile to store credentials."""
self.credfile = os.path.join(CACHE_DIR,".credentials")
def register(self):
"""registers account."""
email = typer.prompt("Enter email address")
msg.info("Password criteria",'- between 8 to 25 characters\n - contains atleast one number\n - contains atleast lower case alphabet\n - contains atleast uppercase alphabet')
rules = r"^(?=.*[0-9])(?=.*[a-z])(?=.*[A-Z]).{8,25}$"
password = typer.prompt("Enter password",confirmation_prompt=True,hide_input=True)
if not re.search(rules,password):
msg.warn("password unacceptable")
typer.Exit(1)
return
url = app_settings.get_endpoint()+'/programmers/register'
data = {
"username" : email,
"password" : password
}
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,data)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
msg.good("Account created","These credentials will be used as both programmer and user credentials")
return
if rq.status_code == 409:
msg.warn("Invalid email","email is already in use. Try login")
raise typer.Exit(0)
msg.fail("Error","Contact developer")
def login_prompt(self):
"""login prompt to enter credentials."""
email = typer.prompt("Enter email address")
password = typer.prompt("Enter password",hide_input=True)
url = f"{app_settings.get_endpoint()}/programmers/login"
data = {
"username" : email,
"password" : password
}
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,data)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 404:
msg.warn("User not found","please enter valid email")
elif rq.status_code == 500:
msg.fail("Server error","contact developer")
elif rq.status_code == 401:
msg.warn("incorrect password")
elif rq.status_code == 402:
msg.warn("please verify account first")
elif rq.status_code == 200:
msg.good("logged in successfully","credentials saved in cache")
token = json.loads(rq.content)['access_token']
with open(self.credfile,'w+') as cred:
cred.write(token)
return token
raise typer.Exit(1)
def get_token(self,flag=False):
"""returns valid authorization token."""
url = app_settings.get_endpoint()
if os.path.isfile(self.credfile):
with open(self.credfile,'r') as cred:
token = cred.read()
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(f"{url}/programmers/verify" , headers = {"authorization" : "Bearer "+token})
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
override = False
if flag:
override = typer.confirm("Found existing valid token. do you want to login again?")
if not override:
return token
else:
msg.warn("Valid credentials not found","Please enter login credentials again")
token = self.login_prompt()
with open(self.credfile,'w+') as cred:
cred.write(token)
return token
def get_header(self):
"""Get authentication header."""
header=None
if self.auth_check():
token = self.get_token()
header={"authorization":"Bearer "+token}
return header
def auth_check(self):
"""checks if AUTH is enabled on server."""
try:
url = app_settings.get_endpoint() + "/auth"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.get(url)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
return True
return False
except Exception as ex:
msg.warn("authentication error",str(ex))
raise typer.Abort()
def remove_token(self):
"""logout by deleting access token."""
try:
os.remove(self.credfile)
msg.good("Logged out")
except Exception:
msg.warn("No credentials found")
authmanager = AuthManager()
def name_check(value:str):
"""checks validity of argorithmID."""
rules = r"^[A-Za-z_]+$"
m = re.search(rules,value)
if m is None:
msg.fail("Invalid name" , "argorithm name should be [A-Za-z_]")
raise typer.Exit(code=1)
return value
def autocomplete(incomplete:str):
"""autocomplete function for finding argorithms."""
local_directory , incomplete = os.path.split(incomplete)
if local_directory == '':
local_directory = '.'
files = os.listdir(local_directory)
res = []
l = len(incomplete)
for filename in files:
if filename[:l] == incomplete and filename[-3:] == '.py':
if local_directory == '.':
res.append(filename)
else:
res.append(os.path.join(local_directory,filename))
return res
class CodeManager():
"""Handles file verification, testing and submissions."""
def __init__(self,filename):
"""gets filepath for code file and config file."""
directory , argorithm_file = os.path.split(filename)
argorithmID = name_check(argorithm_file[:-3])
directory = os.getcwd() if not directory else directory
self.codepath = os.path.join(directory,argorithm_file)
self.configpath = os.path.join(directory,argorithmID+".config.json")
if not os.path.isfile(self.codepath):
msg.warn("Python file not found",'use the init command first')
raise typer.Abort()
if not os.path.isfile(self.configpath):
msg.warn("config file not found",'use the configure command first')
raise typer.Abort()
def verify(self):
"""checks whether files are valid or not."""
injection_check(self.codepath)
validateconfig(self.configpath)
def test(self,prompt:bool=False):
"""Execute code locally."""
try:
with open(self.configpath,'r') as configfile:
config = json.load(configfile)
parameters = config['example']
if prompt:
user = typer.confirm("Do you want to add user input?")
if user:
parameters = input_data(config['parameters'])
if parameters and prompt:
typer.echo("using parameters:")
for key in parameters:
typer.echo(f"- {key} : {parameters[key]}")
return execution_check(self.codepath,self.configpath,parameters)
except AssertionError:
msg.warn("Execution Failed","execution should return ARgorithmToolkit.StateSet")
raise typer.Exit(1)
except Exception:
msg.warn("Execution Failed")
traceback.print_exception(*sys.exc_info())
raise typer.Exit(1)
def generate_submission(self):
"""Generate the files for submission."""
_ , local_file = os.path.split(self.codepath)
with open(self.configpath,'r') as configfile:
data = json.load(configfile)
files = [
('file', (local_file, open(self.codepath, 'rb'), 'application/octet')),
('data', ('data', json.dumps(data), 'application/json')),
]
header = authmanager.get_header()
return files,header
def submit(self):
"""Submit code to server."""
files,header = self.generate_submission()
url = app_settings.get_endpoint()+"/argorithms/insert"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,files=files,headers=header)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
msg.good("Submitted")
elif rq.status_code == 409:
msg.warn("Already exists","An argorithm with this name already exists,try another argorithm name")
elif rq.status_code == 406:
msg.warn("File name was invalid","The name shoud be of type [A-Za-z_]")
elif rq.status_code == 400:
msg.warn("Incorrect file format","please refer documentation")
else:
msg.fail("Application error")
def update(self):
"""Update code in servers."""
files,header = self.generate_submission()
url = app_settings.get_endpoint()+"/argorithms/update"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,files=files,headers=header)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
msg.good("updated")
elif rq.status_code == 404:
msg.warn("Not found","Try submit command to add argorithm to server")
elif rq.status_code == 401:
msg.warn("Unauthorized","Only author of argorithm or admin is allowed to alter argorithms")
else:
msg.fail("Application error")
@app.command()
def connect(
local:bool = typer.Option(False,"--local",'-l',help="Connects to server running on localhost",show_default=False)
):
"""Connect to your endpoint.
More info at https://argorithm.github.io/toolkit/cli#connect
"""
if local:
endpoint = "http://localhost"
else:
endpoint = typer.prompt("Enter server endpoint",default=app_settings.get_endpoint())
app_settings.set_endpoint(endpoint)
@app.command()
def init(
name:str = typer.Argument(...,help="The name given to the argorithm [A-Za-z_]",callback=name_check)
):
"""Create Blank code template and config template for ARgorithm.
More info at https://argorithm.github.io/toolkit/cli#init
"""
typer.echo(f"Creating empty template for {name}")
with open(os.path.join(ARgorithmToolkit.__path__[0],'data/template.py',),'rb') as template:
starter = template.read()
filename = f"{name}.py"
with open(filename , "wb") as codefile:
codefile.write(starter)
msg.good("Template generated","refer documentation at https://argorithm.github.io/toolkit")
@app.command()
def configure(
filepath:str=typer.Argument(... , help="The code file to be configured" , autocompletion=autocomplete),
blank:bool=typer.Option(False,'-b','--blank',help="create blank config file"),
validate:bool=typer.Option(False,'-v','--validate',help="only validate existing config file,Dont create")
):
"""Create configuration file for argorithm.
More info at https://argorithm.github.io/toolkit/cli#configure
"""
directory,filename = os.path.split(filepath)
name = filename[:-3]
if not os.path.isfile(filepath):
msg.warn("Python file not found",'use the init command first')
raise typer.Abort()
configpath = os.path.join(os.getcwd(),directory,name+".config.json")
try:
validateconfig(configpath)
typer.echo("Valid config file found")
if not blank:
if not validate:
redo = typer.confirm(f"Do you remake the {name}.config.json?")
if redo:
create(configpath)
raise typer.Exit(0)
except ValidationError:
if not validate and not blank:
create(configpath)
raise typer.Exit(0)
except FileNotFoundError:
if validate:
msg.warn("Config file not found")
raise typer.Exit(0)
if not blank:
create(configpath)
raise typer.Exit(0)
config = {
"argorithmID" : name,
"file" : name+".py",
"function" : "run",
"parameters" : {},
"example" : {},
"description" : ""
}
with open(configpath, "w") as configfile:
json.dump(config,configfile,indent=4)
@app.command()
def submit(
filename:str=typer.Argument(... , help="The code file to be submitted" , autocompletion=autocomplete)
):
"""Submit argorithms to server.
More info at https://argorithm.github.io/toolkit/cli#submit
"""
code = CodeManager(filename)
with Halo(text='Verifying', spinner='dots'):
code.verify()
msg.good("Files verified")
with Halo(text='Testing', spinner='dots'):
code.test(prompt=False)
msg.good("Files verified")
code.submit()
@app.command()
def update(
filename:str=typer.Argument(... , help="The code file to be submitted" , autocompletion=autocomplete)
):
"""Updates pre existing argorithms at server.
More info at https://argorithm.github.io/toolkit/cli#update
"""
code = CodeManager(filename)
with Halo(text='Verifying', spinner='dots'):
code.verify()
msg.good("Files verified")
with Halo(text='Testing', spinner='dots'):
code.test(prompt=False)
msg.good("Files verified")
code.update()
@app.command()
def delete(
argorithm_id:str = typer.Argument(... , help="argorithmID of function to be deleted.")
):
"""Deletes argorithm from server.
More info at https://argorithm.github.io/toolkit/cli#delete
"""
params = search(argorithm_id)
flag = typer.confirm("Are you sure you want to delete it?")
if not flag:
typer.echo("Not deleting")
raise typer.Abort()
header=authmanager.get_header()
data = {
"argorithmID" : params["argorithmID"],
}
url = app_settings.get_endpoint()+"/argorithms/delete"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,json=data,headers=header)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
msg.info("Deleted successfully",)
elif rq.status_code == 401:
msg.warn("Not authorized","only author of argorithm or admin can delete it")
else:
msg.fail("application error")
def search(argid,show=True):
"""Searches argorithm on server."""
url = app_settings.get_endpoint()+"/argorithms/view/"+argid
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.get(url)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
data = json.loads(rq.content)
if show:
typer.echo("Found argorithm")
msg.menuitem(data)
return data
if rq.status_code == 404:
msg.warn("Not found",f"{argid} not found in database")
raise typer.Exit(1)
msg.fail("Application error")
raise typer.Exit(1)
@app.command("list")
def list_argorithms():
"""Get list of argorithms in server.
More info at https://argorithm.github.io/toolkit/cli#list
"""
url = app_settings.get_endpoint()+"/argorithms/list"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.get(url)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
menu = json.loads(rq.content)
if len(menu) == 0:
msg.warn("No argorithms")
raise typer.Exit(1)
for item in menu:
msg.menuitem(item)
@app.command()
def test(
argorithm_id:str = typer.Argument(... , help="argorithmID of function to be called. If not passed then menu will be presented"),
output:bool = typer.Option(False,'--output','-o',help="print results in json format",show_default=False),
user_input:bool = typer.Option(False,'--user-input','-u',help="if present, takes input from user",show_default=False),
):
"""Test argorithms stored in server.
More info at https://argorithm.github.io/toolkit/cli#test
"""
params = search(argorithm_id,show=not output)
header=authmanager.get_header()
data = {
"argorithmID" : params["argorithmID"],
"parameters" : params["example"]
}
if user_input:
data["parameters"] = input_data(params["parameters"])
url = app_settings.get_endpoint()+"/argorithms/run"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,json=data,headers=header)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
if output:
print(rq.text)
else:
msg.state(json.loads(rq.content))
elif rq.status_code == 401:
msg.warn("Authentication failed",json.loads(rq.content)['detail'])
else:
msg.fail("application error")
@app.command()
def execute(
filename:str=typer.Argument(... , help="The code file to be submitted" , autocompletion=autocomplete)
):
"""Execute locally stored ARgorithms.
More info at https://argorithm.github.io/toolkit/cli#execute
"""
code = CodeManager(filename)
code.verify()
states = [x.content for x in code.test(prompt=True)]
msg.state({"data" : states})
account_app = typer.Typer(help="Manages account")
app.add_typer(account_app,name="account")
@account_app.command()
def login():
"""Log in to ARgorithmServer.
Only is AUTH is enabled on server. More info at
https://argorithm.github.io/toolkit/cli#login
"""
if not authmanager.auth_check():
msg.warn("AUTH disabled at server")
raise typer.Exit(0)
authmanager.get_token(flag=True)
@account_app.command()
def signup():
"""Create new programmer and user account in ARgorithmServer.
Only is AUTH is enabled on server. More info at
https://argorithm.github.io/toolkit/cli#signup
"""
if not authmanager.auth_check():
msg.warn("AUTH disabled at server")
raise typer.Exit(0)
authmanager.register()
@account_app.command()
def logout():
"""Remove pre-existing login credentials.
More info at https://argorithm.github.io/toolkit/cli#logout
"""
authmanager.remove_token()
admin_app = typer.Typer(help="Administrator level methods")
app.add_typer(admin_app,name="admin")
@admin_app.callback()
def admin_auth_check():
"""Check if auth is enabled for admin routes."""
if not authmanager.auth_check():
msg.warn("AUTH is disabled at this endpoint")
raise typer.Exit(1)
@admin_app.command()
def grant(
email:str=typer.Argument( ... , help="The account email that would be granted admin access")
):
"""Grants admin acess.
More info at https://argorithm.github.io/toolkit/cli#grant
"""
data = {
"email" : email
}
header=authmanager.get_header()
url = app_settings.get_endpoint()+"/admin/grant"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,json=data,headers=header)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
msg.good("granted admin priveleges")
elif rq.status_code == 401:
msg.warn("Not authorized","You need admin priveleges")
elif rq.status_code == 406:
msg.warn("Blacklisted email","This email is blacklisted thus cannot be granted admin priveleges")
elif rq.status_code == 404:
msg.warn("Not found","No such email registered")
else:
msg.fail("Application Error")
@admin_app.command()
def revoke(
email:str=typer.Argument( ... , help="The account email that would be lose admin access")
):
"""Revokes admin acess.
More info at https://argorithm.github.io/toolkit/cli#revoke
"""
data = {
"email" : email
}
header=authmanager.get_header()
url = app_settings.get_endpoint()+"/admin/revoke"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,json=data,headers=header)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
msg.good("revoked admin priveleges")
elif rq.status_code == 401:
msg.warn("Not authorized","You need admin priveleges")
elif rq.status_code == 404:
msg.warn("Not found","No such email registered")
else:
msg.fail("Application Error")
@admin_app.command("delete")
def account_delete(
email:str=typer.Argument( ... , help="The account email that would be granted admin access"),
user:bool=typer.Option(False,'-u','--user',help="If flag is present, it will delete the user account")
):
"""Deletes account.
Requires admin priveleges. More info at
https://argorithm.github.io/toolkit/cli#delete_1
"""
data = {
"email" : email
}
header=authmanager.get_header()
if user:
url = app_settings.get_endpoint()+"/admin/delete_user"
else:
url = app_settings.get_endpoint()+"/admin/delete_programmer"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,json=data,headers=header)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
msg.good("deleted account")
elif rq.status_code == 401:
msg.warn("Not authorized","You need admin priveleges")
elif rq.status_code == 404:
msg.warn("Not found","No such email registered")
else:
msg.fail("Application Error")
@admin_app.command()
def blacklist(
email:str=typer.Argument( ... , help="The account email that would be granted admin access")
):
"""Blacklists account.
Requires admin priveleges. More info at
https://argorithm.github.io/toolkit/cli#blacklist
"""
data = {
"email" : email
}
header=authmanager.get_header()
url = app_settings.get_endpoint()+"/admin/black_list"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,json=data,headers=header)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
msg.good("blacklisted account")
elif rq.status_code == 401:
msg.warn("Not authorized","You need admin priveleges")
elif rq.status_code == 404:
msg.warn("Not found","No such email registered")
else:
msg.fail("Application Error")
@admin_app.command()
def whitelist(
email:str=typer.Argument( ... , help="The account email that would be granted admin access")
):
"""Whitelist accounts.
Requires admin priveleges. More info at
https://argorithm.github.io/toolkit/cli#whitelist
"""
data = {
"email" : email
}
header=authmanager.get_header()
url = app_settings.get_endpoint()+"/admin/white_list"
try:
with Halo(text='Connecting', spinner='dots'):
rq = requests.post(url,json=data,headers=header)
except requests.RequestException as rqe:
msg.fail("Connection failed",str(rqe))
raise typer.Abort()
if rq.status_code == 200:
msg.good("whitelisted account")
elif rq.status_code == 401:
msg.warn("Not authorized","You need admin priveleges")
elif rq.status_code == 404:
msg.warn("Not found","No such email registered")
else:
msg.fail("Application Error")
Functions
def account_delete(email: str = <typer.models.ArgumentInfo object>, user: bool = <typer.models.OptionInfo object>)
-
Deletes account.
Requires admin priveleges. More info at https://argorithm.github.io/toolkit/cli#delete_1
Expand source code
@admin_app.command("delete") def account_delete( email:str=typer.Argument( ... , help="The account email that would be granted admin access"), user:bool=typer.Option(False,'-u','--user',help="If flag is present, it will delete the user account") ): """Deletes account. Requires admin priveleges. More info at https://argorithm.github.io/toolkit/cli#delete_1 """ data = { "email" : email } header=authmanager.get_header() if user: url = app_settings.get_endpoint()+"/admin/delete_user" else: url = app_settings.get_endpoint()+"/admin/delete_programmer" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,json=data,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("deleted account") elif rq.status_code == 401: msg.warn("Not authorized","You need admin priveleges") elif rq.status_code == 404: msg.warn("Not found","No such email registered") else: msg.fail("Application Error")
def admin_auth_check()
-
Check if auth is enabled for admin routes.
Expand source code
@admin_app.callback() def admin_auth_check(): """Check if auth is enabled for admin routes.""" if not authmanager.auth_check(): msg.warn("AUTH is disabled at this endpoint") raise typer.Exit(1)
def autocomplete(incomplete: str)
-
autocomplete function for finding argorithms.
Expand source code
def autocomplete(incomplete:str): """autocomplete function for finding argorithms.""" local_directory , incomplete = os.path.split(incomplete) if local_directory == '': local_directory = '.' files = os.listdir(local_directory) res = [] l = len(incomplete) for filename in files: if filename[:l] == incomplete and filename[-3:] == '.py': if local_directory == '.': res.append(filename) else: res.append(os.path.join(local_directory,filename)) return res
def blacklist(email: str = <typer.models.ArgumentInfo object>)
-
Blacklists account.
Requires admin priveleges. More info at https://argorithm.github.io/toolkit/cli#blacklist
Expand source code
@admin_app.command() def blacklist( email:str=typer.Argument( ... , help="The account email that would be granted admin access") ): """Blacklists account. Requires admin priveleges. More info at https://argorithm.github.io/toolkit/cli#blacklist """ data = { "email" : email } header=authmanager.get_header() url = app_settings.get_endpoint()+"/admin/black_list" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,json=data,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("blacklisted account") elif rq.status_code == 401: msg.warn("Not authorized","You need admin priveleges") elif rq.status_code == 404: msg.warn("Not found","No such email registered") else: msg.fail("Application Error")
def configure(filepath: str = <typer.models.ArgumentInfo object>, blank: bool = <typer.models.OptionInfo object>, validate: bool = <typer.models.OptionInfo object>)
-
Create configuration file for argorithm.
More info at https://argorithm.github.io/toolkit/cli#configure
Expand source code
@app.command() def configure( filepath:str=typer.Argument(... , help="The code file to be configured" , autocompletion=autocomplete), blank:bool=typer.Option(False,'-b','--blank',help="create blank config file"), validate:bool=typer.Option(False,'-v','--validate',help="only validate existing config file,Dont create") ): """Create configuration file for argorithm. More info at https://argorithm.github.io/toolkit/cli#configure """ directory,filename = os.path.split(filepath) name = filename[:-3] if not os.path.isfile(filepath): msg.warn("Python file not found",'use the init command first') raise typer.Abort() configpath = os.path.join(os.getcwd(),directory,name+".config.json") try: validateconfig(configpath) typer.echo("Valid config file found") if not blank: if not validate: redo = typer.confirm(f"Do you remake the {name}.config.json?") if redo: create(configpath) raise typer.Exit(0) except ValidationError: if not validate and not blank: create(configpath) raise typer.Exit(0) except FileNotFoundError: if validate: msg.warn("Config file not found") raise typer.Exit(0) if not blank: create(configpath) raise typer.Exit(0) config = { "argorithmID" : name, "file" : name+".py", "function" : "run", "parameters" : {}, "example" : {}, "description" : "" } with open(configpath, "w") as configfile: json.dump(config,configfile,indent=4)
def connect(local: bool = <typer.models.OptionInfo object>)
-
Connect to your endpoint.
More info at https://argorithm.github.io/toolkit/cli#connect
Expand source code
@app.command() def connect( local:bool = typer.Option(False,"--local",'-l',help="Connects to server running on localhost",show_default=False) ): """Connect to your endpoint. More info at https://argorithm.github.io/toolkit/cli#connect """ if local: endpoint = "http://localhost" else: endpoint = typer.prompt("Enter server endpoint",default=app_settings.get_endpoint()) app_settings.set_endpoint(endpoint)
def delete(argorithm_id: str = <typer.models.ArgumentInfo object>)
-
Deletes argorithm from server.
More info at https://argorithm.github.io/toolkit/cli#delete
Expand source code
@app.command() def delete( argorithm_id:str = typer.Argument(... , help="argorithmID of function to be deleted.") ): """Deletes argorithm from server. More info at https://argorithm.github.io/toolkit/cli#delete """ params = search(argorithm_id) flag = typer.confirm("Are you sure you want to delete it?") if not flag: typer.echo("Not deleting") raise typer.Abort() header=authmanager.get_header() data = { "argorithmID" : params["argorithmID"], } url = app_settings.get_endpoint()+"/argorithms/delete" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,json=data,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.info("Deleted successfully",) elif rq.status_code == 401: msg.warn("Not authorized","only author of argorithm or admin can delete it") else: msg.fail("application error")
def execute(filename: str = <typer.models.ArgumentInfo object>)
-
Execute locally stored ARgorithms.
More info at https://argorithm.github.io/toolkit/cli#execute
Expand source code
@app.command() def execute( filename:str=typer.Argument(... , help="The code file to be submitted" , autocompletion=autocomplete) ): """Execute locally stored ARgorithms. More info at https://argorithm.github.io/toolkit/cli#execute """ code = CodeManager(filename) code.verify() states = [x.content for x in code.test(prompt=True)] msg.state({"data" : states})
def grant(email: str = <typer.models.ArgumentInfo object>)
-
Grants admin acess.
More info at https://argorithm.github.io/toolkit/cli#grant
Expand source code
@admin_app.command() def grant( email:str=typer.Argument( ... , help="The account email that would be granted admin access") ): """Grants admin acess. More info at https://argorithm.github.io/toolkit/cli#grant """ data = { "email" : email } header=authmanager.get_header() url = app_settings.get_endpoint()+"/admin/grant" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,json=data,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("granted admin priveleges") elif rq.status_code == 401: msg.warn("Not authorized","You need admin priveleges") elif rq.status_code == 406: msg.warn("Blacklisted email","This email is blacklisted thus cannot be granted admin priveleges") elif rq.status_code == 404: msg.warn("Not found","No such email registered") else: msg.fail("Application Error")
def init(name: str = <typer.models.ArgumentInfo object>)
-
Create Blank code template and config template for ARgorithm.
More info at https://argorithm.github.io/toolkit/cli#init
Expand source code
@app.command() def init( name:str = typer.Argument(...,help="The name given to the argorithm [A-Za-z_]",callback=name_check) ): """Create Blank code template and config template for ARgorithm. More info at https://argorithm.github.io/toolkit/cli#init """ typer.echo(f"Creating empty template for {name}") with open(os.path.join(ARgorithmToolkit.__path__[0],'data/template.py',),'rb') as template: starter = template.read() filename = f"{name}.py" with open(filename , "wb") as codefile: codefile.write(starter) msg.good("Template generated","refer documentation at https://argorithm.github.io/toolkit")
def list_argorithms()
-
Get list of argorithms in server.
More info at https://argorithm.github.io/toolkit/cli#list
Expand source code
@app.command("list") def list_argorithms(): """Get list of argorithms in server. More info at https://argorithm.github.io/toolkit/cli#list """ url = app_settings.get_endpoint()+"/argorithms/list" try: with Halo(text='Connecting', spinner='dots'): rq = requests.get(url) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: menu = json.loads(rq.content) if len(menu) == 0: msg.warn("No argorithms") raise typer.Exit(1) for item in menu: msg.menuitem(item)
def login()
-
Log in to ARgorithmServer.
Only is AUTH is enabled on server. More info at https://argorithm.github.io/toolkit/cli#login
Expand source code
@account_app.command() def login(): """Log in to ARgorithmServer. Only is AUTH is enabled on server. More info at https://argorithm.github.io/toolkit/cli#login """ if not authmanager.auth_check(): msg.warn("AUTH disabled at server") raise typer.Exit(0) authmanager.get_token(flag=True)
def logout()
-
Remove pre-existing login credentials.
More info at https://argorithm.github.io/toolkit/cli#logout
Expand source code
@account_app.command() def logout(): """Remove pre-existing login credentials. More info at https://argorithm.github.io/toolkit/cli#logout """ authmanager.remove_token()
def name_check(value: str)
-
checks validity of argorithmID.
Expand source code
def name_check(value:str): """checks validity of argorithmID.""" rules = r"^[A-Za-z_]+$" m = re.search(rules,value) if m is None: msg.fail("Invalid name" , "argorithm name should be [A-Za-z_]") raise typer.Exit(code=1) return value
def revoke(email: str = <typer.models.ArgumentInfo object>)
-
Revokes admin acess.
More info at https://argorithm.github.io/toolkit/cli#revoke
Expand source code
@admin_app.command() def revoke( email:str=typer.Argument( ... , help="The account email that would be lose admin access") ): """Revokes admin acess. More info at https://argorithm.github.io/toolkit/cli#revoke """ data = { "email" : email } header=authmanager.get_header() url = app_settings.get_endpoint()+"/admin/revoke" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,json=data,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("revoked admin priveleges") elif rq.status_code == 401: msg.warn("Not authorized","You need admin priveleges") elif rq.status_code == 404: msg.warn("Not found","No such email registered") else: msg.fail("Application Error")
def search(argid, show=True)
-
Searches argorithm on server.
Expand source code
def search(argid,show=True): """Searches argorithm on server.""" url = app_settings.get_endpoint()+"/argorithms/view/"+argid try: with Halo(text='Connecting', spinner='dots'): rq = requests.get(url) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: data = json.loads(rq.content) if show: typer.echo("Found argorithm") msg.menuitem(data) return data if rq.status_code == 404: msg.warn("Not found",f"{argid} not found in database") raise typer.Exit(1) msg.fail("Application error") raise typer.Exit(1)
def signup()
-
Create new programmer and user account in ARgorithmServer.
Only is AUTH is enabled on server. More info at https://argorithm.github.io/toolkit/cli#signup
Expand source code
@account_app.command() def signup(): """Create new programmer and user account in ARgorithmServer. Only is AUTH is enabled on server. More info at https://argorithm.github.io/toolkit/cli#signup """ if not authmanager.auth_check(): msg.warn("AUTH disabled at server") raise typer.Exit(0) authmanager.register()
def submit(filename: str = <typer.models.ArgumentInfo object>)
-
Submit argorithms to server.
More info at https://argorithm.github.io/toolkit/cli#submit
Expand source code
@app.command() def submit( filename:str=typer.Argument(... , help="The code file to be submitted" , autocompletion=autocomplete) ): """Submit argorithms to server. More info at https://argorithm.github.io/toolkit/cli#submit """ code = CodeManager(filename) with Halo(text='Verifying', spinner='dots'): code.verify() msg.good("Files verified") with Halo(text='Testing', spinner='dots'): code.test(prompt=False) msg.good("Files verified") code.submit()
def test(argorithm_id: str = <typer.models.ArgumentInfo object>, output: bool = <typer.models.OptionInfo object>, user_input: bool = <typer.models.OptionInfo object>)
-
Test argorithms stored in server.
More info at https://argorithm.github.io/toolkit/cli#test
Expand source code
@app.command() def test( argorithm_id:str = typer.Argument(... , help="argorithmID of function to be called. If not passed then menu will be presented"), output:bool = typer.Option(False,'--output','-o',help="print results in json format",show_default=False), user_input:bool = typer.Option(False,'--user-input','-u',help="if present, takes input from user",show_default=False), ): """Test argorithms stored in server. More info at https://argorithm.github.io/toolkit/cli#test """ params = search(argorithm_id,show=not output) header=authmanager.get_header() data = { "argorithmID" : params["argorithmID"], "parameters" : params["example"] } if user_input: data["parameters"] = input_data(params["parameters"]) url = app_settings.get_endpoint()+"/argorithms/run" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,json=data,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: if output: print(rq.text) else: msg.state(json.loads(rq.content)) elif rq.status_code == 401: msg.warn("Authentication failed",json.loads(rq.content)['detail']) else: msg.fail("application error")
def update(filename: str = <typer.models.ArgumentInfo object>)
-
Updates pre existing argorithms at server.
More info at https://argorithm.github.io/toolkit/cli#update
Expand source code
@app.command() def update( filename:str=typer.Argument(... , help="The code file to be submitted" , autocompletion=autocomplete) ): """Updates pre existing argorithms at server. More info at https://argorithm.github.io/toolkit/cli#update """ code = CodeManager(filename) with Halo(text='Verifying', spinner='dots'): code.verify() msg.good("Files verified") with Halo(text='Testing', spinner='dots'): code.test(prompt=False) msg.good("Files verified") code.update()
def whitelist(email: str = <typer.models.ArgumentInfo object>)
-
Whitelist accounts.
Requires admin priveleges. More info at https://argorithm.github.io/toolkit/cli#whitelist
Expand source code
@admin_app.command() def whitelist( email:str=typer.Argument( ... , help="The account email that would be granted admin access") ): """Whitelist accounts. Requires admin priveleges. More info at https://argorithm.github.io/toolkit/cli#whitelist """ data = { "email" : email } header=authmanager.get_header() url = app_settings.get_endpoint()+"/admin/white_list" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,json=data,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("whitelisted account") elif rq.status_code == 401: msg.warn("Not authorized","You need admin priveleges") elif rq.status_code == 404: msg.warn("Not found","No such email registered") else: msg.fail("Application Error")
Classes
class AuthManager
-
Handles authentication.
sets up credfile to store credentials.
Expand source code
class AuthManager(): """Handles authentication.""" def __init__(self): """sets up credfile to store credentials.""" self.credfile = os.path.join(CACHE_DIR,".credentials") def register(self): """registers account.""" email = typer.prompt("Enter email address") msg.info("Password criteria",'- between 8 to 25 characters\n - contains atleast one number\n - contains atleast lower case alphabet\n - contains atleast uppercase alphabet') rules = r"^(?=.*[0-9])(?=.*[a-z])(?=.*[A-Z]).{8,25}$" password = typer.prompt("Enter password",confirmation_prompt=True,hide_input=True) if not re.search(rules,password): msg.warn("password unacceptable") typer.Exit(1) return url = app_settings.get_endpoint()+'/programmers/register' data = { "username" : email, "password" : password } try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,data) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("Account created","These credentials will be used as both programmer and user credentials") return if rq.status_code == 409: msg.warn("Invalid email","email is already in use. Try login") raise typer.Exit(0) msg.fail("Error","Contact developer") def login_prompt(self): """login prompt to enter credentials.""" email = typer.prompt("Enter email address") password = typer.prompt("Enter password",hide_input=True) url = f"{app_settings.get_endpoint()}/programmers/login" data = { "username" : email, "password" : password } try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,data) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 404: msg.warn("User not found","please enter valid email") elif rq.status_code == 500: msg.fail("Server error","contact developer") elif rq.status_code == 401: msg.warn("incorrect password") elif rq.status_code == 402: msg.warn("please verify account first") elif rq.status_code == 200: msg.good("logged in successfully","credentials saved in cache") token = json.loads(rq.content)['access_token'] with open(self.credfile,'w+') as cred: cred.write(token) return token raise typer.Exit(1) def get_token(self,flag=False): """returns valid authorization token.""" url = app_settings.get_endpoint() if os.path.isfile(self.credfile): with open(self.credfile,'r') as cred: token = cred.read() try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(f"{url}/programmers/verify" , headers = {"authorization" : "Bearer "+token}) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: override = False if flag: override = typer.confirm("Found existing valid token. do you want to login again?") if not override: return token else: msg.warn("Valid credentials not found","Please enter login credentials again") token = self.login_prompt() with open(self.credfile,'w+') as cred: cred.write(token) return token def get_header(self): """Get authentication header.""" header=None if self.auth_check(): token = self.get_token() header={"authorization":"Bearer "+token} return header def auth_check(self): """checks if AUTH is enabled on server.""" try: url = app_settings.get_endpoint() + "/auth" try: with Halo(text='Connecting', spinner='dots'): rq = requests.get(url) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: return True return False except Exception as ex: msg.warn("authentication error",str(ex)) raise typer.Abort() def remove_token(self): """logout by deleting access token.""" try: os.remove(self.credfile) msg.good("Logged out") except Exception: msg.warn("No credentials found")
Methods
def auth_check(self)
-
checks if AUTH is enabled on server.
Expand source code
def auth_check(self): """checks if AUTH is enabled on server.""" try: url = app_settings.get_endpoint() + "/auth" try: with Halo(text='Connecting', spinner='dots'): rq = requests.get(url) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: return True return False except Exception as ex: msg.warn("authentication error",str(ex)) raise typer.Abort()
def get_header(self)
-
Get authentication header.
Expand source code
def get_header(self): """Get authentication header.""" header=None if self.auth_check(): token = self.get_token() header={"authorization":"Bearer "+token} return header
def get_token(self, flag=False)
-
returns valid authorization token.
Expand source code
def get_token(self,flag=False): """returns valid authorization token.""" url = app_settings.get_endpoint() if os.path.isfile(self.credfile): with open(self.credfile,'r') as cred: token = cred.read() try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(f"{url}/programmers/verify" , headers = {"authorization" : "Bearer "+token}) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: override = False if flag: override = typer.confirm("Found existing valid token. do you want to login again?") if not override: return token else: msg.warn("Valid credentials not found","Please enter login credentials again") token = self.login_prompt() with open(self.credfile,'w+') as cred: cred.write(token) return token
def login_prompt(self)
-
login prompt to enter credentials.
Expand source code
def login_prompt(self): """login prompt to enter credentials.""" email = typer.prompt("Enter email address") password = typer.prompt("Enter password",hide_input=True) url = f"{app_settings.get_endpoint()}/programmers/login" data = { "username" : email, "password" : password } try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,data) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 404: msg.warn("User not found","please enter valid email") elif rq.status_code == 500: msg.fail("Server error","contact developer") elif rq.status_code == 401: msg.warn("incorrect password") elif rq.status_code == 402: msg.warn("please verify account first") elif rq.status_code == 200: msg.good("logged in successfully","credentials saved in cache") token = json.loads(rq.content)['access_token'] with open(self.credfile,'w+') as cred: cred.write(token) return token raise typer.Exit(1)
def register(self)
-
registers account.
Expand source code
def register(self): """registers account.""" email = typer.prompt("Enter email address") msg.info("Password criteria",'- between 8 to 25 characters\n - contains atleast one number\n - contains atleast lower case alphabet\n - contains atleast uppercase alphabet') rules = r"^(?=.*[0-9])(?=.*[a-z])(?=.*[A-Z]).{8,25}$" password = typer.prompt("Enter password",confirmation_prompt=True,hide_input=True) if not re.search(rules,password): msg.warn("password unacceptable") typer.Exit(1) return url = app_settings.get_endpoint()+'/programmers/register' data = { "username" : email, "password" : password } try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,data) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("Account created","These credentials will be used as both programmer and user credentials") return if rq.status_code == 409: msg.warn("Invalid email","email is already in use. Try login") raise typer.Exit(0) msg.fail("Error","Contact developer")
def remove_token(self)
-
logout by deleting access token.
Expand source code
def remove_token(self): """logout by deleting access token.""" try: os.remove(self.credfile) msg.good("Logged out") except Exception: msg.warn("No credentials found")
class CodeManager (filename)
-
Handles file verification, testing and submissions.
gets filepath for code file and config file.
Expand source code
class CodeManager(): """Handles file verification, testing and submissions.""" def __init__(self,filename): """gets filepath for code file and config file.""" directory , argorithm_file = os.path.split(filename) argorithmID = name_check(argorithm_file[:-3]) directory = os.getcwd() if not directory else directory self.codepath = os.path.join(directory,argorithm_file) self.configpath = os.path.join(directory,argorithmID+".config.json") if not os.path.isfile(self.codepath): msg.warn("Python file not found",'use the init command first') raise typer.Abort() if not os.path.isfile(self.configpath): msg.warn("config file not found",'use the configure command first') raise typer.Abort() def verify(self): """checks whether files are valid or not.""" injection_check(self.codepath) validateconfig(self.configpath) def test(self,prompt:bool=False): """Execute code locally.""" try: with open(self.configpath,'r') as configfile: config = json.load(configfile) parameters = config['example'] if prompt: user = typer.confirm("Do you want to add user input?") if user: parameters = input_data(config['parameters']) if parameters and prompt: typer.echo("using parameters:") for key in parameters: typer.echo(f"- {key} : {parameters[key]}") return execution_check(self.codepath,self.configpath,parameters) except AssertionError: msg.warn("Execution Failed","execution should return ARgorithmToolkit.StateSet") raise typer.Exit(1) except Exception: msg.warn("Execution Failed") traceback.print_exception(*sys.exc_info()) raise typer.Exit(1) def generate_submission(self): """Generate the files for submission.""" _ , local_file = os.path.split(self.codepath) with open(self.configpath,'r') as configfile: data = json.load(configfile) files = [ ('file', (local_file, open(self.codepath, 'rb'), 'application/octet')), ('data', ('data', json.dumps(data), 'application/json')), ] header = authmanager.get_header() return files,header def submit(self): """Submit code to server.""" files,header = self.generate_submission() url = app_settings.get_endpoint()+"/argorithms/insert" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,files=files,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("Submitted") elif rq.status_code == 409: msg.warn("Already exists","An argorithm with this name already exists,try another argorithm name") elif rq.status_code == 406: msg.warn("File name was invalid","The name shoud be of type [A-Za-z_]") elif rq.status_code == 400: msg.warn("Incorrect file format","please refer documentation") else: msg.fail("Application error") def update(self): """Update code in servers.""" files,header = self.generate_submission() url = app_settings.get_endpoint()+"/argorithms/update" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,files=files,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("updated") elif rq.status_code == 404: msg.warn("Not found","Try submit command to add argorithm to server") elif rq.status_code == 401: msg.warn("Unauthorized","Only author of argorithm or admin is allowed to alter argorithms") else: msg.fail("Application error")
Methods
def generate_submission(self)
-
Generate the files for submission.
Expand source code
def generate_submission(self): """Generate the files for submission.""" _ , local_file = os.path.split(self.codepath) with open(self.configpath,'r') as configfile: data = json.load(configfile) files = [ ('file', (local_file, open(self.codepath, 'rb'), 'application/octet')), ('data', ('data', json.dumps(data), 'application/json')), ] header = authmanager.get_header() return files,header
def submit(self)
-
Submit code to server.
Expand source code
def submit(self): """Submit code to server.""" files,header = self.generate_submission() url = app_settings.get_endpoint()+"/argorithms/insert" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,files=files,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("Submitted") elif rq.status_code == 409: msg.warn("Already exists","An argorithm with this name already exists,try another argorithm name") elif rq.status_code == 406: msg.warn("File name was invalid","The name shoud be of type [A-Za-z_]") elif rq.status_code == 400: msg.warn("Incorrect file format","please refer documentation") else: msg.fail("Application error")
def test(self, prompt: bool = False)
-
Execute code locally.
Expand source code
def test(self,prompt:bool=False): """Execute code locally.""" try: with open(self.configpath,'r') as configfile: config = json.load(configfile) parameters = config['example'] if prompt: user = typer.confirm("Do you want to add user input?") if user: parameters = input_data(config['parameters']) if parameters and prompt: typer.echo("using parameters:") for key in parameters: typer.echo(f"- {key} : {parameters[key]}") return execution_check(self.codepath,self.configpath,parameters) except AssertionError: msg.warn("Execution Failed","execution should return ARgorithmToolkit.StateSet") raise typer.Exit(1) except Exception: msg.warn("Execution Failed") traceback.print_exception(*sys.exc_info()) raise typer.Exit(1)
def update(self)
-
Update code in servers.
Expand source code
def update(self): """Update code in servers.""" files,header = self.generate_submission() url = app_settings.get_endpoint()+"/argorithms/update" try: with Halo(text='Connecting', spinner='dots'): rq = requests.post(url,files=files,headers=header) except requests.RequestException as rqe: msg.fail("Connection failed",str(rqe)) raise typer.Abort() if rq.status_code == 200: msg.good("updated") elif rq.status_code == 404: msg.warn("Not found","Try submit command to add argorithm to server") elif rq.status_code == 401: msg.warn("Unauthorized","Only author of argorithm or admin is allowed to alter argorithms") else: msg.fail("Application error")
def verify(self)
-
checks whether files are valid or not.
Expand source code
def verify(self): """checks whether files are valid or not.""" injection_check(self.codepath) validateconfig(self.configpath)
class Messager
-
Class for pretty printing messages using typer.
Expand source code
class Messager(): """Class for pretty printing messages using typer.""" def msg(self,tag:str,title:str,message:str,color:str): """Pretty messaging for standard log messages.""" code = typer.style(f"[{tag.upper()}]: {title.upper()}" , fg=color , bold=True) typer.echo(code) if message: typer.echo(message) def info(self,title:str,message:str=""): """Information message.""" self.msg("info",title,message,typer.colors.BLUE) def warn(self,title:str,message:str=""): """Warning message.""" self.msg("error",title,message,typer.colors.YELLOW) def fail(self,title:str,message:str=""): """Error message.""" self.msg("critical error",title,message,typer.colors.RED) def good(self,title:str,message:str=""): """Success message.""" self.msg("success",title,message,typer.colors.GREEN) def menuitem(self,argorithm): """pretty print argorithm details.""" head = typer.style(f"- {argorithm['argorithmID']}",fg=typer.colors.GREEN,bold=True) typer.echo(head) typer.secho(f"by {argorithm['maintainer']}",fg=typer.colors.CYAN) if argorithm['description']: typer.echo(f"{argorithm['description']}") if argorithm['parameters']: typer.echo("Parameters") for key in argorithm['parameters']: if argorithm['parameters'][key]['description']: typer.echo(f"- {key} : {argorithm['parameters'][key]['description']}") else: typer.echo(f"- {key}") typer.echo() def state(self,states): """pretty print states.""" states = states['data'] for state in states: typer.echo('\n'+'-'*50) typer.secho(state['state_type'],bold=True) typer.secho('\t'+state['comments'],fg=typer.colors.CYAN) if state['state_def']: for key in state['state_def']: typer.echo(f"\t{key} : {state['state_def'][key]}")
Methods
def fail(self, title: str, message: str = '')
-
Error message.
Expand source code
def fail(self,title:str,message:str=""): """Error message.""" self.msg("critical error",title,message,typer.colors.RED)
def good(self, title: str, message: str = '')
-
Success message.
Expand source code
def good(self,title:str,message:str=""): """Success message.""" self.msg("success",title,message,typer.colors.GREEN)
def info(self, title: str, message: str = '')
-
Information message.
Expand source code
def info(self,title:str,message:str=""): """Information message.""" self.msg("info",title,message,typer.colors.BLUE)
-
pretty print argorithm details.
Expand source code
def menuitem(self,argorithm): """pretty print argorithm details.""" head = typer.style(f"- {argorithm['argorithmID']}",fg=typer.colors.GREEN,bold=True) typer.echo(head) typer.secho(f"by {argorithm['maintainer']}",fg=typer.colors.CYAN) if argorithm['description']: typer.echo(f"{argorithm['description']}") if argorithm['parameters']: typer.echo("Parameters") for key in argorithm['parameters']: if argorithm['parameters'][key]['description']: typer.echo(f"- {key} : {argorithm['parameters'][key]['description']}") else: typer.echo(f"- {key}") typer.echo()
def msg(self, tag: str, title: str, message: str, color: str)
-
Pretty messaging for standard log messages.
Expand source code
def msg(self,tag:str,title:str,message:str,color:str): """Pretty messaging for standard log messages.""" code = typer.style(f"[{tag.upper()}]: {title.upper()}" , fg=color , bold=True) typer.echo(code) if message: typer.echo(message)
def state(self, states)
-
pretty print states.
Expand source code
def state(self,states): """pretty print states.""" states = states['data'] for state in states: typer.echo('\n'+'-'*50) typer.secho(state['state_type'],bold=True) typer.secho('\t'+state['comments'],fg=typer.colors.CYAN) if state['state_def']: for key in state['state_def']: typer.echo(f"\t{key} : {state['state_def'][key]}")
def warn(self, title: str, message: str = '')
-
Warning message.
Expand source code
def warn(self,title:str,message:str=""): """Warning message.""" self.msg("error",title,message,typer.colors.YELLOW)
class Settings
-
handles connection endpoints.
Expand source code
class Settings(): """handles connection endpoints.""" endpoint:str=CLOUD_URL def get_endpoint(self): """returns required endpoint.""" config_file = os.path.join(CACHE_DIR , "config") if os.path.isfile(config_file): with open(config_file,"r") as conf: self.endpoint = conf.read() return self.endpoint def set_endpoint(self,url): """set up cloud endpoint.""" try: with Halo(text='Connecting', spinner='dots'): rq = requests.get(url+"/argorithm") if rq.status_code == 200: msg.good("Connected",f"web requests will now go to {url}") else: raise AttributeError("Not a server endpoint") except ValueError as ve: msg.warn("Please try again with proper URL") raise typer.Exit(1) from ve except AttributeError as ex: msg.fail(str(ex)) raise typer.Exit(1) from ex except Exception as ex: msg.fail("Endpoint couldnt be found.") print(ex) raise typer.Exit(2) from ex config_file = os.path.join(CACHE_DIR , "config") with open(config_file,'w') as config: config.write(url)
Class variables
var endpoint : str
Methods
def get_endpoint(self)
-
returns required endpoint.
Expand source code
def get_endpoint(self): """returns required endpoint.""" config_file = os.path.join(CACHE_DIR , "config") if os.path.isfile(config_file): with open(config_file,"r") as conf: self.endpoint = conf.read() return self.endpoint
def set_endpoint(self, url)
-
set up cloud endpoint.
Expand source code
def set_endpoint(self,url): """set up cloud endpoint.""" try: with Halo(text='Connecting', spinner='dots'): rq = requests.get(url+"/argorithm") if rq.status_code == 200: msg.good("Connected",f"web requests will now go to {url}") else: raise AttributeError("Not a server endpoint") except ValueError as ve: msg.warn("Please try again with proper URL") raise typer.Exit(1) from ve except AttributeError as ex: msg.fail(str(ex)) raise typer.Exit(1) from ex except Exception as ex: msg.fail("Endpoint couldnt be found.") print(ex) raise typer.Exit(2) from ex config_file = os.path.join(CACHE_DIR , "config") with open(config_file,'w') as config: config.write(url)