Work with Instance Data#
In this example Notebook, we show you how to use the Peak ODS Adapter for Apache Spark to interact with ODS data using Spark SQL and DataFrames.
The first section is on configuring the Spark framework and the Peak ODS Adapter for Apache Spark. The fun starts with “Work with Instance Data”.
Happy sparking!
Initialize Spark#
Configure Spark#
Initialize the Spark context and configure it for using the Peak ODS Adapter for Apache Spark as plugin.
In this example we create and connect to a local Spark Master.
from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().set("spark.jars", "/target/spark-ods.jar")
conf.set("spark.sql.repl.eagerEval.enabled",True)
spark = SparkSession.builder.master('local[*]').config(conf = conf).getOrCreate() # or 'spark://spark-master:7077'
sc = spark.sparkContext
Initialize the Peak ODS Adapter for Apache Spark.#
To work with the Peak ODS Adapter for Apache Spark, you need to define the connection information conInfo
to the Peak ODS Server together with the location of the bulk data files on disc.
The connection information is then passed to the connectionManager
to establish the ODS connection. This odsConnection
has to be provided in all Spark ODS operations.
You have to add an override to the ODS MULTI_VOLUME symbol
DISC1
to access the bulk data files in the Spark environment.
conInfo = {
"url": "http://nvhdemo:8080/api/",
"user": "sa",
"password": "sa",
"override.symbol.DISC1": "file:///data/NVH/"
}
connectionManager = sc._jvm.com.peaksolution.sparkods.ods.ConnectionManager.instance
odsConnection = connectionManager.createODSConnection(conInfo)
Work with Instance Data#
Using the “odsinstances” format, individual instances of ODS entities can be loaded by passing the entity name to the load
function. The DataFrame then contains all application attributes as columns and all instances of the loaded application element as rows.
So let’s get all “Projects” of your data…
Only after invoking a terminal method like
show()
data will be loaded from the ODS server and transfered to the client.
In a Notebook
show()
is implicitely called and the first 20 rows will be displayed. You can avoid this by asigning the return value to a variable in your Notebook (see following examples).
spark.read.format("odsinstances").options(**odsConnection).load("Project")
Id | Name | Classification |
---|---|---|
1 | PMV 2PV | 0 |
2 | PMV Model P | 0 |
3 | PMV Summit | 0 |
There were not such many attributes for “Projects”, but this change, when working with more complex entities like “MeaResult”. In this case, you can examine the schema of the DataFrame (Experts see that the ODS datatypes are mapped to Spark datatypes 😉).
Note: The “nullable” property of the column field is always returned as “true”. Please refer to your ASAM ODS datamodel to determine if a column (attribute) is nullable.
mearesults = spark.read.format("odsinstances").options(**odsConnection).load("MeaResult")
mearesults.printSchema()
root
|-- Description: string (nullable = true)
|-- StorageType: integer (nullable = true)
|-- Size: long (nullable = true)
|-- MeasurementEnd: timestamp (nullable = true)
|-- DateCreated: timestamp (nullable = true)
|-- Id: long (nullable = true)
|-- analytic_path: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- description: string (nullable = true)
| | |-- mimeType: string (nullable = true)
| | |-- location: string (nullable = true)
|-- Name: string (nullable = true)
|-- MeasurementBegin: timestamp (nullable = true)
|-- TestStep: long (nullable = true)
|-- Classification: long (nullable = true)
|-- TplMeaResult: long (nullable = true)
|-- TestSequence: long (nullable = true)
|-- TestEquipment: long (nullable = true)
|-- UnitUnderTest: long (nullable = true)
Let’s start working on our Spark DataFrame by selecting columns, defining filter conditions and sorting by defined columns.
Enjoy sparking…. 👍
mearesults.select("Id", "Name", "DateCreated").where("Name like 'C%'").orderBy("DateCreated", ascending=False)
Id | Name | DateCreated |
---|---|---|
3 | Channel | 2019-07-03 16:02:30 |
164 | Compressed - mics | 2014-09-23 13:18:26 |
168 | Compressed - acc ... | 2014-09-23 13:18:26 |
172 | Compressed - CAN | 2014-09-23 13:18:26 |
192 | Compressed - acc ... | 2014-09-23 13:18:26 |
188 | Compressed - mics | 2014-09-23 13:18:26 |
196 | Compressed - CAN | 2014-09-23 13:18:26 |
2000 | Channel | 2014-03-17 15:48:22 |
The central entity in our data repository is the measurement (for instance “MeaResult”). To make your life easier working with measurements we’ve created the virtual element “measuredContext”.
You can use the “measuredContext” to generate a DataFrame containing the measurement entity (derivded from “AoMeasurement”), together with all its parent elements and the context elements (like UUT, Test Sequence, Test Equipment,…).
The columns names of a “measuredContext” DataFrame are formatted as <Entity name>___<Attribute name>
.
spark.read.format("odsinstances").options(**odsConnection).load("measuredContext") \
.select("Project___Name", "StructureLevel___Name", "Test___Name", "TestStep___Name", "MeaResult___Name", "vehicle___manufacturer") \
.limit(100) \
.where("TestStep___Name like 'PBN%'")
Project___Name | StructureLevel___Name | Test___Name | TestStep___Name | MeaResult___Name | vehicle___manufacturer |
---|---|---|---|---|---|
PMV Model P | PBN Measurements | PBN_UNECE_R51_201... | PBN_UNECE_R51_Rig... | Channel | Peak Motor Vehicles |
PMV Summit | PBN Measurements | PBN_UNECE_R51_201... | PBN_UNECE_R51_Rig... | Channel | Peak Motor Vehicles |
Close the SparkContext#
It is a good practice to close the SparkContext when you’re done with it.
This will ensure that all Spark-related operations are properly terminated before your program exits.
sc.stop()
License#
Copyright © 2025 Peak Solution GmbH
The training material in this repository is licensed under a Creative Commons BY-NC-SA 4.0 license. See LICENSE file for more information.