1
1
import os , datetime
2
2
import pytest
3
3
from unittest import skipIf
4
- from sqlalchemy import create_engine , select , Column
4
+ from sqlalchemy import create_engine , select , insert , Column , MetaData , Table
5
5
from sqlalchemy .orm import declarative_base , Session
6
- from sqlalchemy .types import SMALLINT , Integer , BigInteger , Float , DECIMAL , BOOLEAN , String
6
+ from sqlalchemy .types import SMALLINT , Integer , BOOLEAN , String
7
7
8
8
9
9
@pytest .fixture
@@ -12,8 +12,10 @@ def db_engine():
12
12
HOST = os .environ .get ("host" )
13
13
HTTP_PATH = os .environ .get ("http_path" )
14
14
ACCESS_TOKEN = os .environ .get ("access_token" )
15
+ CATALOG = os .environ .get ("catalog" )
16
+ SCHEMA = os .environ .get ("schema" )
15
17
16
- engine = create_engine (f"databricks+thrift://token:{ ACCESS_TOKEN } @{ HOST } ?http_path={ HTTP_PATH } " )
18
+ engine = create_engine (f"databricks+thrift://token:{ ACCESS_TOKEN } @{ HOST } ?http_path={ HTTP_PATH } &catalog= { CATALOG } &schema= { SCHEMA } " )
17
19
return engine
18
20
19
21
@@ -26,23 +28,57 @@ def base(db_engine):
26
28
def session (db_engine ):
27
29
return Session (bind = db_engine )
28
30
31
+ @pytest .fixture ()
32
+ def metadata_obj (db_engine ):
33
+ return MetaData (bind = db_engine )
34
+
29
35
30
36
def test_can_connect (db_engine ):
31
37
simple_query = "SELECT 1"
32
38
result = db_engine .execute (simple_query ).fetchall ()
33
39
assert len (result ) == 1
34
40
35
- @skipIf (True , 'metadata operations not yet supported' )
36
- def test_create_insert_drop_table (base , session : Session ):
37
- """Make sure we can automatically create and drop a table defined with SQLAlchemy's MetaData object
38
- while exercising all supported types.
41
+
42
+ def test_create_insert_drop_table_core (base , db_engine , metadata_obj : MetaData ):
43
+ """
44
+ """
45
+
46
+ SampleTable = Table (
47
+ "PySQLTest_{}" .format (datetime .datetime .utcnow ().strftime ("%s" )),
48
+ metadata_obj ,
49
+ Column ("name" , String (255 )),
50
+ Column ("episodes" , Integer ),
51
+ Column ("some_bool" , BOOLEAN )
52
+ )
53
+
54
+ metadata_obj .create_all ()
55
+
56
+ insert_stmt = insert (SampleTable ).values (name = "Bim Adwunmi" , episodes = 6 , some_bool = True )
57
+
58
+ with db_engine .connect () as conn :
59
+ conn .execute (insert_stmt )
60
+
61
+ select_stmt = select (SampleTable )
62
+ resp = db_engine .execute (select_stmt )
63
+
64
+ result = resp .fetchall ()
65
+
66
+ assert len (result ) == 1
67
+
68
+ metadata_obj .drop_all ()
69
+
70
+
71
+ @skipIf (True , 'Unity catalog must be supported' )
72
+ def test_create_insert_drop_table_orm (base , session : Session ):
73
+ """ORM classes built on the declarative base class must have a primary key.
74
+ This is restricted to Unity Catalog.
39
75
"""
40
76
41
77
class SampleObject (base ):
42
78
43
79
__tablename__ = "PySQLTest_{}" .format (datetime .datetime .utcnow ().strftime ("%s" ))
44
80
45
- name = Column (String , primary_key = True )
81
+ name = Column (String ( 255 ) , primary_key = True )
46
82
episodes = Column (Integer ),
47
83
some_bool = Column (BOOLEAN )
48
84
0 commit comments