ASAM ODS Big Data Connector#

In this example Notebook, we show you how to use the Peak ODS Adapter for Apache Spark to export your data to the big data world (Avro/Parquet) by using the ASAM ODS Big Data Connector functionalities.

The first section is on configuring the Spark framework and the Peak Spark ODS Adapter. The fun starts with “Working with Peak Spark ODS Adapter”.

Happy sparking!

Initialize Spark#

Configure Spark#

Initialize the Spark context and configure it for using the Peak ODS Adapter for Apache Spark as plugin.

In this example we create and connect to a local Spark Master.

from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().set("spark.jars", "/target/spark-ods.jar")
conf.set("spark.sql.repl.eagerEval.enabled",True)
conf.set("spark.plugins", "com.peaksolution.sparkods.ods.SparkOdsPlugin")
conf.set("spark.odsplugin.host", "fls-test")

spark = SparkSession.builder.master('local[*]').config(conf = conf).getOrCreate() # or 'spark://spark-master:7077'
sc = spark.sparkContext

Initialize Peak ODS Adapter for Apache Spark.#

To work with the Peak ODS Adapter for Apache Spark, you need to define the connection information conInfo to the Peak ODS Server together with the location of the bulk data files on disc.

The connection information is then passed to the connectionManager to establish the ODS connection. This odsConnection has to be provided in all Spark ODS operations.

You have to add an override to the ODS MULTI_VOLUME symbol DISC1 to access the bulk data files in the Spark environment.

conInfo = {
    "url": "http://nvhdemo:8080/api/",
    "user": "sa",
    "password": "sa",
    "override.symbol.DISC1": "file:///data/NVH/"
}

connectionManager = sc._jvm.com.peaksolution.sparkods.ods.ConnectionManager.instance
odsConnection = connectionManager.createODSConnection(conInfo)

Export ODS Big Data#

The Peak ODS Adapter for Apache Spark allows exporting ASAM ODS instance and measurement (mass) data in a format suitable for big data analysis tools in the ways defined by the ASAM ODS standard: AVRO/JSON for instance data and Parquet for mass data.

However, the Peak ODS Adapter for Apache Spark allows using all build-in Spark Data Sources for writing, so you’re free to choose the most suitable format for your application.

Don’t get confused with Data Source formats. The Peak ODS Adapter for Apache Spark defines the ODS connectivity as a Data Source format like all other Spark Adaptors. So you use the Peak ODS Adapter for Apache Spark Data Source formats to get the data into the Spark DataFrame (read) and the other Spark Data Source formats for exporting the DataFrame (write).

Exporting data according to the ASAM ODS Big Data standards is a two step approach:

  • First you “select” the data to be loaded from the Peak ODS Server.

  • Then you specify how your Spark DataFrame is stored.

Read Instance Data using an Export Definition File#

To read instance data according to the ASAM ODS Big Data Connector standard, define “mappedodsinstances” as Data Source format.

Define the export definition file by specifying the file name as “mappingrulefile” option.

Define a specific rule of the Export Definiiotn File to be executed - in our example “MeaResultExport”.

Within an ASAM ODS Export Definition File you can select which attributes of a certain entity to be loaded, you can map the attribute to a different name (alias) and you can implicitely join other entities of your data model (schema).

You can also write to Avro using the “odsinstance” Data Source format of the Peak Spark ODS Adapter.

df = spark.read.format("mappedodsinstances")\
    .options(**odsConnection)\
    .option("mappingrulefile","mdmexportdefinition.xml")\
    .load("MeaResultExport")

After loadding the data you can export the data to Avro:

