Lets improve the Kafka and Python example by using a shared file which contains the ip address and port number of Kafka. See the previous blog post for the details on how to run this example.
[I lost the link to the original post. Apologies.]
[I lost the link to the original post. Apologies.]
more kafka_config.py
server_ip_addr = '127.0.0.1'
port_number = '9092'
server_port = server_ip_addr + ':' + port_number
###########################################################################
more consumer.py
from flask import Flask, Response
from kafka import KafkaConsumer
import kafka_config as kc
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', group_id='view', bootstrap_servers=[kc.server_port])
if not consumer:
print('ERROR: consumer not connected. Check the consumer IP address and port numbers.')
sys.exit()
#Continuously listen to the connection and print messages as recieved
app = Flask(__name__)
@app.route('/')
def index():
# return a multipart response
return Response(kafkastream(),
mimetype='multipart/x-mixed-replace; boundary=frame')
def kafkastream():
print('kafka stream ')
i = 0
for msg in consumer:
print('msg number: ' + str(i))
yield (b'--frame\r\n'
b'Content-Type: image/png\r\n\r\n' + msg.value + b'\r\n\r\n')
if __name__ == '__main__':
print('main')
app.run()
# app.run(host='127.0.0.1', debug=True)
###########################################################################
# producer.py
import time
import os
import sys
import cv2
import kafka_config as kc
from kafka import SimpleProducer, KafkaClient
# connect to Kafka
kafka = KafkaClient(kc.server_port)
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'
def video_emitter(video):
# Open the video
if not os.path.exists(video):
print("Exiting: file does not exist: " + video)
sys.exit()
video = cv2.VideoCapture(video)
#video = skvideo.io.VideoCapture(video)
if not video:
print("ERROR: can't read video file!")
sys.exit()
print(' emitting.....')
i = 0
# read the file
while (video.isOpened):
# read the image in each frame
success, image = video.read()
# check if the file has read to the end
if not success:
if i == 0:
print("ERROR: can't read video file!")
sys.exit()
else:
break;
# convert the image png
ret, jpeg = cv2.imencode('.png', image)
# Convert the image to bytes and send to kafka
producer.send_messages(topic, jpeg.tobytes())
# To reduce CPU usage create sleep time of 0.2sec
time.sleep(0.2)
if (i % 100):
print("frame = " + str(i * 100))
i = i + 1
# clear the capture
video.release()
print('done emitting')
if __name__ == '__main__':
video_emitter('toy_plane_liftoff.avi')
# video_emitter('big_buck_bunny_480p_5mb.flv')
# video_emitter('big_buck_bunny_480p_5mb.mp4')
No comments:
Post a Comment