first commit

This commit is contained in:
Que 2024-12-05 14:24:03 +08:00
commit e1ef5996dd
36 changed files with 2262 additions and 0 deletions

18
.gitignore vendored Normal file
View File

@ -0,0 +1,18 @@
__pycache__
.vscode
.idea
build
dist
out
o
*.jpg
*.jpeg
*.png
*.txt
*.spec
test.py
*.bin
client.py
file2py.py
character.py
demo_kpt.py

View File

@ -0,0 +1 @@
from .authorization import Authorization

View File

@ -0,0 +1,21 @@
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
class MyAES():
def __init__(self, key: str, iv: str):
# key 和 iv 必须为 16 位
key = key.encode('utf-8')
iv = iv.encode('utf-8')
self.cryptor = AES.new(key, AES.MODE_CBC, iv)
def encrypt(self, plain_text: str):
encode_text = plain_text.encode('utf-8')
pad_text = pad(encode_text, AES.block_size)
encrypted_text = self.cryptor.encrypt(pad_text)
return encrypted_text
def decrypt(self, encrypted_text: bytes):
plain_text = self.cryptor.decrypt(encrypted_text)
plain_text = unpad(plain_text, AES.block_size).decode('utf-8')
return plain_text

View File

@ -0,0 +1,51 @@
import os
import hashlib
from .machine_code import MachineCodeGenerator
from .aes import MyAES
class Authorization:
def __init__(self):
self.aes_key = "9B8FD68A366F4D03"
self.aes_iv = "305FB72D83134CA0"
self.pre_str = "ZHDC" # 前缀
self.suf_str = "HYKJ" # 后缀
self.aes = MyAES(self.aes_key, self.aes_iv)
self.machine_code = MachineCodeGenerator(self.pre_str, self.suf_str).get(use_mac=True)
encrypt_code = self.aes.encrypt(self.machine_code)
self.md5_code = hashlib.md5(encrypt_code).hexdigest().upper()
def activate(self):
print('请发送', self.machine_code, '给华燕技术人员获取激活码')
activate_code = input('请输入激活码:')
if activate_code:
activate_code = activate_code.upper()
if activate_code == self.md5_code:
print("激活成功!")
with open('register.bin', 'w') as f:
f.write(activate_code)
return True
else:
print("激活码错误,请重新输入!")
return False
else:
return False
# 打开程序先调用注册文件,比较注册文件中注册码与此时的硬件信息编码后是否一致
def check(self):
if os.path.exists("register.bin"):
with open("register.bin", "r") as f:
activate_code = f.read()
if activate_code == self.md5_code:
return True
return False

View File

@ -0,0 +1,80 @@
import uuid
import hashlib
# import wmi
class MachineCodeGenerator():
def __init__(self, pre_str, suf_str):
self.pre_str = pre_str
self.suf_str = suf_str
# 获取机器码,机器码由以下四部分拼接组成
# 1、CPU序列号 2、MAC地址 3.硬盘序列号 4.主板序列号
# self.m_wmi = wmi.WMI()
# mac地址 12位
def get_mac_address(self):
node = uuid.getnode()
macHex = uuid.UUID(int=node).hex[-12:]
return macHex.upper()[:12]
# # cpu序列号 16位
# def get_cpu_serial(self):
# cpu_info = self.m_wmi.Win32_Processor()
# if len(cpu_info) > 0:
# serial_number = cpu_info[0].ProcessorId
# return serial_number[:16]
# else:
# return "ABCDEFGHIJKLMNOP"
# # 硬盘序列号 15位
# def get_disk_serial(self):
# disk_info = self.m_wmi.Win32_PhysicalMedia()
# disk_info.sort()
# if len(disk_info) > 0:
# serial_number = disk_info[0].SerialNumber.strip()
# return serial_number[:15]
# else:
# return "WD-ABCDEFGHIJKL"
# # 主板序列号 14位
# def get_board_serial(self):
# board_info = self.m_wmi.Win32_BaseBoard()
# if len(board_info) > 0:
# board_id = board_info[0].SerialNumber.strip().strip('.')
# return board_id[:14]
# else:
# return "ABCDEFGHIJKLMN"
# 拼接生成机器码
def get(self, use_mac = False, use_cpu = False, use_disk = False, use_board = False):
combine_str = self.pre_str
if use_mac:
self.mac_address = self.get_mac_address()
combine_str += self.mac_address
if use_cpu:
self.cpu_serial = self.get_cpu_serial()
combine_str += self.cpu_serial
if use_disk:
self.disk_serial = self.get_disk_serial()
combine_str += self.disk_serial
if use_board:
self.board_serial = self.get_board_serial()
combine_str += self.board_serial
combine_str += self.suf_str
combine_byte = combine_str.encode("utf-8")
machine_code = hashlib.md5(combine_byte).hexdigest()
return machine_code.upper()
# 获取机器硬件信息
# def getHardwareInfo(self):
# info = {
# "mac_address": self.mac_address,
# "cpu_serial": self.cpu_serial,
# "disk_serial": self.disk_serial,
# "board_serial": self.board_serial,
# }
# return info

View File

@ -0,0 +1,33 @@
config_certificate = {
"Global": {
"text_score": 0.4,
"use_det": True,
"use_cls": False,
"use_rec": True,
"print_verbose": False,
"min_height": 30,
"width_height_ratio": 8,
"inference_num_threads": -1,
},
"Det": {
"inference_num_threads": -1,
"use_cuda": False,
"model_path": "models/det_certificate_encrypt",
"limit_side_len": 960,
"limit_type": "max",
"thresh": 0.3,
"box_thresh": 0.5,
"max_candidates": 1000,
"unclip_ratio": 1.6,
"use_dilation": True,
"score_mode": "fast",
},
"Rec": {
"inference_num_threads": -1,
"use_cuda": False,
"model_path": "models/rec_certificate_encrypt",
"rec_keys_path": "models/common_dict.txt",
"rec_img_shape": [3, 48, 320],
"rec_batch_num": 6,
},
}

View File

@ -0,0 +1,38 @@
Global:
text_score: 0.4
use_det: true
use_cls: false
use_rec: true
print_verbose: false
min_height: 30
width_height_ratio: 8
inference_num_threads: &infer_num_threads -1
Det:
inference_num_threads: *infer_num_threads
use_cuda: false
model_path: models/det_certificate_encrypt
limit_side_len: 960
limit_type: max
thresh: 0.3
box_thresh: 0.5
max_candidates: 1000
unclip_ratio: 1.6
use_dilation: true
score_mode: fast
Rec:
inference_num_threads: *infer_num_threads
use_cuda: false
model_path: models/rec_certificate_encrypt
rec_keys_path: models/common_dict.txt
rec_img_shape: [3, 48, 320]
rec_batch_num: 6

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

View File

@ -0,0 +1,70 @@
import os
import traceback
from pathlib import Path
import numpy as np
from openvino.runtime import Core
from cryptography.fernet import Fernet
def model_decrypt(encryt_file, key):
with open(encryt_file, 'rb') as fr:
encrypted_data = fr.read()
decrypted_data = Fernet(key).decrypt(encrypted_data)
return decrypted_data
class OpenVINOInferSession:
def __init__(self, model_path=None, config=None, infer_num_threads=-1):
core = Core()
if config is not None:
model_path = config["model_path"]
infer_num_threads = config.get("inference_num_threads", -1)
if Path(model_path).is_file():
# self._verify_model(model_path)
model = core.read_model(model_path)
else:
key=b'QO_XTswXrn-GWc3hnFzOmM6c5MC2stRZzYYTSeKX3Wk='
model_data = model_decrypt(os.path.join(model_path, '__model__.encrypted'), key)
params_data = model_decrypt(os.path.join(model_path, '__params__.encrypted'), key)
model = core.read_model(model_data, params_data)
cpu_nums = os.cpu_count()
if infer_num_threads != -1 and 1 <= infer_num_threads <= cpu_nums:
core.set_property("CPU", {"INFERENCE_NUM_THREADS": str(infer_num_threads)})
compile_model = core.compile_model(model=model, device_name="CPU")
self.session = compile_model.create_infer_request()
# print(self.session.get_input_tensor().get_shape())
def __call__(self, input_content: np.ndarray) -> np.ndarray:
try:
outputs = self.session.infer(inputs=[input_content])
return list(outputs.values())
# return self.session.get_output_tensor().data
except Exception as e:
error_info = traceback.format_exc()
raise OpenVINOError(error_info) from e
def get_input_size(self):
# [n,c,h,w]
input_shape = self.session.get_input_tensor().get_shape()
# (w, h)
input_size = (input_shape[3], input_shape[2])
return input_size
@staticmethod
def _verify_model(model_path):
model_path = Path(model_path)
if not model_path.exists():
raise FileNotFoundError(f"{model_path} does not exists.")
if not model_path.is_file():
raise FileExistsError(f"{model_path} is not a file.")
class OpenVINOError(Exception):
pass

