Para completar esta lección, debe tener una instalación activa de Kafka en su máquina. Lea para saber cómo hacer esto.
Instalación del cliente Python para Apache Kafka
Antes de que podamos comenzar a trabajar con Apache Kafka en el programa Python, necesitamos instalar el cliente Python para Apache Kafka. Esto se puede hacer usando pip (índice de paquetes de Python). Aquí hay un comando para lograr esto:
Esta será una instalación rápida en la terminal:
Instalación del cliente Python Kafka mediante PIP
Ahora que tenemos una instalación activa para Apache Kafka y también hemos instalado el cliente Python Kafka, estamos listos para comenzar a codificar.
Haciendo un productor
Lo primero que debe tener para publicar mensajes en Kafka es una aplicación de producción que puede enviar mensajes a temas en Kafka.
Tenga en cuenta que los productores de Kafka son productores de mensajes asincrónicos. Esto significa que las operaciones realizadas mientras se publica un mensaje en la partición de tema de Kafka no son bloqueantes. Para simplificar las cosas, escribiremos un editor JSON simple para esta lección.
Para empezar, crea una instancia para Kafka Producer:
import json
import pprint
producer= KafkaProducer(
bootstrap_servers=‘localhost:9092’,
value_serializer=lambda v: json.dumps(v).encode(‘utf-8’))
El atributo bootstrap_servers informa sobre el host y el puerto del servidor Kafka. El atributo value_serializer es solo para el propósito de la serialización JSON de los valores JSON encontrados.
Para jugar con el productor de Kafka, intentemos imprimir las métricas relacionadas con el productor y el clúster de Kafka:
pprint.pprint(metrics)
Veremos lo siguiente ahora:
Kafka Mterics
Ahora, intentemos finalmente enviar algún mensaje a Kafka Queue. Un objeto JSON simple será un buen ejemplo:
El linuxhint es la partición tema sobre el cual se enviará el objeto JSON en. Cuando ejecute el script, no obtendrá ningún resultado ya que el mensaje simplemente se envía a la partición del tema. Es hora de escribir un consumidor para que podamos probar nuestra aplicación.
Hacer un consumidor
Ahora, estamos listos para establecer una nueva conexión como una aplicación de consumidor y recibir los mensajes del tema de Kafka. Empiece por crear una nueva instancia para el consumidor:
from kafka import TopicPartition
print(‘Making connection.’)
consumer= KafkaConsumer(bootstrap_servers=‘localhost:9092’)
Ahora, asigne un tema a esta conexión y también un posible valor de compensación.
consumer.assign([TopicPartition(‘linuxhint’, 2)])
Finalmente, estamos listos para imprimir el mensaje:
for message in consumer:
print(“OFFSET: “ + str(message[0])+ “t MSG: “ + str(message))
A través de esto, obtendremos una lista de todos los mensajes publicados en la partición de temas del consumidor de Kafka. El resultado de este programa será:
Consumidor de Kafka
Solo para una referencia rápida, aquí está el guión completo de Producer:
import json
import pprint
producer= KafkaProducer(
bootstrap_servers=‘localhost:9092’,
value_serializer=lambda v: json.dumps(v).encode(‘utf-8’))
producer.send(‘linuxhint’, {‘topic’: ‘kafka’})
# metrics= producer.metrics()
# pprint.pprint(metrics)
Y aquí está el programa completo para consumidores que usamos:
from kafka import TopicPartition
print(‘Making connection.’)
consumer= KafkaConsumer(bootstrap_servers=‘localhost:9092’)
print(‘Assigning Topic.’)
consumer.assign([TopicPartition(‘linuxhint’, 2)])
print(‘Getting message.’)
for message in consumer:
print(“OFFSET: “ + str(message[0])+ “t MSG: “ + str(message))
Conclusión
En esta lección, vimos cómo podemos instalar y comenzar a usar Apache Kafka en nuestros programas Python. Mostramos lo fácil que es realizar tareas simples relacionadas con Kafka en Python con el cliente Kafka para Python demostrado.