Lesen von Azure Iot Hub Events mit Databricks

Prinzipiell kann dieser Schritt übersprungen werden. Für mich war es allerdings hilfreich, die Events zuerst in der Konsole ausgeben zu lassen.

  1. Erstellen eines Azure IoT Hub

  2. Kopieren des Connection Strings und Event Hub Name

  3. Erstelle ein Notebook

  4. Verbinden zu Event Hub

  5. Ausführen des Kompletten Codes

  6. Senden von Daten über Python

  7. Jetzt sollten die Events in der Konsole ausgegeben werden

2. Kopiere den Connection String und Event Hub Name

  1. Gehe zu IoT Hub

  1. Kopiere "Event Hub-compatible endpoint" und "Event Hub-compatible name"

6. Verbinden zu Event Hub

Ersetze "IOT_CS" und "ehName" mit Ihrem connection string.

IOT_CS = "Endpoint=sb://iothub-ns-iothubd06f-24947894-cf917c9427.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=7nUrbGMSqDVxCTwbkQoL1fkT4Cq5jtl70YPseCOGdVI=;EntityPath=iothubd06f90321d7a48c59a" # dbutils.secrets.get('iot','iothub-cs') # IoT Hub connection string (Event Hub Compatible)
ehConf = { 
  'eventhubs.connectionString':sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(IOT_CS),
  'ehName':"iothubd06f90321d7a48c59a"
}
df = spark.readStream.format("eventhubs").options(**ehConf).load()
df.printSchema()

Die folgende Ausgabe sollte kommen:

7. Ausführen des kompleten Codes

# Import the necessary libraries
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import functions as F

