基于face_recognition的简单人脸识别程序

sean 编辑于2022-03-21 21:41软件相关

之前玩树莓派的一个usb摄像头闲置,碰巧有一个闲置的G4560的pc(下载机,web服务器),于是简单加以改造,兼职成一个简单人脸识别程序

上图:

上服务端代码:

from flask import Flask
from flask import request
app = Flask(__name__)

import subprocess
import os
import time

import cv2
import face_recognition
import numpy as np
from threading import Thread

from pydub import AudioSegment
from pydub.playback import play


status='empty'

motion='no'
d_thread = None
face_detect_task = None


known_my_image = face_recognition.load_image_file("my.jpg")
known_qiqi_image = face_recognition.load_image_file("qiqi.jpg")
known_quanquan_image = face_recognition.load_image_file("quanquan.jpg")

# Get the face encodings for the known images
my_face_encoding = face_recognition.face_encodings(known_my_image)[0]
qiqi_face_encoding = face_recognition.face_encodings(known_qiqi_image)[0]
quanquan_face_encoding = face_recognition.face_encodings(known_quanquan_image)[0]

known_face_encodings = [
    my_face_encoding,
    qiqi_face_encoding,
    quanquan_face_encoding
]

known_face_names = [
    "babi",
    "qiqi",
    "quanquan"
]

face_locations = []
face_encodings = []

face_names = []
process_this_frame = True


def query_paused_and_stop_task(subp, length=300):
    '''
    #轮询status状态,如果是则中止传入subprocess生成的子进程。
    #参数 length为任务执行的大概时间,不确定可以稍大点,单位为秒,默认300秒
    #subp 为 子进程对象
    '''
    counter =  0
    while counter <length:
        global status
        if status == 'empty':
            subp.kill()
            break
        time.sleep(1)
        counter += 1
        print('sleep 1 second')

def play_task():
    subp = subprocess.Popen(['mplayer', '-af',  'volume=-10',  'https://blog.freeloong.top/admin/upload/shuimanjinshan.mp3'], shell=False)
    query_paused_and_stop_task(subp,37)
    global status
    return status

@app.route("/listen", methods=['GET','POST'])
def water():
    if request.method == 'POST':
        data = request.get_json()
        print(data)
        global status
        status = data['water']
        if status == 'full':
            result = play_task()
            return result
        elif status == 'empty':
            return status
    else:
        return "<p>Hello, World!</p>"

class FaceDetectTask:
    def __init__(self):
        self._running = True
        self.file_dict = {
                     'qiqi':'helloqiqi.mp3', 
                     'quanquan':'helloquanquan.mp3',
                     'babi':'hellobabi.mp3',
                     'mami':'hellomami.mp3',
                     'guest':'helloguest.mp3'   
                                }
    def stop(self):
        self._running = False

    def play_message_tip(self, name):
        file_name =  self.file_dict[name]
        song = AudioSegment.from_mp3(str(file_name))
        try:
                play(song)
        except FileNotFoundError:
            # 如果没有安装ffmpeg 或 libav,就会报错,可以尝试安装ffmpeg或libav即可
            # eg: sudo apt install ffmpeg
            pass
    
    def run(self):
        video_capture = cv2.VideoCapture(0)
        while self._running:
            ret, frame = video_capture.read()
            rgb_frame = frame[:, :, ::-1]
            face_locations = face_recognition.face_locations(rgb_frame)
            face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
            for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
                # See if the face is a match for the known face(s)
                matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
        
                name = "guest"
        
                # If a match was found in known_face_encodings, just use the first one.
                # if True in matches:
                #     first_match_index = matches.index(True)
                #     name = known_face_names[first_match_index]
        
                # Or instead, use the known face with the smallest distance to the new face
                face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
                best_match_index = np.argmin(face_distances)
                if matches[best_match_index]:
                    name = known_face_names[best_match_index]
                self.play_message_tip(name)
                print(f'hello! {name}')
                
        print('finish detect face')
        video_capture.release()


def face_detect(motion):
    global d_thread
    global face_detect_task
    if motion == 'yes':
        print(f'motion: {motion}')
        if d_thread:
            print(d_thread)
            print('face_detect is started! do nothing')
            return 'analyzing'
        else:
            face_detect_task = FaceDetectTask()
            d_thread = Thread(target=face_detect_task.run)
            d_thread.start()
            print('detecting the face')
            return 'analyzed'
    else:
        print(f'motion: {motion}')
        if d_thread:
            print('start close analyze thread')
            face_detect_task.stop()
            d_thread.join()
            d_thread = None
            print('finish close analyze thread')
            return 'closed analyze'
        else:
            print('no analyze thread do nothing')
            return 'no analyzed'
        
@app.route("/detect", methods=['GET','POST'])
def radar():
    if request.method == 'POST':
        data = request.get_json()
        global motion
        motion = data['motion']
        result = face_detect(motion)
        return result
    else:
        return "<p>Hello, World!</p>"

