Source code for odsbox.con_i

"""
Helper for ASAM ODS HTTP API conI session

Example::

    from odsbox.con_i import ConI

    with ConI(
        url="http://localhost:8087/api",
        auth=("sa", "sa")
    ) as con_i:
        units = con_i.query_data({"AoUnit": {}})

"""

from __future__ import annotations

import logging
import os
from typing import List, Tuple

import requests
import requests.auth
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message
from pandas import DataFrame

import odsbox.proto.ods_pb2 as ods
from odsbox.datamatrices_to_pandas import to_pandas
from odsbox.jaquel import jaquel_to_ods
from odsbox.model_cache import ModelCache
from odsbox.transaction import Transaction


[docs] class ConI: """ This is a helper to hold an ASAM ODS HTTP API ConI session. Example:: from odsbox.con_i import ConI with ConI( url="http://localhost:8087/api", auth=("sa", "sa") ) as con_i: units = con_i.query_data({"AoUnit": {}}) """ __log: logging.Logger = logging.getLogger(__name__) __session: requests.Session | None __con_i: str | None __default_http_headers: dict[str, str] = { "Content-Type": "application/x-asamods+protobuf", "Accept": "application/x-asamods+protobuf", } mc: ModelCache
[docs] def __init__( self, url: str = "http://localhost:8080/api", auth: requests.auth.AuthBase | Tuple[str, str] = ("sa", "sa"), context_variables: ods.ContextVariables | dict | None = None, verify_certificate: bool = True, ): """ Create a session object keeping track of ASAM ODS session URL named `conI`. Example:: from odsbox.con_i import ConI # basic auth with ConI( url="http://localhost:8087/api", auth=("sa", "sa") ) as con_i: units = con_i.query_data({"AoUnit": {}}) Example:: import requests from odsbox.con_i import ConI class BearerAuth(requests.auth.AuthBase): def __init__(self, token): self.token = token def __call__(self, r): r.headers["authorization"] = "Bearer " + self.token return r # bearer auth with ConI( url="http://localhost:8087/api", auth=BearerAuth("YOUR_BEARER_TOKEN") ) as con_i: units = con_i.query_data({"AoUnit": {}}) :param str url: base url of the ASAM ODS API of a given server. An example is "http://localhost:8080/api". :param requests.auth.AuthBase auth: An auth object to be used for the used requests package. For basic auth `("USER", "PASSWORD")` can be used. :param ods.ContextVariables | dict | None context_variables: If context variables are necessary for the connection they are passed here. It defaults to None. :param bool verify_certificate: If no certificate is provided for https insecure access can be enabled. It defaults to True. :raises requests.HTTPError: If connection to ASAM ODS server fails. """ session = requests.Session() session.auth = auth session.verify = verify_certificate _context_variables = None if isinstance(context_variables, ods.ContextVariables): _context_variables = context_variables else: _context_variables = ods.ContextVariables() if isinstance(context_variables, dict): for key, value in context_variables.items(): _context_variables.variables[key].string_array.values.append(value) response = session.post( url + "/ods", data=_context_variables.SerializeToString(), timeout=60.0, headers=self.__default_http_headers, ) if 201 == response.status_code: con_i = response.headers["location"] self.__log.debug("ConI: %s", con_i) self.__session = session self.__con_i = con_i self.check_requests_response(response) # lets cache the model self.model_read()
def __del__(self): if None is not self.__session: self.logout() def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_traceback): self.logout()
[docs] def con_i_url(self) -> str: """ Get the ASAM ODS session URL used to work with this session. :return str: The ASAM ODS session URL """ if self.__con_i is None: raise ValueError("ConI already closed") return self.__con_i
[docs] def logout(self): """ Close the attached session at the ODS server by calling delete on the session URL and closing the requests session. :raises requests.HTTPError: If delete the ASAM ODS session fails. """ if self.__session is not None: if self.__con_i is None: raise ValueError("ConI already closed") response = self.__session.delete( self.__con_i, timeout=60.0, headers={"Accept": "application/x-asamods+protobuf"} ) self.__session.close() self.__session = None self.__con_i = None self.check_requests_response(response)
[docs] def query_data( self, query: str | dict | ods.SelectStatement, **kwargs, ) -> DataFrame: """ Query ods server for content and return the results as Pandas DataFrame :param str | dict | ods.SelectStatement query: Query given as JAQueL query (dict or str) or as an ASAM ODS SelectStatement. :param kwargs: additional arguments passed to `to_pandas`. :raises requests.HTTPError: If query fails. :return DataFrame: The DataMatrices as Pandas.DataFrame. The columns are named as `ENTITY_NAME.ATTRIBUTE_NAME`. `IsNull` values are not marked invalid. """ data_matrices = ( self.data_read(query) if isinstance(query, ods.SelectStatement) else self.data_read_jaquel(query) ) return to_pandas(data_matrices, model_cache=self.mc, **kwargs)
[docs] def model(self) -> ods.Model: """ Get the cache ODS server model. This model will return the cached application model related to your session. :return ods.Model: The application model of the ASAM ODS server. """ return self.mc.model()
[docs] def data_read_jaquel(self, jaquel: str | dict) -> ods.DataMatrices: """ Query ods server for content. :param str | dict jaquel: Query given as JAQueL query (dict or str). :raises requests.HTTPError: If query fails. :return ods.DataMatrices: The DataMatrices representing the result. It will contain one ods.DataMatrix for each returned entity type. """ _, ods_query = jaquel_to_ods(self.model(), jaquel) return self.data_read(ods_query)
[docs] def data_read(self, select_statement: ods.SelectStatement) -> ods.DataMatrices: """ Query ods server for content. :param ods.SelectStatement select_statement: Query given as ASAM ODS SelectStatement. :raises requests.HTTPError: If query fails. :return ods.DataMatrices: The DataMatrices representing the result. It will contain one ods.DataMatrix for each returned entity type. """ response = self.ods_post_request("data-read", select_statement) return_value = ods.DataMatrices() return_value.ParseFromString(response.content) return return_value
[docs] def data_create(self, data: ods.DataMatrices) -> List[int]: """ Create new ASAM ODS instances or write bulk data. :param ods.DataMatrices data: Matrices containing columns for instances to be created. :raises requests.HTTPError: If creation fails. :return List[int]: list of ids created from your request. """ response = self.ods_post_request("data-create", data) return_value = ods.DataMatrices() return_value.ParseFromString(response.content) return list(return_value.matrices[0].columns[0].longlong_array.values)
[docs] def data_update(self, data: ods.DataMatrices) -> None: """ Update existing instances. :param ods.DataMatrices data: Matrices containing columns for instances to be updated. The `id` column is used to identify the instances to be updated. :raises requests.HTTPError: If update fails. """ self.ods_post_request("data-update", data)
[docs] def data_delete(self, data: ods.DataMatrices) -> None: """ Delete existing instances. :param ods.DataMatrices data: Matrices containing columns for instances to be deleted. The `id` column is used to identify the instances to be deleted. :raises requests.HTTPError: If delete fails. """ self.ods_post_request("data-delete", data)
[docs] def data_copy(self, copy_request: ods.CopyRequest) -> ods.Instance: """ Copy an Instance and its related children. :param ods.CopyRequest copy_request: Define instance to be copied. :raises requests.HTTPError: If copy fails. :return ods.Instance: Newly created instance """ response = self.ods_post_request("data-copy", copy_request) return_value = ods.Instance() return_value.ParseFromString(response.content) return return_value
[docs] def n_m_relation_read(self, identifier: ods.NtoMRelationIdentifier) -> ods.NtoMRelatedInstances: """ Read n-m relations for a defined instance. :param ods.NtoMRelationIdentifier identifier: identify n to m relation to be read. :raises requests.HTTPError: If read fails. :return ods.NtoMRelatedInstances: Return n to m related instances that were queried. """ response = self.ods_post_request("n-m-relation-read", identifier) return_value = ods.NtoMRelatedInstances() return_value.ParseFromString(response.content) return return_value
[docs] def n_m_relation_write(self, related_instances: ods.NtoMWriteRelatedInstances) -> None: """ Update, delete or create n-m relations for given instance pairs. :raises requests.HTTPError: If write fails. :param ods.NtoMWriteRelatedInstances related_instances: related instances to be updated, deleted or created. """ self.ods_post_request("n-m-relation-write", related_instances)
[docs] def transaction(self) -> Transaction: """ Open a transaction object to be used in a with clause Example:: with con_i.transaction() as transaction: # do writing transaction.commit() :raises requests.HTTPError: If creation of transaction fails. :return Transaction: transaction object that will abort automatically if commit is not called. """ return Transaction(self)
[docs] def transaction_create(self) -> None: """ Open a transaction for writing. :raises requests.HTTPError: If creation of transaction fails. """ self.ods_post_request("transaction-create")
[docs] def transaction_commit(self) -> None: """ Commit transaction created before. :raises requests.HTTPError: If creation of transaction fails. """ self.ods_post_request("transaction-commit")
[docs] def transaction_abort(self) -> None: """ Abort transaction created before. :raises requests.HTTPError: If creation of transaction fails. """ self.ods_post_request("transaction-abort")
[docs] def valuematrix_read(self, request: ods.ValueMatrixRequestStruct) -> ods.DataMatrices: """ Read bulk data from a submatrix or measurement. Submatrix access can also be done using data-read. :param ods.ValueMatrixRequestStruct request: Define measurement or submatrix to create ASAM ODS ValueMatrix for. :raises requests.HTTPError: If ValueMatrix access fails. :return ods.DataMatrices: DataMatrices containing the bulk data for the request. """ response = self.ods_post_request("valuematrix-read", request) return_value = ods.DataMatrices() return_value.ParseFromString(response.content) return return_value
[docs] def model_read(self) -> ods.Model: """ Read the model from server and update cached version. :raises requests.HTTPError: If model read fails. :return ods.Model: The application model of the server. """ response = self.ods_post_request("model-read") model = ods.Model() model.ParseFromString(response.content) self.mc = ModelCache(model) return model
[docs] def model_update(self, model_parts: ods.Model, update_model: bool = True) -> None: """ Update application model content. This method is used to modify existing items or create new ones. :param ods.Model model_parts: parts of the model to be updated or created. :param bool update_model: determine if the model cache of this ConI instance should be updated by reading the whole model again. It defaults to True. :raises requests.HTTPError: If model update fails. """ self.ods_post_request("model-update", model_parts) if update_model: # cache again if successfully changed self.model_read()
[docs] def model_delete(self, model_parts: ods.Model, update_model: bool = True) -> None: """ Delete application model content. :param ods.Model model_parts: define model parts to be deleted. :param bool update_model: determine if the model cache of this ConI instance should be updated by reading the whole model again. It defaults to True. :raises requests.HTTPError: If model update fails. """ self.ods_post_request("model-delete", model_parts) if update_model: # cache again if successfully changed self.model_read()
[docs] def model_check(self) -> None: """ Check if stored application model is consistent. :raises requests.HTTPError: If model contains errors. """ self.ods_post_request("model-check")
[docs] def basemodel_read(self) -> ods.BaseModel: """ Read the ODS base model version used by the server. :raises requests.HTTPError: If reading base model fails. :return ods.BaseModel: used server base model. """ response = self.ods_post_request("basemodel-read") base_model = ods.BaseModel() base_model.ParseFromString(response.content) return base_model
[docs] def asampath_create(self, instance: ods.Instance) -> ods.AsamPath: """ Create an persistent string representing the instance. :param ods.Instance instance: Instance to be get AsamPath for. :raises requests.HTTPError: If creation fails. :return ods.AsamPath: The AsamPath that represents the instance. """ response = self.ods_post_request("asampath-create", instance) return_value = ods.AsamPath() return_value.ParseFromString(response.content) return return_value
[docs] def asampath_resolve(self, asam_path: ods.AsamPath) -> ods.Instance: """ Use the persistent string to get back the instance. :param ods.AsamPath asam_path: AsamPath to be resolved. :raises requests.HTTPError: If path could not be resolved. :return ods.Instance: Instance represented by AsamPath. """ response = self.ods_post_request("asampath-resolve", asam_path) return_value = ods.Instance() return_value.ParseFromString(response.content) return return_value
[docs] def context_read(self, pattern_or_filter: ods.ContextVariablesFilter | str = "*") -> ods.ContextVariables: """ Read the session context variables. :param ods.ContextVariablesFilter | str pattern_or_filter: Context variable filter as str or ContextVariablesFilter. It defaults to "*" to return all variables. :raises requests.HTTPError: If something went wrong. :return ods.ContextVariables: ContextVariables where the name matches the filter. """ context_variables_filter = ( pattern_or_filter if isinstance(pattern_or_filter, ods.ContextVariablesFilter) else ods.ContextVariablesFilter(pattern=pattern_or_filter) ) response = self.ods_post_request("context-read", context_variables_filter) return_value = ods.ContextVariables() return_value.ParseFromString(response.content) return return_value
[docs] def context_update(self, context_variables: ods.ContextVariables): """ Set context variables for current session. This will set context variables for the given session. If new session is created they will fall back to their default. :param ods.ContextVariables context_variables: ContextVariables to be set or updated. :raises requests.HTTPError: If something went wrong. """ self.ods_post_request("context-update", context_variables)
[docs] def password_update(self, password_update: ods.PasswordUpdate) -> None: """ Update the password of the defined user. :param ods.PasswordUpdate password_update: Defines for which user the password should eb updated. :raises requests.HTTPError: If something went wrong. """ self.ods_post_request("password-update", password_update)
[docs] def file_access(self, file_identifier: ods.FileIdentifier) -> str: """ Get file access URL for file content. :param ods.FileIdentifier file_identifier: Define content to be accessed. Might be an AoFile or a DT_BLOB attribute. :raises requests.HTTPError: If something went wrong. :raises ValueError: If no file location provided by server. :return str: The server file URL. """ response = self.ods_post_request("file-access", file_identifier) server_file_url = response.headers.get("location") if server_file_url is None: raise ValueError("No file location provided by server!") return server_file_url
[docs] def file_access_download( self, file_identifier: ods.FileIdentifier, target_file_or_folder: str, overwrite_existing: bool = False, default_filename: str = "download.bin", ) -> str: """ Read file content from server. :param ods.FileIdentifier file_identifier: Define content to be read. Might be an AoFile or a DT_BLOB attribute. :param str target_file_or_folder: Path to save the file content to. If pointing to an existing folder. Original filename will be used. Full path is returned. :param bool overwrite_existing: If existing files should be overwritten. It defaults to False. :param str default_filename: Default filename if no filename is provided by server. It defaults to "download.bin". :raises requests.HTTPError: If something went wrong. :raises FileExistsError: If file already exists and 'overwrite_existing' is False. :raises ValueError: If no open session. :return str: file path of saved file. """ server_file_url = self.file_access(file_identifier) if self.__session is None: raise ValueError("No open session!") file_response = self.__session.get( server_file_url, headers={ "Accept": "application/octet-stream, application/x-asamods+protobuf, */*", }, ) self.check_requests_response(file_response) target_file_path = target_file_or_folder if os.path.isdir(target_file_path): content_disposition = file_response.headers.get( "Content-Disposition", f'attachment; filename="{default_filename}"' ) filename = ( content_disposition.split("filename=")[1].strip('"') if "filename=" in content_disposition else default_filename ) target_file_path = os.path.join(target_file_path, filename) if not overwrite_existing and os.path.exists(target_file_path): raise FileExistsError(f"File '{target_file_path}' already exists and 'overwrite_existing' is False.") with open(target_file_path, "wb") as file: file.write(file_response.content) return target_file_path
[docs] def file_access_upload( self, file_identifier: ods.FileIdentifier, source_file_path: str, ) -> None: """ Upload file content to server. :param ods.FileIdentifier file_identifier: Define content to be written. Might be an AoFile or a DT_BLOB attribute. :param str source_file_path: Path to the file to be uploaded. :raises requests.HTTPError: If something went wrong. :raises FileNotFoundError: If source file was not found. :raises ValueError: If no open session. """ if not os.path.isfile(source_file_path): raise FileNotFoundError(f"File '{source_file_path}' not found.") server_file_url = self.file_access(file_identifier) with open(source_file_path, "rb") as file: if self.__session is None: raise ValueError("No open session!") put_response = self.__session.put( server_file_url, data=file, headers={"Content-Type": "application/octet-stream", "Accept": "application/x-asamods+protobuf"}, ) self.check_requests_response(put_response)
[docs] def file_access_delete( self, file_identifier: ods.FileIdentifier, ) -> None: """ Delete file content from server. :param ods.FileIdentifier file_identifier: Define content to be deleted. Might be an AoFile or a DT_BLOB attribute. :raises requests.HTTPError: If something went wrong. :raises ValueError: If no open session. """ server_file_url = self.file_access(file_identifier) if self.__session is None: raise ValueError("No open session!") delete_response = self.__session.delete(server_file_url, headers={"Accept": "application/x-asamods+protobuf"}) self.check_requests_response(delete_response)
[docs] def ods_post_request( self, relative_url_part: str, message: Message | None = None, timeout: float = 600.0, headers: dict[str, str] | None = None, ) -> requests.Response: """ Do ODS post call with the given relative URL. :param str relative_url_part: url part that is joined to conI URL using `/`. :param Message | None message: protobuf message to be send, defaults to None. :param float timeout: maximal time to wait for response. :raises requests.HTTPError: If status code is not 200 or 201. :return requests.Response: requests response if successful. """ if self.__session is None or self.__con_i is None: raise ValueError("No open session!") response = self.__session.post( self.__con_i + "/" + relative_url_part, data=message.SerializeToString() if message is not None else None, timeout=timeout, headers=(headers if headers is not None else self.__default_http_headers), ) self.check_requests_response(response) return response
@staticmethod def check_requests_response(response: requests.Response): if response.status_code not in (200, 201): response.headers if ( "Content-Type" in response.headers and "application/x-asamods+protobuf" == response.headers["Content-Type"] ): error_info = ods.ErrorInfo() error_info.ParseFromString(response.content) raise requests.HTTPError( MessageToJson(error_info), response=response, ) response.raise_for_status()