#Erstellen der Structured Streaming Felder
Schema = StructType([
               StructField("IMU_ATTI(0):Longitude" ,FloatType(),False),
StructField("IMU_ATTI(0):Latitude" ,FloatType(),False),
StructField("IMU_ATTI(0):numSats" ,FloatType(),False),
StructField("IMU_ATTI(0):barometer:Raw" ,FloatType(),False),
StructField("IMU_ATTI(0):barometer:Smooth" ,FloatType(),False),
StructField("IMU_ATTI(0):accel:X" ,FloatType(),False),
StructField("IMU_ATTI(0):accel:Y" ,FloatType(),False),
StructField("IMU_ATTI(0):accel:Z" ,FloatType(),False),
StructField("IMU_ATTI(0):accel:Composite" ,FloatType(),False),
StructField("IMU_ATTI(0):gyro:X" ,FloatType(),False),
StructField("IMU_ATTI(0):gyro:Y" ,FloatType(),False),
StructField("IMU_ATTI(0):gyro:Z" ,FloatType(),False),
StructField("IMU_ATTI(0):gyro:Composite" ,FloatType(),False),
StructField("IMU_ATTI(0):mag:X" ,FloatType(),False),
StructField("IMU_ATTI(0):mag:Y" ,FloatType(),False),
StructField("IMU_ATTI(0):mag:Z" ,FloatType(),False),
StructField("IMU_ATTI(0):mag:Mod" ,FloatType(),False),
StructField("IMU_ATTI(0):velN" ,FloatType(),False),
StructField("IMU_ATTI(0):velE" ,FloatType(),False),
StructField("IMU_ATTI(0):velD" ,FloatType(),False),
StructField("IMU_ATTI(0):velComposite" ,FloatType(),False),
StructField("IMU_ATTI(0):velH" ,FloatType(),False),
StructField("IMU_ATTI(0):GPS-H" ,FloatType(),False),
StructField("IMU_ATTI(0):roll" ,FloatType(),False),
StructField("IMU_ATTI(0):pitch" ,FloatType(),False),
StructField("IMU_ATTI(0):yaw" ,FloatType(),False),
StructField("IMU_ATTI(0):yaw360" ,FloatType(),False),
StructField("IMU_ATTI(0):totalGyro:Z" ,FloatType(),False),
StructField("IMU_ATTI(0):totalGyro:X" ,FloatType(),False),
StructField("IMU_ATTI(0):totalGyro:Y" ,FloatType(),False),
StructField("IMU_ATTI(0):magYaw" ,FloatType(),False),
StructField("IMU_ATTI(0):distanceHP" ,FloatType(),False),
StructField("IMU_ATTI(0):distanceTravelled" ,FloatType(),False),
StructField("IMU_ATTI(0):directionOfTravel[mag]" ,FloatType(),False),
StructField("IMU_ATTI(0):directionOfTravel[true]" ,FloatType(),False),
StructField("IMU_ATTI(0):temperature" ,FloatType(),False),
StructField("General:relativeHeight" ,FloatType(),False),
StructField("GPS(0):Long" ,FloatType(),False),
StructField("GPS(0):Lat" ,FloatType(),False),
StructField("GPS(0):Date" ,FloatType(),False),
StructField("GPS(0):heightMSL" ,FloatType(),False),
StructField("GPS(0):hDOP" ,FloatType(),False),
StructField("GPS(0):pDOP" ,FloatType(),False),
StructField("GPS(0):sAcc" ,FloatType(),False),
StructField("GPS(0):numGPS" ,FloatType(),False),
StructField("GPS(0):numGLNAS" ,FloatType(),False),
StructField("GPS(0):numSV" ,FloatType(),False),
StructField("GPS(0):velN" ,FloatType(),False),
StructField("GPS(0):velE" ,FloatType(),False),
StructField("GPS(0):velD" ,FloatType(),False),
StructField("RC:Aileron" ,FloatType(),False),
StructField("RC:Elevator" ,FloatType(),False),
StructField("RC:Rudder" ,FloatType(),False),
StructField("RC:Throttle" ,FloatType(),False),
StructField("Motor:Speed:RFront" ,FloatType(),False),
StructField("Motor:Speed:LFront" ,FloatType(),False),
StructField("Motor:Speed:LBack" ,FloatType(),False),
StructField("Motor:Speed:RBack" ,FloatType(),False),
StructField("Motor:EscTemp:RFront" ,FloatType(),False),
StructField("Motor:EscTemp:LFront" ,FloatType(),False),
StructField("Motor:EscTemp:LBack" ,FloatType(),False),
StructField("Motor:EscTemp:RBack" ,FloatType(),False),
StructField("Motor:PPMrecv:RFront" ,FloatType(),False),
StructField("Motor:PPMrecv:LFront" ,FloatType(),False),
StructField("Motor:PPMrecv:LBack" ,FloatType(),False),
StructField("Motor:PPMrecv:RBack" ,FloatType(),False),
StructField("Motor:PPMsend:RFront" ,FloatType(),False),
StructField("Motor:PPMsend:LFront" ,FloatType(),False),
StructField("Motor:PPMsend:LBack" ,FloatType(),False),
StructField("Motor:PPMsend:RBack" ,FloatType(),False),
StructField("Motor:V_out:RFront" ,FloatType(),False),
StructField("Motor:V_out:LFront" ,FloatType(),False),
StructField("Motor:V_out:LBack" ,FloatType(),False),
StructField("Motor:V_out:RBack" ,FloatType(),False),
StructField("Motor:Volts:RFront" ,FloatType(),False),
StructField("Motor:Volts:LFront" ,FloatType(),False),
StructField("Motor:Volts:LBack" ,FloatType(),False),
StructField("Motor:Volts:RBack" ,FloatType(),False),
StructField("Motor:Current:RFront" ,FloatType(),False),
StructField("Motor:Current:LFront" ,FloatType(),False),
StructField("Motor:Current:LBack" ,FloatType(),False),
StructField("Motor:Current:RBack" ,FloatType(),False),
StructField("Motor:thrustAngle" ,FloatType(),False),
StructField("AirComp:AirSpeedBody:X" ,FloatType(),False),
StructField("AirComp:AirSpeedBody:Y" ,FloatType(),False),
StructField("AirComp:Alti" ,FloatType(),False),
StructField("AirComp:WindSpeed" ,FloatType(),False),
StructField("AirComp:Wind:X" ,FloatType(),False),
StructField("AirComp:Wind:Y" ,FloatType(),False),
StructField("AirComp:MotorSpeed" ,FloatType(),False),
StructField("AirComp:VelLevel" ,FloatType(),False),
StructField("flyCState" ,FloatType(),False),
StructField("MotorCtrl:PWM:RFront" ,FloatType(),False),
StructField("MotorCtrl:PWM:LFront" ,FloatType(),False),
StructField("MotorCtrl:PWM:LBack" ,FloatType(),False),
StructField("MotorCtrl:PWM:RBack" ,FloatType(),False)
            ])