df.show()
df.write.format("avro").save("mearesult.avro")
+----------+---------+--------------------+------+-----------+-------------+--------+--------+-------------------+-------------------+---+-------------+--------------------+--------------------+-------------------+--------------------+
|ao_element|ao_source|               ao_id|ao_iid|Description|  StorageType|MDMLinks|    Size|     MeasurementEnd|        DateCreated| Id|analytic_path|            MimeType|                Name|   MeasurementBegin|            TestStep|
+----------+---------+--------------------+------+-----------+-------------+--------+--------+-------------------+-------------------+---+-------------+--------------------+--------------------+-------------------+--------------------+
| MeaResult|  NVHDEMO| NVHDEMO_MeaResult_3|     3|           |     database|      []|       0|1970-01-01 00:00:00|2019-07-03 16:02:30|  3|           []|application/x-asa...|             Channel|1970-01-01 00:00:00|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   150|           |external_only|      []|     420|1970-01-01 00:00:00|1970-01-01 00:00:00|150|           []|application/x-asa...|            geometry|1970-01-01 00:00:00|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   151|           |external_only|      []|     192|2014-09-23 13:18:26|2014-09-23 13:18:26|151|           []|application/x-asa...|Slow quantity - a...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   152|           |external_only|      []|     896|2014-09-23 13:18:26|2014-09-23 13:18:26|152|           []|application/x-asa...| Slow quantity - CAN|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   153|           |external_only|      []|  150708|2014-09-23 13:18:26|2014-09-23 13:18:26|153|           []|application/x-asa...|APS - acc and mag...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   154|           |external_only|      []|   20328|2014-09-23 13:18:26|2014-09-23 13:18:26|154|           []|application/x-asa...|    Order APS - mics|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   155|           |external_only|      []|  601908|2014-09-23 13:18:26|2014-09-23 13:18:26|155|           []|application/x-asa...|          APS - mics|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   156|           |external_only|      []|    2188|2014-09-23 13:18:26|2014-09-23 13:18:26|156|           []|application/x-asa...|   1/3 Octave - mics|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   157|           |external_only|      []|   80328|2014-09-23 13:18:26|2014-09-23 13:18:26|157|           []|application/x-asa...|Order APS - acc a...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   158|           |external_only|      []|     128|2014-09-23 13:18:26|2014-09-23 13:18:26|158|           []|application/x-asa...|Order complex;c:2...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   159|           |external_only|      []|     128|2014-09-23 13:18:26|2014-09-23 13:18:26|159|           []|application/x-asa...|Order complex;c:4...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   160|           |external_only|      []|     128|2014-09-23 13:18:26|2014-09-23 13:18:26|160|           []|application/x-asa...|Order complex;c:6...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   161|           |external_only|      []|13107200|2014-09-23 13:18:26|2014-09-23 13:18:26|161|           []|application/x-asa...|   Throughput - mics|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   162|           |     database|      []|     800|2014-09-23 13:18:26|2014-09-23 13:18:26|162|           []|application/x-asa...|    Integrity - mics|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   163|           |     database|      []|     960|2014-09-23 13:18:26|2014-09-23 13:18:26|163|           []|application/x-asa...|        Level - mics|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   164|           |external_only|      []|    3200|2014-09-23 13:18:26|2014-09-23 13:18:26|164|           []|application/x-asa...|   Compressed - mics|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   165|           |external_only|      []| 1966080|2014-09-23 13:18:26|2014-09-23 13:18:26|165|           []|application/x-asa...|Throughput - acc ...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   166|           |     database|      []|     800|2014-09-23 13:18:26|2014-09-23 13:18:26|166|           []|application/x-asa...|Integrity - acc a...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   167|           |     database|      []|     960|2014-09-23 13:18:26|2014-09-23 13:18:26|167|           []|application/x-asa...|Level - acc and m...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
| MeaResult|  NVHDEMO|NVHDEMO_MeaResult...|   168|           |external_only|      []|    3840|2014-09-23 13:18:26|2014-09-23 13:18:26|168|           []|application/x-asa...|Compressed - acc ...|2014-09-23 13:18:26|{TestStep, NVHDEM...|
+----------+---------+--------------------+------+-----------+-------------+--------+--------+-------------------+-------------------+---+-------------+--------------------+--------------------+-------------------+--------------------+
only showing top 20 rows

Read ODS measurement (mass) data#

Use the “ods” format as Data Source, to load measurement data.

Depending which measurements you load, your DataFrame will contain different measurement quantities and thus different columns.

In addition, the DataFrame will contain a special column “idref” which contains a globally unique identifier to identify the measurement to which the values of the actual row belong - which is espcially useful in case several measurements are contained in the same DataFrame.

You can additionally load the data as binary (packed) and in chunks (to avoid big blobs) as well as treating column (local column) names as “case sensitive”

df = spark.read.format("ods")\
    .options(**odsConnection)\
    .option("mode", "packed")\
    .option("maxChunkCount", 10000)\
    .option("caseSensitive", "true")\
    .load("where MeaResult.Id = 3")

Having a look at the first 10 rows.

df.limit(10)
namedatalengthflagsnameindepnameordnumidrefdatatypecompressionminmax
CHANNEL05[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull
CHANNEL08[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull
CHANNEL03[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull
CHANNEL01[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull
CHANNEL06[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull
CHANNEL04[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull
CHANNEL07[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull
CHANNEL02[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull
CHANNEL09[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull
CHANNEL10[1F 8B 08 00 00 0...3001nullX-Axis1NVHDEMO_SubMatrix_3doublegzipnullnull

After loadding the data you can export the data to Parquet:

df.write.parquet("mearesult_id_3")

Clean up and delete the parquet file afterwards for the next run of this notebook….

rm -r mearesult_id_3

Close the SparkContext#

It is a good practice to close the SparkContext when you’re done with it.

This will ensure that all Spark-related operations are properly terminated before your program exits.

sc.stop()

License#

Copyright © 2024 Peak Solution GmbH

The training material in this repository is licensed under a Creative Commons BY-NC-SA 4.0 license. See LICENSE file for more information.