본문 바로가기

RestfulAPI

RestfulAPI Flask에서 JWT 사용하여 로그인 및 로그아웃하기

Flask에서 JWT 사용하여 로그인 및 로그아웃하기 

import datetime
from email_validator import EmailNotValidError, validate_email
from flask import request
from flask_jwt_extended import create_access_token, jwt_required,get_jwt
from mysql.connector import Error
from flask_restful import Resource

from mysql_connection import get_connection
from utils import check_password, hash_password

class UserRegisterResource(Resource) :
    def post(self):
        # 1. 클라이언트가 보낸 데이터를 받아준다.
        data = request.get_json()
        print(data)
        # 2. 데이터가 모두 있는지 확인
        if data.get('email') is None or data.get('email').strip() == '' or \
            data.get('username') is None or data.get('username').strip() == '' or \
            data.get('password') is None or data.get('password').strip() == '':

            return {"result" : "fail"}, 400
        
        # 3. 이메일 주소가 형식이 올바른지 확인
        try :
            validate_email(data['email'])
        except EmailNotValidError :
            return {"result" : "fail", "error" : str(e)}, 400
            
        # 4. 비밀번호 길이가 유효한지 체크한다.
        #    예) 4자리 이상 12자리 이하 !
        if len(data['password']) < 4 or len(data['password']) > 12:
            return {'result' : 'fail'},400
        
        # 5. 비밀번호를 암호화 한다.
        password = hash_password(data['password'])
        print(password)

        # 6. DB에 저장한다.
        try :
            connection = get_connection()
            query = '''insert into user
                (username, email, password)
                values
                (%s, %s, %s);'''
            record = (data['username'], data['email'], password)
            cursor = connection.cursor()
            cursor.execute(query, record)
            connection.commit()

            ### DB에 회원가입하여, user 테이블에 insert된 후,
            ### 이 user 테이블의 id 값을 가져와야 한다.
            user_id = cursor.lastrowid

            cursor.close()
            connection.close()

        except Error as e:
            if cursor is not None:
                cursor.close()
            if connection is not None:
                connection.close()
            return {'result' : 'fail'}, 500
        
        # 6-2. user_id를 바로 클라이언트에게 보내면 안되고,
        ##     JWT로 암호화 해서, 인증토큰을 보내야 한다.
        #access_token = create_access_token(user_id,
        #                                   expires_delta=datetime.timedelta(minutes=3))
        access_token = create_access_token(user_id)    
        # 7. 응답할 데이터를 JSON으로 만들어서 리턴!

        return {"result" : "success", "access_token" : access_token}

    

class UserLoginResource(Resource) :
    def post(self) :

        
        # 1. 클라이언트로부터 데이터를 받는다.
        data = request.get_json()
        print(data)
        if 'email' not in data or 'password' not in data :
            return {'result' : 'fail'} , 400
        
        if data['email'].strip() == '' or data['password'].strip() == '':
            return {'result' : 'fail'} , 400

        # 2. DB로부터 이메일에 해당하는 유저 정보를 가져온다.
        try:
            connection = get_connection()
            query = '''select *
                    from user
                    where email = %s;'''
            record = (data['email'], )
            print(record)
            cursor = connection.cursor(dictionary=True)
            print(cursor)
            cursor.execute(query,record)
            print(cursor)
            result_list = cursor.fetchall()
            print(result_list)

            cursor.close()
            connection.close()

        except Error as e:
            if cursor is not None:
                cursor.close()
            if connection is not None:
                connection.close()
            return {'result':'fail','error':str(e)}, 500

        # 3. 회원인지 확인한다.
        if result_list == [] :
            return {'result' : 'fail'} , 401

        # 4. 비밀번호를 체크한다.
        # 유저가 입력한 비번 data['password']
        # DB에 암호화된 비번 result_lsit[0]['password']
        isCorrect = check_password(data['password'], result_list[0]['password'])
        if isCorrect != True :
            return {'result' : 'fail'} , 401

        # 5. 유저 아이디를 가져온다.
        user_id = result_list[0]['id']

        # 6. JWT 토큰을 만든다.
        # access_token = create_access_token(user_id,
        #                                    expires_delta= datetime.timedelta(minutes=3))
        access_token = create_access_token(user_id)
        # 7. 클라이언트에 응답한다.
        
        return {'result' : 'success', 'access':access_token}
    

# 로그아웃 된 토큰을 저장할, set 을 만든다.
jwt_blacklist= set()
class UserLogoutResource(Resource):

    @jwt_required()
    def delete(self):
        
        jti = get_jwt()['jti']
        jwt_blacklist.add(jti)

        return

코드 한 줄씩 설명

1. 필요한 라이브러리 임포트