View File

@ -0,0 +1 @@
from .rtmpose import RTMPose

View File

@ -0,0 +1,70 @@
from typing import Tuple
import numpy as np
def get_simcc_maximum(simcc_x: np.ndarray,
simcc_y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Get maximum response location and value from simcc representations.
Note:
instance number: N
num_keypoints: K
heatmap height: H
heatmap width: W
Args:
simcc_x (np.ndarray): x-axis SimCC in shape (K, Wx) or (N, K, Wx)
simcc_y (np.ndarray): y-axis SimCC in shape (K, Wy) or (N, K, Wy)
Returns:
tuple:
- locs (np.ndarray): locations of maximum heatmap responses in shape
(K, 2) or (N, K, 2)
- vals (np.ndarray): values of maximum heatmap responses in shape
(K,) or (N, K)
"""
N, K, Wx = simcc_x.shape
simcc_x = simcc_x.reshape(N * K, -1)
simcc_y = simcc_y.reshape(N * K, -1)
# get maximum value locations
x_locs = np.argmax(simcc_x, axis=1)
y_locs = np.argmax(simcc_y, axis=1)
locs = np.stack((x_locs, y_locs), axis=-1).astype(np.float32)
max_val_x = np.amax(simcc_x, axis=1)
max_val_y = np.amax(simcc_y, axis=1)
# get maximum value across x and y axis
# mask = max_val_x > max_val_y
# max_val_x[mask] = max_val_y[mask]
vals = 0.5 * (max_val_x + max_val_y)
locs[vals <= 0.] = -1
# reshape
locs = locs.reshape(N, K, 2)
vals = vals.reshape(N, K)
return locs, vals
def convert_coco_to_openpose(keypoints, scores):
keypoints_info = np.concatenate((keypoints, scores[..., None]), axis=-1)
# compute neck joint
neck = np.mean(keypoints_info[:, [5, 6]], axis=1)
# neck score when visualizing pred
neck[:,
2:3] = np.where(keypoints_info[:, 5, 2:3] > keypoints_info[:, 6, 2:3],
keypoints_info[:, 6, 2:3], keypoints_info[:, 5, 2:3])
new_keypoints_info = np.insert(keypoints_info, 17, neck, axis=1)
mmpose_idx = [17, 6, 8, 10, 7, 9, 12, 14, 16, 13, 15, 2, 1, 4, 3]
openpose_idx = [1, 2, 3, 4, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17]
new_keypoints_info[:, openpose_idx] = \
new_keypoints_info[:, mmpose_idx]
keypoints_info = new_keypoints_info
keypoints, scores = keypoints_info[..., :2], keypoints_info[..., 2]
return keypoints, scores

View File

@ -0,0 +1,165 @@
from typing import Tuple
import cv2
import numpy as np
def bbox_xyxy2cs(bbox: np.ndarray,
padding: float = 1.) -> Tuple[np.ndarray, np.ndarray]:
"""Transform the bbox format from (x,y,w,h) into (center, scale)
Args:
bbox (ndarray): Bounding box(es) in shape (4,) or (n, 4), formatted
as (left, top, right, bottom)
padding (float): BBox padding factor that will be multilied to scale.
Default: 1.0
Returns:
tuple: A tuple containing center and scale.
- np.ndarray[float32]: Center (x, y) of the bbox in shape (2,) or
(n, 2)
- np.ndarray[float32]: Scale (w, h) of the bbox in shape (2,) or
(n, 2)
"""
# convert single bbox from (4, ) to (1, 4)
dim = bbox.ndim
if dim == 1:
bbox = bbox[None, :]
# get bbox center and scale
x1, y1, x2, y2 = np.hsplit(bbox, [1, 2, 3])
center = np.hstack([x1 + x2, y1 + y2]) * 0.5
scale = np.hstack([x2 - x1, y2 - y1]) * padding
if dim == 1:
center = center[0]
scale = scale[0]
return center, scale
def _rotate_point(pt: np.ndarray, angle_rad: float) -> np.ndarray:
"""Rotate a point by an angle.
Args:
pt (np.ndarray): 2D point coordinates (x, y) in shape (2, )
angle_rad (float): rotation angle in radian
Returns:
np.ndarray: Rotated point in shape (2, )
"""
sn, cs = np.sin(angle_rad), np.cos(angle_rad)
rot_mat = np.array([[cs, -sn], [sn, cs]])
return rot_mat @ pt
def _get_3rd_point(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""To calculate the affine matrix, three pairs of points are required. This
function is used to get the 3rd point, given 2D points a & b.
The 3rd point is defined by rotating vector `a - b` by 90 degrees
anticlockwise, using b as the rotation center.
Args:
a (np.ndarray): The 1st point (x,y) in shape (2, )
b (np.ndarray): The 2nd point (x,y) in shape (2, )
Returns:
np.ndarray: The 3rd point.
"""
direction = a - b
c = b + np.r_[-direction[1], direction[0]]
return c
def get_warp_matrix(center: np.ndarray,
scale: np.ndarray,
rot: float,
output_size: Tuple[int, int],
shift: Tuple[float, float] = (0., 0.),
inv: bool = False) -> np.ndarray:
"""Calculate the affine transformation matrix that can warp the bbox area
in the input image to the output size.
Args:
center (np.ndarray[2, ]): Center of the bounding box (x, y).
scale (np.ndarray[2, ]): Scale of the bounding box
wrt [width, height].
rot (float): Rotation angle (degree).
output_size (np.ndarray[2, ] | list(2,)): Size of the
destination heatmaps.
shift (0-100%): Shift translation ratio wrt the width/height.
Default (0., 0.).
inv (bool): Option to inverse the affine transform direction.
(inv=False: src->dst or inv=True: dst->src)
Returns:
np.ndarray: A 2x3 transformation matrix
"""
shift = np.array(shift)
src_w = scale[0]
dst_w = output_size[0]
dst_h = output_size[1]
# compute transformation matrix
rot_rad = np.deg2rad(rot)
src_dir = _rotate_point(np.array([0., src_w * -0.5]), rot_rad)
dst_dir = np.array([0., dst_w * -0.5])
# get four corners of the src rectangle in the original image
src = np.zeros((3, 2), dtype=np.float32)
src[0, :] = center + scale * shift
src[1, :] = center + src_dir + scale * shift
src[2, :] = _get_3rd_point(src[0, :], src[1, :])
# get four corners of the dst rectangle in the input image
dst = np.zeros((3, 2), dtype=np.float32)
dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir
dst[2, :] = _get_3rd_point(dst[0, :], dst[1, :])
if inv:
warp_mat = cv2.getAffineTransform(np.float32(dst), np.float32(src))
else:
warp_mat = cv2.getAffineTransform(np.float32(src), np.float32(dst))
return warp_mat
def top_down_affine(input_size: dict, bbox_scale: dict, bbox_center: dict,
img: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Get the bbox image as the model input by affine transform.
Args:
input_size (dict): The input size of the model.
bbox_scale (dict): The bbox scale of the img.
bbox_center (dict): The bbox center of the img.
img (np.ndarray): The original image.
Returns:
tuple: A tuple containing center and scale.
- np.ndarray[float32]: img after affine transform.
- np.ndarray[float32]: bbox scale after affine transform.
"""
w, h = input_size
warp_size = (int(w), int(h))
# reshape bbox to fixed aspect ratio
aspect_ratio = w / h
b_w, b_h = np.hsplit(bbox_scale, [1])
bbox_scale = np.where(b_w > b_h * aspect_ratio,
np.hstack([b_w, b_w / aspect_ratio]),
np.hstack([b_h * aspect_ratio, b_h]))
# get the affine matrix
center = bbox_center
scale = bbox_scale
rot = 0
warp_mat = get_warp_matrix(center, scale, rot, output_size=(w, h))
# do affine transform
img = cv2.warpAffine(img, warp_mat, warp_size, flags=cv2.INTER_LINEAR)
return img, bbox_scale

View File

@ -0,0 +1,106 @@
from typing import List, Tuple
import numpy as np
from infer_engine import OpenVINOInferSession
from .postprocess import get_simcc_maximum
from .preprocess import bbox_xyxy2cs, top_down_affine
class RTMPose():
def __init__(self,
model_path: str,):
self.mean = (123.675, 116.28, 103.53)
self.std = (58.395, 57.12, 57.375)
self.session = OpenVINOInferSession(model_path)
# print(self.session.get_input_size())
self.model_input_size = self.session.get_input_size()
def __call__(self, image: np.ndarray, bboxes: list = []):
if len(bboxes) == 0:
bboxes = [[0, 0, image.shape[1], image.shape[0]]]
keypoints, scores = [], []
for bbox in bboxes:
img_data, center, scale = self.preprocess(image, bbox)
# print(img)
outputs = self.session(img_data)
# print(outputs)
kpts, score = self.postprocess(outputs, center, scale)
keypoints.append(kpts)
scores.append(score)
keypoints = np.concatenate(keypoints, axis=0)
scores = np.concatenate(scores, axis=0)
return keypoints, scores
def preprocess(self, img: np.ndarray, bbox: list):
"""Do preprocessing for RTMPose model inference.
Args:
img (np.ndarray): Input image in shape.
bbox (list): xyxy-format bounding box of target.
Returns:
tuple:
- resized_img (np.ndarray): Preprocessed image.
- center (np.ndarray): Center of image.
- scale (np.ndarray): Scale of image.
"""
bbox = np.array(bbox)
# get center and scale
center, scale = bbox_xyxy2cs(bbox, padding=1.25)
# do affine transformation
resized_img, scale = top_down_affine(self.model_input_size, scale,
center, img)
# print(resized_img)
# normalize image
if self.mean is not None:
self.mean = np.array(self.mean)
self.std = np.array(self.std)
img_data = (resized_img - self.mean) / self.std
img_data = np.transpose(img_data, (2, 0, 1)) # Channel first
img_data = np.expand_dims(img_data, axis=0).astype(np.float32)
return img_data, center, scale
def postprocess(
self,
outputs: List[np.ndarray],
center: Tuple[int, int],
scale: Tuple[int, int],
simcc_split_ratio: float = 2.0) -> Tuple[np.ndarray, np.ndarray]:
"""Postprocess for RTMPose model output.
Args:
outputs (np.ndarray): Output of RTMPose model.
model_input_size (tuple): RTMPose model Input image size.
center (tuple): Center of bbox in shape (x, y).
scale (tuple): Scale of bbox in shape (w, h).
simcc_split_ratio (float): Split ratio of simcc.
Returns:
tuple:
- keypoints (np.ndarray): Rescaled keypoints.
- scores (np.ndarray): Model predict scores.
"""
# decode simcc
simcc_x, simcc_y = outputs
# print(simcc_x.shape)
locs, scores = get_simcc_maximum(simcc_x, simcc_y)
keypoints = locs / simcc_split_ratio
# rescale keypoints
keypoints = keypoints / self.model_input_size * scale
keypoints = keypoints + center - scale / 2
return keypoints, scores

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,34 @@
from keypoint_detector import RTMPose
from rapidocr_openvino import RapidOCR
from utils import four_point_transform
from result import Result
from configs.config_certificate import config_certificate
class PipePredictor():
def __init__(self) -> None:
self.kpt_detector = RTMPose('models/kpt_certificate_encrypt')
self.ocr = RapidOCR(config_certificate)
self.result = Result()
def __call__(self, img):
self.result.clear()
keypoints, scores = self.kpt_detector(img)
self.warped_img = four_point_transform(img, keypoints[0])
self.ocr_res, _ = self.ocr(self.warped_img)
# print(self.ocr_res)
self.result.update(self.ocr_res)
return self.result.get()

View File

@ -0,0 +1,2 @@
from .rapidocr import RapidOCR

View File

@ -0,0 +1 @@
from .text_cls import TextClassifier

View File

@ -0,0 +1,86 @@
import copy
import math
import time
from typing import Any, Dict, List, Tuple, Union
import cv2
import numpy as np
from infer_engine import OpenVINOInferSession
from .utils import ClsPostProcess
class TextClassifier:
def __init__(self, config: Dict[str, Any]):
self.cls_image_shape = config["cls_image_shape"]
self.cls_batch_num = config["cls_batch_num"]
self.cls_thresh = config["cls_thresh"]
self.postprocess_op = ClsPostProcess(config["label_list"])
self.infer = OpenVINOInferSession(config=config)
def __call__(
self, img_list: Union[np.ndarray, List[np.ndarray]]
) -> Tuple[List[np.ndarray], List[List[Union[str, float]]], float]:
if isinstance(img_list, np.ndarray):
img_list = [img_list]
img_list = copy.deepcopy(img_list)
# Calculate the aspect ratio of all text bars
width_list = [img.shape[1] / float(img.shape[0]) for img in img_list]
# Sorting can speed up the cls process
indices = np.argsort(np.array(width_list))
img_num = len(img_list)
cls_res = [["", 0.0]] * img_num
batch_num = self.cls_batch_num
elapse = 0
for beg_img_no in range(0, img_num, batch_num):
end_img_no = min(img_num, beg_img_no + batch_num)
norm_img_batch = []
for ino in range(beg_img_no, end_img_no):
norm_img = self.resize_norm_img(img_list[indices[ino]])
norm_img = norm_img[np.newaxis, :]
norm_img_batch.append(norm_img)
norm_img_batch = np.concatenate(norm_img_batch).astype(np.float32)
starttime = time.time()
prob_out = self.infer(norm_img_batch)[0]
cls_result = self.postprocess_op(prob_out)
elapse += time.time() - starttime
for rno, (label, score) in enumerate(cls_result):
cls_res[indices[beg_img_no + rno]] = [label, score]
if "180" in label and score > self.cls_thresh:
img_list[indices[beg_img_no + rno]] = cv2.rotate(
img_list[indices[beg_img_no + rno]], 1
)
return img_list, cls_res, elapse
def resize_norm_img(self, img: np.ndarray) -> np.ndarray:
img_c, img_h, img_w = self.cls_image_shape
h, w = img.shape[:2]
ratio = w / float(h)
if math.ceil(img_h * ratio) > img_w:
resized_w = img_w
else:
resized_w = int(math.ceil(img_h * ratio))
resized_image = cv2.resize(img, (resized_w, img_h))
resized_image = resized_image.astype("float32")
if img_c == 1:
resized_image = resized_image / 255
resized_image = resized_image[np.newaxis, :]
else:
resized_image = resized_image.transpose((2, 0, 1)) / 255
resized_image -= 0.5
resized_image /= 0.5
padding_im = np.zeros((img_c, img_h, img_w), dtype=np.float32)
padding_im[:, :, :resized_w] = resized_image
return padding_im

View File

@ -0,0 +1,15 @@
from typing import List, Tuple
import numpy as np
class ClsPostProcess:
def __init__(self, label_list: List[str]):
self.label_list = label_list
def __call__(self, preds: np.ndarray) -> List[Tuple[str, float]]:
pred_idxs = preds.argmax(axis=1)
decode_out = [
(self.label_list[idx], preds[i, idx]) for i, idx in enumerate(pred_idxs)
]
return decode_out

View File

@ -0,0 +1 @@
from .text_detect import TextDetector

View File

@ -0,0 +1,96 @@
import time
from typing import Any, Dict, Optional, Tuple
import numpy as np
from infer_engine import OpenVINOInferSession
from .utils import DBPostProcess, DetPreProcess
class TextDetector:
def __init__(self, config: Dict[str, Any]):
limit_side_len = config.get("limit_side_len", 736)
limit_type = config.get("limit_type", "min")
self.preprocess_op = DetPreProcess(limit_side_len, limit_type)
post_process = {
"thresh": config.get("thresh", 0.3),
"box_thresh": config.get("box_thresh", 0.5),
"max_candidates": config.get("max_candidates", 1000),
"unclip_ratio": config.get("unclip_ratio", 1.6),
"use_dilation": config.get("use_dilation", True),
"score_mode": config.get("score_mode", "fast"),
}
self.postprocess_op = DBPostProcess(**post_process)
self.infer = OpenVINOInferSession(config=config)
def __call__(self, img: np.ndarray) -> Tuple[Optional[np.ndarray], float]:
start_time = time.perf_counter()
if img is None:
raise ValueError("img is None")
ori_img_shape = img.shape[0], img.shape[1]
prepro_img = self.preprocess_op(img)
if prepro_img is None:
return None, 0
preds = self.infer(prepro_img)[0]
dt_boxes, dt_boxes_scores = self.postprocess_op(preds, ori_img_shape)
# print(dt_boxes)
dt_boxes = self.filter_tag_det_res(dt_boxes, ori_img_shape)
elapse = time.perf_counter() - start_time
return dt_boxes, elapse
def filter_tag_det_res(
self, dt_boxes: np.ndarray, image_shape: Tuple[int, int]
) -> np.ndarray:
img_height, img_width = image_shape
dt_boxes_new = []
for box in dt_boxes:
box = self.order_points_clockwise(box)
box = self.clip_det_res(box, img_height, img_width)
rect_width = int(np.linalg.norm(box[0] - box[1]))
rect_height = int(np.linalg.norm(box[0] - box[3]))
if rect_width <= 3 or rect_height <= 3:
continue
dt_boxes_new.append(box)
return np.array(dt_boxes_new)
def order_points_clockwise(self, pts: np.ndarray) -> np.ndarray:
"""
reference from:
https://github.com/jrosebr1/imutils/blob/master/imutils/perspective.py
sort the points based on their x-coordinates
"""
xSorted = pts[np.argsort(pts[:, 0]), :]
# grab the left-most and right-most points from the sorted
# x-roodinate points
leftMost = xSorted[:2, :]
rightMost = xSorted[2:, :]
# now, sort the left-most coordinates according to their
# y-coordinates so we can grab the top-left and bottom-left
# points, respectively
leftMost = leftMost[np.argsort(leftMost[:, 1]), :]
(tl, bl) = leftMost
rightMost = rightMost[np.argsort(rightMost[:, 1]), :]
(tr, br) = rightMost
rect = np.array([tl, tr, br, bl], dtype="float32")
return rect
def clip_det_res(
self, points: np.ndarray, img_height: int, img_width: int
) -> np.ndarray:
for pno in range(points.shape[0]):
points[pno, 0] = int(min(max(points[pno, 0], 0), img_width - 1))
points[pno, 1] = int(min(max(points[pno, 1], 0), img_height - 1))
return points

View File

@ -0,0 +1,226 @@
from typing import List, Optional, Tuple
import cv2
import numpy as np
import pyclipper
from shapely.geometry import Polygon
class DetPreProcess:
def __init__(self, limit_side_len: int = 736, limit_type: str = "min"):
self.mean = np.array([0.485, 0.456, 0.406])
self.std = np.array([0.229, 0.224, 0.225])
self.scale = 1 / 255.0
self.limit_side_len = limit_side_len
self.limit_type = limit_type
def __call__(self, img: np.ndarray) -> Optional[np.ndarray]:
resized_img = self.resize(img)
if resized_img is None:
return None
img = self.normalize(resized_img)
img = self.permute(img)
img = np.expand_dims(img, axis=0).astype(np.float32)
return img
def normalize(self, img: np.ndarray) -> np.ndarray:
return (img.astype("float32") * self.scale - self.mean) / self.std
def permute(self, img: np.ndarray) -> np.ndarray:
return img.transpose((2, 0, 1))
def resize(self, img: np.ndarray) -> Optional[np.ndarray]:
"""resize image to a size multiple of 32 which is required by the network"""
h, w = img.shape[:2]
if self.limit_type == "max":
if max(h, w) > self.limit_side_len:
if h > w:
ratio = float(self.limit_side_len) / h
else:
ratio = float(self.limit_side_len) / w
else:
ratio = 1.0
else:
if min(h, w) < self.limit_side_len:
if h < w:
ratio = float(self.limit_side_len) / h
else:
ratio = float(self.limit_side_len) / w
else:
ratio = 1.0
resize_h = int(h * ratio)
resize_w = int(w * ratio)
resize_h = int(round(resize_h / 32) * 32)
resize_w = int(round(resize_w / 32) * 32)
try:
if int(resize_w) <= 0 or int(resize_h) <= 0:
return None
img = cv2.resize(img, (int(resize_w), int(resize_h)))
except Exception as exc:
raise ResizeImgError from exc
return img
class ResizeImgError(Exception):
pass
class DBPostProcess:
"""The post process for Differentiable Binarization (DB)."""
def __init__(
self,
thresh: float = 0.3,
box_thresh: float = 0.7,
max_candidates: int = 1000,
unclip_ratio: float = 2.0,
score_mode: str = "fast",
use_dilation: bool = False,
):
self.thresh = thresh
self.box_thresh = box_thresh
self.max_candidates = max_candidates
self.unclip_ratio = unclip_ratio
self.min_size = 3
self.score_mode = score_mode
self.dilation_kernel = None
if use_dilation:
self.dilation_kernel = np.array([[1, 1], [1, 1]])
def __call__(
self, pred: np.ndarray, ori_shape: Tuple[int, int]
) -> Tuple[np.ndarray, List[float]]:
src_h, src_w = ori_shape
pred = pred[:, 0, :, :]
segmentation = pred > self.thresh
mask = segmentation[0]
if self.dilation_kernel is not None:
mask = cv2.dilate(
np.array(segmentation[0]).astype(np.uint8), self.dilation_kernel
)
boxes, scores = self.boxes_from_bitmap(pred[0], mask, src_w, src_h)
return boxes, scores
def boxes_from_bitmap(
self, pred: np.ndarray, bitmap: np.ndarray, dest_width: int, dest_height: int
) -> Tuple[np.ndarray, List[float]]:
"""
bitmap: single map with shape (1, H, W),
whose values are binarized as {0, 1}
"""
height, width = bitmap.shape
outs = cv2.findContours(
(bitmap * 255).astype(np.uint8), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE
)
if len(outs) == 3:
img, contours, _ = outs[0], outs[1], outs[2]
elif len(outs) == 2:
contours, _ = outs[0], outs[1]
num_contours = min(len(contours), self.max_candidates)
boxes, scores = [], []
for index in range(num_contours):
contour = contours[index]
points, sside = self.get_mini_boxes(contour)
if sside < self.min_size:
continue
if self.score_mode == "fast":
score = self.box_score_fast(pred, points.reshape(-1, 2))
else:
score = self.box_score_slow(pred, contour)
if self.box_thresh > score:
continue
box = self.unclip(points)
box, sside = self.get_mini_boxes(box)
if sside < self.min_size + 2:
continue
box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width)
box[:, 1] = np.clip(
np.round(box[:, 1] / height * dest_height), 0, dest_height
)
boxes.append(box.astype(np.int32))
scores.append(score)
return np.array(boxes, dtype=np.int32), scores
def get_mini_boxes(self, contour: np.ndarray) -> Tuple[np.ndarray, float]:
bounding_box = cv2.minAreaRect(contour)
points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0])
index_1, index_2, index_3, index_4 = 0, 1, 2, 3
if points[1][1] > points[0][1]:
index_1 = 0
index_4 = 1
else:
index_1 = 1
index_4 = 0
if points[3][1] > points[2][1]:
index_2 = 2
index_3 = 3
else:
index_2 = 3
index_3 = 2
box = np.array(
[points[index_1], points[index_2], points[index_3], points[index_4]]
)
return box, min(bounding_box[1])
@staticmethod
def box_score_fast(bitmap: np.ndarray, _box: np.ndarray) -> float:
h, w = bitmap.shape[:2]
box = _box.copy()
xmin = np.clip(np.floor(box[:, 0].min()).astype(np.int32), 0, w - 1)
xmax = np.clip(np.ceil(box[:, 0].max()).astype(np.int32), 0, w - 1)
ymin = np.clip(np.floor(box[:, 1].min()).astype(np.int32), 0, h - 1)
ymax = np.clip(np.ceil(box[:, 1].max()).astype(np.int32), 0, h - 1)
mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8)
box[:, 0] = box[:, 0] - xmin
box[:, 1] = box[:, 1] - ymin
cv2.fillPoly(mask, box.reshape(1, -1, 2).astype(np.int32), 1)
return cv2.mean(bitmap[ymin : ymax + 1, xmin : xmax + 1], mask)[0]
def box_score_slow(self, bitmap: np.ndarray, contour: np.ndarray) -> float:
"""use polyon mean score as the mean score"""
h, w = bitmap.shape[:2]
contour = contour.copy()
contour = np.reshape(contour, (-1, 2))
xmin = np.clip(np.min(contour[:, 0]), 0, w - 1)
xmax = np.clip(np.max(contour[:, 0]), 0, w - 1)
ymin = np.clip(np.min(contour[:, 1]), 0, h - 1)
ymax = np.clip(np.max(contour[:, 1]), 0, h - 1)
mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8)
contour[:, 0] = contour[:, 0] - xmin
contour[:, 1] = contour[:, 1] - ymin
cv2.fillPoly(mask, contour.reshape(1, -1, 2).astype(np.int32), 1)
return cv2.mean(bitmap[ymin : ymax + 1, xmin : xmax + 1], mask)[0]
def unclip(self, box: np.ndarray) -> np.ndarray:
unclip_ratio = self.unclip_ratio
poly = Polygon(box)
distance = poly.area * unclip_ratio / poly.length
offset = pyclipper.PyclipperOffset()
offset.AddPath(box, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON)
expanded = np.array(offset.Execute(distance)).reshape((-1, 1, 2))
return expanded

View File

@ -0,0 +1 @@
from .text_recognize import TextRecognizer

View File

@ -0,0 +1,91 @@
import math
import time
from typing import List, Tuple, Union
import cv2
import numpy as np
from infer_engine import OpenVINOInferSession
from .utils import CTCLabelDecode
# from character import character_list
class TextRecognizer:
def __init__(self, config):
self.rec_image_shape = config["rec_img_shape"]
self.rec_batch_num = config["rec_batch_num"]
self.character_dict_path = config["rec_keys_path"]
self.postprocess_op = CTCLabelDecode(character_path=self.character_dict_path)
# self.postprocess_op = CTCLabelDecode(character=character_list)
self.infer = OpenVINOInferSession(config=config)
def __call__(
self, img_list: Union[np.ndarray, List[np.ndarray]]
) -> Tuple[List[Tuple[str, float]], float]:
if isinstance(img_list, np.ndarray):
img_list = [img_list]
# Calculate the aspect ratio of all text bars
width_list = [img.shape[1] / float(img.shape[0]) for img in img_list]
# Sorting can speed up the recognition process
indices = np.argsort(np.array(width_list))
img_num = len(img_list)
rec_res = [("", 0.0)] * img_num
batch_num = self.rec_batch_num
elapse = 0
for beg_img_no in range(0, img_num, batch_num):
end_img_no = min(img_num, beg_img_no + batch_num)
max_wh_ratio = 0
for ino in range(beg_img_no, end_img_no):
h, w = img_list[indices[ino]].shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
norm_img_batch = []
for ino in range(beg_img_no, end_img_no):
norm_img = self.resize_norm_img(img_list[indices[ino]], max_wh_ratio)
norm_img_batch.append(norm_img[np.newaxis, :])
norm_img_batch = np.concatenate(norm_img_batch).astype(np.float32)
starttime = time.time()
preds = self.infer(norm_img_batch)[0]
rec_result = self.postprocess_op(preds)
for rno, one_res in enumerate(rec_result):
rec_res[indices[beg_img_no + rno]] = one_res
elapse += time.time() - starttime
return rec_res, elapse
def resize_norm_img(self, img: np.ndarray, max_wh_ratio: float) -> np.ndarray:
img_channel, img_height, img_width = self.rec_image_shape
assert img_channel == img.shape[2]
img_width = int(img_height * max_wh_ratio)
h, w = img.shape[:2]
ratio = w / float(h)
if math.ceil(img_height * ratio) > img_width:
resized_w = img_width
else:
resized_w = int(math.ceil(img_height * ratio))
resized_image = cv2.resize(img, (resized_w, img_height))
resized_image = resized_image.astype("float32")
resized_image = resized_image.transpose((2, 0, 1)) / 255
resized_image -= 0.5
resized_image /= 0.5
padding_im = np.zeros((img_channel, img_height, img_width), dtype=np.float32)
padding_im[:, :, 0:resized_w] = resized_image
return padding_im

View File

@ -0,0 +1,98 @@
from pathlib import Path
from typing import List, Optional, Tuple, Union
import numpy as np
class CTCLabelDecode:
def __init__(
self,
character: Optional[List[str]] = None,
character_path: Union[str, Path, None] = None,
):
self.character = self.get_character(character, character_path)
self.dict = {char: i for i, char in enumerate(self.character)}
def __call__(self, preds: np.ndarray) -> List[Tuple[str, float]]:
preds_idx = preds.argmax(axis=2)
preds_prob = preds.max(axis=2)
text = self.decode(preds_idx, preds_prob, is_remove_duplicate=True)
return text
def get_character(
self,
character: Optional[List[str]] = None,
character_path: Union[str, Path, None] = None,
) -> List[str]:
if character is None and character_path is None:
raise ValueError("character must not be None")
character_list = None
if character:
character_list = character
if character_path:
character_list = self.read_character_file(character_path)
if character_list is None:
raise ValueError("character must not be None")
character_list = self.insert_special_char(
character_list, " ", len(character_list)
)
character_list = self.insert_special_char(character_list, "blank", 0)
return character_list
@staticmethod
def read_character_file(character_path: Union[str, Path]) -> List[str]:
character_list = []
with open(character_path, "rb") as f:
lines = f.readlines()
for line in lines:
line = line.decode("utf-8").strip("\n").strip("\r\n")
character_list.append(line)
return character_list
@staticmethod
def insert_special_char(
character_list: List[str], special_char: str, loc: int = -1
) -> List[str]:
character_list.insert(loc, special_char)
return character_list
def decode(
self,
text_index: np.ndarray,
text_prob: Optional[np.ndarray] = None,
is_remove_duplicate: bool = False,
) -> List[Tuple[str, float]]:
"""convert text-index into text-label."""
result_list = []
ignored_tokens = self.get_ignored_tokens()
batch_size = len(text_index)
for batch_idx in range(batch_size):
char_list, conf_list = [], []
cur_pred_ids = text_index[batch_idx]
for idx, cur_idx in enumerate(cur_pred_ids):
if cur_idx in ignored_tokens:
continue
if is_remove_duplicate:
# only for predict
if idx > 0 and cur_pred_ids[idx - 1] == cur_idx:
continue
char_list.append(self.character[int(cur_idx)])
if text_prob is None:
conf_list.append(1)
else:
conf_list.append(text_prob[batch_idx][idx])
text = "".join(char_list)
result_list.append((text, np.mean(conf_list if any(conf_list) else [0])))
return result_list
@staticmethod
def get_ignored_tokens() -> List[int]:
return [0] # for ctc blank

View File

@ -0,0 +1,275 @@
import copy
from pathlib import Path
from typing import Any, List, Optional, Tuple, Union
import cv2
import numpy as np
from .ch_ppocr_cls import TextClassifier
from .ch_ppocr_det import TextDetector
from .ch_ppocr_rec import TextRecognizer
from template import template_960
class RapidOCR:
def __init__(self, config = None, **kwargs):
# if kwargs:
# updater = UpdateParameters()
# config = updater(config, **kwargs)
global_config = config["Global"]
self.print_verbose = global_config["print_verbose"]
self.text_score = global_config["text_score"]
self.min_height = global_config["min_height"]
self.width_height_ratio = global_config["width_height_ratio"]
self.use_det = global_config["use_det"]
if self.use_det:
self.text_det = TextDetector(config["Det"])
self.use_cls = global_config["use_cls"]
if self.use_cls:
self.text_cls = TextClassifier(config["Cls"])
self.use_rec = global_config["use_rec"]
if self.use_rec:
self.text_rec = TextRecognizer(config["Rec"])
# self.load_img = LoadImage()
def __call__(
self,
img_content: Union[str, np.ndarray, bytes, Path],
use_det: Optional[bool] = None,
use_cls: Optional[bool] = None,
use_rec: Optional[bool] = None,
**kwargs,
) -> Tuple[Optional[List[List[Union[Any, str]]]], Optional[List[float]]]:
use_det = self.use_det if use_det is None else use_det
use_cls = self.use_cls if use_cls is None else use_cls
use_rec = self.use_rec if use_rec is None else use_rec
if kwargs:
box_thresh = kwargs.get("box_thresh", 0.5)
unclip_ratio = kwargs.get("unclip_ratio", 1.6)
text_score = kwargs.get("text_score", 0.5)
self.text_det.postprocess_op.box_thresh = box_thresh
self.text_det.postprocess_op.unclip_ratio = unclip_ratio
self.text_score = text_score
# img = self.load_img(img_content)
img = img_content
dt_boxes, cls_res, rec_res = None, None, None
det_elapse, cls_elapse, rec_elapse = 0.0, 0.0, 0.0
if use_det:
img, padding_h = self.maybe_add_letterbox(img)
dt_boxes, det_elapse = self.auto_text_det(img)
if dt_boxes is None:
return None, None
template_boxes, keys = [], []
# label_ids = []
for index, value in template_960.items():
box = value['box']
key = value['key']
# label = index
template_boxes.append(box)
keys.append(key)
# label_ids.append(label)
matched_flags = [False] * len(template_boxes)
matched_boxes = []
matched_keys = []
for dt_box in dt_boxes:
dt_pt0 = dt_box[0]
min_distance = float('inf')
best_index = None
for idx, box in enumerate(template_boxes):
pt0 = np.array([box[0], box[1]])
distance = np.linalg.norm(dt_pt0 - pt0)
if not matched_flags[idx] and distance <= 30.0:
if distance < min_distance:
min_distance = distance
best_index = idx
if best_index is not None:
matched_boxes.append(dt_box)
matched_keys.append(keys[best_index])
matched_flags[best_index] = True
dt_boxes = matched_boxes
dt_keys = matched_keys
img = self.get_crop_img_list(img, dt_boxes)
if use_cls:
img, cls_res, cls_elapse = self.text_cls(img)
if use_rec:
rec_res, rec_elapse = self.text_rec(img)
if dt_boxes is not None and padding_h > 0:
for box, cls in dt_boxes:
box[:, 1] -= padding_h
ocr_res = self.get_final_res(
dt_boxes, dt_keys, cls_res, rec_res, det_elapse, cls_elapse, rec_elapse
)
return ocr_res
def maybe_add_letterbox(self, img: np.ndarray) -> Tuple[np.ndarray, int]:
h, w = img.shape[:2]
if self.width_height_ratio == -1:
use_limit_ratio = False
else:
use_limit_ratio = w / h > self.width_height_ratio
if h <= self.min_height or use_limit_ratio:
new_h = max(int(w / self.width_height_ratio), self.min_height) * 2
padding_h = int(abs(new_h - h) / 2)
block_img = cv2.copyMakeBorder(
img, padding_h, padding_h, 0, 0, cv2.BORDER_CONSTANT, value=(0, 0, 0)
)
return block_img, padding_h
return img, 0
def auto_text_det(
self, img: np.ndarray
) -> Tuple[Optional[List[np.ndarray]], float]:
dt_boxes, det_elapse = self.text_det(img)
if dt_boxes is None or len(dt_boxes) < 1:
return None, 0.0
dt_boxes = self.sorted_boxes(dt_boxes)
return dt_boxes, det_elapse
def get_crop_img_list(
self, img: np.ndarray, dt_boxes: List
) -> List[np.ndarray]:
def get_rotate_crop_image(img: np.ndarray, points: np.ndarray) -> np.ndarray:
img_crop_width = int(
max(
np.linalg.norm(points[0] - points[1]),
np.linalg.norm(points[2] - points[3]),
)
)
img_crop_height = int(
max(
np.linalg.norm(points[0] - points[3]),
np.linalg.norm(points[1] - points[2]),
)
)
pts_std = np.array(
[
[0, 0],
[img_crop_width, 0],
[img_crop_width, img_crop_height],
[0, img_crop_height],
]
).astype(np.float32)
M = cv2.getPerspectiveTransform(points, pts_std)
dst_img = cv2.warpPerspective(
img,
M,
(img_crop_width, img_crop_height),
borderMode=cv2.BORDER_REPLICATE,
flags=cv2.INTER_CUBIC,
)
dst_img_height, dst_img_width = dst_img.shape[0:2]
if dst_img_height * 1.0 / dst_img_width >= 1.5:
dst_img = np.rot90(dst_img)
return dst_img
img_crop_list = []
for box in dt_boxes:
tmp_box = copy.deepcopy(box)
img_crop = get_rotate_crop_image(img, tmp_box)
img_crop_list.append(img_crop)
return img_crop_list
@staticmethod
def sorted_boxes(dt_boxes: np.ndarray) -> List[np.ndarray]:
"""
Sort text boxes in order from top to bottom, left to right
args:
dt_boxes(array):detected text boxes with shape [4, 2]
return:
sorted boxes(array) with shape [4, 2]
"""
num_boxes = dt_boxes.shape[0]
sorted_boxes = sorted(dt_boxes, key=lambda x: (x[0][1], x[0][0]))
_boxes = list(sorted_boxes)
for i in range(num_boxes - 1):
for j in range(i, -1, -1):
if (
abs(_boxes[j + 1][0][1] - _boxes[j][0][1]) < 10
and _boxes[j + 1][0][0] < _boxes[j][0][0]
):
tmp = _boxes[j]
_boxes[j] = _boxes[j + 1]
_boxes[j + 1] = tmp
else:
break
return _boxes
def get_final_res(
self,
dt_boxes: Optional[List[np.ndarray]],
dt_keys,
cls_res: Optional[List[List[Union[str, float]]]],
rec_res: Optional[List[Tuple[str, float]]],
det_elapse: float,
cls_elapse: float,
rec_elapse: float,
) -> Tuple[Optional[List[List[Union[Any, str]]]], Optional[List[float]]]:
if dt_boxes is None and rec_res is None and cls_res is not None:
return cls_res, [cls_elapse]
if dt_boxes is None and rec_res is None:
return None, None
if dt_boxes is None and rec_res is not None:
return [[res[0], res[1]] for res in rec_res], [rec_elapse]
if dt_boxes is not None and rec_res is None:
return [box.tolist() for box in dt_boxes], [det_elapse]
dt_boxes, dt_keys, rec_res = self.filter_result(dt_boxes, dt_keys, rec_res)
if not dt_boxes or not rec_res or len(dt_boxes) <= 0:
return None, None
ocr_res = [
[box.tolist(), key, res[0], res[1]] for box, key, res in zip(dt_boxes, dt_keys, rec_res)
], [det_elapse, cls_elapse, rec_elapse]
return ocr_res
def filter_result(
self,
dt_boxes: Optional[List[Tuple[np.ndarray, int]]],
dt_keys,
rec_res: Optional[List[Tuple[str, float]]],
) -> Tuple[Optional[List[np.ndarray]], Optional[List[Tuple[str, float]]]]:
if dt_boxes is None or rec_res is None:
return None, None
filter_boxes, filter_keys, filter_rec_res = [], [], []
for box, key, rec_reuslt in zip(dt_boxes, dt_keys, rec_res):
text, score = rec_reuslt
if float(score) >= self.text_score:
filter_boxes.append(box)
filter_keys.append(key)
filter_rec_res.append(rec_reuslt)
return filter_boxes, filter_keys, filter_rec_res

View File

@ -0,0 +1,115 @@
import wmi
import os
import base64
import hashlib
import pyDes
import uuid
class Register:
def __init__(self):
self.Des_key = "zhdchykj" #Key,需八位
self.Des_IV = "12345678" # 自定IV向量
self.pre_str = "ZHDC" # 前缀
self.suf_str = "HYKJ" # 后缀
# 获取机器码,机器码由以下四部分拼接组成
# 1、CPU序列号 2、MAC地址 3.硬盘序列号 4.主板序列号
self.m_wmi = wmi.WMI()
#cpu序列号 16位
def get_cpu_serial(self):
cpu_info = self.m_wmi.Win32_Processor()
if len(cpu_info) > 0:
serial_number = cpu_info[0].ProcessorId
return serial_number
else:
return "ABCDEFGHIJKLMNOP"
#硬盘序列号 15位
def get_disk_serial(self):
disk_info = self.m_wmi.Win32_PhysicalMedia()
disk_info.sort()
if len(disk_info) > 0:
serial_number = disk_info[0].SerialNumber.strip()
return serial_number
else:
return "WD-ABCDEFGHIJKL"
#mac地址 12位
# def get_mac_address(self):
# for network in self.m_wmi.Win32_NetworkAdapterConfiguration():
# mac_address = network.MacAddress
# if mac_address != None:
# return mac_address.replace(":", "")
# return "ABCDEF123456"
def get_mac_address(self):
node = uuid.getnode()
macHex = uuid.UUID(int=node).hex[-12:]
return macHex.upper()[:12]
#主板序列号 14位
def get_board_serial(self):
board_info = self.m_wmi.Win32_BaseBoard()
if len(board_info) > 0:
board_id = board_info[0].SerialNumber.strip().strip('.')
return board_id
else:
return "ABCDEFGHIJKLMN"
# 拼接生成机器码
def getMachineCode(self):
mac_address = self.get_mac_address()
# cpu_serial = self.get_cpu_serial()
# disk_serial = self.get_disk_serial()
# board_serial = self.get_board_serial()
# combine_str = self.pre_str + mac_address + cpu_serial + disk_serial + board_serial + self.suf_str
combine_str = self.pre_str + mac_address + self.suf_str
combine_byte = combine_str.encode("utf-8")
machine_code = hashlib.md5(combine_byte).hexdigest()
return machine_code.upper()
#DES+base64加密
def Encrypt(self, tr):
k = pyDes.des(self.Des_key, pyDes.CBC, self.Des_IV, pad=None, padmode=pyDes.PAD_PKCS5)
EncryptStr = k.encrypt(tr)
return base64.b32encode(EncryptStr) # 转base64编码返回
# 获取register成功后生成注册文件
def register(self):
machine_code = self.getMachineCode()
print('请发送', machine_code, '给华燕技术人员获取激活码')
key_code = input('请输入激活码:')
if key_code:
encrypt_code = self.Encrypt(machine_code.encode("utf-8"))
md5_code = hashlib.md5(encrypt_code).hexdigest().upper()
key_code = key_code.upper()
if md5_code == key_code:
print("激活成功!")
with open('register.bin', 'w') as f:
f.write(key_code)
return True
else:
print("激活码错误,请重新输入!")
return False
else:
return False
# 打开程序先调用注册文件,比较注册文件中注册码与此时的硬件信息编码后是否一致
def checkAuthored(self):
machine_code = self.getMachineCode()
encrypt_code = self.Encrypt(machine_code.encode("utf-8"))
md5_code = hashlib.md5(encrypt_code).hexdigest().upper()
if os.path.exists("register.bin"):
with open("register.bin", "r") as f:
key_code = f.read()
if key_code == md5_code:
return True
return False

View File

@ -0,0 +1,121 @@
import json
class Result(object):
def __init__(self) -> None:
# self.res_dict = {
# "合格证编号": "",
# "发证日期":"",
# "车辆制造企业名称":"",
# "车辆品牌":"",
# "车辆名称":"",
# "车辆型号":"",
# "车架号":"",
# "车身颜色":"",
# "底盘型号":"",
# "底盘ID":"",
# "底盘合格证编号":"",
# "发动机型号":"",
# "发动机号":"",
# "燃料种类":"",
# "排量":"",
# "功率":"",
# "排放标准":"",
# "油耗":"",
# "外廓尺寸1":"",
# "外廓尺寸2":"",
# "外廓尺寸3":"",
# "货箱内部尺寸1":"",
# "货箱内部尺寸2":"",
# "货箱内部尺寸3":"",
# "钢板弹簧片数":"",
# "轮胎数":"",
# "轮胎规格":"",
# "前轮距":"",
# "后轮距":"",
# "轴距":"",
# "轴荷":"",
# "轴数":"",
# "转向形式":"",
# "总质量":"",
# "整备质量":"",
# "额定载质量":"",
# "载质量利用系数":"",
# "准牵引总质量":"",
# "半挂车鞍座最大允许总质量":"",
# "驾驶室准乘人数":"",
# "额定载客":"",
# "最高设计车速":"",
# "车辆制造日期":"",
# }
self.res_dict = {
"HGZBH": "",
"FZRQ":"",
"CLZZQYMC":"",
"CLPP":"",
"CLMC":"",
"CLXH":"",
"CJH":"",
"CSYS":"",
"DPXH":"",
"DPID":"",
"DPHGZBH":"",
"FDJXH":"",
"FDJH":"",
"RLZL":"",
"PL":"",
"GL":"",
"PFBZ":"",
"YH":"",
"WKCC1":"",
"WKCC2":"",
"WKCC3":"",
"HXNBCC1":"",
"HXNBCC2":"",
"HXNBCC3":"",
"GBTHPS":"",
"LTS":"",
"LTGG":"",
"QLJ":"",
"HLJ":"",
"ZJ":"",
"ZH":"",
"ZS":"",
"ZXXS":"",
"ZZL":"",
"ZBZL":"",
"EDZZL":"",
"ZZLLYXS":"",
"ZQYZZL":"",
"BGCAZZDYXZZL":"",
"JSSZCRS":"",
"EDZK":"",
"ZGSJCS":"",
"ZLZZRQ":"",
}
def update(self, ocr_res):
if ocr_res is not None:
# boxes, keys, txts, scores = list(zip(*ocr_res))
for box, key, text, score in ocr_res:
self.res_dict[key] = text
# if len(ocr_res)==len(self.res_dict):
# for key, (text, score) in zip(self.res_dict.keys(), ocr_res):
# if score < 0.5:
# continue
# self.res_dict[key] = text
def clear(self):
for key in self.res_dict.keys():
self.res_dict[key] = ""
def get(self):
final_res = {"code":"1",
"msg":"识别成功",
"data":self.res_dict}
return final_res

View File

@ -0,0 +1,57 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
from pipeline import PipePredictor
from utils import base64_to_cv2
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# class Item(BaseModel):
# img_base64: str = None #图片base64
class Item(BaseModel):
imgbase64: str = None
@app.get("/ocr/vehicle_certificate/state")
async def get_state():
return "OCR Vehicle Certificate is running!"
@app.post('/ocr/vehicle_certificate')
async def predict(request_data: Item):
img_base64 = request_data.imgbase64
img = base64_to_cv2(img_base64)
result = pipe_predictor(img)
return result
if __name__ == '__main__':
# from register import Register
# register = Register()
# while(not register.checkAuthored()):
# register.register()
from authorization import Authorization
authorization = Authorization()
while(not authorization.check()):
authorization.activate()
pipe_predictor = PipePredictor()
print("OCR Vehicle Certificate Service start!")
uvicorn.run(app=app,
host="0.0.0.0",
port=8002,
# reload=True,
workers=1,
)

View File

@ -0,0 +1,266 @@
template_960 = {
0:
{
"chn_key": "合格证编号",
"box": [210,0,480,35],
"key": "HGZBH"
},
1:
{
"chn_key": "发证日期",
"box": [690,0,960,35],
"key": "FZRQ"
},
2:
{
"chn_key": "车辆制造企业名称",
"box": [210,35,600,70],
"key": "CLZZQYMC"
},
3:
{
"chn_key": "车辆品牌",
"box": [210,65,400,100],
"key": "CLPP"
},
4:
{
"chn_key": "车辆名称",
"box": [480,65,690,100],
"key": "CLMC"
},
5:
{
"chn_key": "车辆型号",
"box": [210,95,480,130],
"key": "CLXH"
},
6:
{
"chn_key": "车架号",
"box": [690,95,960,130],
"key": "CJH"
},
7:
{
"chn_key": "车身颜色",
"box": [210,125,480,160],
"key": "CSYS"
},
8:
{
"chn_key": "底盘型号",
"box": [210,155,480,190],
"key": "DPXH"
},
9:
{
"chn_key": "底盘ID",
"box": [690,155,960,190],
"key": "DPID"
},
10:
{
"chn_key": "底盘合格证编号",
"box": [210,185,480,220],
"key": "DPHGZBH"
},
11:
{
"chn_key": "发动机型号",
"box": [690,185,960,220],
"key": "FDJXH"
},
12:
{
"chn_key": "发动机号",
"box": [210,215,480,250],
"key": "FDJH"
},
13:
{
"chn_key": "燃料种类",
"box": [210,245,300,280],
"key": "RLZL"
},
14:
{
"chn_key": "排量",
"box": [690,250,770,280],
"key": "PL"
},
15:
{
"chn_key": "功率",
"box": [820,250,960,280],
"key": "GL"
},
16:
{
"chn_key": "排放标准",
"box": [210,280,500,310],
"key": "PFBZ"
},
17:
{
"chn_key": "油耗",
"box": [210,310,300,340],
"key": "YH"
},
18:
{
"chn_key": "外廓尺寸1",
"box": [210,340,300,370],
"key": "WKCC1"
},
19:
{
"chn_key": "外廓尺寸2",
"box": [300,340,390,370],
"key": "WKCC2"
},
20:
{
"chn_key": "外廓尺寸3",
"box": [390,340,480,370],
"key": "WKCC3"
},
21:
{
"chn_key": "货箱内部尺寸1",
"box": [690,340,780,370],
"key": "HXNBCC1"
},
22:
{
"chn_key": "货箱内部尺寸2",
"box": [780,340,870,370],
"key": "HXNBCC2"
},
23:
{
"chn_key": "货箱内部尺寸3",
"box": [865,340,960,370],
"key": "HXNBCC3"
},
24:
{
"chn_key": "钢板弹簧片数",
"box": [210,370,300,400],
"key": "GBTHPS"
},
25:
{
"chn_key": "轮胎数",
"box": [690,370,780,410],
"key": "LTS"
},
26:
{
"chn_key": "轮胎规格",
"box": [210,400,480,435],
"key": "LTGG"
},
27:
{
"chn_key": "前轮距",
"box": [210,430,310,465],
"key": "QLJ"
},
28:
{
"chn_key": "后轮距",
"box": [480,430,580,465],
"key": "HLJ"
},
29:
{
"chn_key": "轴距",
"box": [210,460,310,495],
"key": "ZJ"
},
30:
{
"chn_key": "轴荷",
"box": [210,490,400,530],
"key": "ZH"
},
31:
{
"chn_key": "轴数",
"box": [210,530,310,560],
"key": "ZS"
},
32:
{
"chn_key": "转向形式",
"box": [690,530,810,560],
"key": "ZXXS"
},
33:
{
"chn_key": "总质量",
"box": [210,560,310,590],
"key": "ZZL"
},
34:
{
"chn_key": "整备质量",
"box": [690,560,810,590],
"key": "ZBZL"
},
35:
{
"chn_key": "额定载质量",
"box": [210,590,310,620],
"key": "EDZZL"
},
36:
{
"chn_key": "载质量利用系数",
"box": [690,590,810,620],
"key": "ZZLLYXS"
},
37:
{
"chn_key": "准牵引总质量",
"box": [210,630,300,670],
"key": "ZQYZZL"
},
38:
{
"chn_key": "半挂车鞍座最大允许总质量",
"box": [690,620,800,660],
"key": "BGCAZZDYXZZL"
},
39:
{
"chn_key": "驾驶室准乘人数",
"box": [210,680,300,710],
"key": "JSSZCRS"
},
40:
{
"chn_key": "额定载客",
"box": [210,710,300,740],
"key": "EDZK"
},
41:
{
"chn_key": "最高设计车速",
"box": [210,740,300,770],
"key": "ZGSJCS"
},
42:
{
"chn_key": "车辆制造日期",
"box": [210,770,400,805],
"key": "ZLZZRQ"
},
# 43:
# {
# "chn_key": "备注",
# "box": [0,810,480,960],
# "key": "BZ"
# }
}

View File

@ -0,0 +1,117 @@
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tobytes()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def expand_crop(image, box, expand_ratio=0.0):
imgh, imgw, c = image.shape
xmin, ymin, xmax, ymax = box
org_rect = [xmin, ymin, xmax, ymax]
h_half = (ymax - ymin) * (1 + expand_ratio) / 2.
w_half = (xmax - xmin) * (1 + expand_ratio) / 2.
if h_half > w_half * 4 / 3:
w_half = h_half * 0.75
center = [(ymin + ymax) / 2., (xmin + xmax) / 2.]
ymin = max(0, int(center[0] - h_half))
ymax = min(imgh - 1, int(center[0] + h_half))
xmin = max(0, int(center[1] - w_half))
xmax = min(imgw - 1, int(center[1] + w_half))
return image[ymin:ymax, xmin:xmax, :].copy(), [xmin, ymin, xmax, ymax], org_rect
def crop_image_with_box(image, boxes):
crop_res = []
for box in boxes:
crop_image, new_box, ori_box = expand_crop(image, box)
# import cv2
# cv2.imshow('1',crop_image)
# cv2.waitKey(0)
if crop_image is not None:
crop_res.append(crop_image)
return crop_res
def four_point_transform(image, pts):
# # 获取输入坐标点
# (tl, tr, br, bl) = pts
# # 计算输入的w和h的值
# widthA = np.sqrt(((br[0] - bl[0])**2) + ((br[1] - bl[1])**2))
# widthB = np.sqrt(((tr[0] - tl[0])**2) + ((tr[1] - tl[1])**2))
# maxWidth = max(int(widthA), int(widthB))
# heightA = np.sqrt(((tr[0] - br[0])**2) + ((tr[1] - br[1])**2))
# heightB = np.sqrt(((tl[0] - bl[0])**2) + ((tl[1] - bl[1])**2))
# maxHeight = max(int(heightA), int(heightB))
maxWidth = 960
maxHeight = 960
# 变化后对应坐标位置
dst = np.array([[0, 0],
[maxWidth - 1, 0],
[maxWidth - 1, maxHeight - 1],
[0, maxHeight - 1]],
dtype='float32')
# 计算变换矩阵
warp_mat = cv2.getPerspectiveTransform(np.float32(pts), dst)
warped = cv2.warpPerspective(image, warp_mat, (maxWidth, maxHeight))
return warped
def rect_intersect_area(rect1, rect2):
"""
计算两个矩形的相交面积
rect1, rect2: 形状为(4,)的数组包含[x1, y1, x2, y2]
"""
x1, y1, x2, y2 = rect1
X1, Y1, X2, Y2 = rect2
# 计算相交矩形的坐标
ix1 = max(x1, X1)
iy1 = max(y1, Y1)
ix2 = min(x2, X2)
iy2 = min(y2, Y2)
# 检查是否有相交
if ix1 < ix2 and iy1 < iy2:
# 计算相交面积
area = (ix2 - ix1) * (iy2 - iy1)
return area
else:
return 0
def rect_intersect(rect1, rect2):
"""
计算两个矩形的相交面积
rect1, rect2: 形状为(4,)的数组包含[x1, y1, x2, y2]
"""
x1, y1, x2, y2 = rect1
X1, Y1, X2, Y2 = rect2
# 计算相交矩形的坐标
ix1 = max(x1, X1)
iy1 = max(y1, Y1)
ix2 = min(x2, X2)
iy2 = min(y2, Y2)
# 检查是否有相交
# if ix1 < ix2 and iy1 < iy2:
# 计算相交面积
if (ix2 - ix1) > 10 and (iy2 - iy1) > 10:
return True
else:
return False