服务器端是跟上一个水溢出报警系统采用的同一个web端后台略加修改而成,出于功耗的考量,仅当它接收esp8266连接的小的感应雷达发来的检测信号时,才开启人脸检测程序比较省电.

esp8266代码radar_sensor.ino跟检测水位传感器类似:

#define Radar_Sensor D2 //attach radar sensor to digital D2

#include <ESP8266WiFi.h>
#include <ESP8266HTTPClient.h>

#define SERVER_DOMIAN_NAME "alarm.freeloong.top"

#ifndef STASSID
#define STASSID "your_wifi_ssid"
#define STAPSK  "your_passwd"
#endif

int radar_status = 0; //radar's status 0 not detected, 1 detected 


void setup() {
  pinMode(Radar_Sensor, INPUT); // The Radar sensor is an Input
  pinMode(LED_BUILTIN, OUTPUT);     // Initialize the LED_BUILTIN pin as an output
  digitalWrite(LED_BUILTIN, HIGH); //Turn the LED off by making the voltage HIGH

  Serial.begin(115200);

  Serial.println();
  Serial.println();
  Serial.println();

  WiFi.begin(STASSID, STAPSK);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("");
  Serial.print("Connected! IP address: ");
  Serial.println(WiFi.localIP());

}

void send_motion_detected() {
  if ((WiFi.status() == WL_CONNECTED)) {
    WiFiClient client;
    HTTPClient http;

    Serial.print("[HTTP] begin...\n");
    // configure traged server and url
    http.begin(client, "http://alarm.freeloong.top/detect"); //HTTP
    http.addHeader("Content-Type", "application/json");

    Serial.print("[HTTP] POST...\n");
    // start connection and send HTTP header and body
    int httpCode = http.POST("{\"motion\":\"yes\"}");

    // httpCode will be negative on error
    if (httpCode > 0) {
      // HTTP header has been send and Server response header has been handled
      Serial.printf("[HTTP] POST... code: %d\n", httpCode);

      // file found at server
      if (httpCode == HTTP_CODE_OK) {
        const String& payload = http.getString();
        Serial.println("received payload:\n<<");
        Serial.println(payload);
        Serial.println(">>");
        delay(10000);
      }
    } else {
      Serial.printf("[HTTP] POST... failed, error: %s\n", http.errorToString(httpCode).c_str());
    }

    http.end();
  }
}

void send_no_motion() {
  if ((WiFi.status() == WL_CONNECTED)) {

    WiFiClient client;
    HTTPClient http;

    Serial.print("[HTTP] begin...\n");
    // configure traged server and url
    http.begin(client, "http://alarm.freeloong.top/detect"); //HTTP
    http.addHeader("Content-Type", "application/json");

    Serial.print("[HTTP] POST...\n");
    // start connection and send HTTP header and body
    int httpCode = http.POST("{\"motion\":\"no\"}");

    // httpCode will be negative on error
    if (httpCode > 0) {
      // HTTP header has been send and Server response header has been handled
      Serial.printf("[HTTP] POST... code: %d\n", httpCode);

      // file found at server
      if (httpCode == HTTP_CODE_OK) {
        const String& payload = http.getString();
        Serial.println("received payload:\n<<");
        Serial.println(payload);
        Serial.println(">>");
      }
    } else {
      Serial.printf("[HTTP] POST... failed, error: %s\n", http.errorToString(httpCode).c_str());
    }

    http.end();
  }
}

// the loop function runs over and over again forever
void loop() {
  if ( digitalRead(Radar_Sensor) == HIGH) {
    if (radar_status == 0) {
      //digitalWrite(LED_BUILTIN, LOW); //Turn the LED on (Note that LOW is the voltage level
      send_motion_detected(); //send the full signal to server
      radar_status = 1;
    }
  }
  else { //当前没有检测到有移动的物体
    if (radar_status == 1) {
      digitalWrite(LED_BUILTIN, HIGH); //Turn the LED off by making the voltage HIGH
      send_no_motion(); //send the empty signal to server
      radar_status = 0;
    }
  }
  delay(200);
}

web服务端的启动脚本run_production.sh:

#!/bin/bash
exec_path=/home/d/alarm
log_path=/home/d/alarm
source /home/d/venvalarm/bin/activate
cd $exec_path
export FLASK_APP=alarm
export FLASK_ENV=prodution

flask run -p 8005
deactivate

关于本站

肥龙软件分享的软件是本站作者开发的免费,无广告,安全可靠,绝不附带任何无关软件,绝不困绑任何插件的实用软件。如果您感觉好用,可以捐赠我们,这样我们可以有更积极的动力去改进升级软件,维持服务器运转,感谢您的捐助,谢谢!

致谢 赞赏/捐助名单

2024-8-13 **军 ¥16.8

更新时间:2024.8.31

联系作者(邮箱)
分类