소스코드
소스코드 모음
소스코드 모음
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import cv2
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import numpy as np
import os
file_prefix = "ㅎ"
file_num = 0
# MediaPipe Hand Landmarker 초기화
base_options = mp.tasks.BaseOptions(model_asset_path="hand_landmarker.task")
options = vision.HandLandmarkerOptions(
base_options=base_options,
num_hands=1,
min_hand_detection_confidence=0.5,
min_hand_presence_confidence=0.5,
min_tracking_confidence=0.5,
)
# 웹캠 초기화
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
# Hand Landmarker 생성
with vision.HandLandmarker.create_from_options(options) as landmarker:
while cap.isOpened():
success, image = cap.read()
if not success:
continue
# BGR을 RGB로 변환
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image)
# 손 감지 수행
detection_result = landmarker.detect(image)
# RGB를 BGR로 다시 변환
image_array = image.numpy_view()
image_array = cv2.cvtColor(image_array, cv2.COLOR_RGB2BGR)
raw = image_array.copy()
# 감지된 손의 랜드마크 그리기
if detection_result.hand_landmarks:
for hand_landmarks in detection_result.hand_landmarks:
# 각 랜드마크 포인트 그리기
for landmark in hand_landmarks:
x = int(landmark.x * image_array.shape[1])
y = int(landmark.y * image_array.shape[0])
cv2.circle(image_array, (x, y), 2, (0, 255, 0), -1)
# 손가락 연결선 그리기
connections = mp.solutions.hands.HAND_CONNECTIONS
for connection in connections:
start_idx = connection[0]
end_idx = connection[1]
start_point = hand_landmarks[start_idx]
end_point = hand_landmarks[end_idx]
x1 = int(start_point.x * image_array.shape[1])
y1 = int(start_point.y * image_array.shape[0])
x2 = int(end_point.x * image_array.shape[1])
y2 = int(end_point.y * image_array.shape[0])
cv2.line(image_array, (x1, y1), (x2, y2), (0, 255, 0), 1)
# 'q' 키를 누르면 종료
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
elif key == ord("s"):
try:
file_num += 1
filename = os.path.join(
"output",
f"{file_prefix}_{file_num}.png",
)
# 한글 경로 대응
success, buffer = cv2.imencode(".png", raw)
if success:
with open(filename, "wb") as f:
f.write(buffer)
print(f"Saved: {filename}")
else:
print(f"Failed to save: {filename}")
except Exception as e:
print(f"Error: {e}")
# 결과 화면 표시
cv2.imshow("Hand Tracking", image_array)
cv2.imshow("raw", raw)
cap.release()
cv2.destroyAllWindows()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import cv2
import numpy as np
import mediapipe as mp
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv1D, MaxPooling1D, Flatten
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
# MediaPipe 초기화
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(
static_image_mode=True, max_num_hands=1, min_detection_confidence=0.5
)
def read_korean_image(image_path):
with open(image_path, "rb") as f:
bytes = bytearray(f.read())
nparr = np.asarray(bytes, dtype=np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
def extract_landmarks(image_path):
# 이미지 읽기
# image = cv2.imread(image_path)
# image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image_rgb = read_korean_image(image_path)
# 랜드마크 추출
results = hands.process(image_rgb)
if results.multi_hand_landmarks:
landmarks = results.multi_hand_landmarks[0]
# 21개의 랜드마크에서 x, y, z 좌표 추출
coords = [[lm.x, lm.y, lm.z] for lm in landmarks.landmark]
return np.array(coords)
return None
def load_dataset(dataset_path):
X = []
y = []
label_map = {}
current_label = 0
# 데이터셋 디렉토리에서 모든 이미지 파일 검색
for filename in os.listdir(dataset_path):
if filename.endswith(".png"):
# 파일 이름에서 라벨 추출 (예: 'ㄱ_1.png' -> 'ㄱ')
label = filename.split("_")[0]
# 새로운 라벨이면 매핑 추가
if label not in label_map:
label_map[label] = current_label
current_label += 1
# 이미지에서 랜드마크 추출
image_path = os.path.join(dataset_path, filename)
landmarks = extract_landmarks(image_path)
if landmarks is not None:
X.append(landmarks)
y.append(label_map[label])
return np.array(X), np.array(y), label_map
def build_model(num_classes):
model = Sequential(
[
# 입력 형태: (21, 3) - 21개의 랜드마크, 각각 x,y,z 좌표
Conv1D(64, 3, activation="relu", input_shape=(21, 3)),
MaxPooling1D(2),
Conv1D(128, 3, activation="relu"),
MaxPooling1D(2),
Conv1D(64, 3, activation="relu"),
Flatten(),
Dense(128, activation="relu"),
Dropout(0.5),
Dense(num_classes, activation="softmax"),
]
)
return model
def main():
# 데이터 로드
dataset_path = "./dataset/"
X, y, label_map = load_dataset(dataset_path)
# 라벨을 원-핫 인코딩으로 변환
y = to_categorical(y)
# 학습/검증 데이터 분할
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 모델 생성
num_classes = len(label_map)
model = build_model(num_classes)
# 모델 컴파일
model.compile(
optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"]
)
# 모델 학습
history = model.fit(
X_train, y_train, validation_data=(X_val, y_val), epochs=250, batch_size=32
)
# 모델 저장
model.save("hand_sign_model.h5")
# 라벨 매핑 저장
import json
with open("label_map.json", "w", encoding="utf-8") as f:
json.dump(label_map, f, ensure_ascii=False)
return model, history, label_map
# 새로운 이미지 예측을 위한 함수
def predict_sign(model, image_path, label_map):
landmarks = extract_landmarks(image_path)
if landmarks is not None:
# 배치 차원 추가
landmarks = np.expand_dims(landmarks, axis=0)
# 예측
prediction = model.predict(landmarks)
predicted_class = np.argmax(prediction[0])
# 라벨 매핑에서 해당하는 자모 찾기
for label, idx in label_map.items():
if idx == predicted_class:
return label, prediction[0][predicted_class]
return None, None
if __name__ == "__main__":
model, history, label_map = main()
# 예측 테스트
test_image_path = "./dataset/ㄱ_1.png" # 테스트할 이미지 경로
predicted_label, confidence = predict_sign(model, test_image_path, label_map)
if predicted_label:
print(f"예측된 자모: {predicted_label}")
print(f"신뢰도: {confidence:.2f}")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# main.py
from fastapi import FastAPI, Request, WebSocket
from fastapi.templating import Jinja2Templates
from fastapi.responses import StreamingResponse, HTMLResponse
import cv2
import mediapipe as mp
import numpy as np
from tensorflow.keras.models import load_model
from PIL import ImageFont, ImageDraw, Image
import json
import asyncio
from picamera2 import Picamera2
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# MediaPipe 초기화
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
hands = mp_hands.Hands(
static_image_mode=False,
max_num_hands=1,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
# 모델과 라벨 매핑 로드
model = load_model("./models/hand_sign_model.h5")
with open("./models/label_map.json", "r", encoding="utf-8") as f:
label_map = json.load(f)
reverse_label_map = {int(v): k for k, v in label_map.items()}
class ConnectionManager:
def __init__(self):
self.active_connections = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
def put_text(img, text, position, font_size=30, font_color=(0, 255, 0)):
img_pil = Image.fromarray(img)
draw = ImageDraw.Draw(img_pil)
font = ImageFont.truetype(
"/usr/share/fonts/truetype/nanum/NanumGothic.ttf", font_size
)
draw.text(position, text, font=font, fill=font_color)
return np.array(img_pil)
def process_landmarks(hand_landmarks):
landmarks = np.array([[lm.x, lm.y, lm.z] for lm in hand_landmarks.landmark])
return np.expand_dims(landmarks, axis=0)
async def generate_frames(websocket: WebSocket = None):
try:
cap = Picamera2()
height = 480
width = 640
middle = (int(width / 2), int(height / 2))
cap.configure(
cap.create_video_configuration(
main={
"format": "RGB888",
"size": (width, height),
},
)
)
cap.set_controls({"FrameRate": 120})
cap.start()
except:
print("camera_error1")
try:
while True:
ret, frame = True, cap.capture_array()
if not ret:
break
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = hands.process(image)
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
mp_drawing.draw_landmarks(
image, hand_landmarks, mp_hands.HAND_CONNECTIONS
)
landmarks = process_landmarks(hand_landmarks)
prediction = model.predict(landmarks, verbose=0)
predicted_class = np.argmax(prediction[0])
confidence = prediction[0][predicted_class]
predicted_label = reverse_label_map.get(
predicted_class, "인식할 수 없음"
)
text = f"{predicted_label} ({confidence:.2f})"
image = put_text(image, text, (10, 30))
# 웹소켓으로 예측 결과 전송
if websocket:
prediction_data = {
"label": predicted_label,
"confidence": float(confidence),
}
await manager.broadcast(json.dumps(prediction_data))
ret, buffer = cv2.imencode(".jpg", image)
frame = buffer.tobytes()
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")
await asyncio.sleep(0.1) # 프레임 간 지연 추가
except Exception as e:
print(f"Camera Error: {e}")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
async for frame in generate_frames(websocket):
pass
except Exception as e:
print(f"WebSocket Error: {e}")
finally:
manager.disconnect(websocket)
@app.get("/video_feed")
async def video_feed():
return StreamingResponse(
generate_frames(), media_type="multipart/x-mixed-replace; boundary=frame"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8080)
This post is licensed under CC BY 4.0 by the author.