Source code for flask_restful_dbbase.resources.collection_model_resource

# flask_restful/resources/collection_model_resource.py
""""
This module implements a starting point for collection model resources.

"""
import json
import inspect

from dbbase.utils import xlate
from flask_restful import request
from .dbbase_resource import DBBaseResource
from ..validations import validate_process
import logging

logger = logging.getLogger(__file__)


[docs]class CollectionModelResource(DBBaseResource): """ CollectionModelResource Class This model class implements the base class. This class supports only gets to return collections of records. Like the ModelResource class there is a provision for a process_get_input function. The usual filtering is available for gathering records if a variable is a specific value. Most of the time, this will be all that is necessary. However, there is a means to select records by a comparison operator. Use a format as follows: query_string: { var1: [operator, comparison_value] } The list of operators for single comparison value is: ["eq", "ne", "gt", "ge", "lt", "le"] In addition, if comparison is made such as selecting where var1 > var2, use `{var1: ["gt", var:var2]}` to signal this is another variable. To gather records that are found in a list use var1[]: [val1, val2, ...]. Or, for more flexibility use: query_string: { var1: [operator, [val1, val2, ...]] } The supported list of operators are ["like", "ilike", "notlike", "notilike"] To control record set sizes, a variable `page_config` is a dict of page or record variables. Bear in mind tha all variables entering can be either camel or snake based. Page_config variables: "orderBy": ["id", "statusId"], Can be a single value or a list To sort in descending order use the format -var1 "pageSize": "50", The maximum page size can be limited by the class variable `max_page_size` "offset": 30, The number of records to skip "limit": "100", limits the size of the record set "serialFields": ["id", "statusId"], can specify specific columns to be returned fallback: serial fields can be specified by resource or dbbase model "debug": "False", covered below if debug is true, the recordset is not returned. Instead, data returned consists of variables used, the default variable amounts, and the sqlalchemy query that would be executed. "class_defaults": { "model_name": self.model_name, "process_get_input": get_input_doc, "max_page_size": self.max_page_size, "order_by": None, "op_codes": self.OP_CODES1 + self.OP_CODES2, }, "original_data": orig_data, "converted_data": query_data, "page_configs": configs, "query": str(query) """ model_name = None process_get_input = None max_page_size = None order_by = None _PAGE_CONFIGS = [ "page_size", "offset", "limit", "debug", "order_by", "serial_fields", ] OP_CODES1 = ["eq", "ne", "gt", "ge", "lt", "le"] OP_CODES2 = ["like", "ilike", "notlike", "notilike"] def __init__(self): if self.model_class is None: msg = "A model class must be set for this resource to function." raise ValueError(msg) self.model_name = self.model_class._class() def _page_configs(self, configs): """ Converts any config variables to snake case and verifies the variables are part of page configs. NOTE: what if a None or null is sent through? """ tmp = {} if isinstance(configs, list): configs = configs[0] if isinstance(configs, str): configs = json.loads(configs.replace("'", '"')) for key, value in configs.items(): new_key = xlate(key, camel_case=False) if new_key in self._PAGE_CONFIGS: # run the gauntlet if new_key == "order_by": # account for field name conversion if isinstance(value, list): new_value = [ xlate(val, camel_case=False) for val in value ] else: new_value = [xlate(value, camel_case=False)] elif new_key == "limit": new_value = int(value) elif new_key == "page_size": new_value = int(value) if self.max_page_size is not None: if new_value > self.max_page_size: new_value = self.max_page_size elif new_key in ["page_size", "offset", "limit"]: new_value = int(value) elif new_key == "debug": new_value = value.lower() == "true" else: new_key == "serial_fields" new_value = [xlate(val, camel_case=False) for val in value] tmp[new_key] = new_value else: raise ValueError(f"Unknown page config value: {new_key}") if "order_by" not in tmp and self.order_by is not None: if isinstance(self.order_by, list): tmp["order_by"] = self.order_by else: tmp["order_by"] = [self.order_by] return tmp def _classify_op(self, var, value): """ Classifies the operation using the variable. Also, translation from camel to snake takes place as well var, value select where var = value normal var[], list | single value select item1 or item2 or item3 ... value, (op, value) "eq ne gt lt ..." return op, new_var, value """ if var.endswith("[]"): new_var = xlate(var[:-2], camel_case=False) return new_var, "in", value elif isinstance(value, list) and len(value) == 2: # Note that val could be a variable such as var:my_variable # xlate of my_variable is handled when adding to query filter op, val = value new_var = xlate(var, camel_case=False) if op not in self.OP_CODES1 + self.OP_CODES2: str_list = str(self.OP_CODES1 + self.OP_CODES2).replace( "'", "" ) raise ValueError( f'Op code "{op}" wrong. Must be in {str_list}' ) return new_var, op, val elif isinstance(value, list) and len(value) == 1: if value[0] in self.OP_CODES1 + self.OP_CODES2: # mistake concluded raise ValueError( "There must be a value paired with the operator. " "Example: [operator, value]" ) new_var = xlate(var, camel_case=False) return new_var, "eq", value else: # default new_var = xlate(var, camel_case=False) return new_var, "eq", value
[docs] def get(self): FUNC_NAME = "get" name = self.model_class._class() url = request.path data = request.args.to_dict(flat=False) orig_data = request.args.to_dict(flat=False) configs = {} for key in ["pageConfig", "page_config"]: if key in data: configs = self._page_configs(data.pop(key)) break order_by = configs.get("order_by") page_size = configs.get("page_size") limit = configs.get("limit") offset = configs.get("offset") serial_fields = configs.get("serial_fields") serial_field_relations = configs.get("serial_field_relations") debug = configs["debug"] if "debug" in configs else False query = self.model_class.query if self.process_get_input is not None: # single values will still be in a list at this point # Ex: {id: [40]}" # format for output: # To continue processing after updates or changes: # {"status": True, "query", query, "data": data} # To exit the scene: # { # "status": False, # "message", msg, # "status_code": status_code # } # Anything other than that results in a 500 error output = self.process_get_input(query, data) # bare minimum check validate_process(output, true_keys=["query", "data"]) if output["status"]: query = output["query"] data = output["data"] else: message = output["message"] status_code = output["status_code"] return {"message": message}, status_code query_data = [] for var, value in data.items(): # classify op and convert var from camel_case try: query_data.append(self._classify_op(var, value)) except Exception as err: return {"message": f"{err.args}"}, 400 obj_params = self.get_obj_params() query_data = [ [var, op, value] for var, op, value in query_data if var in obj_params ] # query filtering for new_var, op, value in query_data: var = getattr(self.model_class, new_var) if isinstance(value, str) and value[:4] == "var:": # also a variable comp_var = xlate(value[4:], camel_case=False) value = getattr(self.model_class, comp_var) if isinstance(value, list): query = query.filter(var.in_(value)) elif op in self.OP_CODES1: func = getattr(var, f"__{op}__") query = query.filter(func(value)) else: # bad op codes already weeded out above func = getattr(var, op) query = query.filter(func(value)) if order_by: msg = "{order} is not a column in {name}" order_list = [] for order in order_by: if order.startswith("-"): order = order[1:] if hasattr(self.model_class, order): order_list.append( getattr(self.model_class, order).desc() ) else: return ( {"message": msg.format(order=order, name=name)}, 400, ) else: if hasattr(self.model_class, order): order_list.append(getattr(self.model_class, order)) else: return ( {"message": msg.format(order=order, name=name)}, 400, ) query = query.order_by(*order_list) if offset is not None: query = query.offset(offset) if page_size is not None: query = query.limit(page_size) if limit is not None: # works same as page size, more familiar for dbs query = query.limit(limit) if debug: if self.process_get_input is None: get_input_doc = None else: get_input_doc = inspect.getdoc(self.process_get_input) return { "class_defaults": { "model_name": self.model_name, "process_get_input": get_input_doc, "max_page_size": self.max_page_size, "order_by": None, "op_codes": self.OP_CODES1 + self.OP_CODES2, }, "original_data": orig_data, "converted_data": query_data, "page_configs": configs, "query": str(query), }, 200 query = query.all() if serial_fields is None: serial_fields, serial_field_relations = self._get_serializations( "get" ) try: return ( { self.model_class._class(): [ item.to_dict( serial_fields=serial_fields, serial_field_relations=serial_field_relations, ) for item in query ], }, 200, ) except Exception as err: msg = err.args[0] return_msg = f"Internal Server Error: method {FUNC_NAME}: {url}" logger.error(f"{url} method {FUNC_NAME}: {msg}") return {"message": return_msg}, 500