Flask에서 JWT 사용하여 로그인 및 로그아웃하기
import datetime
from email_validator import EmailNotValidError, validate_email
from flask import request
from flask_jwt_extended import create_access_token, jwt_required,get_jwt
from mysql.connector import Error
from flask_restful import Resource
from mysql_connection import get_connection
from utils import check_password, hash_password
class UserRegisterResource(Resource) :
def post(self):
# 1. 클라이언트가 보낸 데이터를 받아준다.
data = request.get_json()
print(data)
# 2. 데이터가 모두 있는지 확인
if data.get('email') is None or data.get('email').strip() == '' or \
data.get('username') is None or data.get('username').strip() == '' or \
data.get('password') is None or data.get('password').strip() == '':
return {"result" : "fail"}, 400
# 3. 이메일 주소가 형식이 올바른지 확인
try :
validate_email(data['email'])
except EmailNotValidError :
return {"result" : "fail", "error" : str(e)}, 400
# 4. 비밀번호 길이가 유효한지 체크한다.
# 예) 4자리 이상 12자리 이하 !
if len(data['password']) < 4 or len(data['password']) > 12:
return {'result' : 'fail'},400
# 5. 비밀번호를 암호화 한다.
password = hash_password(data['password'])
print(password)
# 6. DB에 저장한다.
try :
connection = get_connection()
query = '''insert into user
(username, email, password)
values
(%s, %s, %s);'''
record = (data['username'], data['email'], password)
cursor = connection.cursor()
cursor.execute(query, record)
connection.commit()
### DB에 회원가입하여, user 테이블에 insert된 후,
### 이 user 테이블의 id 값을 가져와야 한다.
user_id = cursor.lastrowid
cursor.close()
connection.close()
except Error as e:
if cursor is not None:
cursor.close()
if connection is not None:
connection.close()
return {'result' : 'fail'}, 500
# 6-2. user_id를 바로 클라이언트에게 보내면 안되고,
## JWT로 암호화 해서, 인증토큰을 보내야 한다.
#access_token = create_access_token(user_id,
# expires_delta=datetime.timedelta(minutes=3))
access_token = create_access_token(user_id)
# 7. 응답할 데이터를 JSON으로 만들어서 리턴!
return {"result" : "success", "access_token" : access_token}
class UserLoginResource(Resource) :
def post(self) :
# 1. 클라이언트로부터 데이터를 받는다.
data = request.get_json()
print(data)
if 'email' not in data or 'password' not in data :
return {'result' : 'fail'} , 400
if data['email'].strip() == '' or data['password'].strip() == '':
return {'result' : 'fail'} , 400
# 2. DB로부터 이메일에 해당하는 유저 정보를 가져온다.
try:
connection = get_connection()
query = '''select *
from user
where email = %s;'''
record = (data['email'], )
print(record)
cursor = connection.cursor(dictionary=True)
print(cursor)
cursor.execute(query,record)
print(cursor)
result_list = cursor.fetchall()
print(result_list)
cursor.close()
connection.close()
except Error as e:
if cursor is not None:
cursor.close()
if connection is not None:
connection.close()
return {'result':'fail','error':str(e)}, 500
# 3. 회원인지 확인한다.
if result_list == [] :
return {'result' : 'fail'} , 401
# 4. 비밀번호를 체크한다.
# 유저가 입력한 비번 data['password']
# DB에 암호화된 비번 result_lsit[0]['password']
isCorrect = check_password(data['password'], result_list[0]['password'])
if isCorrect != True :
return {'result' : 'fail'} , 401
# 5. 유저 아이디를 가져온다.
user_id = result_list[0]['id']
# 6. JWT 토큰을 만든다.
# access_token = create_access_token(user_id,
# expires_delta= datetime.timedelta(minutes=3))
access_token = create_access_token(user_id)
# 7. 클라이언트에 응답한다.
return {'result' : 'success', 'access':access_token}
# 로그아웃 된 토큰을 저장할, set 을 만든다.
jwt_blacklist= set()
class UserLogoutResource(Resource):
@jwt_required()
def delete(self):
jti = get_jwt()['jti']
jwt_blacklist.add(jti)
return
코드 한 줄씩 설명
1. 필요한 라이브러리 임포트
import datetime
from email_validator import EmailNotValidError, validate_email
from flask import request
from flask_jwt_extended import create_access_token, jwt_required, get_jwt
from mysql.connector import Error
from flask_restful import Resource
from mysql_connection import get_connection
from utils import check_password, hash_password
- 필요한 라이브러리를 불러온다. 이메일 유효성 검사, 요청 처리, JWT 토큰 생성 및 검증, MySQL 데이터베이스 연결, 비밀번호 암호화와 같은 기능을 사용하기 위해 라이브러리를 임포트한다.
2. 회원가입 엔드포인트
class UserRegisterResource(Resource):
def post(self):
data = request.get_json()
print(data)
- 클라이언트가 보낸 데이터를 JSON 형식으로 받아와 출력한다.
if data.get('email') is None or data.get('email').strip() == '' or \
data.get('username') is None or data.get('username').strip() == '' or \
data.get('password') is None or data.get('password').strip() == '':
return {"result" : "fail"}, 400
- 필수 데이터(이메일, 사용자 이름, 비밀번호)가 모두 있는지 확인합니다. 하나라도 없으면 실패 응답을 반환한다.
try:
validate_email(data['email'])
except EmailNotValidError:
return {"result" : "fail", "error" : str(e)}, 400
- 이메일 주소가 올바른 형식인지 확인. 올바르지 않으면 실패 응답을 반환한다.
if len(data['password']) < 4 or len(data['password']) > 12:
return {'result' : 'fail'}, 400
- 비밀번호 길이가 4자리 이상 12자리 이하인지 확인한. 조건에 맞지 않으면 실패 응답을 반환한다.
password = hash_password(data['password'])
print(password)
- 비밀번호를 암호화하여 저장한다.
try:
connection = get_connection()
query = '''insert into user
(username, email, password)
values
(%s, %s, %s);'''
record = (data['username'], data['email'], password)
cursor = connection.cursor()
cursor.execute(query, record)
connection.commit()
user_id = cursor.lastrowid
cursor.close()
connection.close()
except Error as e:
if cursor is not None:
cursor.close()
if connection is not None:
connection.close()
return {'result' : 'fail'}, 500
- 데이터를 데이터베이스에 저장합니다. 저장 후 사용자 ID를 가져온. 오류가 발생하면 실패 응답을 반환한다.
access_token = create_access_token(user_id)
- JWT 토큰을 생성한. 이 토큰은 클라이언트가 이후 요청에서 자신을 인증하는 데 사용된다.
return {"result" : "success", "access_token" : access_token}
성공 응답과 함께 JWT 토큰을 반환한다.
3. 로그인 엔드포인트
class UserLoginResource(Resource):
def post(self):
data = request.get_json()
print(data)
- 클라이언트가 보낸 데이터를 JSON 형식으로 받아와 출력한다.
if 'email' not in data or 'password' not in data:
return {'result' : 'fail'}, 400
if data['email'].strip() == '' or data['password'].strip() == '':
return {'result' : 'fail'}, 400
이메일과 비밀번호가 있는지 확인한다. 없거나 빈 값이면 실패 응답을 반환한다.
try:
connection = get_connection()
query = '''select *
from user
where email = %s;'''
record = (data['email'],)
cursor = connection.cursor(dictionary=True)
cursor.execute(query, record)
result_list = cursor.fetchall()
cursor.close()
connection.close()
except Error as e:
if cursor is not None:
cursor.close()
if connection is not None:
connection.close()
return {'result':'fail','error':str(e)}, 500
- 데이터베이스에서 이메일에 해당하는 사용자 정보를 가져온다. 오류가 발생하면 실패 응답을 반환한다.
if result_list == []:
return {'result' : 'fail'}, 401
- 사용자가 존재하는지 확인한다. 존재하지 않으면 실패 응답을 반환한.
isCorrect = check_password(data['password'], result_list[0]['password'])
if isCorrect != True:
return {'result' : 'fail'}, 401
- 비밀번호를 확인한. 맞지 않으면 실패 응답을 반환한다.
user_id = result_list[0]['id']
access_token = create_access_token(user_id)
return {'result' : 'success', 'access':access_token}
- 사용자 ID로 JWT 토큰을 생성하여 성공 응답과 함께 반환합니다.
4. 로그아웃 엔드포인트
jwt_blacklist = set()
class UserLogoutResource(Resource):
@jwt_required()
def delete(self):
jti = get_jwt()['jti']
jwt_blacklist.add(jti)
return {'result': 'success'}
- 로그아웃 요청을 처리한다. 토큰을 블랙리스트에 추가하여 무효화한다.
'RestfulAPI' 카테고리의 다른 글
Serverless Framework와 Github Acitons를 이용한 AWS 자동 배포 (0) | 2024.05.24 |
---|---|
AWS Serverless Framework 설치 방법 (0) | 2024.05.23 |
Flask에서 JWT 사용하여 회원가입 API 만들기 및 토큰처리 (2) | 2024.05.23 |
RestfulAPI Hashing 을 이용하여 단방향 암호화 하는방법 (0) | 2024.05.23 |
RestfulAPI Python MySQL Connector 딜리트 방법과 코드 예제 (0) | 2024.05.22 |