Source code for odsbox.con_i

"""
Helper for ASAM ODS HTTP API conI session

Example::

    from odsbox.con_i import ConI

    with ConI(
        url="http://localhost:8087/api",
        auth=("sa", "sa")
    ) as con_i:
        units = con_i.query_data({"AoUnit": {}})

"""

from __future__ import annotations

import logging
import os

import requests
import requests.auth
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message
from pandas import DataFrame

import odsbox.proto.ods_pb2 as ods
from odsbox.bulk_reader import BulkReader
from odsbox.datamatrices_to_pandas import to_pandas
from odsbox.jaquel import Jaquel
from odsbox.model_cache import ModelCache
from odsbox.security import Security
from odsbox.transaction import Transaction


[docs] class ConI: """ This is a helper to hold an ASAM ODS HTTP API ConI session. Example:: from odsbox.con_i import ConI with ConI( url="http://localhost:8087/api", auth=("sa", "sa") ) as con_i: units = con_i.query_data({"AoUnit": {}}) """ __log: logging.Logger = logging.getLogger(__name__) __default_http_headers: dict[str, str] = { "Content-Type": "application/x-asamods+protobuf", "Accept": "application/x-asamods+protobuf", }
[docs] def __init__( self, url: str = "http://localhost:8080/api", auth: requests.auth.AuthBase | tuple[str, str] = ("sa", "sa"), context_variables: ods.ContextVariables | dict | None = None, verify_certificate: bool = True, load_model: bool = True, allow_redirects: bool = False, connection_timeout: float = 60.0, request_timeout: float = 600.0, ) -> None: """ Create a session object keeping track of ASAM ODS session URL named `conI`. Example:: from odsbox.con_i import ConI # basic auth with ConI( url="http://localhost:8087/api", auth=("sa", "sa") ) as con_i: units = con_i.query_data({"AoUnit": {}}) Example:: import requests from odsbox.con_i import ConI class BearerAuth(requests.auth.AuthBase): def __init__(self, token): self.token = token def __call__(self, r): r.headers["authorization"] = "Bearer " + self.token return r # bearer auth with ConI( url="http://localhost:8087/api", auth=BearerAuth("YOUR_BEARER_TOKEN") ) as con_i: units = con_i.query_data({"AoUnit": {}}) :param str url: base url of the ASAM ODS API of a given server. An example is "http://localhost:8080/api". :param requests.auth.AuthBase auth: An auth object to be used for the used requests package. For basic auth `("USER", "PASSWORD")` can be used. :param ods.ContextVariables | dict | None context_variables: If context variables are necessary for the connection they are passed here. It defaults to None. :param bool verify_certificate: If no certificate is provided for https insecure access can be enabled. It defaults to True. :param bool load_model: If the model should be read after connection is established. It defaults to True. :param bool allow_redirects: If redirects should be allowed in requests calls. It defaults to False. :param float connection_timeout: Timeout in seconds for establishing connections. It defaults to 60.0. :param float request_timeout: Timeout in seconds for individual requests. It defaults to 600.0. :raises requests.HTTPError: If connection to ASAM ODS server fails. """ self.__session: requests.Session | None = None self.__con_i: str | None = None self.__security: Security | None = None self.__mc: ModelCache | None = None self.__allow_redirects: bool = allow_redirects self.__bulk_reader: BulkReader | None = None self.__connection_timeout: float = connection_timeout self.__request_timeout: float = request_timeout session = requests.Session() session.auth = auth session.verify = verify_certificate _context_variables = None if isinstance(context_variables, ods.ContextVariables): _context_variables = context_variables else: _context_variables = ods.ContextVariables() if isinstance(context_variables, dict): for key, value in context_variables.items(): _context_variables.variables[key].string_array.values.append(value) response = session.post( url + "/ods", data=_context_variables.SerializeToString(), timeout=self.__connection_timeout, headers=self.__default_http_headers, allow_redirects=self.__allow_redirects, ) if 201 == response.status_code: con_i = response.headers["location"] self.__log.debug("ConI: %s", con_i) self.__session = session self.__con_i = con_i self.check_requests_response(response) if load_model: # lets cache the model self.model_read()
def __del__(self) -> None: self.close() def __enter__(self) -> ConI: return self def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_traceback: object ) -> None: self.close()
[docs] def close(self) -> None: """ Close the attached session at the ODS server by calling delete on the session URL and closing the requests session. No exception is raised if logout fails. """ try: self.logout() except Exception as e: self.__log.exception("Exception during logout in close: %s", e)
[docs] def con_i_url(self) -> str: """ Get the ASAM ODS session URL used to work with this session. :return str: The ASAM ODS session URL """ if self.__con_i is None: raise ValueError("ConI already closed") return self.__con_i
[docs] def logout(self) -> None: """ Close the attached session at the ODS server by calling delete on the session URL and closing the requests session. :raises requests.HTTPError: If delete the ASAM ODS session fails. """ if self.__session is not None: try: if self.__con_i is not None: response = self.__session.delete( self.__con_i, timeout=self.__connection_timeout, headers={"Accept": "application/x-asamods+protobuf"}, allow_redirects=self.__allow_redirects, ) self.check_requests_response(response) finally: self.__con_i = None self.__session.close() self.__session = None self.__security = None self.__bulk_reader = None self.__mc = None
[docs] def query( self, jaquel_query: str | dict, enum_as_string: bool = True, date_as_timestamp: bool = True, is_null_to_nan: bool = True, result_naming_mode: str = "query", # "query" or "model" **kwargs, ) -> DataFrame: """ Query ods server for content using JAQueL query and return the results as Pandas DataFrame. This method combines the JAQUEL query language with pandas DataFrames for convenient data access. Result column names can be controlled via the `result_naming_mode` parameter to match either your query specification (JAQUEL mode) or the schema entity names (model mode). Example - Basic Query:: result = con_i.query({"AoUnit": {}}) print(result.columns) # Output: Index(['Name', 'Id', 'PhysDimension'], ...) Example - Query with Column Selection:: # Select specific columns using query-based column names (default) query = { "AoUnit": {}, "$attributes": { "name": 1, "id": 1, "phys_dimension.name": 1 } } result = con_i.query(query) print(result.columns) # Output: Index(['name', 'id', 'phys_dimension.name'], ...) Example - Same Query with Model Column Names:: # Same query but with model/schema column names result = con_i.query(query, result_naming_mode="model") print(result.columns) # Output: Index(['Unit.Name', 'Unit.Id', 'PhysDimension.Name'], ...) :param str | dict jaquel_query: JAQueL query as dict or str. :param bool enum_as_string: Columns of type DT_ENUM or DS_ENUM are returned as int values. If this is set to True the model_cache is used to map the int values to the corresponding string values. Defaults to True. :param bool date_as_timestamp: Columns of type DT_DATE or DS_DATE are returned as string. If this is set to True the strings are converted to pandas Timestamp. Defaults to True. :param bool is_null_to_nan: If set to True, the is_null flags are used to set corresponding values to pd.NA. This uses pandas native nullable data types for better type preservation. Defaults to True. :param str result_naming_mode: Controls how result column names are generated. "query" (default): Uses column names from the JAQUEL query (e.g., 'name', 'phys_dimension.name'). "model": Uses column names from the ods.Model schema (e.g., 'Unit.Name', 'PhysDimension.Name'). :param kwargs: Additional arguments passed to `to_pandas`. :raises requests.HTTPError: If query fails. :return DataFrame: The DataMatrices as Pandas.DataFrame with columns named according to `result_naming_mode`. """ if result_naming_mode not in ("query", "model"): raise ValueError(f"result_naming_mode must be 'query' or 'model', got '{result_naming_mode}'") jaquel = Jaquel(self.model(), jaquel_query) data_matrices = self.data_read(jaquel.select_statement) return to_pandas( data_matrices, model_cache=self.mc, enum_as_string=enum_as_string, date_as_timestamp=date_as_timestamp, is_null_to_nan=is_null_to_nan, jaquel_conversion_result=jaquel if result_naming_mode == "query" else None, **kwargs, )
[docs] def query_data( self, query: str | dict | ods.SelectStatement, enum_as_string: bool = False, date_as_timestamp: bool = False, is_null_to_nan: bool = False, result_naming_mode: str = "model", **kwargs, ) -> DataFrame: """ Query ods server for content and return the results as Pandas DataFrame. This is a lower-level variant of query() with different defaults: - Defaults to model column names (result_naming_mode="model") - No automatic enum/date/null conversions by default - Can accept raw ASAM ODS SelectStatement objects :param str | dict | ods.SelectStatement query: Query given as JAQueL query (dict or str) or as an ASAM ODS SelectStatement. :param bool enum_as_string: Columns of type DT_ENUM or DS_ENUM are returned as int values. If this is set to True the model_cache is used to map the int values to the corresponding string values. Defaults to False. :param bool date_as_timestamp: Columns of type DT_DATE or DS_DATE are returned as string. If this is set to True the strings are converted to pandas Timestamp. Defaults to False. :param bool is_null_to_nan: If set to True, the is_null flags are used to set corresponding values to pd.NA. This uses pandas native nullable data types for better type preservation. Defaults to False. :param str result_naming_mode: Controls how result column names are generated. "query": Uses column names from the JAQUEL query. "model" (default): Uses column names from the ods.Model schema. :param kwargs: Additional arguments passed to `to_pandas`. :raises requests.HTTPError: If query fails. :return DataFrame: The DataMatrices as Pandas.DataFrame with columns named according to `result_naming_mode`. """ if result_naming_mode not in ("query", "model"): raise ValueError(f"result_naming_mode must be 'query' or 'model', got '{result_naming_mode}'") if isinstance(query, ods.SelectStatement): jaquel = None select_statement = query else: jaquel = Jaquel(self.model(), query) select_statement = jaquel.select_statement data_matrices = self.data_read(select_statement) return to_pandas( data_matrices, model_cache=self.mc, enum_as_string=enum_as_string, date_as_timestamp=date_as_timestamp, is_null_to_nan=is_null_to_nan, jaquel_conversion_result=jaquel if result_naming_mode == "query" else None, **kwargs, )
[docs] def model(self) -> ods.Model: """ Get the cache ODS server model. This model will return the cached application model related to your session. :return ods.Model: The application model of the ASAM ODS server. """ return self.mc.model()
[docs] def data_read_jaquel(self, jaquel_query: str | dict) -> ods.DataMatrices: """ Query ods server for content. :param str | dict jaquel_query: Query given as JAQueL query (dict or str). :raises requests.HTTPError: If query fails. :return ods.DataMatrices: The DataMatrices representing the result. It will contain one ods.DataMatrix for each returned entity type. """ jaquel = Jaquel(self.model(), jaquel_query) return self.data_read(jaquel.select_statement)
[docs] def data_read(self, select_statement: ods.SelectStatement) -> ods.DataMatrices: """ Query ods server for content. :param ods.SelectStatement select_statement: Query given as ASAM ODS SelectStatement. :raises requests.HTTPError: If query fails. :return ods.DataMatrices: The DataMatrices representing the result. It will contain one ods.DataMatrix for each returned entity type. """ if not isinstance(select_statement, ods.SelectStatement): raise TypeError(f"data_read expects 'ods.SelectStatement', got '{type(select_statement).__name__}'") response = self.ods_post_request("data-read", select_statement) return_value = ods.DataMatrices() return_value.ParseFromString(response.content) return return_value
[docs] def data_create(self, data: ods.DataMatrices) -> list[int]: """ Create new ASAM ODS instances or write bulk data. :param ods.DataMatrices data: Matrices containing columns for instances to be created. :raises requests.HTTPError: If creation fails. :return List[int]: list of ids created from your request. """ if not isinstance(data, ods.DataMatrices): raise TypeError(f"data_create expects 'ods.DataMatrices', got '{type(data).__name__}'") response = self.ods_post_request("data-create", data) return_value = ods.DataMatrices() return_value.ParseFromString(response.content) return list(return_value.matrices[0].columns[0].longlong_array.values)
[docs] def data_update(self, data: ods.DataMatrices) -> None: """ Update existing instances. :param ods.DataMatrices data: Matrices containing columns for instances to be updated. The `id` column is used to identify the instances to be updated. :raises requests.HTTPError: If update fails. """ if not isinstance(data, ods.DataMatrices): raise TypeError(f"data_update expects 'ods.DataMatrices', got '{type(data).__name__}'") self.ods_post_request("data-update", data)
[docs] def data_delete(self, data: ods.DataMatrices, timeout: float | None = None) -> None: """ Delete existing instances. :param ods.DataMatrices data: Matrices containing columns for instances to be deleted. The `id` column is used to identify the instances to be deleted. :param float | None timeout: maximal time to wait for response. Delete might take longer time. Uses the request_timeout from constructor if None. :raises requests.HTTPError: If delete fails. """ if not isinstance(data, ods.DataMatrices): raise TypeError(f"data_delete expects 'ods.DataMatrices', got '{type(data).__name__}'") self.ods_post_request("data-delete", data, timeout=timeout)
[docs] def data_copy(self, copy_request: ods.CopyRequest) -> ods.Instance: """ Copy an Instance and its related children. :param ods.CopyRequest copy_request: Define instance to be copied. :raises requests.HTTPError: If copy fails. :return ods.Instance: Newly created instance """ if not isinstance(copy_request, ods.CopyRequest): raise TypeError(f"data_copy expects 'ods.CopyRequest', got '{type(copy_request).__name__}'") response = self.ods_post_request("data-copy", copy_request) return_value = ods.Instance() return_value.ParseFromString(response.content) return return_value
[docs] def n_m_relation_read(self, identifier: ods.NtoMRelationIdentifier) -> ods.NtoMRelatedInstances: """ Read n-m relations for a defined instance. :param ods.NtoMRelationIdentifier identifier: identify n to m relation to be read. :raises requests.HTTPError: If read fails. :return ods.NtoMRelatedInstances: Return n to m related instances that were queried. """ if not isinstance(identifier, ods.NtoMRelationIdentifier): raise TypeError( f"n_m_relation_read expects 'ods.NtoMRelationIdentifier', got '{type(identifier).__name__}'" ) response = self.ods_post_request("n-m-relation-read", identifier) return_value = ods.NtoMRelatedInstances() return_value.ParseFromString(response.content) return return_value
[docs] def n_m_relation_write(self, related_instances: ods.NtoMWriteRelatedInstances) -> None: """ Update, delete or create n-m relations for given instance pairs. :raises requests.HTTPError: If write fails. :param ods.NtoMWriteRelatedInstances related_instances: related instances to be updated, deleted or created. """ if not isinstance(related_instances, ods.NtoMWriteRelatedInstances): raise TypeError( f"n_m_relation_write expects 'ods.NtoMWriteRelatedInstances', got '{type(related_instances).__name__}'" ) self.ods_post_request("n-m-relation-write", related_instances)
[docs] def transaction(self) -> Transaction: """ Open a transaction object to be used in a with clause Example:: with con_i.transaction() as transaction: # do writing transaction.commit() :raises requests.HTTPError: If creation of transaction fails. :return Transaction: transaction object that will abort automatically if commit is not called. """ return Transaction(self)
[docs] def transaction_create(self) -> None: """ Open a transaction for writing. :raises requests.HTTPError: If creation of transaction fails. """ self.ods_post_request("transaction-create")
[docs] def transaction_commit(self) -> None: """ Commit transaction created before. :raises requests.HTTPError: If creation of transaction fails. """ self.ods_post_request("transaction-commit")
[docs] def transaction_abort(self) -> None: """ Abort transaction created before. :raises requests.HTTPError: If creation of transaction fails. """ self.ods_post_request("transaction-abort")
[docs] def valuematrix_read(self, request: ods.ValueMatrixRequestStruct) -> ods.DataMatrices: """ Read bulk data from a submatrix or measurement. Submatrix access can also be done using data-read. :param ods.ValueMatrixRequestStruct request: Define measurement or submatrix to create ASAM ODS ValueMatrix for. :raises requests.HTTPError: If ValueMatrix access fails. :return ods.DataMatrices: DataMatrices containing the bulk data for the request. """ if not isinstance(request, ods.ValueMatrixRequestStruct): raise TypeError(f"valuematrix_read expects 'ods.ValueMatrixRequestStruct', got '{type(request).__name__}'") response = self.ods_post_request("valuematrix-read", request) return_value = ods.DataMatrices() return_value.ParseFromString(response.content) return return_value
[docs] def model_read(self) -> ods.Model: """ Read the model from server and update cached version. :raises requests.HTTPError: If model read fails. :return ods.Model: The application model of the server. """ response = self.ods_post_request("model-read") model = ods.Model() model.ParseFromString(response.content) self.__mc = ModelCache(model) return model
[docs] def model_update(self, model_parts: ods.Model, update_model: bool = True) -> None: """ Update application model content. This method is used to modify existing items or create new ones. :param ods.Model model_parts: parts of the model to be updated or created. :param bool update_model: determine if the model cache of this ConI instance should be updated by reading the whole model again. It defaults to True. :raises requests.HTTPError: If model update fails. """ if not isinstance(model_parts, ods.Model): raise TypeError(f"model_update expects 'ods.Model', got '{type(model_parts).__name__}'") self.ods_post_request("model-update", model_parts) if update_model: # cache again if successfully changed self.model_read()
[docs] def model_delete(self, model_parts: ods.Model, update_model: bool = True) -> None: """ Delete application model content. :param ods.Model model_parts: define model parts to be deleted. :param bool update_model: determine if the model cache of this ConI instance should be updated by reading the whole model again. It defaults to True. :raises requests.HTTPError: If model update fails. """ if not isinstance(model_parts, ods.Model): raise TypeError(f"model_delete expects 'ods.Model', got '{type(model_parts).__name__}'") self.ods_post_request("model-delete", model_parts) if update_model: # cache again if successfully changed self.model_read()
[docs] def model_check(self) -> None: """ Check if stored application model is consistent. :raises requests.HTTPError: If model contains errors. """ self.ods_post_request("model-check")
[docs] def basemodel_read(self) -> ods.BaseModel: """ Read the ODS base model version used by the server. :raises requests.HTTPError: If reading base model fails. :return ods.BaseModel: used server base model. """ response = self.ods_post_request("basemodel-read") base_model = ods.BaseModel() base_model.ParseFromString(response.content) return base_model
[docs] def asampath_create(self, instance: ods.Instance) -> ods.AsamPath: """ Create an persistent string representing the instance. :param ods.Instance instance: Instance to be get AsamPath for. :raises requests.HTTPError: If creation fails. :return ods.AsamPath: The AsamPath that represents the instance. """ if not isinstance(instance, ods.Instance): raise TypeError(f"asampath_create expects 'ods.Instance', got '{type(instance).__name__}'") response = self.ods_post_request("asampath-create", instance) return_value = ods.AsamPath() return_value.ParseFromString(response.content) return return_value
[docs] def asampath_resolve(self, asam_path: ods.AsamPath) -> ods.Instance: """ Use the persistent string to get back the instance. :param ods.AsamPath asam_path: AsamPath to be resolved. :raises requests.HTTPError: If path could not be resolved. :return ods.Instance: Instance represented by AsamPath. """ if not isinstance(asam_path, ods.AsamPath): raise TypeError(f"asampath_resolve expects 'ods.AsamPath', got '{type(asam_path).__name__}'") response = self.ods_post_request("asampath-resolve", asam_path) return_value = ods.Instance() return_value.ParseFromString(response.content) return return_value
[docs] def context_read(self, pattern_or_filter: ods.ContextVariablesFilter | str = "*") -> ods.ContextVariables: """ Read the session context variables. :param ods.ContextVariablesFilter | str pattern_or_filter: Context variable filter as str or ContextVariablesFilter. It defaults to "*" to return all variables. :raises requests.HTTPError: If something went wrong. :return ods.ContextVariables: ContextVariables where the name matches the filter. """ context_variables_filter = ( pattern_or_filter if isinstance(pattern_or_filter, ods.ContextVariablesFilter) else ods.ContextVariablesFilter(pattern=pattern_or_filter) ) response = self.ods_post_request("context-read", context_variables_filter) return_value = ods.ContextVariables() return_value.ParseFromString(response.content) return return_value
[docs] def context_update(self, context_variables: ods.ContextVariables) -> None: """ Set context variables for current session. This will set context variables for the given session. If new session is created they will fall back to their default. :param ods.ContextVariables context_variables: ContextVariables to be set or updated. :raises requests.HTTPError: If something went wrong. """ if not isinstance(context_variables, ods.ContextVariables): raise TypeError(f"context_update expects 'ods.ContextVariables', got '{type(context_variables).__name__}'") self.ods_post_request("context-update", context_variables)
[docs] def password_update(self, password_update: ods.PasswordUpdate) -> None: """ Update the password of the defined user. :param ods.PasswordUpdate password_update: Defines for which user the password should eb updated. :raises requests.HTTPError: If something went wrong. """ if not isinstance(password_update, ods.PasswordUpdate): raise TypeError(f"password_update expects 'ods.PasswordUpdate', got '{type(password_update).__name__}'") self.ods_post_request("password-update", password_update)
[docs] def file_access(self, file_identifier: ods.FileIdentifier) -> str: """ Get file access URL for file content. :param ods.FileIdentifier file_identifier: Define content to be accessed. Might be an AoFile or a DT_BLOB attribute. :raises requests.HTTPError: If something went wrong. :raises ValueError: If no file location provided by server. :return str: The server file URL. """ if not isinstance(file_identifier, ods.FileIdentifier): raise TypeError(f"file_access expects 'ods.FileIdentifier', got '{type(file_identifier).__name__}'") response = self.ods_post_request("file-access", file_identifier) server_file_url = response.headers.get("location") if server_file_url is None: raise ValueError("No file location provided by server!") return server_file_url
[docs] def file_access_download( self, file_identifier: ods.FileIdentifier, target_file_or_folder: str, overwrite_existing: bool = False, default_filename: str = "download.bin", ) -> str: """ Read file content from server. :param ods.FileIdentifier file_identifier: Define content to be read. Might be an AoFile or a DT_BLOB attribute. :param str target_file_or_folder: Path to save the file content to. If pointing to an existing folder. Original filename will be used. Full path is returned. :param bool overwrite_existing: If existing files should be overwritten. It defaults to False. :param str default_filename: Default filename if no filename is provided by server. It defaults to "download.bin". :raises requests.HTTPError: If something went wrong. :raises FileExistsError: If file already exists and 'overwrite_existing' is False. :raises ValueError: If no open session. :return str: file path of saved file. """ if not isinstance(file_identifier, ods.FileIdentifier): raise TypeError( f"file_access_download expects 'ods.FileIdentifier', got '{type(file_identifier).__name__}'" ) server_file_url = self.file_access(file_identifier) if self.__session is None: raise ValueError("No open session!") file_response = self.__session.get( server_file_url, headers={ "Accept": "application/octet-stream, application/x-asamods+protobuf, */*", }, timeout=self.__request_timeout, allow_redirects=self.__allow_redirects, ) self.check_requests_response(file_response) target_file_path = target_file_or_folder if os.path.isdir(target_file_path): content_disposition = file_response.headers.get( "Content-Disposition", f'attachment; filename="{default_filename}"' ) filename = ( content_disposition.split("filename=")[1].strip('"') if "filename=" in content_disposition else default_filename ) target_file_path = os.path.join(target_file_path, filename) if not overwrite_existing and os.path.exists(target_file_path): raise FileExistsError(f"File '{target_file_path}' already exists and 'overwrite_existing' is False.") with open(target_file_path, "wb") as file: file.write(file_response.content) return target_file_path
[docs] def file_access_upload( self, file_identifier: ods.FileIdentifier, source_file_path: str, ) -> None: """ Upload file content to server. :param ods.FileIdentifier file_identifier: Define content to be written. Might be an AoFile or a DT_BLOB attribute. :param str source_file_path: Path to the file to be uploaded. :raises requests.HTTPError: If something went wrong. :raises FileNotFoundError: If source file was not found. :raises ValueError: If no open session. """ if not isinstance(file_identifier, ods.FileIdentifier): raise TypeError(f"file_access_upload expects 'ods.FileIdentifier', got '{type(file_identifier).__name__}'") if not os.path.isfile(source_file_path): raise FileNotFoundError(f"File '{source_file_path}' not found.") server_file_url = self.file_access(file_identifier) with open(source_file_path, "rb") as file: if self.__session is None: raise ValueError("No open session!") put_response = self.__session.put( server_file_url, data=file, headers={"Content-Type": "application/octet-stream", "Accept": "application/x-asamods+protobuf"}, timeout=self.__request_timeout, allow_redirects=self.__allow_redirects, ) self.check_requests_response(put_response)
[docs] def file_access_delete( self, file_identifier: ods.FileIdentifier, ) -> None: """ Delete file content from server. :param ods.FileIdentifier file_identifier: Define content to be deleted. Might be an AoFile or a DT_BLOB attribute. :raises requests.HTTPError: If something went wrong. :raises ValueError: If no open session. """ if not isinstance(file_identifier, ods.FileIdentifier): raise TypeError(f"file_access_delete expects 'ods.FileIdentifier', got '{type(file_identifier).__name__}'") server_file_url = self.file_access(file_identifier) if self.__session is None: raise ValueError("No open session!") delete_response = self.__session.delete( server_file_url, headers={"Accept": "application/x-asamods+protobuf"}, timeout=self.__request_timeout, allow_redirects=self.__allow_redirects, ) self.check_requests_response(delete_response)
[docs] def ods_post_request( self, relative_url_part: str, message: Message | None = None, timeout: float | None = None, headers: dict[str, str] | None = None, ) -> requests.Response: """ Do ODS post call with the given relative URL. :param str relative_url_part: url part that is joined to conI URL using `/`. :param Message | None message: protobuf message to be send, defaults to None. :param float | None timeout: maximal time to wait for response. If None, uses the request_timeout from constructor. :raises requests.HTTPError: If status code is not 200 or 201. :return requests.Response: requests response if successful. """ if self.__session is None or self.__con_i is None: raise ValueError("No open session!") response = self.__session.post( self.__con_i + "/" + relative_url_part, data=message.SerializeToString() if message is not None else None, timeout=timeout if timeout is not None else self.__request_timeout, headers=(headers if headers is not None else self.__default_http_headers), allow_redirects=self.__allow_redirects, ) self.check_requests_response(response) return response
@staticmethod def check_requests_response(response: requests.Response) -> None: if response.status_code not in (200, 201): response.headers if ( "Content-Type" in response.headers and "application/x-asamods+protobuf" == response.headers["Content-Type"] ): error_info = ods.ErrorInfo() error_info.ParseFromString(response.content) raise requests.HTTPError( MessageToJson(error_info), response=response, ) response.raise_for_status() @property def mc(self) -> ModelCache: """ Get the model cache for the current session. :return ods.ModelCache: ModelCache object containing the cached application model. """ if self.__mc is None: if self.__con_i is None: raise ValueError("ConI already closed!") raise ValueError("Model not read! Call model_read() first.") return self.__mc @property def security(self) -> Security: """ Get the security information for the current session. :raises requests.HTTPError: If security info retrieval fails. :return ods.Security: Security object containing permissions and roles. """ if self.__session is None: raise ValueError("No open session!") if self.__security is None: self.__security = Security(self) return self.__security @property def bulk(self) -> BulkReader: """ Get the bulk reader for the current session. Example:: from odsbox.con_i import ConI with ConI( url="https://MYSERVER/api", auth=("USER", "PASSWORD"), ) as con_i: submatrix_id = 1234 df = con_i.bulk.data_read(submatrix_id, ["Time", "Co*"]) :return ods.BulkReader: BulkReader object for reading data in bulk. """ if self.__session is None: raise ValueError("No open session!") if self.__bulk_reader is None: self.__bulk_reader = BulkReader(self) return self.__bulk_reader