Thursday, June 8, 2017

Kafka and python with a config file

Lets improve the Kafka and Python example by using a shared file which contains the ip address and port number of Kafka. See the previous blog post for the details on how to run this example.

[I lost the link to the original post. Apologies.]


more kafka_config.py 
server_ip_addr = '127.0.0.1'
port_number = '9092'



server_port = server_ip_addr + ':' + port_number



###########################################################################



more consumer.py

from flask import Flask, Response
from kafka import KafkaConsumer
import kafka_config as kc

#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', group_id='view', bootstrap_servers=[kc.server_port])

if not consumer:
   print('ERROR: consumer not connected. Check the consumer IP address and port numbers.')
   sys.exit()

#Continuously listen to the connection and print messages as recieved
app = Flask(__name__)

@app.route('/')
def index():
    # return a multipart response
    return Response(kafkastream(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')
def kafkastream():
print('kafka stream ')
i = 0
for msg in consumer:
print('msg number: ' + str(i))
yield (b'--frame\r\n'
      b'Content-Type: image/png\r\n\r\n' + msg.value + b'\r\n\r\n')

if __name__ == '__main__':
print('main')
app.run()
#    app.run(host='127.0.0.1', debug=True)

###########################################################################

# producer.py

import time
import os
import sys
import cv2
import kafka_config as kc

from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient(kc.server_port)
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'

def video_emitter(video):
    # Open the video

    if not os.path.exists(video):
        print("Exiting: file does not exist: " + video)
        sys.exit()
   
    video = cv2.VideoCapture(video)

    #video = skvideo.io.VideoCapture(video)
    if not video:
        print("ERROR: can't read video file!")            
        sys.exit()
        
    
    print(' emitting.....')

    i = 0
    # read the file
    while (video.isOpened):
        # read the image in each frame
        success, image = video.read()
        # check if the file has read to the end
        if not success:
            if i == 0:
                print("ERROR: can't read video file!")            
                sys.exit()
            else:
                break;
            
        # convert the image png
        ret, jpeg = cv2.imencode('.png', image)
        # Convert the image to bytes and send to kafka
        producer.send_messages(topic, jpeg.tobytes())
        # To reduce CPU usage create sleep time of 0.2sec  
        time.sleep(0.2)
        if (i % 100):
            print("frame = " + str(i * 100))
        i = i + 1

       

    # clear the capture
    video.release()
    print('done emitting')

if __name__ == '__main__':
video_emitter('toy_plane_liftoff.avi')
#    video_emitter('big_buck_bunny_480p_5mb.flv')
#    video_emitter('big_buck_bunny_480p_5mb.mp4')


    

No comments:

Post a Comment