IOT_CS = "Endpoint=sb://iothub-ns-iothubd06f-24947894-cf917c9427.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=7nUrbGMSqDVxCTwbkQoL1fkT4Cq5jtl70YPseCOGdVI=;EntityPath=iothubd06f90321d7a48c59a" # dbutils.secrets.get('iot','iothub-cs') # IoT Hub connection string (Event Hub Compatible)
ehConf = { 
  'eventhubs.connectionString':sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(IOT_CS),
  'ehName':"iothubd06f90321d7a48c59a"
}
df = spark.readStream.format("eventhubs").options(**ehConf).load()
df.printSchema()

df = df.withColumn("body", df["body"].cast("string"))
df1 = df.select(F.from_json(F.col("body").cast("string"), Schema).alias("payload"))

# Umbenennen der Spalten
df2 = df1.select(
   F.col("payload.IMU_ATTI(0):Longitude").alias("IMU_ATTI(0):Longitude"),
    F.col("payload.IMU_ATTI(0):Latitude").alias("IMU_ATTI(0):Latitude"),
    F.col("payload.IMU_ATTI(0):numSats").alias("IMU_ATTI(0):numSats"),
    F.col("payload.IMU_ATTI(0):barometer:Raw").alias("IMU_ATTI(0):barometer:Raw"),
    F.col("payload.IMU_ATTI(0):barometer:Smooth").alias("IMU_ATTI(0):barometer:Smooth"),
    F.col("payload.IMU_ATTI(0):accel:X").alias("IMU_ATTI(0):accel:X"),
    F.col("payload.IMU_ATTI(0):accel:Y").alias("IMU_ATTI(0):accel:Y"),
    F.col("payload.IMU_ATTI(0):accel:Z").alias("IMU_ATTI(0):accel:Z"),
    F.col("payload.IMU_ATTI(0):accel:Composite").alias("IMU_ATTI(0):accel:Composite"),
    F.col("payload.IMU_ATTI(0):gyro:X").alias("IMU_ATTI(0):gyro:X"),
    F.col("payload.IMU_ATTI(0):gyro:Y").alias("IMU_ATTI(0):gyro:Y"),
    F.col("payload.IMU_ATTI(0):gyro:Z").alias("IMU_ATTI(0):gyro:Z"),
    F.col("payload.IMU_ATTI(0):gyro:Composite").alias("IMU_ATTI(0):gyro:Composite"),
    F.col("payload.IMU_ATTI(0):mag:X").alias("IMU_ATTI(0):mag:X"),
    F.col("payload.IMU_ATTI(0):mag:Y").alias("IMU_ATTI(0):mag:Y"),
    F.col("payload.IMU_ATTI(0):mag:Z").alias("IMU_ATTI(0):mag:Z"),
    F.col("payload.IMU_ATTI(0):mag:Mod").alias("IMU_ATTI(0):mag:Mod"),
    F.col("payload.IMU_ATTI(0):velN").alias("IMU_ATTI(0):velN"),
    F.col("payload.IMU_ATTI(0):velE").alias("IMU_ATTI(0):velE"),
    F.col("payload.IMU_ATTI(0):velD").alias("IMU_ATTI(0):velD"),
    F.col("payload.IMU_ATTI(0):velComposite").alias("IMU_ATTI(0):velComposite"),
    F.col("payload.IMU_ATTI(0):velH").alias("IMU_ATTI(0):velH"),
    F.col("payload.IMU_ATTI(0):GPS-H").alias("IMU_ATTI(0):GPS-H"),
    F.col("payload.IMU_ATTI(0):roll").alias("IMU_ATTI(0):roll"),
    F.col("payload.IMU_ATTI(0):pitch").alias("IMU_ATTI(0):pitch"),
    F.col("payload.IMU_ATTI(0):yaw").alias("IMU_ATTI(0):yaw"),
    F.col("payload.IMU_ATTI(0):yaw360").alias("IMU_ATTI(0):yaw360"),
    F.col("payload.IMU_ATTI(0):totalGyro:Z").alias("IMU_ATTI(0):totalGyro:Z"),
    F.col("payload.IMU_ATTI(0):totalGyro:X").alias("IMU_ATTI(0):totalGyro:X"),
    F.col("payload.IMU_ATTI(0):totalGyro:Y").alias("IMU_ATTI(0):totalGyro:Y"),
    F.col("payload.IMU_ATTI(0):magYaw").alias("IMU_ATTI(0):magYaw"),
    F.col("payload.IMU_ATTI(0):distanceHP").alias("IMU_ATTI(0):distanceHP"),
    F.col("payload.IMU_ATTI(0):distanceTravelled").alias("IMU_ATTI(0):distanceTravelled"),
    F.col("payload.IMU_ATTI(0):directionOfTravel[mag]").alias("IMU_ATTI(0):directionOfTravel[mag]"),
    F.col("payload.IMU_ATTI(0):directionOfTravel[true]").alias("IMU_ATTI(0):directionOfTravel[true]"),
    F.col("payload.IMU_ATTI(0):temperature").alias("IMU_ATTI(0):temperature"),
    F.col("payload.General:relativeHeight").alias("General:relativeHeight"),
    F.col("payload.GPS(0):Long").alias("GPS(0):Long"),
    F.col("payload.GPS(0):Lat").alias("GPS(0):Lat"),
    F.col("payload.GPS(0):Date").alias("GPS(0):Date"),
    F.col("payload.GPS(0):heightMSL").alias("GPS(0):heightMSL"),
    F.col("payload.GPS(0):hDOP").alias("GPS(0):hDOP"),
    F.col("payload.GPS(0):pDOP").alias("GPS(0):pDOP"),
    F.col("payload.GPS(0):sAcc").alias("GPS(0):sAcc"),
    F.col("payload.GPS(0):numGPS").alias("GPS(0):numGPS"),
    F.col("payload.GPS(0):numGLNAS").alias("GPS(0):numGLNAS"),
    F.col("payload.GPS(0):numSV").alias("GPS(0):numSV"),
    F.col("payload.GPS(0):velN").alias("GPS(0):velN"),
    F.col("payload.GPS(0):velE").alias("GPS(0):velE"),
    F.col("payload.GPS(0):velD").alias("GPS(0):velD"),
    F.col("payload.RC:Aileron").alias("RC:Aileron"),
    F.col("payload.RC:Elevator").alias("RC:Elevator"),
    F.col("payload.RC:Rudder").alias("RC:Rudder"),
    F.col("payload.RC:Throttle").alias("RC:Throttle"),
    F.col("payload.Motor:Speed:RFront").alias("Motor:Speed:RFront"),
    F.col("payload.Motor:Speed:LFront").alias("Motor:Speed:LFront"),
    F.col("payload.Motor:Speed:LBack").alias("Motor:Speed:LBack"),
    F.col("payload.Motor:Speed:RBack").alias("Motor:Speed:RBack"),
    F.col("payload.Motor:EscTemp:RFront").alias("Motor:EscTemp:RFront"),
    F.col("payload.Motor:EscTemp:LFront").alias("Motor:EscTemp:LFront"),
    F.col("payload.Motor:EscTemp:LBack").alias("Motor:EscTemp:LBack"),
    F.col("payload.Motor:EscTemp:RBack").alias("Motor:EscTemp:RBack"),
    F.col("payload.Motor:PPMrecv:RFront").alias("Motor:PPMrecv:RFront"),
    F.col("payload.Motor:PPMrecv:LFront").alias("Motor:PPMrecv:LFront"),
    F.col("payload.Motor:PPMrecv:LBack").alias("Motor:PPMrecv:LBack"),
    F.col("payload.Motor:PPMrecv:RBack").alias("Motor:PPMrecv:RBack"),
    F.col("payload.Motor:PPMsend:RFront").alias("Motor:PPMsend:RFront"),
    F.col("payload.Motor:PPMsend:LFront").alias("Motor:PPMsend:LFront"),
    F.col("payload.Motor:PPMsend:LBack").alias("Motor:PPMsend:LBack"),
    F.col("payload.Motor:PPMsend:RBack").alias("Motor:PPMsend:RBack"),
    F.col("payload.Motor:V_out:RFront").alias("Motor:V_out:RFront"),
    F.col("payload.Motor:V_out:LFront").alias("Motor:V_out:LFront"),
    F.col("payload.Motor:V_out:LBack").alias("Motor:V_out:LBack"),
    F.col("payload.Motor:V_out:RBack").alias("Motor:V_out:RBack"),
    F.col("payload.Motor:Volts:RFront").alias("Motor:Volts:RFront"),
    F.col("payload.Motor:Volts:LFront").alias("Motor:Volts:LFront"),
    F.col("payload.Motor:Volts:LBack").alias("Motor:Volts:LBack"),
    F.col("payload.Motor:Volts:RBack").alias("Motor:Volts:RBack"),
    F.col("payload.Motor:Current:RFront").alias("Motor:Current:RFront"),
    F.col("payload.Motor:Current:LFront").alias("Motor:Current:LFront"),
    F.col("payload.Motor:Current:LBack").alias("Motor:Current:LBack"),
    F.col("payload.Motor:Current:RBack").alias("Motor:Current:RBack"),
    F.col("payload.Motor:thrustAngle").alias("Motor:thrustAngle"),
    F.col("payload.AirComp:AirSpeedBody:X").alias("AirComp:AirSpeedBody:X"),
    F.col("payload.AirComp:AirSpeedBody:Y").alias("AirComp:AirSpeedBody:Y"),
    F.col("payload.AirComp:Alti").alias("AirComp:Alti"),
    F.col("payload.AirComp:WindSpeed").alias("AirComp:WindSpeed"),
    F.col("payload.AirComp:Wind:X").alias("AirComp:Wind:X"),
    F.col("payload.AirComp:Wind:Y").alias("AirComp:Wind:Y"),
    F.col("payload.AirComp:MotorSpeed").alias("AirComp:MotorSpeed"),
    F.col("payload.AirComp:VelLevel").alias("AirComp:VelLevel"),
    F.col("payload.flyCState").alias("flyCState"),
    F.col("payload.MotorCtrl:PWM:RFront").alias("MotorCtrl:PWM:RFront"),
    F.col("payload.MotorCtrl:PWM:LFront").alias("MotorCtrl:PWM:LFront"),
    F.col("payload.MotorCtrl:PWM:LBack").alias("MotorCtrl:PWM:LBack"),
    F.col("payload.MotorCtrl:PWM:RBack").alias("MotorCtrl:PWM:RBack")
)
 
 df2.printSchema()
 
    
def output (writeDF, _):
    writeDF.show()
    
# Ausgabe in der Console   
df2.writeStream \
    .foreachBatch(output) \
    .outputMode("update") \
    .start()\
    .awaitTermination()
df2.show()

Last updated