diff --git a/.gitignore b/.gitignore index 6c3a791..78911cf 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ venv temp tmp __pycache__ +.hypothesis *.pyc *.sqlite diff --git a/project/server/__init__.py b/project/server/__init__.py index 311df63..a32a698 100644 --- a/project/server/__init__.py +++ b/project/server/__init__.py @@ -1,11 +1,17 @@ # project/server/__init__.py import os +import sys from flask import Flask from flask_bcrypt import Bcrypt from flask_sqlalchemy import SQLAlchemy +if sys.version_info >= (3, 4): + py_version = 3 +elif sys.version_info >= (2, 7): + py_version = 2 + app = Flask(__name__) app_settings = os.getenv( @@ -18,4 +24,5 @@ db = SQLAlchemy(app) from project.server.auth.views import auth_blueprint + app.register_blueprint(auth_blueprint) diff --git a/project/server/auth/views.py b/project/server/auth/views.py index 1a63067..7656717 100644 --- a/project/server/auth/views.py +++ b/project/server/auth/views.py @@ -4,7 +4,7 @@ from flask import Blueprint, request, make_response, jsonify from flask.views import MethodView -from project.server import bcrypt, db +from project.server import bcrypt, db, py_version from project.server.models import User, BlacklistToken auth_blueprint = Blueprint('auth', __name__) @@ -18,81 +18,113 @@ class RegisterAPI(MethodView): def post(self): # get the post data post_data = request.get_json() - # check if user already exists - user = User.query.filter_by(email=post_data.get('email')).first() - if not user: - try: - user = User( - email=post_data.get('email'), - password=post_data.get('password') - ) - # insert the user - db.session.add(user) - db.session.commit() - # generate the auth token - auth_token = user.encode_auth_token(user.id) - responseObject = { - 'status': 'success', - 'message': 'Successfully registered.', - 'auth_token': auth_token.decode() - } - return make_response(jsonify(responseObject)), 201 - except Exception as e: - responseObject = { - 'status': 'fail', - 'message': 'Some error occurred. Please try again.' - } - return make_response(jsonify(responseObject)), 401 - else: - responseObject = { - 'status': 'fail', - 'message': 'User already exists. Please Log in.', - } - return make_response(jsonify(responseObject)), 202 + # validate post data + if py_version == 3: + if isinstance(post_data['email'], str) and isinstance(post_data['password'], str): + is_str = True + elif py_version == 2: + if isinstance(post_data['email'], unicode) and isinstance(post_data['password'], unicode): + is_str = True + if is_str: + if '@' in post_data['email'] and '.' in post_data['email'] and len(post_data['email']) >= 4 and len( + post_data['password']) >= 6: + # check if user already exists + user = User.query.filter_by(email=post_data.get('email')).first() + if not user: + try: + user = User( + email=post_data.get('email'), + password=post_data.get('password') + ) + # insert the user + db.session.add(user) + db.session.commit() + # generate the auth token + auth_token = user.encode_auth_token(user.id) + responseObject = { + 'status': 'success', + 'message': 'Successfully registered.', + 'auth_token': auth_token.decode() + } + return make_response(jsonify(responseObject)), 201 + except Exception as e: + responseObject = { + 'status': 'fail', + 'message': 'Some error occurred. Please try again.' + } + return make_response(jsonify(responseObject)), 401 + else: + responseObject = { + 'status': 'fail', + 'message': 'User already exists. Please Log in.', + } + return make_response(jsonify(responseObject)), 202 + responseObject = { + 'status': 'fail', + 'message': 'Email or Password format is not correct.', + } + return make_response(jsonify(responseObject)), 202 class LoginAPI(MethodView): """ User Login Resource """ + def post(self): # get the post data post_data = request.get_json() - try: - # fetch the user data - user = User.query.filter_by( - email=post_data.get('email') - ).first() - if user and bcrypt.check_password_hash( - user.password, post_data.get('password') - ): - auth_token = user.encode_auth_token(user.id) - if auth_token: + # validate post data + if py_version == 3: + if isinstance(post_data['email'], str) and isinstance(post_data['password'], str): + is_str = True + elif py_version == 2: + if isinstance(post_data['email'], unicode) and isinstance(post_data['password'], unicode): + is_str = True + if is_str: + if '@' in post_data['email'] and '.' in post_data['email'] and len(post_data['email']) >= 4 and len( + post_data['password']) >= 6: + try: + # fetch the user data + user = User.query.filter_by( + email=post_data.get('email') + ).first() + if user and bcrypt.check_password_hash( + user.password, post_data.get('password') + ): + auth_token = user.encode_auth_token(user.id) + if auth_token: + responseObject = { + 'status': 'success', + 'message': 'Successfully logged in.', + 'auth_token': auth_token.decode() + } + return make_response(jsonify(responseObject)), 200 + else: + responseObject = { + 'status': 'fail', + 'message': 'User does not exist.' + } + return make_response(jsonify(responseObject)), 404 + except Exception as e: + print(e) responseObject = { - 'status': 'success', - 'message': 'Successfully logged in.', - 'auth_token': auth_token.decode() + 'status': 'fail', + 'message': 'Email or Password format is not correct.' } - return make_response(jsonify(responseObject)), 200 - else: - responseObject = { - 'status': 'fail', - 'message': 'User does not exist.' - } - return make_response(jsonify(responseObject)), 404 - except Exception as e: - print(e) - responseObject = { - 'status': 'fail', - 'message': 'Try again' - } - return make_response(jsonify(responseObject)), 500 + return make_response(jsonify(responseObject)), 202 + responseObject = { + 'status': 'fail', + 'message': 'Email or Password format is not correct.', + } + return make_response(jsonify(responseObject)), 202 class UserAPI(MethodView): """ User Resource """ + def get(self): # get the auth token auth_header = request.headers.get('Authorization') @@ -131,6 +163,7 @@ class LogoutAPI(MethodView): """ Logout Resource """ + def post(self): # get auth token auth_header = request.headers.get('Authorization') @@ -171,6 +204,7 @@ def post(self): } return make_response(jsonify(responseObject)), 401 + # define the API resources registration_view = RegisterAPI.as_view('register_api') login_view = LoginAPI.as_view('login_api') diff --git a/project/tests/test_property.py b/project/tests/test_property.py new file mode 100644 index 0000000..182a1c4 --- /dev/null +++ b/project/tests/test_property.py @@ -0,0 +1,185 @@ +# project/server/tests/property_tests.py + +import unittest +import json + +# from project.server import db +# from project.server.models import User, BlacklistToken +from project.tests.base import BaseTestCase +from hypothesis import given, assume, strategies as st + +# data structures for testing +user_data = st.fixed_dictionaries({ + 'email': st.text(), + 'password': st.text(min_size=6) +}) + + +class PropertyBasedTestAuthBlueprint(BaseTestCase): + """ Property Based testing for existing test suite """ + + @given(user_data) + def test_user_registration(self, data): + """ + Property Based testing user registration + :param data: user_data dict + :return: bool + """ + # assuming email and password exists + assume(data['email']) + assume(data['password']) + + response = self.client.post( + '/auth/register', + data=json.dumps(data), + content_type='application/json' + ) + if response.status_code == 202: + response_data = json.loads(response.data.decode()) + self.assertTrue(response_data['status'] == 'fail') + self.assertTrue(response_data['message'] == 'Email or Password format is not correct.') + self.assertTrue(response.content_type == 'application/json') + elif response.status_code == 201: + response_data = json.loads(response.data.decode()) + self.assertTrue(response_data['status'] == 'success') + self.assertTrue(response_data['message'] == 'Successfully registered.') + self.assertTrue(response_data['auth_token']) + self.assertTrue(response.content_type == 'application/json') + + @given(user_data) + def test_registered_user_login(self, data): + """ + Property Based testing for login of registered-user + :param data: user_data dict + :return: bool + """ + with self.client: + # user registration + resp_register = self.client.post( + '/auth/register', + data=json.dumps(data), + content_type='application/json' + ) + if resp_register.status_code == 202: + data_register = json.loads(resp_register.data.decode()) + self.assertTrue(data_register['status'] == 'fail') + self.assertTrue(data_register['message'] == 'Email or Password format is not correct.') + self.assertTrue(resp_register.content_type == 'application/json') + elif resp_register.status_code == 201: + data_register = json.loads(resp_register.data.decode()) + self.assertTrue(data_register['status'] == 'success') + self.assertTrue(data_register['message'] == 'Successfully registered.') + self.assertTrue(data_register['auth_token']) + self.assertTrue(resp_register.content_type == 'application/json') + + # registered user login + response = self.client.post( + '/auth/login', + data=json.dumps(data), + content_type='application/json' + ) + if response.status_code == 202: + response_data = json.loads(response.data.decode()) + self.assertTrue(response_data['status'] == 'fail') + self.assertTrue(response_data['message'] == 'Email or Password format is not correct.') + self.assertTrue(response.content_type == 'application/json') + elif response.status_code == 200: + response_data = json.loads(response.data.decode()) + self.assertTrue(response_data['status'] == 'success') + self.assertTrue(response_data['message'] == 'Successfully logged in.') + self.assertTrue(response_data['auth_token']) + self.assertTrue(response.content_type == 'application/json') + + @given(user_data) + def test_user_status(self, data): + """ + Test for user status + :param data: user_data dict + :return: bool + """ + with self.client: + resp_register = self.client.post( + '/auth/register', + data=json.dumps(data), + content_type='application/json' + ) + if resp_register.status_code == 202: + data_register = json.loads(resp_register.data.decode()) + self.assertTrue(data_register['status'] == 'fail') + self.assertTrue(data_register['message'] == 'Email or Password format is not correct.') + self.assertTrue(resp_register.content_type == 'application/json') + elif resp_register.status_code == 201: + response = self.client.get( + '/auth/status', + headers=dict( + Authorization='Bearer ' + json.loads( + resp_register.data.decode() + )['auth_token'] + ) + ) + response_data = json.loads(response.data.decode()) + self.assertTrue(response_data['status'] == 'success') + self.assertTrue(response_data['data'] is not None) + self.assertTrue(response_data['data']['email'] == data['email']) + self.assertTrue(response_data['data']['admin'] is 'true' or 'false') + self.assertEqual(response.status_code, 200) + + @given(user_data) + def test_valid_logout(self, data): + """ + Test for logout before token expires + :param data: user_data dict + :return: bool + """ + with self.client: + # user registration + resp_register = self.client.post( + '/auth/register', + data=json.dumps(data), + content_type='application/json', + ) + if resp_register.status_code == 201: + data_register = json.loads(resp_register.data.decode()) + self.assertTrue(data_register['status'] == 'success') + self.assertTrue( + data_register['message'] == 'Successfully registered.') + self.assertTrue(data_register['auth_token']) + self.assertTrue(resp_register.content_type == 'application/json') + # user login + resp_login = self.client.post( + '/auth/login', + data=json.dumps(dict( + email='joe@gmail.com', + password='123456' + )), + content_type='application/json' + ) + + if resp_login.status_code == 200: + data_login = json.loads(resp_login.data.decode()) + self.assertTrue(data_login['status'] == 'success') + self.assertTrue(data_login['message'] == 'Successfully logged in.') + self.assertTrue(data_login['auth_token']) + self.assertTrue(resp_login.content_type == 'application/json') + + # valid token logout + response = self.client.post( + '/auth/logout', + headers=dict( + Authorization='Bearer ' + json.loads( + resp_login.data.decode() + )['auth_token'] + ) + ) + if response.status_code == 401: + response_data = json.loads(response.data.decode()) + self.assertTrue(response_data['status'] == 'fail') + elif response.status_code == 200: + response_data = json.loads(response.data.decode()) + self.assertTrue(response_data['status'] == 'success') + self.assertTrue(response_data['message'] == 'Successfully logged out.') + self.assertEqual(response.status_code, 200) + + +if __name__ == '__main__': + unittest.main() diff --git a/requirements.txt b/requirements.txt index f8bacd0..e8aee54 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ Flask-Migrate==2.0.2 Flask-Script==2.0.5 Flask-SQLAlchemy==2.1 Flask-Testing==0.6.1 +hypothesis==3.6.1 itsdangerous==0.24 Jinja2==2.8 Mako==1.0.6