-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobject_detection_stream_distributed.py
More file actions
90 lines (74 loc) · 3.13 KB
/
object_detection_stream_distributed.py
File metadata and controls
90 lines (74 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from kafka import KafkaProducer
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from imutils import face_utils
from scipy.spatial import distance
import numpy as np
import imutils
import dlib
import cv2
from imageai.Detection import ObjectDetection
import os
from PIL import Image
import tensorflow as tf
import time
conf = SparkConf().setAppName("object detection streaming").setMaster("yarn")
conf.set("spark.scheduler.mode", "FAIR")
conf.set("spark.scheduler.allocation.file", "/opt/spark-2.4.3-bin-hadoop2.7/conf/fairscheduler.xml")
sc = SparkContext(conf=conf)
sc.setLocalProperty("spark.scheduler.pool", "pool3")
ssc = StreamingContext(sc, 0.5)
sql_sc = SQLContext(sc)
input_topic = 'input'
output_topic = 'output3'
brokers = "G01-01:2181,G01-02:2181,G01-03:2181,G01-04:2181,G01-05:2181,G01-06:2181,G01-07:2181,G01-08:2181," \
"G01-09:2181,G01-10:2181,G01-11:2181,G01-12:2181,G01-13:2181,G01-14:2181,G01-15:2181,G01-16:2181"
def my_decoder(s):
return s
kafkaStream = KafkaUtils.createStream(ssc, brokers, 'test-consumer-group-3', {input_topic: 15},
valueDecoder=my_decoder)
producer = KafkaProducer(bootstrap_servers='G01-01:9092', compression_type='gzip', batch_size=163840,
buffer_memory=33554432, max_request_size=20485760)
detector = ObjectDetection()
detector.setModelTypeAsTinyYOLOv3() # !!!tiny
detector.setModelPath('/home/hduser/yolo-tiny.h5')
detector.loadModel(detection_speed="flash")
custom = detector.CustomObjects(person=True, bottle=True, knife=True, cell_phone=True, fork=True)
graph = tf.get_default_graph()
broadcast_detector = sc.broadcast(detector)
broadcast_custom = sc.broadcast(custom)
broadcast_graph = sc.broadcast(graph)
broadcast_producer = sc.broadcast(producer)
def obj_detection(ss):
key = ss[0]
value = ss[1]
with broadcast_graph.value.as_default():
image = np.asarray(bytearray(value), dtype="uint8")
# image = np.frombuffer(value, dtype=np.uint8)
# img = image.reshape(300, 400, 3)
# img = cv2.imread("/tmp/" + key)
img = cv2.imdecode(image, cv2.IMREAD_ANYCOLOR)
frame = imutils.resize(img, width=600)
# img_array = np.array(frame)
detected_image_array, detections = broadcast_detector.value.detectCustomObjectsFromImage(
custom_objects=broadcast_custom.value,
input_type="array",
input_image=frame,
output_type="array")
# image_really = Image.fromarray(detected_image_array.astype('uint8')).convert('RGB')
current = int(time.time() * 1000)
if current - int(key) < 3000:
broadcast_producer.value.send('output3', value=cv2.imencode('.jpg', detected_image_array)[1].tobytes(),
key=key.encode('utf-8'))
producer.flush()
print('send over!')
return 0
def handler(message):
newrdd = message.map(obj_detection)
for i in newrdd.collect():
print(i)
kafkaStream.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()