Thursday, June 8, 2017

Kafka and Python

The article at this link has an example of a producer and a consumer written in Python. The producer reads frames from a video and sends the frames to Kafka. The consumer reads Kafka and send the frames to a browser.

https://scotch.io/tutorials/build-a-distributed-streaming-system-with-apache-kafka-and-python

First create a virtualenv with the python deps.

http://programmingmatrix.blogspot.com/2018/05/mac-osx-virtualenv-instructions.html

mkvirtualenv -p python3 kafka
workon kafka
pip install kafka-python opencv-python Flask

For some reason related to the ffmpeg encoder that is the default on Ubuntu 16.04 the CaptureVideo call does not load mp4 videos. Use avi if you want to just test Kafka.

Use an absolute path to your video.

https://cinelerra-cv.org/footage/toy_plane_liftoff.avi

Copy the avi file to the directory with the script. Obviously the script can be modified to take a command line file arg instead.

Also the code is missing error checking.

#################################################################################

#!/usr/bin/env python3

# more consumer.py
from flask import Flask, Response
from kafka import KafkaConsumer
#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', group_id='view', bootstrap_servers=['127.0.0.1:9092'])

if not consumer:
   print('ERROR: consumer not connected. Check the consumer IP address and port numbers.')
   sys.exit()

#Continuously listen to the connection and print messages as recieved
app = Flask(__name__)

@app.route('/')
def index():
    # return a multipart response
    return Response(kafkastream(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')
def kafkastream():
print('kafka stream ')
i = 0
for msg in consumer:
print('msg number: ' + str(i))
yield (b'--frame\r\n'
      b'Content-Type: image/png\r\n\r\n' + msg.value + b'\r\n\r\n')

if __name__ == '__main__':
print('main')
app.run()
#    app.run(host='127.0.0.1', debug=True)


(kafka) 

#################################################################################


#!/usr/bin/env python3
# producer.py



import time

import os
import sys
import cv2
#import skvideo.io

from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'

def video_emitter(video):
    # Open the video

    if not os.path.exists(video):
        print("Exiting: file does not exist: " + video)
        sys.exit()
   
    video = cv2.VideoCapture(video)

    #video = skvideo.io.VideoCapture(video)
    if not video:
        print("ERROR: can't read video file!")            
        sys.exit()
        
    
    print(' emitting.....')

    i = 0
    # read the file
    while (video.isOpened):
        # read the image in each frame
        success, image = video.read()
        # check if the file has read to the end
        if not success:
            if i == 0:
                print("ERROR: can't read video file!")            
                sys.exit()
            else:
                break;
            
        # convert the image png
        ret, jpeg = cv2.imencode('.png', image)
        # Convert the image to bytes and send to kafka
        producer.send_messages(topic, jpeg.tobytes())
        # To reduce CPU usage create sleep time of 0.2sec  
        time.sleep(0.2)
        if (i % 100):
            print("frame = " + str(i * 100))
        i = i + 1
       
    # clear the capture
    video.release()
    print('done emitting')

if __name__ == '__main__':
video_emitter('toy_plane_liftoff.avi')
#    video_emitter('big_buck_bunny_480p_5mb.flv')
#    video_emitter('big_buck_bunny_480p_5mb.mp4')




No comments:

Post a Comment