Spark SQL and DataFrames for Peak ODS Server#

In this example Notebook, we show you how to use the Peak ODS Adapter for Apache Spark to interact with ODS data using Spark SQL and DataFrames.

The first section is on configuring the Spark framework and the Peak ODS Adapter for Apache Spark. The fun starts with “Working with Peak ODS Adapter for Apache Spark”.

Happy sparking!

Initialize Spark#

Configure Spark#

Initialize the Spark context and configure it for using the Peak ODS Adapter for Apache Spark as plugin.

In this example we create and connect to a local Spark Master.

from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().set("spark.jars", "/target/spark-ods.jar")
conf.set("spark.sql.repl.eagerEval.enabled",True)
conf.set("spark.plugins", "com.peaksolution.sparkods.ods.SparkOdsPlugin")
conf.set("spark.odsplugin.host", "fls-test")

spark = SparkSession.builder.master('local[*]').config(conf = conf).getOrCreate() # or 'spark://spark-master:7077'
sc = spark.sparkContext

Initialize the Peak ODS Adapter for Apache Spark.#

To work with the Peak ODS Adapter for Apache Spark, you need to define the connection information conInfo to the Peak ODS Server together with the location of the bulk data files on disc.

The connection information is then passed to the connectionManager to establish the ODS connection. This odsConnection has to be provided in all Spark ODS operations.

You have to add an override to the ODS MULTI_VOLUME symbol DISC1 to access the bulk data files in the Spark environment.

conInfo = {
    "url": "http://nvhdemo:8080/api/",
    "user": "sa",
    "password": "sa",
    "override.symbol.DISC1": "file:///data/NVH/"
}

connectionManager = sc._jvm.com.peaksolution.sparkods.ods.ConnectionManager.instance
odsConnection = connectionManager.createODSConnection(conInfo)

Working with Peak ODS Adapter for Apache Spark#

The Peak ODS Adapter for Apache Spark supports two data source formats:

  • odsinstances: Use format("odsinstances") to get instance data. The Spark DataFrame you’re working on will contains all application attributes as columns and all instances of the loaded entity (application element) as rows.

  • ods: Use format("ods") to get measurement (bulk) data. Your Spark DataFrame will contain all channels (MeaQuantities) as columns. When querying data from multiple measurements, it will be concatenated in the DataFrame. The first column in the DataFrame contains the information to which measurement (submatrix) the specifc row belongs.

Working with Instance Data#

Using the “odsinstances” format, individual instances of ODS entities can be loaded by passing the entity name to the load function. The DataFrame then contains all application attributes as columns and all instances of the loaded application element as rows.

So let’s get all “Projects” of your data…

Only after invoking a terminal method like show() data will be loaded from the ODS server and transfered to the client.

In a Notebook show() is implicitely called and the first 20 rows will be displayed. You can avoid this by asigning the return value to a variable in your Notebook (see following examples).

spark.read.format("odsinstances").options(**odsConnection).load("Project")
IdNameClassification
1PMV 2PV0
2PMV Model P0
3PMV Summit0

There were not such many attributes for “Projects”, but this change, when working with more complex entities like “MeaResult”. In this case, you can examine the schema of the DataFrame (Experts see that the ODS datatypes are mapped to Spark datatypes 😉).

Note: The “nullable” property of the column field is always returned as “true”. Please refer to your ASAM ODS datamodel to determine if a column (attribute) is nullable.

mearesults = spark.read.format("odsinstances").options(**odsConnection).load("MeaResult")
mearesults.printSchema()
root
 |-- Description: string (nullable = true)
 |-- StorageType: integer (nullable = true)
 |-- Size: long (nullable = true)
 |-- MeasurementEnd: timestamp (nullable = true)
 |-- DateCreated: timestamp (nullable = true)
 |-- Id: long (nullable = true)
 |-- analytic_path: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- mimeType: string (nullable = true)
 |    |    |-- location: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- MeasurementBegin: timestamp (nullable = true)
 |-- TestStep: long (nullable = true)
 |-- Classification: long (nullable = true)
 |-- TplMeaResult: long (nullable = true)
 |-- TestSequence: long (nullable = true)
 |-- TestEquipment: long (nullable = true)
 |-- UnitUnderTest: long (nullable = true)

Let’s start working on our Spark DataFrame by selecting columns, defining filter conditions and sorting by defined columns.

Enjoy sparking…. 👍

