Skip to content

Commit b66770e

Browse files
author
Sam Partee
authored
PandasReader and SearchIndex (#13)
Basic functionality and implementation of the PandasReader, SearchIndex, and AsyncSearchIndex with some slight modifications to the design for the Reader.
1 parent e445a5d commit b66770e

20 files changed

+551
-153
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
__pycache__/
22
redisvl.egg-info/
3+
.coverage

conftest.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,43 @@
11
import os
22
import pytest
3+
import pandas as pd
34

4-
from redisvl.utils.connection import get_async_redis_connection
5+
from redisvl.utils.connection import (
6+
get_async_redis_connection,
7+
get_redis_connection
8+
)
59

610
HOST = os.environ.get("REDIS_HOST", "localhost")
711
PORT = os.environ.get("REDIS_PORT", 6379)
812
USER = os.environ.get("REDIS_USER", "default")
913
PASS = os.environ.get("REDIS_PASSWORD", "")
1014

15+
aredis = get_async_redis_connection(HOST, PORT, PASS)
16+
redis = get_redis_connection(HOST, PORT, PASS)
17+
print(type(redis))
18+
19+
@pytest.fixture
20+
def async_client():
21+
return aredis
22+
1123
@pytest.fixture
12-
def async_redis():
13-
return get_async_redis_connection(HOST, PORT, PASS)
24+
def client():
25+
return redis
26+
27+
@pytest.fixture
28+
def df():
29+
30+
data = pd.DataFrame(
31+
{
32+
"users": ["john", "mary", "joe"],
33+
"age": [1, 2, 3],
34+
"job": ["engineer", "doctor", "dentist"],
35+
"credit_score": ["high", "low", "medium"],
36+
"user_embedding": [
37+
[0.1, 0.1, 0.5],
38+
[0.1, 0.1, 0.5],
39+
[0.9, 0.9, 0.1],
40+
],
41+
}
42+
)
43+
return data
File renamed without changes.
File renamed without changes.

redisvl/cli/load.py

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
import sys
44
import typing as t
55

6-
from redisvl import readers
7-
from redisvl.index import SearchIndex
8-
from redisvl.load import concurrent_store_as_hash
9-
from redisvl.utils.connection import get_async_redis_connection
6+
from redisvl.index import AsyncSearchIndex
107
from redisvl.utils.log import get_logger
118

129
logger = get_logger(__name__)
@@ -26,58 +23,50 @@ def __init__(self):
2623
parser.add_argument(
2724
"-a", "--password", help="Redis password", type=str, default=""
2825
)
26+
parser.add_argument("-r", "--reader", help="Reader", type=str, default="pandas")
27+
parser.add_argument("-f", "--format", help="Format", type=str, default="pickle")
2928
parser.add_argument("-c", "--concurrency", type=int, default=50)
3029
# TODO add argument to optionally not create index
3130
args = parser.parse_args(sys.argv[2:])
3231
if not args.data:
3332
parser.print_help()
3433
exit(0)
3534

36-
# Create Redis Connection
37-
try:
38-
logger.info(f"Connecting to {args.host}:{str(args.port)}")
39-
redis_conn = get_async_redis_connection(args.host, args.port, args.password)
40-
logger.info("Connected.")
41-
except:
42-
# TODO: be more specific about the exception
43-
logger.error("Could not connect to redis.")
44-
exit(1)
45-
4635
# validate schema
47-
index = SearchIndex.from_yaml(redis_conn, args.schema)
36+
index = AsyncSearchIndex.from_yaml(args.schema)
37+
38+
# try to connect to redis
39+
index.connect(host=args.host, port=args.port, password=args.password)
4840

4941
# read in data
5042
logger.info("Reading data...")
51-
data = self.read_data(args) # TODO add other readers and formats
43+
reader = self._get_reader(args)
5244
logger.info("Data read.")
5345

5446
# load data and create the index
55-
asyncio.run(self.load_and_create_index(args.concurrency, data, index))
47+
asyncio.run(self._load_and_create_index(args.concurrency, reader, index))
5648

57-
def read_data(
58-
self, args: t.List[str], reader: str = "pandas", format: str = "pickle"
59-
) -> dict:
60-
if reader == "pandas":
61-
if format == "pickle":
62-
return readers.pandas.from_pickle(args.data)
49+
def _get_reader(self, args: t.List[str]) -> dict:
50+
if args.reader == "pandas":
51+
from redisvl.readers import PandasReader
52+
53+
if args.format == "pickle":
54+
return PandasReader.from_pickle(args.data)
55+
elif args.format == "json":
56+
return PandasReader.from_json(args.data)
6357
else:
6458
raise NotImplementedError(
65-
"Only pickle format is supported for pandas reader."
59+
"Only pickle and json formats are supported for pandas reader using the CLI"
6660
)
6761
else:
6862
raise NotImplementedError("Only pandas reader is supported.")
6963

70-
async def load_and_create_index(
71-
self, concurrency: int, data: dict, index: SearchIndex
64+
async def _load_and_create_index(
65+
self, concurrency: int, reader: t.Iterable[dict], index: AsyncSearchIndex
7266
):
7367

7468
logger.info("Loading data...")
75-
if index.storage_type == "hash":
76-
await concurrent_store_as_hash(
77-
data, concurrency, index.key_field, index.prefix, index.redis_conn
78-
)
79-
else:
80-
raise NotImplementedError("Only hash storage type is supported.")
69+
await index.load(data=reader, concurrency=concurrency)
8170
logger.info("Data loaded.")
8271

8372
# create index

0 commit comments

Comments
 (0)