import datetime
from email_validator import EmailNotValidError, validate_email
from flask import request
from flask_jwt_extended import create_access_token, jwt_required, get_jwt
from mysql.connector import Error
from flask_restful import Resource

from mysql_connection import get_connection
from utils import check_password, hash_password
  • 필요한 라이브러리를 불러온다. 이메일 유효성 검사, 요청 처리, JWT 토큰 생성 및 검증, MySQL 데이터베이스 연결, 비밀번호 암호화와 같은 기능을 사용하기 위해 라이브러리를 임포트한다.

2. 회원가입 엔드포인트

class UserRegisterResource(Resource):
    def post(self):
        data = request.get_json()
        print(data)
  • 클라이언트가 보낸 데이터를 JSON 형식으로 받아와 출력한다.
        if data.get('email') is None or data.get('email').strip() == '' or \
           data.get('username') is None or data.get('username').strip() == '' or \
           data.get('password') is None or data.get('password').strip() == '':
            return {"result" : "fail"}, 400
  • 필수 데이터(이메일, 사용자 이름, 비밀번호)가 모두 있는지 확인합니다. 하나라도 없으면 실패 응답을 반환한다.
        try:
            validate_email(data['email'])
        except EmailNotValidError:
            return {"result" : "fail", "error" : str(e)}, 400
  • 이메일 주소가 올바른 형식인지 확인. 올바르지 않으면 실패 응답을 반환한다.
        if len(data['password']) < 4 or len(data['password']) > 12:
            return {'result' : 'fail'}, 400
  • 비밀번호 길이가 4자리 이상 12자리 이하인지 확인한. 조건에 맞지 않으면 실패 응답을 반환한다.
        password = hash_password(data['password'])
        print(password)
  • 비밀번호를 암호화하여 저장한다.
        try:
            connection = get_connection()
            query = '''insert into user
                       (username, email, password)
                       values
                       (%s, %s, %s);'''
            record = (data['username'], data['email'], password)
            cursor = connection.cursor()
            cursor.execute(query, record)
            connection.commit()
            user_id = cursor.lastrowid
            cursor.close()
            connection.close()
        except Error as e:
            if cursor is not None:
                cursor.close()
            if connection is not None:
                connection.close()
            return {'result' : 'fail'}, 500
  • 데이터를 데이터베이스에 저장합니다. 저장 후 사용자 ID를 가져온. 오류가 발생하면 실패 응답을 반환한다.
        access_token = create_access_token(user_id)
  • JWT 토큰을 생성한. 이 토큰은 클라이언트가 이후 요청에서 자신을 인증하는 데 사용된다.
       return {"result" : "success", "access_token" : access_token}

성공 응답과 함께 JWT 토큰을 반환한다.

3. 로그인 엔드포인트

class UserLoginResource(Resource):
    def post(self):
        data = request.get_json()
        print(data)
  • 클라이언트가 보낸 데이터를 JSON 형식으로 받아와 출력한다.
        if 'email' not in data or 'password' not in data:
            return {'result' : 'fail'}, 400

        if data['email'].strip() == '' or data['password'].strip() == '':
            return {'result' : 'fail'}, 400

이메일과 비밀번호가 있는지 확인한다. 없거나 빈 값이면 실패 응답을 반환한다.

        try:
            connection = get_connection()
            query = '''select *
                       from user
                       where email = %s;'''
            record = (data['email'],)
            cursor = connection.cursor(dictionary=True)
            cursor.execute(query, record)
            result_list = cursor.fetchall()
            cursor.close()
            connection.close()
        except Error as e:
            if cursor is not None:
                cursor.close()
            if connection is not None:
                connection.close()
            return {'result':'fail','error':str(e)}, 500
  • 데이터베이스에서 이메일에 해당하는 사용자 정보를 가져온다. 오류가 발생하면 실패 응답을 반환한다.
        if result_list == []:
            return {'result' : 'fail'}, 401
  • 사용자가 존재하는지 확인한다. 존재하지 않으면 실패 응답을 반환한.
        isCorrect = check_password(data['password'], result_list[0]['password'])
        if isCorrect != True:
            return {'result' : 'fail'}, 401
  • 비밀번호를 확인한. 맞지 않으면 실패 응답을 반환한다.
        user_id = result_list[0]['id']
        access_token = create_access_token(user_id)
        return {'result' : 'success', 'access':access_token}
  • 사용자 ID로 JWT 토큰을 생성하여 성공 응답과 함께 반환합니다.

4. 로그아웃 엔드포인트

jwt_blacklist = set()

class UserLogoutResource(Resource):
    @jwt_required()
    def delete(self):
        jti = get_jwt()['jti']
        jwt_blacklist.add(jti)
        return {'result': 'success'}
  • 로그아웃 요청을 처리한다. 토큰을 블랙리스트에 추가하여 무효화한다.