基于face_recognition的简单人脸识别程序
sean 编辑于2022-03-21 21:41软件相关
之前玩树莓派的一个usb摄像头闲置,碰巧有一个闲置的G4560的pc(下载机,web服务器),于是简单加以改造,兼职成一个简单人脸识别程序
上图:
上服务端代码:
from flask import Flask
from flask import request
app = Flask(__name__)
import subprocess
import os
import time
import cv2
import face_recognition
import numpy as np
from threading import Thread
from pydub import AudioSegment
from pydub.playback import play
status='empty'
motion='no'
d_thread = None
face_detect_task = None
known_my_image = face_recognition.load_image_file("my.jpg")
known_qiqi_image = face_recognition.load_image_file("qiqi.jpg")
known_quanquan_image = face_recognition.load_image_file("quanquan.jpg")
# Get the face encodings for the known images
my_face_encoding = face_recognition.face_encodings(known_my_image)[0]
qiqi_face_encoding = face_recognition.face_encodings(known_qiqi_image)[0]
quanquan_face_encoding = face_recognition.face_encodings(known_quanquan_image)[0]
known_face_encodings = [
my_face_encoding,
qiqi_face_encoding,
quanquan_face_encoding
]
known_face_names = [
"babi",
"qiqi",
"quanquan"
]
face_locations = []
face_encodings = []
face_names = []
process_this_frame = True
def query_paused_and_stop_task(subp, length=300):
'''
#轮询status状态,如果是则中止传入subprocess生成的子进程。
#参数 length为任务执行的大概时间,不确定可以稍大点,单位为秒,默认300秒
#subp 为 子进程对象
'''
counter = 0
while counter <length:
global status
if status == 'empty':
subp.kill()
break
time.sleep(1)
counter += 1
print('sleep 1 second')
def play_task():
subp = subprocess.Popen(['mplayer', '-af', 'volume=-10', 'https://blog.freeloong.top/admin/upload/shuimanjinshan.mp3'], shell=False)
query_paused_and_stop_task(subp,37)
global status
return status
@app.route("/listen", methods=['GET','POST'])
def water():
if request.method == 'POST':
data = request.get_json()
print(data)
global status
status = data['water']
if status == 'full':
result = play_task()
return result
elif status == 'empty':
return status
else:
return "<p>Hello, World!</p>"
class FaceDetectTask:
def __init__(self):
self._running = True
self.file_dict = {
'qiqi':'helloqiqi.mp3',
'quanquan':'helloquanquan.mp3',
'babi':'hellobabi.mp3',
'mami':'hellomami.mp3',
'guest':'helloguest.mp3'
}
def stop(self):
self._running = False
def play_message_tip(self, name):
file_name = self.file_dict[name]
song = AudioSegment.from_mp3(str(file_name))
try:
play(song)
except FileNotFoundError:
# 如果没有安装ffmpeg 或 libav,就会报错,可以尝试安装ffmpeg或libav即可
# eg: sudo apt install ffmpeg
pass
def run(self):
video_capture = cv2.VideoCapture(0)
while self._running:
ret, frame = video_capture.read()
rgb_frame = frame[:, :, ::-1]
face_locations = face_recognition.face_locations(rgb_frame)
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
# See if the face is a match for the known face(s)
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
name = "guest"
# If a match was found in known_face_encodings, just use the first one.
# if True in matches:
# first_match_index = matches.index(True)
# name = known_face_names[first_match_index]
# Or instead, use the known face with the smallest distance to the new face
face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
best_match_index = np.argmin(face_distances)
if matches[best_match_index]:
name = known_face_names[best_match_index]
self.play_message_tip(name)
print(f'hello! {name}')
print('finish detect face')
video_capture.release()
def face_detect(motion):
global d_thread
global face_detect_task
if motion == 'yes':
print(f'motion: {motion}')
if d_thread:
print(d_thread)
print('face_detect is started! do nothing')
return 'analyzing'
else:
face_detect_task = FaceDetectTask()
d_thread = Thread(target=face_detect_task.run)
d_thread.start()
print('detecting the face')
return 'analyzed'
else:
print(f'motion: {motion}')
if d_thread:
print('start close analyze thread')
face_detect_task.stop()
d_thread.join()
d_thread = None
print('finish close analyze thread')
return 'closed analyze'
else:
print('no analyze thread do nothing')
return 'no analyzed'
@app.route("/detect", methods=['GET','POST'])
def radar():
if request.method == 'POST':
data = request.get_json()
global motion
motion = data['motion']
result = face_detect(motion)
return result
else:
return "<p>Hello, World!</p>"
服务器端是跟上一个水溢出报警系统采用的同一个web端后台略加修改而成,出于功耗的考量,仅当它接收esp8266连接的小的感应雷达发来的检测信号时,才开启人脸检测程序比较省电.
esp8266代码radar_sensor.ino跟检测水位传感器类似:
#define Radar_Sensor D2 //attach radar sensor to digital D2
#include <ESP8266WiFi.h>
#include <ESP8266HTTPClient.h>
#define SERVER_DOMIAN_NAME "alarm.freeloong.top"
#ifndef STASSID
#define STASSID "your_wifi_ssid"
#define STAPSK "your_passwd"
#endif
int radar_status = 0; //radar's status 0 not detected, 1 detected
void setup() {
pinMode(Radar_Sensor, INPUT); // The Radar sensor is an Input
pinMode(LED_BUILTIN, OUTPUT); // Initialize the LED_BUILTIN pin as an output
digitalWrite(LED_BUILTIN, HIGH); //Turn the LED off by making the voltage HIGH
Serial.begin(115200);
Serial.println();
Serial.println();
Serial.println();
WiFi.begin(STASSID, STAPSK);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.print("Connected! IP address: ");
Serial.println(WiFi.localIP());
}
void send_motion_detected() {
if ((WiFi.status() == WL_CONNECTED)) {
WiFiClient client;
HTTPClient http;
Serial.print("[HTTP] begin...\n");
// configure traged server and url
http.begin(client, "http://alarm.freeloong.top/detect"); //HTTP
http.addHeader("Content-Type", "application/json");
Serial.print("[HTTP] POST...\n");
// start connection and send HTTP header and body
int httpCode = http.POST("{\"motion\":\"yes\"}");
// httpCode will be negative on error
if (httpCode > 0) {
// HTTP header has been send and Server response header has been handled
Serial.printf("[HTTP] POST... code: %d\n", httpCode);
// file found at server
if (httpCode == HTTP_CODE_OK) {
const String& payload = http.getString();
Serial.println("received payload:\n<<");
Serial.println(payload);
Serial.println(">>");
delay(10000);
}
} else {
Serial.printf("[HTTP] POST... failed, error: %s\n", http.errorToString(httpCode).c_str());
}
http.end();
}
}
void send_no_motion() {
if ((WiFi.status() == WL_CONNECTED)) {
WiFiClient client;
HTTPClient http;
Serial.print("[HTTP] begin...\n");
// configure traged server and url
http.begin(client, "http://alarm.freeloong.top/detect"); //HTTP
http.addHeader("Content-Type", "application/json");
Serial.print("[HTTP] POST...\n");
// start connection and send HTTP header and body
int httpCode = http.POST("{\"motion\":\"no\"}");
// httpCode will be negative on error
if (httpCode > 0) {
// HTTP header has been send and Server response header has been handled
Serial.printf("[HTTP] POST... code: %d\n", httpCode);
// file found at server
if (httpCode == HTTP_CODE_OK) {
const String& payload = http.getString();
Serial.println("received payload:\n<<");
Serial.println(payload);
Serial.println(">>");
}
} else {
Serial.printf("[HTTP] POST... failed, error: %s\n", http.errorToString(httpCode).c_str());
}
http.end();
}
}
// the loop function runs over and over again forever
void loop() {
if ( digitalRead(Radar_Sensor) == HIGH) {
if (radar_status == 0) {
//digitalWrite(LED_BUILTIN, LOW); //Turn the LED on (Note that LOW is the voltage level
send_motion_detected(); //send the full signal to server
radar_status = 1;
}
}
else { //当前没有检测到有移动的物体
if (radar_status == 1) {
digitalWrite(LED_BUILTIN, HIGH); //Turn the LED off by making the voltage HIGH
send_no_motion(); //send the empty signal to server
radar_status = 0;
}
}
delay(200);
}
web服务端的启动脚本run_production.sh:
#!/bin/bash
exec_path=/home/d/alarm
log_path=/home/d/alarm
source /home/d/venvalarm/bin/activate
cd $exec_path
export FLASK_APP=alarm
export FLASK_ENV=prodution
flask run -p 8005
deactivate
关于本站
肥龙软件分享的软件是本站作者开发的免费,无广告,安全可靠,绝不附带任何无关软件,绝不困绑任何插件的实用软件。如果您感觉好用,可以捐赠我们,这样我们可以有更积极的动力去改进升级软件,维持服务器运转,感谢您的捐助,谢谢!
致谢 赞赏/捐助名单
更新时间:2024.8.31
联系作者(邮箱)
软件