The article at this link has an example of a producer and a consumer written in Python. The producer reads frames from a video and sends the frames to Kafka. The consumer reads Kafka and send the frames to a browser.
https://scotch.io/tutorials/build-a-distributed-streaming-system-with-apache-kafka-and-python
First create a virtualenv with the python deps.
http://programmingmatrix.blogspot.com/2018/05/mac-osx-virtualenv-instructions.html
mkvirtualenv -p python3 kafka
workon kafka
pip install kafka-python opencv-python Flask
For some reason related to the ffmpeg encoder that is the default on Ubuntu 16.04 the CaptureVideo call does not load mp4 videos. Use avi if you want to just test Kafka.
Use an absolute path to your video.
https://cinelerra-cv.org/footage/toy_plane_liftoff.avi
Copy the avi file to the directory with the script. Obviously the script can be modified to take a command line file arg instead.
Also the code is missing error checking.
#################################################################################
#!/usr/bin/env python3
#################################################################################
#!/usr/bin/env python3
https://scotch.io/tutorials/build-a-distributed-streaming-system-with-apache-kafka-and-python
First create a virtualenv with the python deps.
http://programmingmatrix.blogspot.com/2018/05/mac-osx-virtualenv-instructions.html
mkvirtualenv -p python3 kafka
workon kafka
pip install kafka-python opencv-python Flask
For some reason related to the ffmpeg encoder that is the default on Ubuntu 16.04 the CaptureVideo call does not load mp4 videos. Use avi if you want to just test Kafka.
Use an absolute path to your video.
https://cinelerra-cv.org/footage/toy_plane_liftoff.avi
Copy the avi file to the directory with the script. Obviously the script can be modified to take a command line file arg instead.
Also the code is missing error checking.
#################################################################################
# more consumer.py
from flask import Flask, Response
from kafka import KafkaConsumer
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', group_id='view', bootstrap_servers=['127.0.0.1:9092'])
if not consumer:
print('ERROR: consumer not connected. Check the consumer IP address and port numbers.')
sys.exit()
#Continuously listen to the connection and print messages as recieved
app = Flask(__name__)
@app.route('/')
def index():
# return a multipart response
return Response(kafkastream(),
mimetype='multipart/x-mixed-replace; boundary=frame')
def kafkastream():
print('kafka stream ')
i = 0
for msg in consumer:
print('msg number: ' + str(i))
yield (b'--frame\r\n'
b'Content-Type: image/png\r\n\r\n' + msg.value + b'\r\n\r\n')
if __name__ == '__main__':
print('main')
app.run()
# app.run(host='127.0.0.1', debug=True)
(kafka)
#################################################################################
#!/usr/bin/env python3
# producer.py
import time
import os
import sys
import cv2
#import skvideo.io
from kafka import SimpleProducer, KafkaClient
# connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'
def video_emitter(video):
# Open the video
if not os.path.exists(video):
print("Exiting: file does not exist: " + video)
sys.exit()
video = cv2.VideoCapture(video)
#video = skvideo.io.VideoCapture(video)
if not video:
print("ERROR: can't read video file!")
sys.exit()
print(' emitting.....')
i = 0
# read the file
while (video.isOpened):
# read the image in each frame
success, image = video.read()
# check if the file has read to the end
if not success:
if i == 0:
print("ERROR: can't read video file!")
sys.exit()
else:
break;
# convert the image png
ret, jpeg = cv2.imencode('.png', image)
# Convert the image to bytes and send to kafka
producer.send_messages(topic, jpeg.tobytes())
# To reduce CPU usage create sleep time of 0.2sec
time.sleep(0.2)
if (i % 100):
print("frame = " + str(i * 100))
i = i + 1
# clear the capture
video.release()
print('done emitting')
if __name__ == '__main__':
video_emitter('toy_plane_liftoff.avi')
# video_emitter('big_buck_bunny_480p_5mb.flv')
# video_emitter('big_buck_bunny_480p_5mb.mp4')
No comments:
Post a Comment