Skip to content

Commit 91cb6b8

Browse files
committed
Added/Updated tests\bugs\gh_5978_test.py: Checked on 4.0.5.3092, 5.0.1.1395, 6.0.0.346. NOTE. TEST REQUIRES FIREBIRD-DRIVER VERSION 1.10.4+
1 parent 0abb5e8 commit 91cb6b8

File tree

1 file changed

+177
-0
lines changed

1 file changed

+177
-0
lines changed

tests/bugs/gh_5978_test.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
#coding:utf-8
2+
3+
"""
4+
ID: issue-5978
5+
ISSUE: https://github.com/FirebirdSQL/firebird/issues/5978
6+
TITLE: Access to the name of DB encryption key [CORE5712]
7+
DESCRIPTION:
8+
Test creates temporary user with system privilege GET_DBCRYPT_INFO in order to allow him to obtain encryption info.
9+
Then we run following:
10+
1) encrypt DB using plugin 'fbSampleDbCrypt' provided in every FB 4.x+ snapshot;
11+
2) make connection as SYSDBA and ask DB-crypt info (DbInfoCode.CRYPT_PLUGIN and DbInfoCode.CRYPT_KEY)
12+
3) decrypt DB
13+
After this we repeat these actions, except that in "2)" we use temporary user ('tmp_senior') instead of SYSDBA
14+
(he must get same info as was obtained in previous step for SYSDBA).
15+
NOTES:
16+
[08.05.2024] pzotov
17+
18+
### ACHTUNG ### TEST REQUIRES FIREBIRD-DRIVER VERSION 1.10.4+ (date: 07-may-2024).
19+
See reply from pcisar, letters with subj: "fb_info_crypt_key: how it can be obtained using firebird-driver ? // GH-5978, 2018"
20+
21+
Checked on 4.0.5.3092, 5.0.1.1395, 6.0.0.346.
22+
FB 3.x is not checked.
23+
"""
24+
import os
25+
import locale
26+
import re
27+
import time
28+
import datetime as py_dt
29+
30+
import pytest
31+
from firebird.qa import *
32+
from firebird.driver import DatabaseError, DbInfoCode
33+
34+
###########################
35+
### S E T T I N G S ###
36+
###########################
37+
38+
# QA_GLOBALS -- dict, is defined in qa/plugin.py, obtain settings
39+
# from act.files_dir/'test_config.ini':
40+
enc_settings = QA_GLOBALS['encryption']
41+
42+
# ACHTUNG: this must be carefully tuned on every new host:
43+
#
44+
MAX_WAITING_ENCR_FINISH = int(enc_settings['MAX_WAIT_FOR_ENCR_FINISH_WIN' if os.name == 'nt' else 'MAX_WAIT_FOR_ENCR_FINISH_NIX'])
45+
assert MAX_WAITING_ENCR_FINISH > 0
46+
47+
ENCRYPTION_PLUGIN = enc_settings['encryption_plugin'] # fbSampleDbCrypt
48+
ENCRYPTION_KEY = enc_settings['encryption_key'] # Red
49+
50+
db = db_factory()
51+
act = python_act('db', substitutions = [('[ \t]+', ' ')])
52+
53+
# Create user to check ability to get info about crypt key name and plugin name
54+
# by granting yto him system privilege 'GET_DBCRYPT_INFO'
55+
# See: https://github.com/FirebirdSQL/firebird/issues/5978#issuecomment-826241686
56+
# Full list of systyem privileges: src/jrd/SystemPrivileges.h
57+
#
58+
tmp_senior = user_factory('db', name='tmp$senior', password='456', plugin = 'Srp')
59+
60+
tmp_role = role_factory('db', name='tmp$role_get_dbcrypt_key')
61+
62+
#-----------------------------------------------------------------------
63+
64+
def run_encr_decr(act: Action, mode, max_wait_encr_thread_finish, capsys):
65+
66+
assert mode in ('encrypt', 'decrypt')
67+
68+
if mode == 'encrypt':
69+
alter_db_sttm = f'alter database encrypt with "{ENCRYPTION_PLUGIN}" key "{ENCRYPTION_KEY}"'
70+
wait_for_state = 'Database encrypted'
71+
elif mode == 'decrypt':
72+
alter_db_sttm = 'alter database decrypt'
73+
wait_for_state = 'Database not encrypted'
74+
75+
e_thread_finished = False
76+
77+
# 0 = non crypted;
78+
# 1 = has been encrypted;
79+
# 2 = is DEcrypting;
80+
# 3 = is Encrypting;
81+
#
82+
REQUIRED_CRYPT_STATE = 1 if mode == 'encrypt' else 0
83+
current_crypt_state = -1
84+
d1 = py_dt.timedelta(0)
85+
with act.db.connect() as con:
86+
cur = con.cursor()
87+
ps = cur.prepare('select mon$crypt_state from mon$database')
88+
89+
t1=py_dt.datetime.now()
90+
try:
91+
d1 = t1-t1
92+
con.execute_immediate(alter_db_sttm)
93+
con.commit()
94+
while True:
95+
t2=py_dt.datetime.now()
96+
d1=t2-t1
97+
if d1.seconds*1000 + d1.microseconds//1000 > max_wait_encr_thread_finish:
98+
break
99+
100+
######################################################
101+
### C H E C K M O N $ C R Y P T _ S T A T E ###
102+
######################################################
103+
cur.execute(ps)
104+
current_crypt_state = cur.fetchone()[0]
105+
con.commit()
106+
if current_crypt_state == REQUIRED_CRYPT_STATE:
107+
e_thread_finished = True
108+
break
109+
else:
110+
time.sleep(0.5)
111+
except DatabaseError as e:
112+
print( e.__str__() )
113+
114+
115+
assert e_thread_finished, f'TIMEOUT EXPIRATION: {mode=} took {d1.seconds*1000 + d1.microseconds//1000} ms which {max_wait_encr_thread_finish=} ms'
116+
117+
#-----------------------------------------------------------------------
118+
119+
@pytest.mark.encryption
120+
@pytest.mark.version('>=4.0')
121+
def test_1(act: Action, tmp_senior: User, tmp_role: Role, capsys):
122+
123+
# src/jrd/SystemPrivileges.h
124+
prepare_sql = f"""
125+
set bail on;
126+
set wng off;
127+
set list on;
128+
alter role {tmp_role.name}
129+
set system privileges to
130+
GET_DBCRYPT_INFO
131+
;
132+
revoke all on all from {tmp_senior.name};
133+
grant default {tmp_role.name} to user {tmp_senior.name};
134+
commit;
135+
"""
136+
137+
# NB: "firebird.driver.types.InterfaceError: An error response was received" will raise if we
138+
# try to run as tmp_senior and miss 'grant default {tmp_role.name} to user {tmp_senior.name};'
139+
140+
act.expected_stdout = ''
141+
act.isql(switches=['-q'], input = prepare_sql, combine_output = True, io_enc = locale.getpreferredencoding())
142+
assert act.clean_stdout == act.clean_expected_stdout
143+
act.reset()
144+
145+
#......................................................
146+
147+
try:
148+
run_encr_decr(act, 'encrypt', MAX_WAITING_ENCR_FINISH, capsys)
149+
150+
for con_user in (act.db.user, tmp_senior.name):
151+
con_pswd = act.db.password if con_user == act.db.user else tmp_senior.password
152+
153+
# ROLE not needed for tmp_senior because it will be granted as default, see above:
154+
with act.db.connect(user = con_user, password = con_pswd) as con:
155+
crypt_plugin = con.info.get_info(DbInfoCode.CRYPT_PLUGIN)
156+
crypt_key = con.info.get_info(DbInfoCode.CRYPT_KEY)
157+
print(f'{con_user=}')
158+
print(f'{crypt_plugin=}')
159+
print(f'{crypt_key=}')
160+
161+
run_encr_decr(act, 'decrypt', MAX_WAITING_ENCR_FINISH, capsys)
162+
163+
except DatabaseError as e:
164+
print(e.__str__())
165+
166+
act.expected_stdout = f"""
167+
con_user='{act.db.user.upper()}'
168+
crypt_plugin='{ENCRYPTION_PLUGIN}'
169+
crypt_key='{ENCRYPTION_KEY}'
170+
171+
con_user='{tmp_senior.name.upper()}'
172+
crypt_plugin='{ENCRYPTION_PLUGIN}'
173+
crypt_key='{ENCRYPTION_KEY}'
174+
"""
175+
176+
act.stdout = capsys.readouterr().out
177+
assert act.clean_stdout == act.clean_expected_stdout

0 commit comments

Comments
 (0)