mearesults.select("Id", "Name", "DateCreated").where("Name like 'C%'").orderBy("DateCreated", ascending=False)
IdNameDateCreated
3Channel2019-07-03 16:02:30
164Compressed - mics2014-09-23 13:18:26
168Compressed - acc ...2014-09-23 13:18:26
172Compressed - CAN2014-09-23 13:18:26
192Compressed - acc ...2014-09-23 13:18:26
188Compressed - mics2014-09-23 13:18:26
196Compressed - CAN2014-09-23 13:18:26
2000Channel2014-03-17 15:48:22

The central entity in our data repository is the measurement (for instance “MeaResult”). To make your life easier working with measurements we’ve created the virtual element “measuredContext”.
You can use the “measuredContext” to generate a DataFrame containing the measurement entity (derivded from “AoMeasurement”), together with all its parent elements and the context elements (like UUT, Test Sequence, Test Equipment,…).

The columns names of a “measuredContext” DataFrame are formatted as <Entity name>___<Attribute name>.

spark.read.format("odsinstances").options(**odsConnection).load("measuredContext") \
.select("Project___Name", "StructureLevel___Name", "Test___Name", "TestStep___Name", "MeaResult___Name", "vehicle___manufacturer") \
.limit(100) \
.where("TestStep___Name like 'PBN%'")
Project___NameStructureLevel___NameTest___NameTestStep___NameMeaResult___Namevehicle___manufacturer
PMV Model PPBN MeasurementsPBN_UNECE_R51_201...PBN_UNECE_R51_Rig...ChannelPeak Motor Vehicles
PMV SummitPBN MeasurementsPBN_UNECE_R51_201...PBN_UNECE_R51_Rig...ChannelPeak Motor Vehicles

Working with measurement data#

Now that you know how to work with instance data, let’s have a look at the actual measurement data. You use format("ods") to load measurement data.

In our example we’re looking for a measurement with a specifc “Id” - you may want to try more fancy queries…

df = spark.read.format("ods").options(**odsConnection).load("where MeaResult.Id = 3")

You can now look at the first 10 rows…

df.limit(10)
idrefchannel01channel02channel03channel04channel05channel06channel07channel08channel09channel10x-axis
NVHDEMO_SubMatrix_34.38541E-62.02778-4.44111-4.510251.86265E-6-1.74623E-7-0.1925930.770431-0.5795210.3719261
NVHDEMO_SubMatrix_34.38541E-62.02778-2.03551-4.510251.86265E-6-1.74623E-7-0.1925930.770431-0.5795210.3719262
NVHDEMO_SubMatrix_34.38541E-62.02778-4.44111-4.51025-6.52153-1.74623E-7-0.1925930.770431-0.5795210.3719263
NVHDEMO_SubMatrix_32.401752.02778-4.441112.00455-6.52153-1.74623E-7-0.1925930.770431-0.5795210.3719264
NVHDEMO_SubMatrix_34.38541E-6-0.368683-4.441112.00455-6.52153-1.74623E-7-0.1925930.770431-0.5795210.3719265
NVHDEMO_SubMatrix_34.38541E-6-0.368683-2.03551-4.510251.86265E-6-1.74623E-7-0.1925930.770431-0.5795210.3719266
NVHDEMO_SubMatrix_32.401752.02778-4.44111-4.51025-6.52153-1.74623E-7-0.1925930.770431-0.5795210.3719267
NVHDEMO_SubMatrix_34.38541E-6-0.368683-2.03551-4.510251.86265E-6-1.74623E-7-0.1925930.770431-0.5795210.3719268
NVHDEMO_SubMatrix_34.38541E-62.02778-4.44111-4.51025-6.52153-5.43436-0.1925930.577823-0.5795210.3719269
NVHDEMO_SubMatrix_34.38541E-62.02778-2.03551-4.510251.86265E-6-1.74623E-7-0.1925930.770431-0.5795210.37192610

… or plot them in a line chart …

%matplotlib inline
import pandas as pd
import pyspark
import matplotlib.pyplot as plt

plt.rcParams["figure.figsize"] = (20,10)
pdf = df.toPandas().groupby(["idref", "x-axis"], group_keys=True).sum()
pdf.loc['NVHDEMO_SubMatrix_3'].plot()
plt.show()
../_images/09c36b272cf2fd2a1932e2e7541b5188ba553414e9daa4e52fb9db52e12ffad9.png

Now that your measurement data is in a pandas.DataFrame, you can also use all options using pandas.

Have fun.

Close the SparkContext#

It is a good practice to close the SparkContext when you’re done with it.

This will ensure that all Spark-related operations are properly terminated before your program exits.

sc.stop()

License#

Copyright © 2024 Peak Solution GmbH

The training material in this repository is licensed under a Creative Commons BY-NC-SA 4.0 license. See LICENSE file for more information.