Hubungkan Kontrol PLCnext melalui MQTT ke Apache Kafka
Latar Belakang Teknis
Kafka
Apache Kafka adalah kerangka kerja untuk penyerapan data, penyimpanan, pemrosesan, dan redistribusi. Saat ini, ini banyak digunakan di perusahaan-perusahaan di seluruh dunia. Situs web resmi Kafka menawarkan lebih banyak informasi tentang idenya dan cara menerapkannya. Salah satu fitur utamanya adalah banyaknya konektor yang sudah ada ke aplikasi lain dan protokol komunikasi seperti MQTT.
MQTT
MQTT adalah protokol perpesanan berbasis TCP yang ringan, sering digunakan untuk komunikasi IoT karena kekokohan dan tapaknya yang kecil. Detail tentang MQTT standar OASIS dapat ditemukan di situs webnya.
Di sini Anda dapat menemukan artikel Blog Makers tentang cara mengkompilasi silang mosquitto untuk PLCnext, sebuah implementasi MQTT dari Eclipse. Atau, PLCnext Store menawarkan aplikasi MQTT siap pakai.
Persyaratan
- Klien MQTT di PLCnext (lihat bagian sebelumnya untuk petunjuk implementasi)
- pengontrol terhubung ke PC/VM
- Broker MQTT di PC/VM (mis., nyamuk)
- Instance Kafka di PC/VM (lihat panduan memulai cepat Kafka)
Penyiapan
Gambar berikut menunjukkan gambaran umum dari setup yang akan kita terapkan untuk menyerap data dari kontrol PLCselanjutnya ke Kafka. Meskipun dimungkinkan untuk menggunakan Proxy MQTT Confluent untuk versi Kafka mereka (2) kami akan fokus pada solusi yang lebih umum (1). Ini terdiri dari broker MQTT di mana klien terhubung ke dan menerbitkan pesan dan konektor yang berlangganan topik di broker, memproses pesan dan meneruskannya ke Kafka.
Membuat Konektor
Dalam tutorial ini, konektor kami didasarkan pada repositori evokly/kafka-connect-mqtt dari GitHub, dilisensikan di bawah Lisensi MIT (informasi lisensi terperinci). Pertama, kami mengunduh dan mengekstrak repositori. Karena versi repositori terbaru adalah akhir tahun 2016, kami memperbarui build.gradle
file, dengan mengganti dependensi lama dengan versi barunya:
ext { kafkaVersion = '2.6.0' }
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.13'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
compile 'org.bouncycastle:bcprov-jdk15on:1.67'
compile 'org.bouncycastle:bcpkix-jdk15on:1.67'
compile 'org.bouncycastle:bcpg-jdk15on:1.67'
compile 'commons-io:commons-io:2.8.0'
compile 'org.slf4j:slf4j-api:1.7.30'
testCompile 'org.slf4j:slf4j-simple:1.7.30'
}
Dalam contoh ini kita akan mengirim pesan String biasa ke Kafka. Oleh karena itu kita harus mengedit kelas Java DumbProcessor.java
dalam folder /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt
, yang merupakan pemroses pesan default:
@Override
public SourceRecord[] getRecords(String kafkaTopic) {
return new SourceRecord[]{new SourceRecord(null, //sourcePartition
null, //sourceOffset
kafkaTopic, //topic
null, //partition
null, //keySchema
mTopic, //key
null, //valueSchema
mMessage.toString(), //value
new Long(123L))}; //long timestamp
}
Setelah itu, kami membangun File Arsip Java (JAR) yang berisi dependensi:./gradlew clean jar
. Kami menyalin JAR keluaran kafka-connect-mqtt-1.1-SNAPSHOT.jar
yang dapat ditemukan di folder /kafka-connect-mqtt-master/build/libs
ke libs
direktori Kafka.
Kami juga membutuhkan salinan arsip org.Eclipse.paho.client.mqttv3-1.2.5.jar di direktori libs Kafka. Kita dapat mengunduhnya di sini.
Selanjutnya, kita harus membuat file konfigurasi untuk konektor mqtt.properties
di config
Kafka map. File memiliki konten berikut:
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=test_in # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://172.17.0.1:1883 # address of the MQTT broker
mqtt.topic=test/# # MQTT topic where the messages should be collected
#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor
Tes Lokal
Sekarang kita dapat menguji konektor kita secara lokal. Buka direktori Kafka dan mulai contoh ZooKeeper dan Broker:
# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# start Kafka:
bin/kafka-server-start.sh config/server.properties
# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto
# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true
# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123
Pesan tersebut muncul di konsol konsumen.