diff --git a/industries/predictive_maintenance_agent/.gitignore b/industries/predictive_maintenance_agent/.gitignore index b47a17cb..f186afac 100644 --- a/industries/predictive_maintenance_agent/.gitignore +++ b/industries/predictive_maintenance_agent/.gitignore @@ -1,46 +1,123 @@ -# macOS system files +# Misc +config_examples.yml +config_examples.yaml +env.sh +frontend/ +prompts.md + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +*.egg +*.egg-info/ +dist/ +build/ +*.whl +pip-wheel-metadata/ .DS_Store -.DS_Store? -._* -.Spotlight-V100 -.Trashes -ehthumbs.db -Thumbs.db -# Database and vector store files -database/ -*.db -*.sqlite3 +# Virtual environments +.venv/ +venv/ +ENV/ +env/ + +# IDEs and Editors +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.hypothesis/ + +# Jupyter Notebook +.ipynb_checkpoints/ +*.ipynb_checkpoints/ -# Output and generated files +# Output and Data Directories output_data/ -moment/ -readmes/ -*.html -*.csv -*.npy +eval_output/ +example_eval_output/ +output/ +results/ +logs/ -# Python package metadata -src/**/*.egg-info/ -*.egg-info/ +# Database files +*.db +*.sqlite +*.sqlite3 +database/*.db +database/*.sqlite -# Environment files (if they contain secrets) -env.sh +# Vector store data (ChromaDB) +database/ +chroma_db/ +vector_store/ +vanna_vector_store/ -# Model files (if large/binary) +# Model files (large binary files) models/*.pkl -models/*.joblib -models/*.model +models/*.h5 +models/*.pt +models/*.pth +models/*.ckpt +*.pkl +*.h5 +*.pt +*.pth +moment/ -# Logs -*.log -logs/ +# Data files (CSV, JSON, etc. - be selective) +*.csv +*.json +!training_data.json +!vanna_training_data.yaml +!config*.json +!config*.yaml +!config*.yml +!pyproject.toml +!package.json + +# Frontend build artifacts +frontend/node_modules/ +frontend/dist/ +frontend/build/ +frontend/.next/ +frontend/out/ + +# Environment and secrets +.env +.env.local +.env.*.local +*.secret +secrets/ +credentials/ # Temporary files *.tmp *.temp -.pytest_cache/ -__pycache__/ +*.log +*.cache + +# OS specific +Thumbs.db +Desktop.ini + +# Experiment tracking +mlruns/ +wandb/ -# dot env -mydot.env +# Documentation builds +docs/_build/ +docs/.doctrees/ +site/ diff --git a/industries/predictive_maintenance_agent/INSTALLATION.md b/industries/predictive_maintenance_agent/INSTALLATION.md new file mode 100644 index 00000000..612d6399 --- /dev/null +++ b/industries/predictive_maintenance_agent/INSTALLATION.md @@ -0,0 +1,196 @@ +# Installation Guide + +This guide explains how to install the Predictive Maintenance Agent with different database and vector store options. + +## Base Installation + +Install the core package with default dependencies (ChromaDB + SQLite): + +```bash +pip install -e . +``` + +This includes: +- **ChromaDB** - Default vector store for SQL retriever +- **SQLite** - Built-in database support (no additional packages needed) +- **SQLAlchemy** - Generic SQL database support framework +- All core ML and visualization dependencies + +## Optional Dependencies + +Install additional packages based on your needs: + +### Elasticsearch Vector Store + +For production deployments with Elasticsearch as the vector store: + +```bash +pip install -e ".[elasticsearch]" +``` + +### PostgreSQL Database + +For PostgreSQL database support: + +```bash +pip install -e ".[postgres]" +``` + +### MySQL Database + +For MySQL database support: + +```bash +pip install -e ".[mysql]" +``` + +### SQL Server Database + +For Microsoft SQL Server support: + +```bash +pip install -e ".[sqlserver]" +``` + +**Note:** You also need to install the Microsoft ODBC Driver for SQL Server from [Microsoft's website](https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server). + +### Oracle Database + +For Oracle database support: + +```bash +pip install -e ".[oracle]" +``` + +**Note:** You also need to install Oracle Instant Client from [Oracle's website](https://www.oracle.com/database/technologies/instant-client.html). + +## Combined Installations + +### All Databases + +Install support for all SQL databases at once: + +```bash +pip install -e ".[all-databases]" +``` + +This includes: PostgreSQL, MySQL, SQL Server, and Oracle drivers. + +### Everything + +Install all optional dependencies (Elasticsearch + all databases): + +```bash +pip install -e ".[all]" +``` + +## Installation Examples by Use Case + +### Development Setup (Simplest) +```bash +# Base installation - ChromaDB + SQLite +pip install -e . +``` + +### Production with PostgreSQL +```bash +# Base + PostgreSQL +pip install -e ".[postgres]" +``` + +### Production with Elasticsearch and PostgreSQL +```bash +# Base + Elasticsearch + PostgreSQL +pip install -e ".[elasticsearch,postgres]" +``` + +### Enterprise with All Options +```bash +# Everything +pip install -e ".[all]" +``` + +## Verification + +After installation, verify your setup: + +```python +# Check installed packages +import chromadb # Should work with base install +import sqlalchemy # Should work with base install + +# Optional packages (only if installed) +import elasticsearch # If [elasticsearch] installed +import psycopg2 # If [postgres] installed +import pymysql # If [mysql] installed +import pyodbc # If [sqlserver] installed +import cx_Oracle # If [oracle] installed +``` + +## System Requirements + +- **Python:** 3.11 or 3.12 (Python 3.13 not yet supported) +- **OS:** Linux, macOS, or Windows +- **Memory:** Minimum 8GB RAM recommended +- **Disk:** Minimum 10GB free space + +## External Service Requirements + +Depending on your configuration, you may need: + +### Elasticsearch (Optional) +- Elasticsearch 8.0 or higher running +- Network access to Elasticsearch cluster +- Authentication credentials (API key or username/password) + +### Database Servers (Optional) +- **PostgreSQL:** PostgreSQL 12 or higher +- **MySQL:** MySQL 8.0 or higher +- **SQL Server:** SQL Server 2016 or higher +- **Oracle:** Oracle 19c or higher + +## Troubleshooting + +### Import Errors + +**Problem:** `ModuleNotFoundError: No module named 'elasticsearch'` +**Solution:** Install elasticsearch support: `pip install -e ".[elasticsearch]"` + +**Problem:** `ModuleNotFoundError: No module named 'psycopg2'` +**Solution:** Install PostgreSQL support: `pip install -e ".[postgres]"` + +### Binary Dependencies + +**SQL Server on Linux/Mac:** +```bash +# Install unixODBC first +# macOS: +brew install unixodbc + +# Ubuntu/Debian: +sudo apt-get install unixodbc unixodbc-dev + +# Then install ODBC driver from Microsoft +``` + +**Oracle:** +- Download and install Oracle Instant Client +- Set environment variables: + ```bash + export ORACLE_HOME=/path/to/instantclient + export LD_LIBRARY_PATH=$ORACLE_HOME:$LD_LIBRARY_PATH + ``` + +## Next Steps + +After installation, see: +- **Configuration Guide:** `configs/README.md` - How to configure vector stores and databases +- **Examples:** `config_examples.yaml` - Sample configurations +- **Getting Started:** Run the predictive maintenance workflow + +## Support + +For issues or questions: +1. Check the configuration guide: `configs/README.md` +2. Review example configs: `config_examples.yaml` +3. See troubleshooting sections in the README diff --git a/industries/predictive_maintenance_agent/README.md b/industries/predictive_maintenance_agent/README.md index 262a6937..f33099df 100644 --- a/industries/predictive_maintenance_agent/README.md +++ b/industries/predictive_maintenance_agent/README.md @@ -180,6 +180,21 @@ Now install the PDM workflow: uv pip install -e . ``` +#### Installation Options + +**Base Installation** (default - includes ChromaDB + SQLite): +```bash +uv pip install -e . +``` + +**Optional Database Support:** +- PostgreSQL: `uv pip install -e ".[postgres]"` +- MySQL: `uv pip install -e ".[mysql]"` +- All databases: `uv pip install -e ".[all-databases]"` + +**Optional Vector Store:** +- Elasticsearch: `uv pip install -e ".[elasticsearch]"` + ### [Optional] Verify if all prerequisite packages are installed ```bash uv pip list | grep -E "nvidia-nat|nvidia-nat-ragaai|nvidia-nat-phoenix|vanna|chromadb|xgboost|pytest|torch|matplotlib" @@ -320,6 +335,31 @@ INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit) During startup, you'll see Vanna training logs as the SQL agent automatically loads the domain knowledge from `vanna_training_data.yaml` (as described in Section 6). +### Start Modern Web UI (Recommended) + +We now provide a **custom modern web interface** inspired by the NVIDIA AIQ Research Assistant design! This UI offers a superior experience compared to the generic NeMo-Agent-Toolkit-UI. + +**In a new terminal**, navigate to the frontend directory and start the UI: + +```bash +cd frontend +npm install # First time only +npm start +``` + +The UI will be available at `http://localhost:3000` + +**Features of the Modern UI:** +- 🎨 Clean, professional NVIDIA-branded design +- 📊 Embedded visualization display for plots and charts +- 🎯 Quick-start example prompts for common queries +- ⚙️ Configurable settings panel +- 🌓 Dark/Light theme support +- 📱 Fully responsive mobile design +- 🔄 Real-time streaming responses + +See `frontend/README.md` for detailed documentation. + ### Start Code Execution Sandbox The code generation assistant requires a standalone Python sandbox that can execute the generated code. This step starts that sandbox. @@ -443,7 +483,9 @@ def your_custom_utility(file_path: str, param: int = 100) -> str: 4. **Consistent Interface**: All utilities return descriptive success messages 5. **Documentation**: Use `utils.show_utilities()` to discover available functions -### Setup Web Interface +### Alternative: Generic NeMo-Agent-Toolkit UI + +If you prefer the generic NeMo Agent Toolkit UI instead of our custom interface: ```bash git clone https://github.com/NVIDIA/NeMo-Agent-Toolkit-UI.git @@ -459,6 +501,8 @@ The UI is available at `http://localhost:3000` - Configure theme and WebSocket URL as needed - Check "Enable intermediate results" and "Enable intermediate results by default" if you prefer to see all agent calls while the workflow runs +**Note:** The custom modern UI (described above) provides better visualization embedding, domain-specific examples, and a more polished experience tailored for predictive maintenance workflows. + ## Example Prompts Test the system with these prompts: @@ -487,7 +531,7 @@ Retrieve and detect anomalies in sensor 4 measurements for engine number 78 in t **Workspace Utilities Demo** ``` -Retrieve ground truth RUL values and time in cycles from FD001 train dataset. Apply piecewise RUL transformation with MAXLIFE=100. Finally, Plot a line chart of the transformed values across time. +Retrieve RUL values and time in cycles for engine unit 24 from FD001 train dataset. Use the piece wise RUL transformation code utility to perform piecewise RUL transformation on the ground truth RUL values with MAXLIFE=100.Finally, Plot a comparison line chart with RUL values and its transformed values across time. ``` *This example demonstrates how to discover and use workspace utilities directly. The system will show available utilities and then apply the RUL transformation using the pre-built, reliable utility functions.* @@ -496,9 +540,9 @@ Retrieve ground truth RUL values and time in cycles from FD001 train dataset. Ap ``` Perform the following steps: -1.Retrieve the time in cycles, all sensor measurements, and ground truth RUL values for engine unit 24 from FD001 train dataset. +1.Retrieve the time in cycles, all sensor measurements, and ground truth RUL values, partition by unit number for engine unit 24 from FD001 train dataset. 2.Use the retrieved data to predict the Remaining Useful Life (RUL). -3.Use the piece wise RUL transformation code utility to apply piecewise RUL transformation only to the observed RUL column. +3.Use the piece wise RUL transformation code utility to apply piecewise RUL transformation only to the observed RUL column with MAXLIFE of 100. 4.Generate a plot that compares the transformed RUL values and the predicted RUL values across time. ``` ![Prediction Example](imgs/test_prompt_3.png) diff --git a/industries/predictive_maintenance_agent/configs/README.md b/industries/predictive_maintenance_agent/configs/README.md new file mode 100644 index 00000000..bb7218bc --- /dev/null +++ b/industries/predictive_maintenance_agent/configs/README.md @@ -0,0 +1,571 @@ +# SQL Query and Retrieve Tool Configuration Guide + +This comprehensive guide explains how to configure the SQL Query and Retrieve Tool, covering both vector store backends and SQL database connections. + +## Table of Contents +1. [Vector Store Configuration](#vector-store-configuration) +2. [SQL Database Configuration](#sql-database-configuration) +3. [Complete Configuration Examples](#complete-configuration-examples) +4. [Troubleshooting](#troubleshooting) + +--- + +## Vector Store Configuration + +### Overview + +The tool supports **two vector store backends** for storing Vanna AI SQL training data: +- **ChromaDB** (local, file-based) - Default +- **Elasticsearch** (distributed, server-based) + +Both vector stores provide identical functionality and store the same data (DDL, documentation, question-SQL pairs). + +### Quick Start - Vector Stores + +#### Option 1: ChromaDB (Recommended for Development) + +```yaml +functions: + - name: my_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + + # ChromaDB Configuration (DEFAULT) + vector_store_type: chromadb + vector_store_path: ./vanna_vector_store + + # Database and other settings... + db_connection_string_or_path: ./database.db + db_type: sqlite + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +**Requirements:** +- No additional services required +- No extra Python packages needed + +#### Option 2: Elasticsearch (Recommended for Production) + +```yaml +functions: + - name: my_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + + # Elasticsearch Configuration + vector_store_type: elasticsearch + elasticsearch_url: http://localhost:9200 + elasticsearch_index_name: vanna_sql_vectors # Optional + elasticsearch_username: elastic # Optional + elasticsearch_password: changeme # Optional + + # Database and other settings... + db_connection_string_or_path: ./database.db + db_type: sqlite + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +**Requirements:** +- Elasticsearch service must be running +- Install: `pip install elasticsearch` + +### Detailed Comparison - Vector Stores + +| Feature | ChromaDB | Elasticsearch | +|---------|----------|---------------| +| **Setup Complexity** | Simple | Moderate | +| **External Services** | None required | Requires ES cluster | +| **Storage Type** | Local file-based | Distributed | +| **High Availability** | No | Yes (with clustering) | +| **Horizontal Scaling** | No | Yes | +| **Best For** | Dev, testing, single-server | Production, multi-user | +| **Authentication** | File system | API key or basic auth | +| **Performance** | Fast for single-user | Fast for multi-user | +| **Backup** | Copy directory | ES snapshots | + +### When to Use Each Vector Store + +#### Use ChromaDB When: +✅ Getting started or prototyping +✅ Single-server deployment +✅ Local development environment +✅ Simple setup required +✅ No existing Elasticsearch infrastructure +✅ Small to medium data volume + +#### Use Elasticsearch When: +✅ Production environment +✅ Multiple instances/users need access +✅ Need high availability and clustering +✅ Already have Elasticsearch infrastructure +✅ Need advanced search capabilities +✅ Distributed deployment required +✅ Large scale deployments + +### Vector Store Configuration Parameters + +#### Common Parameters (Both Vector Stores) +```yaml +llm_name: string # LLM to use +embedding_name: string # Embedding model to use +db_connection_string_or_path: string # Database connection +db_type: string # 'sqlite', 'postgres', or 'sql' +output_folder: string # Output directory +vanna_training_data_path: string # Training data YAML file +``` + +#### ChromaDB-Specific Parameters +```yaml +vector_store_type: chromadb # Set to 'chromadb' +vector_store_path: string # Directory for ChromaDB storage +``` + +#### Elasticsearch-Specific Parameters +```yaml +vector_store_type: elasticsearch # Set to 'elasticsearch' +elasticsearch_url: string # ES URL (e.g., http://localhost:9200) +elasticsearch_index_name: string # Index name (default: vanna_vectors) +elasticsearch_username: string # Optional: for basic auth +elasticsearch_password: string # Optional: for basic auth +elasticsearch_api_key: string # Optional: alternative to username/password +``` + +### Elasticsearch Authentication + +Choose one of these authentication methods: + +#### Option 1: API Key (Recommended) +```yaml +elasticsearch_api_key: your-api-key-here +``` + +#### Option 2: Basic Auth +```yaml +elasticsearch_username: elastic +elasticsearch_password: changeme +``` + +#### Option 3: No Auth (Development Only) +```yaml +# Omit all auth parameters +``` + +### Data Migration Between Vector Stores + +#### From ChromaDB to Elasticsearch +1. Export training data from ChromaDB +2. Update configuration to use Elasticsearch +3. Run tool - it will auto-initialize Elasticsearch with training data + +#### From Elasticsearch to ChromaDB +1. Training data is reloaded from YAML file automatically +2. Update configuration to use ChromaDB +3. Run tool - it will auto-initialize ChromaDB + +### Vector Store Troubleshooting + +#### ChromaDB Issues +**Problem:** `FileNotFoundError` or permission errors +**Solution:** Ensure directory exists and has write permissions + +**Problem:** Slow performance +**Solution:** ChromaDB is single-threaded, consider Elasticsearch for better performance + +#### Elasticsearch Issues +**Problem:** `ConnectionError` or `ConnectionTimeout` +**Solution:** Verify Elasticsearch is running: `curl http://localhost:9200` + +**Problem:** `AuthenticationException` +**Solution:** Check username/password or API key + +**Problem:** Index already exists with different mapping +**Solution:** Delete index and let tool recreate: `curl -X DELETE http://localhost:9200/vanna_vectors` + +--- + +## SQL Database Configuration + +### Overview + +The tool supports **multiple SQL database types** through a unified `db_connection_string_or_path` parameter: +- **SQLite** (local, file-based) - Default +- **PostgreSQL** (open-source RDBMS) +- **MySQL** (open-source RDBMS) +- **SQL Server** (Microsoft database) +- **Oracle** (enterprise database) +- **Any SQLAlchemy-compatible database** + +### Quick Start - SQL Databases + +#### Option 1: SQLite (File-Based, No Server Required) + +```yaml +db_connection_string_or_path: ./database.db # Just a file path +db_type: sqlite +``` + +**Requirements:** +- No additional services required +- No extra Python packages needed (sqlite3 is built-in) + +#### Option 2: PostgreSQL + +```yaml +db_connection_string_or_path: postgresql://user:password@localhost:5432/database +db_type: postgres +``` + +**Requirements:** +- PostgreSQL server must be running +- Install: `pip install psycopg2-binary` + +#### Option 3: MySQL + +```yaml +db_connection_string_or_path: mysql+pymysql://user:password@localhost:3306/database +db_type: sql # Generic SQL via SQLAlchemy +``` + +**Requirements:** +- MySQL server must be running +- Install: `pip install pymysql sqlalchemy` + +#### Option 4: SQL Server + +```yaml +db_connection_string_or_path: mssql+pyodbc://user:pass@host:1433/db?driver=ODBC+Driver+17+for+SQL+Server +db_type: sql # Generic SQL via SQLAlchemy +``` + +**Requirements:** +- SQL Server must be running +- Install: `pip install pyodbc sqlalchemy` +- Install ODBC Driver for SQL Server + +#### Option 5: Oracle + +```yaml +db_connection_string_or_path: oracle+cx_oracle://user:password@host:1521/?service_name=service +db_type: sql # Generic SQL via SQLAlchemy +``` + +**Requirements:** +- Oracle database must be running +- Install: `pip install cx_Oracle sqlalchemy` + +### Detailed Comparison - SQL Databases + +| Feature | SQLite | PostgreSQL | MySQL | SQL Server | Oracle | +|---------|--------|------------|-------|------------|--------| +| **Setup** | None | Server required | Server required | Server required | Server required | +| **Cost** | Free | Free | Free | Licensed | Licensed | +| **Use Case** | Dev/testing | Production | Production | Enterprise | Enterprise | +| **Concurrent Users** | Limited | Excellent | Excellent | Excellent | Excellent | +| **File-Based** | Yes | No | No | No | No | +| **Advanced Features** | Basic | Advanced | Good | Advanced | Advanced | +| **Python Driver** | Built-in | psycopg2 | pymysql | pyodbc | cx_Oracle | + +### When to Use Each Database + +#### Use SQLite When: +✅ Development and testing +✅ Prototyping and demos +✅ Single-user applications +✅ No server infrastructure required +✅ Small to medium data volume +✅ Embedded applications +✅ Quick setup needed + +#### Use PostgreSQL When: +✅ Production deployments +✅ Multi-user applications +✅ Need advanced SQL features +✅ Open-source preference +✅ Need strong data integrity +✅ Complex queries and analytics +✅ GIS data support needed + +#### Use MySQL When: +✅ Web applications +✅ Read-heavy workloads +✅ Need wide compatibility +✅ Open-source preference +✅ Large-scale deployments +✅ Replication required + +#### Use SQL Server When: +✅ Microsoft ecosystem +✅ Enterprise applications +✅ .NET integration needed +✅ Advanced analytics (T-SQL) +✅ Business intelligence +✅ Existing SQL Server infrastructure + +#### Use Oracle When: +✅ Large enterprise deployments +✅ Mission-critical applications +✅ Need advanced features (RAC, Data Guard) +✅ Existing Oracle infrastructure +✅ High-availability requirements +✅ Maximum performance needed + +### Connection String Formats + +#### SQLite +``` +Format: /path/to/database.db +Example: ./data/sales.db +Example: /var/app/database.db +``` + +#### PostgreSQL +``` +Format: postgresql://username:password@host:port/database +Example: postgresql://admin:secret@db.example.com:5432/sales_db +Example: postgresql://user:pass@localhost:5432/mydb +``` + +#### MySQL +``` +Format: mysql+pymysql://username:password@host:port/database +Example: mysql+pymysql://root:password@localhost:3306/inventory +Example: mysql+pymysql://dbuser:pass@192.168.1.10:3306/analytics +``` + +#### SQL Server +``` +Format: mssql+pyodbc://user:pass@host:port/db?driver=ODBC+Driver+XX+for+SQL+Server +Example: mssql+pyodbc://sa:MyPass@localhost:1433/sales?driver=ODBC+Driver+17+for+SQL+Server +Example: mssql+pyodbc://user:pwd@server:1433/db?driver=ODBC+Driver+18+for+SQL+Server +``` + +#### Oracle +``` +Format: oracle+cx_oracle://username:password@host:port/?service_name=service +Example: oracle+cx_oracle://admin:secret@localhost:1521/?service_name=ORCLPDB +Example: oracle+cx_oracle://user:pass@oracledb:1521/?service_name=PROD +``` + +### Database Configuration Parameters + +```yaml +db_connection_string_or_path: string # Path (SQLite) or connection string (others) +db_type: string # 'sqlite', 'postgres', or 'sql' +``` + +**db_type values:** +- `sqlite` - For SQLite databases (uses connect_to_sqlite internally) +- `postgres` or `postgresql` - For PostgreSQL databases (uses connect_to_postgres) +- `sql` - For generic SQL databases via SQLAlchemy (MySQL, SQL Server, Oracle, etc.) + +### SQL Database Troubleshooting + +#### SQLite Issues +**Problem:** `database is locked` error +**Solution:** Close all connections or use WAL mode + +**Problem:** `unable to open database file` +**Solution:** Check file path and permissions + +#### PostgreSQL Issues +**Problem:** `connection refused` +**Solution:** Check PostgreSQL is running: `systemctl status postgresql` + +**Problem:** `authentication failed` +**Solution:** Verify credentials and check pg_hba.conf + +**Problem:** `database does not exist` +**Solution:** Create database: `createdb database_name` + +#### MySQL Issues +**Problem:** `Access denied for user` +**Solution:** Check credentials and user permissions: `GRANT ALL ON db.* TO 'user'@'host'` + +**Problem:** `Can't connect to MySQL server` +**Solution:** Check MySQL is running: `systemctl status mysql` + +#### SQL Server Issues +**Problem:** `Login failed for user` +**Solution:** Check SQL Server authentication mode and user permissions + +**Problem:** `ODBC Driver not found` +**Solution:** Install ODBC Driver: Download from Microsoft + +**Problem:** `SSL Provider: No credentials are available` +**Solution:** Add `TrustServerCertificate=yes` to connection string + +#### Oracle Issues +**Problem:** `ORA-12541: TNS:no listener` +**Solution:** Start Oracle listener: `lsnrctl start` + +**Problem:** `ORA-01017: invalid username/password` +**Solution:** Verify credentials and user exists + +**Problem:** `cx_Oracle.DatabaseError` +**Solution:** Check Oracle client libraries are installed + +### Required Python Packages by Database + +```bash +# SQLite (built-in, no packages needed) +# Already included with Python + +# PostgreSQL +pip install psycopg2-binary + +# MySQL +pip install pymysql sqlalchemy + +# SQL Server +pip install pyodbc sqlalchemy +# Also install: Microsoft ODBC Driver for SQL Server + +# Oracle +pip install cx_Oracle sqlalchemy +# Also install: Oracle Instant Client + +# Generic SQL (covers MySQL, SQL Server, Oracle via SQLAlchemy) +pip install sqlalchemy +``` + +--- + +## Complete Configuration Examples + +### Example 1: SQLite with ChromaDB (Simplest Setup) +```yaml +functions: + - name: simple_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + # Vector store + vector_store_type: chromadb + vector_store_path: ./vanna_vector_store + # Database + db_connection_string_or_path: ./database.db + db_type: sqlite + # Output + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +### Example 2: PostgreSQL with Elasticsearch (Production Setup) +```yaml +functions: + - name: production_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + # Vector store + vector_store_type: elasticsearch + elasticsearch_url: http://elasticsearch:9200 + elasticsearch_username: elastic + elasticsearch_password: changeme + # Database + db_connection_string_or_path: postgresql://dbuser:dbpass@postgres:5432/analytics + db_type: postgres + # Output + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +### Example 3: MySQL with ChromaDB +```yaml +functions: + - name: mysql_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + # Vector store + vector_store_type: chromadb + vector_store_path: ./vanna_vector_store + # Database + db_connection_string_or_path: mysql+pymysql://root:password@localhost:3306/sales + db_type: sql + # Output + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +--- + +## Architecture Notes + +Both vector stores: +- Use the same NVIDIA embedding models +- Store identical training data +- Provide the same vector similarity search +- Are managed automatically by VannaManager +- Support the same training data YAML format + +The tool automatically: +- Detects if vector store needs initialization +- Loads training data from YAML file +- Creates embeddings using NVIDIA models +- Manages vector store lifecycle + +### Performance Tips + +#### ChromaDB +- Keep on SSD for faster I/O +- Regular directory backups +- Monitor disk space + +#### Elasticsearch +- Use SSD-backed storage +- Configure appropriate heap size +- Enable index caching +- Use snapshots for backups +- Monitor cluster health + +--- + +## Quick Reference + +### Configuration Matrix + +| Database | Vector Store | db_type | Connection Format | +|----------|--------------|---------|-------------------| +| SQLite | ChromaDB | sqlite | `./database.db` | +| SQLite | Elasticsearch | sqlite | `./database.db` | +| PostgreSQL | ChromaDB | postgres | `postgresql://user:pass@host:port/db` | +| PostgreSQL | Elasticsearch | postgres | `postgresql://user:pass@host:port/db` | +| MySQL | ChromaDB | sql | `mysql+pymysql://user:pass@host:port/db` | +| MySQL | Elasticsearch | sql | `mysql+pymysql://user:pass@host:port/db` | +| SQL Server | ChromaDB | sql | `mssql+pyodbc://user:pass@host:port/db?driver=...` | +| SQL Server | Elasticsearch | sql | `mssql+pyodbc://user:pass@host:port/db?driver=...` | +| Oracle | ChromaDB | sql | `oracle+cx_oracle://user:pass@host:port/?service_name=...` | +| Oracle | Elasticsearch | sql | `oracle+cx_oracle://user:pass@host:port/?service_name=...` | + +### Recommended Combinations + +| Use Case | Vector Store | Database | Why | +|----------|--------------|----------|-----| +| **Development** | ChromaDB | SQLite | Simplest setup, no servers | +| **Production (Small)** | ChromaDB | PostgreSQL | Reliable, open-source | +| **Production (Large)** | Elasticsearch | PostgreSQL | Scalable, distributed | +| **Enterprise** | Elasticsearch | SQL Server/Oracle | Advanced features, HA | +| **Web App** | ChromaDB | MySQL | Standard web stack | +| **Analytics** | Elasticsearch | PostgreSQL | Complex queries, multi-user | + +### Default Values + +```yaml +vector_store_type: chromadb # Default +elasticsearch_index_name: vanna_vectors # Default ES index +db_type: sqlite # Default +``` + +--- + +## Additional Resources + +For more detailed examples, see: +- **`config_examples.yaml`** - Complete working examples with all combinations of vector stores and databases +- **`vanna_manager.py`** - Implementation details for connection management +- **`vanna_util.py`** - Vector store implementations (ChromaDB and Elasticsearch) diff --git a/industries/predictive_maintenance_agent/configs/config-reasoning.yml b/industries/predictive_maintenance_agent/configs/config-reasoning.yml index cd7b144e..6a9ef3c4 100644 --- a/industries/predictive_maintenance_agent/configs/config-reasoning.yml +++ b/industries/predictive_maintenance_agent/configs/config-reasoning.yml @@ -15,24 +15,24 @@ general: use_uvloop: true - # telemetry: - # logging: - # console: - # _type: console - # level: INFO - # file: - # _type: file - # path: "pdm.log" - # level: DEBUG - # tracing: - # phoenix: - # _type: phoenix - # endpoint: http://localhost:6006/v1/traces - # project: pdm-test - # catalyst: - # _type: catalyst - # project: "pdm-test" - # dataset: "pdm-dataset" + telemetry: + logging: + console: + _type: console + level: INFO + file: + _type: file + path: "pdm.log" + level: DEBUG + tracing: + phoenix: + _type: phoenix + endpoint: http://localhost:6006/v1/traces + project: pdm-demo-day + catalyst: + _type: catalyst + project: "pdm-demo-day" + dataset: "pdm-demo-day" llms: # SQL query generation model @@ -42,13 +42,11 @@ llms: # Data analysis and tool calling model analyst_llm: - _type: nim - model_name: "qwen/qwen2.5-coder-32b-instruct" - # _type: openai - # model_name: "gpt-4.1-mini" + _type: openai + model_name: "gpt-4.1-mini" # Python code generation model - coding_llm: + coding_llm: _type: nim model_name: "qwen/qwen2.5-coder-32b-instruct" @@ -66,15 +64,20 @@ embedders: # Text embedding model for vector database operations vanna_embedder: _type: nim - model_name: "nvidia/nv-embed-v1" + model_name: "nvidia/llama-3.2-nv-embedqa-1b-v2" functions: sql_retriever: _type: generate_sql_query_and_retrieve_tool llm_name: sql_llm embedding_name: vanna_embedder + # Vector store configuration + vector_store_type: chromadb # Optional, chromadb is default vector_store_path: "database" - db_path: "database/nasa_turbo.db" + # Database configuration + db_type: sqlite # Optional, sqlite is default + db_connection_string_or_path: "database/nasa_turbo.db" + # Output configuration output_folder: "output_data" vanna_training_data_path: "vanna_training_data.yaml" @@ -128,8 +131,8 @@ functions: plot_line_chart, plot_comparison, anomaly_detection, - plot_anomaly, - code_generation_assistant + plot_anomaly + # code_generation_assistant ] parse_agent_response_max_retries: 2 system_prompt: | @@ -154,7 +157,7 @@ functions: Executing step: the step you are currently executing from the plan along with any instructions provided Thought: describe how you are going to execute the step Final Answer: the final answer to the original input question including the absolute file paths of the generated files with - `/Users/vikalluru/Documents/GenerativeAIExamples/industries/manufacturing/predictive_maintenance_agent/output_data/` prepended to the filename. + `/Users/vikalluru/Documents/GenerativeAIExamples/industries/predictive_maintenance_agent/output_data/` prepended to the filename. **FORMAT 3 (when using a tool)** Input plan: Summarize all the steps in the plan. diff --git a/industries/predictive_maintenance_agent/pyproject.toml b/industries/predictive_maintenance_agent/pyproject.toml index 5b08d7e0..4d896ac5 100644 --- a/industries/predictive_maintenance_agent/pyproject.toml +++ b/industries/predictive_maintenance_agent/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "pydantic ~= 2.10.0, <2.11.0", "vanna==0.7.9", "chromadb", + "sqlalchemy>=2.0.0", "xgboost", "matplotlib", "torch", @@ -23,6 +24,36 @@ classifiers = ["Programming Language :: Python"] authors = [{ name = "Vineeth Kalluru" }] maintainers = [{ name = "NVIDIA Corporation" }] +[project.optional-dependencies] +elasticsearch = [ + "elasticsearch>=8.0.0" +] +postgres = [ + "psycopg2-binary>=2.9.0" +] +mysql = [ + "pymysql>=1.0.0" +] +sqlserver = [ + "pyodbc>=4.0.0" +] +oracle = [ + "cx_Oracle>=8.0.0" +] +all-databases = [ + "psycopg2-binary>=2.9.0", + "pymysql>=1.0.0", + "pyodbc>=4.0.0", + "cx_Oracle>=8.0.0" +] +all = [ + "elasticsearch>=8.0.0", + "psycopg2-binary>=2.9.0", + "pymysql>=1.0.0", + "pyodbc>=4.0.0", + "cx_Oracle>=8.0.0" +] + [project.entry-points.'nat.components'] predictive_maintenance_agent = "predictive_maintenance_agent.register" diff --git a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/__init__.py b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/__init__.py index 1b79187b..57913e71 100644 --- a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/__init__.py +++ b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.0.0" +__version__ = "1.5.0" diff --git a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/plotting/code_generation_assistant.py b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/plotting/code_generation_assistant.py index 6088df20..0b3ea1a4 100644 --- a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/plotting/code_generation_assistant.py +++ b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/plotting/code_generation_assistant.py @@ -36,7 +36,7 @@ class CodeGenerationAssistantConfig(FunctionBaseConfig, name="code_generation_as code_execution_tool: FunctionRef = Field(description="The code execution tool to run generated code") output_folder: str = Field(description="The path to the output folder for generated files", default="/output_data") verbose: bool = Field(description="Enable verbose logging", default=True) - max_retries: int = Field(description="Maximum number of retries if code execution fails", default=3) + max_retries: int = Field(description="Maximum number of retries if code execution fails", default=0) @register_function(config_type=CodeGenerationAssistantConfig, framework_wrappers=[LLMFrameworkEnum.LANGCHAIN]) @@ -73,15 +73,14 @@ async def _generate_and_execute_code( **UTILITIES AVAILABLE:** A 'utils' folder contains a pre-built function for predictive maintenance tasks: - utils.apply_piecewise_rul_transformation - INPUTS: - - file_path: Path to JSON file with time series data + - DESCRIPTION: Takes an input pandas DataFrame with time series data and create realistic "knee" patterns on the provided RUL column. + - INPUTS: + - df: pandas DataFrame with time series data - maxlife: Maximum life threshold for the piecewise function (default: 100) - time_col: Name of the time/cycle column (default: 'time_in_cycles') - rul_col: Name of the RUL column to transform (default: 'RUL') OUTPUTS: - - pandas DataFrame with original data plus new 'transformed_RUL' column - * Transform RUL data with realistic knee pattern - * Returns a pandas DataFrame with original data plus new 'transformed_RUL' column + - df - a pandas DataFrame with original data plus new 'transformed_RUL' column - utils.show_utilities(): Show all available utilities if you need to see them @@ -93,6 +92,15 @@ async def _generate_and_execute_code( import sys sys.path.append(".") import utils + + + + + +# Saving files to the current working directory.) +print("Original RUL column name: ", rul column name) +print("Transformed RUL column name: ", 'transformed_RUL') +print("File saved to: filename.json") ``` **UTILITY USAGE GUIDELINES:** @@ -217,8 +225,6 @@ def is_code_incomplete(code): fix_prompt_text = f"""The previous code needs to be fixed. Please analyze the issue and generate corrected Python code. -ORIGINAL INSTRUCTIONS: {instructions} - PREVIOUS CODE: {code} @@ -339,7 +345,6 @@ def _clean_generated_code(raw_code: str) -> str: return '\n'.join(clean_lines).strip() - def _extract_file_paths(stdout: str, output_folder: str) -> list: """Extract generated file paths from execution output.""" import re diff --git a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/generate_sql_query_and_retrieve_tool.py b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/generate_sql_query_and_retrieve_tool.py index 319285c5..0785ce45 100644 --- a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/generate_sql_query_and_retrieve_tool.py +++ b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/generate_sql_query_and_retrieve_tool.py @@ -30,12 +30,58 @@ class GenerateSqlQueryAndRetrieveToolConfig(FunctionBaseConfig, name="generate_sql_query_and_retrieve_tool"): """ NeMo Agent Toolkit function to generate SQL queries and retrieve data. + + Supports multiple database types through flexible connection configuration. """ # Runtime configuration parameters llm_name: str = Field(description="The name of the LLM to use for the function.") embedding_name: str = Field(description="The name of the embedding to use for the function.") - vector_store_path: str = Field(description="The path to the vector store to use for the function.") - db_path: str = Field(description="The path to the SQL database to use for the function.") + + # Vector store configuration + vector_store_type: str = Field( + default="chromadb", + description="Type of vector store: 'chromadb' or 'elasticsearch'" + ) + vector_store_path: str = Field( + default=None, + description="Path to ChromaDB vector store (required if vector_store_type='chromadb')" + ) + elasticsearch_url: str = Field( + default=None, + description="Elasticsearch URL (required if vector_store_type='elasticsearch', e.g., 'http://localhost:9200')" + ) + elasticsearch_index_name: str = Field( + default="vanna_vectors", + description="Elasticsearch index name (used if vector_store_type='elasticsearch')" + ) + elasticsearch_username: str = Field( + default=None, + description="Elasticsearch username for basic auth (optional)" + ) + elasticsearch_password: str = Field( + default=None, + description="Elasticsearch password for basic auth (optional)" + ) + elasticsearch_api_key: str = Field( + default=None, + description="Elasticsearch API key for authentication (optional)" + ) + + # Database configuration + db_connection_string_or_path: str = Field( + description=( + "Database connection (path for SQLite, connection string for others). Format depends on db_type:\n" + "- sqlite: Path to .db file (e.g., './database.db')\n" + "- postgres: Connection string (e.g., 'postgresql://user:pass@host:port/db')\n" + "- sql: SQLAlchemy connection string (e.g., 'mysql+pymysql://user:pass@host/db')" + ) + ) + db_type: str = Field( + default="sqlite", + description="Type of database: 'sqlite', 'postgres', or 'sql' (generic SQL via SQLAlchemy)" + ) + + # Output configuration output_folder: str = Field(description="The path to the output folder to use for the function.") vanna_training_data_path: str = Field(description="The path to the YAML file containing Vanna training data.") @@ -106,8 +152,15 @@ class GenerateSqlQueryInputSchema(BaseModel): vanna_manager = VannaManager.create_with_config( vanna_llm_config=vanna_llm_config, vanna_embedder_config=vanna_embedder_config, + vector_store_type=config.vector_store_type, vector_store_path=config.vector_store_path, - db_path=config.db_path, + elasticsearch_url=config.elasticsearch_url, + elasticsearch_index_name=config.elasticsearch_index_name, + elasticsearch_username=config.elasticsearch_username, + elasticsearch_password=config.elasticsearch_password, + elasticsearch_api_key=config.elasticsearch_api_key, + db_connection_string_or_path=config.db_connection_string_or_path, + db_type=config.db_type, training_data_path=config.vanna_training_data_path ) @@ -173,27 +226,27 @@ async def _response_fn(input_question_in_english: str) -> str: import re llm_response = re.sub(r',\[object Object\],?', '', llm_response) - if "save" in llm_response.lower(): - # Clean the question for filename - clean_question = re.sub(r'[^\w\s-]', '', input_question_in_english.lower()) - clean_question = re.sub(r'\s+', '_', clean_question.strip())[:30] - suggested_filename = f"{clean_question}_results.json" - - sql_output_path = os.path.join(config.output_folder, suggested_filename) + # if "save" in llm_response.lower(): + # Clean the question for filename + clean_question = re.sub(r'[^\w\s-]', '', input_question_in_english.lower()) + clean_question = re.sub(r'\s+', '_', clean_question.strip())[:30] + suggested_filename = f"{clean_question}_results.json" - # Save the data to JSON file - os.makedirs(config.output_folder, exist_ok=True) - json_result = df.to_json(orient="records") - with open(sql_output_path, 'w') as f: - json.dump(json.loads(json_result), f, indent=4) + sql_output_path = os.path.join(config.output_folder, suggested_filename) - logger.info(f"Data saved to {sql_output_path}") + # Save the data to JSON file + os.makedirs(config.output_folder, exist_ok=True) + json_result = df.to_json(orient="records") + with open(sql_output_path, 'w') as f: + json.dump(json.loads(json_result), f, indent=4) - llm_response += f"\n\nData has been saved to file: {suggested_filename}" + logger.info(f"Data saved to {sql_output_path}") - return llm_response + llm_response += f"\n\nData has been saved to file: {suggested_filename}" return llm_response + + # return llm_response except Exception as e: return f"Error running SQL query '{sql}': {e}" diff --git a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/vanna_manager.py b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/vanna_manager.py index a5a3a93e..413aba3f 100644 --- a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/vanna_manager.py +++ b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/vanna_manager.py @@ -21,7 +21,7 @@ import threading import hashlib from typing import Dict, Optional -from .vanna_util import NIMVanna, initVanna, CustomEmbeddingFunction +from .vanna_util import NIMVanna, ElasticNIMVanna, initVanna, NVIDIAEmbeddingFunction logger = logging.getLogger(__name__) @@ -33,6 +33,7 @@ class VannaManager: - Singleton pattern to ensure only one instance per configuration - Thread-safe operations - Simple instance management + - Support for multiple database types: SQLite, generic SQL, and PostgreSQL """ _instances: Dict[str, 'VannaManager'] = {} @@ -49,8 +50,31 @@ def __new__(cls, config_key: str): logger.debug(f"VannaManager: Returning existing singleton instance for config: {config_key}") return cls._instances[config_key] - def __init__(self, config_key: str, vanna_llm_config=None, vanna_embedder_config=None, vector_store_path: str = None, db_path: str = None, training_data_path: str = None): - """Initialize the VannaManager and create Vanna instance immediately if all config is provided""" + def __init__(self, config_key: str, vanna_llm_config=None, vanna_embedder_config=None, + vector_store_type: str = "chromadb", vector_store_path: str = None, + elasticsearch_url: str = None, elasticsearch_index_name: str = "vanna_vectors", + elasticsearch_username: str = None, elasticsearch_password: str = None, + elasticsearch_api_key: str = None, + db_connection_string_or_path: str = None, db_type: str = "sqlite", + training_data_path: str = None, nvidia_api_key: str = None): + """Initialize the VannaManager and create Vanna instance immediately if all config is provided + + Args: + config_key: Unique key for this configuration + vanna_llm_config: LLM configuration object + vanna_embedder_config: Embedder configuration object + vector_store_type: Type of vector store - 'chromadb' or 'elasticsearch' + vector_store_path: Path to ChromaDB vector store (required if vector_store_type='chromadb') + elasticsearch_url: Elasticsearch URL (required if vector_store_type='elasticsearch') + elasticsearch_index_name: Elasticsearch index name + elasticsearch_username: Elasticsearch username for basic auth + elasticsearch_password: Elasticsearch password for basic auth + elasticsearch_api_key: Elasticsearch API key + db_connection_string_or_path: Database connection (path for SQLite, connection string for others) + db_type: Type of database - 'sqlite', 'postgres', or 'sql' (generic SQL with SQLAlchemy) + training_data_path: Path to YAML training data file + nvidia_api_key: NVIDIA API key (optional, can use NVIDIA_API_KEY env var) + """ if hasattr(self, '_initialized') and self._initialized: return @@ -60,17 +84,29 @@ def __init__(self, config_key: str, vanna_llm_config=None, vanna_embedder_config # Store configuration self.vanna_llm_config = vanna_llm_config self.vanna_embedder_config = vanna_embedder_config + self.vector_store_type = vector_store_type self.vector_store_path = vector_store_path - self.db_path = db_path + self.elasticsearch_url = elasticsearch_url + self.elasticsearch_index_name = elasticsearch_index_name + self.elasticsearch_username = elasticsearch_username + self.elasticsearch_password = elasticsearch_password + self.elasticsearch_api_key = elasticsearch_api_key + self.db_connection_string_or_path = db_connection_string_or_path + self.db_type = db_type self.training_data_path = training_data_path + self.nvidia_api_key = nvidia_api_key or os.getenv("NVIDIA_API_KEY") # Create and initialize Vanna instance immediately if all required config is provided self.vanna_instance = None - if all([vanna_llm_config, vanna_embedder_config, vector_store_path, db_path]): + has_vector_config = ( + (vector_store_type == "chromadb" and vector_store_path) or + (vector_store_type == "elasticsearch" and elasticsearch_url) + ) + if all([vanna_llm_config, vanna_embedder_config, has_vector_config, self.db_connection_string_or_path]): logger.debug(f"VannaManager: Initializing with immediate Vanna instance creation") self.vanna_instance = self._create_instance() else: - if any([vanna_llm_config, vanna_embedder_config, vector_store_path, db_path]): + if any([vanna_llm_config, vanna_embedder_config, vector_store_path, elasticsearch_url, self.db_connection_string_or_path]): logger.debug(f"VannaManager: Partial configuration provided, Vanna instance will be created later") else: logger.debug(f"VannaManager: No configuration provided, Vanna instance will be created later") @@ -78,7 +114,11 @@ def __init__(self, config_key: str, vanna_llm_config=None, vanna_embedder_config self._initialized = True logger.debug(f"VannaManager initialized for config: {config_key}") - def get_instance(self, vanna_llm_config=None, vanna_embedder_config=None, vector_store_path: str = None, db_path: str = None, training_data_path: str = None) -> NIMVanna: + def get_instance(self, vanna_llm_config=None, vanna_embedder_config=None, + vector_store_type: str = None, vector_store_path: str = None, + elasticsearch_url: str = None, + db_connection_string_or_path: str = None, db_type: str = None, + training_data_path: str = None, nvidia_api_key: str = None): """ Get the Vanna instance. If not created during init, create it now with provided parameters. """ @@ -89,59 +129,125 @@ def get_instance(self, vanna_llm_config=None, vanna_embedder_config=None, vector # Update configuration with provided parameters self.vanna_llm_config = vanna_llm_config or self.vanna_llm_config self.vanna_embedder_config = vanna_embedder_config or self.vanna_embedder_config + self.vector_store_type = vector_store_type or self.vector_store_type self.vector_store_path = vector_store_path or self.vector_store_path - self.db_path = db_path or self.db_path + self.elasticsearch_url = elasticsearch_url or self.elasticsearch_url + self.db_connection_string_or_path = db_connection_string_or_path or self.db_connection_string_or_path + self.db_type = db_type or self.db_type self.training_data_path = training_data_path or self.training_data_path + self.nvidia_api_key = nvidia_api_key or self.nvidia_api_key + + # Check if we have required vector store config + has_vector_config = ( + (self.vector_store_type == "chromadb" and self.vector_store_path) or + (self.vector_store_type == "elasticsearch" and self.elasticsearch_url) + ) - if all([self.vanna_llm_config, self.vanna_embedder_config, self.vector_store_path, self.db_path]): + if all([self.vanna_llm_config, self.vanna_embedder_config, has_vector_config, self.db_connection_string_or_path]): self.vanna_instance = self._create_instance() else: raise RuntimeError("VannaManager: Missing required configuration parameters") else: logger.debug(f"VannaManager: Returning pre-initialized Vanna instance (ID: {id(self.vanna_instance)})") + logger.debug(f"VannaManager: Vector store type: {self.vector_store_type}") # Show vector store status for pre-initialized instances try: - if os.path.exists(self.vector_store_path): - list_of_folders = [d for d in os.listdir(self.vector_store_path) - if os.path.isdir(os.path.join(self.vector_store_path, d))] - logger.debug(f"VannaManager: Vector store contains {len(list_of_folders)} collections/folders") - if list_of_folders: - logger.debug(f"VannaManager: Vector store folders: {list_of_folders}") - else: - logger.debug(f"VannaManager: Vector store directory does not exist") + if self.vector_store_type == "chromadb" and self.vector_store_path: + if os.path.exists(self.vector_store_path): + list_of_folders = [d for d in os.listdir(self.vector_store_path) + if os.path.isdir(os.path.join(self.vector_store_path, d))] + logger.debug(f"VannaManager: ChromaDB contains {len(list_of_folders)} collections/folders") + if list_of_folders: + logger.debug(f"VannaManager: ChromaDB folders: {list_of_folders}") + else: + logger.debug(f"VannaManager: ChromaDB directory does not exist") + elif self.vector_store_type == "elasticsearch": + logger.debug(f"VannaManager: Using Elasticsearch at {self.elasticsearch_url}") except Exception as e: logger.warning(f"VannaManager: Could not check vector store status: {e}") return self.vanna_instance - def _create_instance(self) -> NIMVanna: + def _create_instance(self): """ Create a new Vanna instance using the stored configuration. + Returns NIMVanna (ChromaDB) or ElasticNIMVanna (Elasticsearch) based on vector_store_type. """ logger.info(f"VannaManager: Creating instance for {self.config_key}") - logger.debug(f"VannaManager: Vector store path: {self.vector_store_path}") - logger.debug(f"VannaManager: Database path: {self.db_path}") + logger.debug(f"VannaManager: Vector store type: {self.vector_store_type}") + logger.debug(f"VannaManager: Database connection: {self.db_connection_string_or_path}") + logger.debug(f"VannaManager: Database type: {self.db_type}") logger.debug(f"VannaManager: Training data path: {self.training_data_path}") - # Create instance - vn_instance = NIMVanna( - VectorConfig={ - "client": "persistent", - "path": self.vector_store_path, - "embedding_function": CustomEmbeddingFunction( - api_key=os.getenv("NVIDIA_API_KEY"), - model=self.vanna_embedder_config.model_name) - }, - LLMConfig={ - "api_key": os.getenv("NVIDIA_API_KEY"), - "model": self.vanna_llm_config.model_name - } + # Create embedding function (used by both ChromaDB and Elasticsearch) + embedding_function = NVIDIAEmbeddingFunction( + api_key=self.nvidia_api_key, + model=self.vanna_embedder_config.model_name ) - # Connect to database - logger.debug(f"VannaManager: Connecting to SQLite database...") - vn_instance.connect_to_sqlite(self.db_path) + # LLM configuration (common for both) + llm_config = { + "api_key": self.nvidia_api_key, + "model": self.vanna_llm_config.model_name + } + + # Create instance based on vector store type + if self.vector_store_type == "chromadb": + logger.debug(f"VannaManager: Creating NIMVanna with ChromaDB") + logger.debug(f"VannaManager: ChromaDB path: {self.vector_store_path}") + vn_instance = NIMVanna( + VectorConfig={ + "client": "persistent", + "path": self.vector_store_path, + "embedding_function": embedding_function + }, + LLMConfig=llm_config + ) + elif self.vector_store_type == "elasticsearch": + logger.debug(f"VannaManager: Creating ElasticNIMVanna with Elasticsearch") + logger.debug(f"VannaManager: Elasticsearch URL: {self.elasticsearch_url}") + logger.debug(f"VannaManager: Elasticsearch index: {self.elasticsearch_index_name}") + + # Build Elasticsearch vector config + es_config = { + "url": self.elasticsearch_url, + "index_name": self.elasticsearch_index_name, + "embedding_function": embedding_function + } + + # Add authentication if provided + if self.elasticsearch_api_key: + es_config["api_key"] = self.elasticsearch_api_key + logger.debug("VannaManager: Using Elasticsearch API key authentication") + elif self.elasticsearch_username and self.elasticsearch_password: + es_config["username"] = self.elasticsearch_username + es_config["password"] = self.elasticsearch_password + logger.debug("VannaManager: Using Elasticsearch basic authentication") + + vn_instance = ElasticNIMVanna( + VectorConfig=es_config, + LLMConfig=llm_config + ) + else: + raise ValueError( + f"Unsupported vector store type: {self.vector_store_type}. " + "Supported types: 'chromadb', 'elasticsearch'" + ) + + # Connect to database based on type + logger.debug(f"VannaManager: Connecting to {self.db_type} database...") + if self.db_type == "sqlite": + vn_instance.connect_to_sqlite(self.db_connection_string_or_path) + elif self.db_type == "postgres" or self.db_type == "postgresql": + self._connect_to_postgres(vn_instance, self.db_connection_string_or_path) + elif self.db_type == "sql": + self._connect_to_sql(vn_instance, self.db_connection_string_or_path) + else: + raise ValueError( + f"Unsupported database type: {self.db_type}. " + "Supported types: 'sqlite', 'postgres', 'sql'" + ) # Set configuration - allow LLM to see data for database introspection vn_instance.allow_llm_to_see_data = True @@ -163,29 +269,126 @@ def _create_instance(self) -> NIMVanna: logger.info(f"VannaManager: Instance created successfully") return vn_instance + def _connect_to_postgres(self, vn_instance: NIMVanna, connection_string: str): + """ + Connect to a PostgreSQL database. + + Args: + vn_instance: The Vanna instance to connect + connection_string: PostgreSQL connection string in format: + postgresql://user:password@host:port/database + """ + try: + import psycopg2 + from psycopg2.pool import SimpleConnectionPool + + logger.info("Connecting to PostgreSQL database...") + + # Parse connection string if needed + if connection_string.startswith("postgresql://"): + # Use SQLAlchemy-style connection for Vanna + vn_instance.connect_to_postgres(url=connection_string) + else: + # Assume it's a psycopg2 connection string + vn_instance.connect_to_postgres(url=f"postgresql://{connection_string}") + + logger.info("Successfully connected to PostgreSQL database") + except ImportError: + logger.error( + "psycopg2 is required for PostgreSQL connections. " + "Install it with: pip install psycopg2-binary" + ) + raise + except Exception as e: + logger.error(f"Error connecting to PostgreSQL: {e}") + raise + + def _connect_to_sql(self, vn_instance: NIMVanna, connection_string: str): + """ + Connect to a generic SQL database using SQLAlchemy. + + Args: + vn_instance: The Vanna instance to connect + connection_string: SQLAlchemy-compatible connection string, e.g.: + - MySQL: mysql+pymysql://user:password@host:port/database + - PostgreSQL: postgresql://user:password@host:port/database + - SQL Server: mssql+pyodbc://user:password@host:port/database?driver=ODBC+Driver+17+for+SQL+Server + - Oracle: oracle+cx_oracle://user:password@host:port/?service_name=service + """ + try: + from sqlalchemy import create_engine + + logger.info("Connecting to SQL database via SQLAlchemy...") + + # Create SQLAlchemy engine + engine = create_engine(connection_string) + + # Connect Vanna to the database using the engine + vn_instance.connect_to_sqlalchemy(engine) + + logger.info("Successfully connected to SQL database") + except ImportError: + logger.error( + "SQLAlchemy is required for generic SQL connections. " + "Install it with: pip install sqlalchemy" + ) + raise + except Exception as e: + logger.error(f"Error connecting to SQL database: {e}") + raise + def _needs_initialization(self) -> bool: """ Check if the vector store needs initialization by checking if it's empty. + For ChromaDB: checks directory existence and contents + For Elasticsearch: checks if index exists and has data """ logger.debug(f"VannaManager: Checking if vector store needs initialization...") - logger.debug(f"VannaManager: Vector store path: {self.vector_store_path}") + logger.debug(f"VannaManager: Vector store type: {self.vector_store_type}") try: - if not os.path.exists(self.vector_store_path): - logger.debug(f"VannaManager: Vector store directory does not exist -> needs initialization") - return True - - # Check if there are any subdirectories (ChromaDB creates subdirectories when data is stored) - list_of_folders = [d for d in os.listdir(self.vector_store_path) - if os.path.isdir(os.path.join(self.vector_store_path, d))] - - logger.debug(f"VannaManager: Found {len(list_of_folders)} folders in vector store") - if list_of_folders: - logger.debug(f"VannaManager: Vector store folders: {list_of_folders}") - logger.debug(f"VannaManager: Vector store is populated -> skipping initialization") - return False + if self.vector_store_type == "chromadb": + logger.debug(f"VannaManager: Checking ChromaDB at: {self.vector_store_path}") + + if not os.path.exists(self.vector_store_path): + logger.debug(f"VannaManager: ChromaDB directory does not exist -> needs initialization") + return True + + # Check if there are any subdirectories (ChromaDB creates subdirectories when data is stored) + list_of_folders = [d for d in os.listdir(self.vector_store_path) + if os.path.isdir(os.path.join(self.vector_store_path, d))] + + logger.debug(f"VannaManager: Found {len(list_of_folders)} folders in ChromaDB") + if list_of_folders: + logger.debug(f"VannaManager: ChromaDB folders: {list_of_folders}") + logger.debug(f"VannaManager: ChromaDB is populated -> skipping initialization") + return False + else: + logger.debug(f"VannaManager: ChromaDB is empty -> needs initialization") + return True + + elif self.vector_store_type == "elasticsearch": + logger.debug(f"VannaManager: Checking Elasticsearch at: {self.elasticsearch_url}") + + # For Elasticsearch, check if training data is available in the instance + # This is a simplified check - we assume if we can connect, we should initialize if no training data exists + try: + if hasattr(self.vanna_instance, 'get_training_data'): + training_data = self.vanna_instance.get_training_data() + if training_data and len(training_data) > 0: + logger.debug(f"VannaManager: Elasticsearch has {len(training_data)} training data entries -> skipping initialization") + return False + else: + logger.debug(f"VannaManager: Elasticsearch has no training data -> needs initialization") + return True + else: + logger.debug(f"VannaManager: Cannot check Elasticsearch training data -> needs initialization") + return True + except Exception as e: + logger.debug(f"VannaManager: Error checking Elasticsearch data ({e}) -> needs initialization") + return True else: - logger.debug(f"VannaManager: Vector store is empty -> needs initialization") + logger.warning(f"VannaManager: Unknown vector store type: {self.vector_store_type}") return True except Exception as e: @@ -233,16 +436,42 @@ def get_stats(self) -> Dict: return { "config_key": self.config_key, "instance_id": id(self.vanna_instance) if self.vanna_instance else None, - "has_instance": self.vanna_instance is not None + "has_instance": self.vanna_instance is not None, + "db_type": self.db_type, } @classmethod - def create_with_config(cls, vanna_llm_config, vanna_embedder_config, vector_store_path: str, db_path: str, training_data_path: str = None): + def create_with_config(cls, vanna_llm_config, vanna_embedder_config, + vector_store_type: str = "chromadb", vector_store_path: str = None, + elasticsearch_url: str = None, elasticsearch_index_name: str = "vanna_vectors", + elasticsearch_username: str = None, elasticsearch_password: str = None, + elasticsearch_api_key: str = None, + db_connection_string_or_path: str = None, db_type: str = "sqlite", + training_data_path: str = None, nvidia_api_key: str = None): """ Class method to create a VannaManager with full configuration. Uses create_config_key to ensure singleton behavior based on configuration. + + Args: + vanna_llm_config: LLM configuration object + vanna_embedder_config: Embedder configuration object + vector_store_type: Type of vector store - 'chromadb' or 'elasticsearch' + vector_store_path: Path to ChromaDB vector store (required if vector_store_type='chromadb') + elasticsearch_url: Elasticsearch URL (required if vector_store_type='elasticsearch') + elasticsearch_index_name: Elasticsearch index name + elasticsearch_username: Elasticsearch username for basic auth + elasticsearch_password: Elasticsearch password for basic auth + elasticsearch_api_key: Elasticsearch API key + db_connection_string_or_path: Database connection (path for SQLite, connection string for others) + db_type: Type of database - 'sqlite', 'postgres', or 'sql' + training_data_path: Path to YAML training data file + nvidia_api_key: NVIDIA API key (optional) """ - config_key = create_config_key(vanna_llm_config, vanna_embedder_config, vector_store_path, db_path) + config_key = create_config_key( + vanna_llm_config, vanna_embedder_config, + vector_store_type, vector_store_path, elasticsearch_url, + db_connection_string_or_path, db_type + ) # Create instance with just config_key (singleton pattern) instance = cls(config_key) @@ -251,9 +480,17 @@ def create_with_config(cls, vanna_llm_config, vanna_embedder_config, vector_stor if not hasattr(instance, 'vanna_llm_config') or instance.vanna_llm_config is None: instance.vanna_llm_config = vanna_llm_config instance.vanna_embedder_config = vanna_embedder_config + instance.vector_store_type = vector_store_type instance.vector_store_path = vector_store_path - instance.db_path = db_path + instance.elasticsearch_url = elasticsearch_url + instance.elasticsearch_index_name = elasticsearch_index_name + instance.elasticsearch_username = elasticsearch_username + instance.elasticsearch_password = elasticsearch_password + instance.elasticsearch_api_key = elasticsearch_api_key + instance.db_connection_string_or_path = db_connection_string_or_path + instance.db_type = db_type instance.training_data_path = training_data_path + instance.nvidia_api_key = nvidia_api_key # Create Vanna instance immediately if all config is available if instance.vanna_instance is None: @@ -262,9 +499,24 @@ def create_with_config(cls, vanna_llm_config, vanna_embedder_config, vector_stor return instance -def create_config_key(vanna_llm_config, vanna_embedder_config, vector_store_path: str, db_path: str) -> str: +def create_config_key(vanna_llm_config, vanna_embedder_config, + vector_store_type: str, vector_store_path: str, elasticsearch_url: str, + db_connection_string_or_path: str, db_type: str = "sqlite") -> str: """ Create a unique configuration key for the VannaManager singleton. + + Args: + vanna_llm_config: LLM configuration object + vanna_embedder_config: Embedder configuration object + vector_store_type: Type of vector store + vector_store_path: Path to ChromaDB vector store + elasticsearch_url: Elasticsearch URL + db_connection_string_or_path: Database connection (path for SQLite, connection string for others) + db_type: Type of database + + Returns: + str: Unique configuration key """ - config_str = f"{vanna_llm_config.model_name}_{vanna_embedder_config.model_name}_{vector_store_path}_{db_path}" + vector_id = vector_store_path if vector_store_type == "chromadb" else elasticsearch_url + config_str = f"{vanna_llm_config.model_name}_{vanna_embedder_config.model_name}_{vector_store_type}_{vector_id}_{db_connection_string_or_path}_{db_type}" return hashlib.md5(config_str.encode()).hexdigest()[:12] diff --git a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/vanna_util.py b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/vanna_util.py index 2c90fd85..f4764e55 100644 --- a/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/vanna_util.py +++ b/industries/predictive_maintenance_agent/src/predictive_maintenance_agent/retrievers/vanna_util.py @@ -13,12 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from vanna.chromadb import ChromaDB_VectorStore -from vanna.base import VannaBase -from langchain_nvidia import ChatNVIDIA +"""Vanna utilities for SQL generation using NVIDIA NIM services.""" + +import logging + +from langchain_nvidia import ChatNVIDIA, NVIDIAEmbeddings from tqdm import tqdm +from vanna.base import VannaBase +from vanna.chromadb import ChromaDB_VectorStore + +logger = logging.getLogger(__name__) class NIMCustomLLM(VannaBase): + """Custom LLM implementation for Vanna using NVIDIA NIM.""" + def __init__(self, config=None): VannaBase.__init__(self, config=config) @@ -27,10 +35,10 @@ def __init__(self, config=None): # default parameters - can be overrided using config self.temperature = 0.7 - + if "temperature" in config: self.temperature = config["temperature"] - + # If only config is passed if "api_key" not in config: raise ValueError("config must contain a NIM api_key") @@ -40,7 +48,7 @@ def __init__(self, config=None): api_key = config["api_key"] model = config["model"] - + # Initialize ChatNVIDIA client self.client = ChatNVIDIA( api_key=api_key, @@ -49,16 +57,23 @@ def __init__(self, config=None): ) self.model = model - def system_message(self, message: str) -> any: - return {"role": "system", "content": message+"\n DO NOT PRODUCE MARKDOWN, ONLY RESPOND IN PLAIN TEXT"} + def system_message(self, message: str) -> dict: + """Create a system message.""" + return { + "role": "system", + "content": message + "\n DO NOT PRODUCE MARKDOWN, ONLY RESPOND IN PLAIN TEXT", + } - def user_message(self, message: str) -> any: + def user_message(self, message: str) -> dict: + """Create a user message.""" return {"role": "user", "content": message} - def assistant_message(self, message: str) -> any: + def assistant_message(self, message: str) -> dict: + """Create an assistant message.""" return {"role": "assistant", "content": message} def submit_prompt(self, prompt, **kwargs) -> str: + """Submit a prompt to the LLM.""" if prompt is None: raise Exception("Prompt is None") @@ -70,60 +85,547 @@ def submit_prompt(self, prompt, **kwargs) -> str: num_tokens = 0 for message in prompt: num_tokens += len(message["content"]) / 4 - print(f"Using model {self.model} for {num_tokens} tokens (approx)") - - response = self.client.invoke(prompt) - return response.content + logger.debug(f"Using model {self.model} for {num_tokens} tokens (approx)") + + logger.debug(f"Submitting prompt with {len(prompt)} messages") + logger.debug(f"Prompt content preview: {str(prompt)[:500]}...") + + try: + response = self.client.invoke(prompt) + logger.debug(f"Response type: {type(response)}") + logger.debug(f"Response content type: {type(response.content)}") + logger.debug( + f"Response content length: {len(response.content) if response.content else 0}" + ) + logger.debug( + f"Response content preview: {response.content[:200] if response.content else 'None'}..." + ) + return response.content + except Exception as e: + logger.error(f"Error in submit_prompt: {e}") + logger.error(f"Error type: {type(e)}") + import traceback + + logger.error(f"Full traceback: {traceback.format_exc()}") + raise class NIMVanna(ChromaDB_VectorStore, NIMCustomLLM): - def __init__(self, VectorConfig = None, LLMConfig = None): + """Vanna implementation using NVIDIA NIM for LLM and ChromaDB for vector storage.""" + + def __init__(self, VectorConfig=None, LLMConfig=None): ChromaDB_VectorStore.__init__(self, config=VectorConfig) NIMCustomLLM.__init__(self, config=LLMConfig) + + +class ElasticVectorStore(VannaBase): + """ + Elasticsearch-based vector store for Vanna. + + This class provides vector storage and retrieval capabilities using Elasticsearch's + dense_vector field type and kNN search functionality. + + Configuration: + config: Dictionary with the following keys: + - url: Elasticsearch connection URL (e.g., "http://localhost:9200") + - index_name: Name of the Elasticsearch index to use (default: "vanna_vectors") + - api_key: Optional API key for authentication + - username: Optional username for basic auth + - password: Optional password for basic auth + - embedding_function: Function to generate embeddings (required) + """ + + def __init__(self, config=None): + VannaBase.__init__(self, config=config) + + if not config: + raise ValueError("config must be passed for ElasticVectorStore") + + # Elasticsearch connection parameters + self.url = config.get("url", "http://localhost:9200") + self.index_name = config.get("index_name", "vanna_vectors") + self.api_key = config.get("api_key") + self.username = config.get("username") + self.password = config.get("password") + + # Embedding function (required) + if "embedding_function" not in config: + raise ValueError("embedding_function must be provided in config") + self.embedding_function = config["embedding_function"] + + # Initialize Elasticsearch client + self._init_elasticsearch_client() + + # Create index if it doesn't exist + self._create_index_if_not_exists() + + logger.info(f"ElasticVectorStore initialized with index: {self.index_name}") + + def _init_elasticsearch_client(self): + """Initialize the Elasticsearch client with authentication.""" + try: + from elasticsearch import Elasticsearch + except ImportError: + raise ImportError( + "elasticsearch package is required for ElasticVectorStore. " + "Install it with: pip install elasticsearch" + ) + + # Build client kwargs + client_kwargs = {} + + if self.api_key: + client_kwargs["api_key"] = self.api_key + elif self.username and self.password: + client_kwargs["basic_auth"] = (self.username, self.password) + + self.es_client = Elasticsearch(self.url, **client_kwargs) + + # Test connection (try but don't fail if ping doesn't work) + try: + if self.es_client.ping(): + logger.info(f"Successfully connected to Elasticsearch at {self.url}") + else: + logger.warning(f"Elasticsearch ping failed, but will try to proceed at {self.url}") + except Exception as e: + logger.warning(f"Elasticsearch ping check failed ({e}), but will try to proceed") + + def _create_index_if_not_exists(self): + """Create the Elasticsearch index with appropriate mappings if it doesn't exist.""" + if self.es_client.indices.exists(index=self.index_name): + logger.debug(f"Index {self.index_name} already exists") + return + + # Get embedding dimension by creating a test embedding + test_embedding = self._generate_embedding("test") + embedding_dim = len(test_embedding) + + # Index mapping with dense_vector field for embeddings + index_mapping = { + "mappings": { + "properties": { + "id": {"type": "keyword"}, + "text": {"type": "text"}, + "embedding": { + "type": "dense_vector", + "dims": embedding_dim, + "index": True, + "similarity": "cosine" + }, + "metadata": {"type": "object", "enabled": True}, + "type": {"type": "keyword"}, # ddl, documentation, sql + "created_at": {"type": "date"} + } + } + } + + self.es_client.indices.create(index=self.index_name, body=index_mapping) + logger.info(f"Created Elasticsearch index: {self.index_name}") + + def _generate_embedding(self, text: str) -> list[float]: + """Generate embedding for a given text using the configured embedding function.""" + if hasattr(self.embedding_function, 'embed_query'): + # NVIDIA embedding function returns [[embedding]] + result = self.embedding_function.embed_query(text) + if isinstance(result, list) and len(result) > 0: + if isinstance(result[0], list): + return result[0] # Extract the inner list + return result # type: ignore[return-value] + return result # type: ignore[return-value] + elif callable(self.embedding_function): + # Generic callable + result = self.embedding_function(text) + if isinstance(result, list) and len(result) > 0: + if isinstance(result[0], list): + return result[0] + return result # type: ignore[return-value] + return result # type: ignore[return-value] + else: + raise ValueError("embedding_function must be callable or have embed_query method") + + def add_ddl(self, ddl: str, **kwargs) -> str: + """ + Add a DDL statement to the vector store. + + Args: + ddl: The DDL statement to store + **kwargs: Additional metadata + + Returns: + Document ID + """ + import hashlib + from datetime import datetime + + # Generate document ID + doc_id = hashlib.md5(ddl.encode()).hexdigest() + + # Generate embedding + embedding = self._generate_embedding(ddl) + + # Create document + doc = { + "id": doc_id, + "text": ddl, + "embedding": embedding, + "type": "ddl", + "metadata": kwargs, + "created_at": datetime.utcnow().isoformat() + } + + # Index document + self.es_client.index(index=self.index_name, id=doc_id, document=doc) + logger.debug(f"Added DDL to Elasticsearch: {doc_id}") + + return doc_id + + def add_documentation(self, documentation: str, **kwargs) -> str: + """ + Add documentation to the vector store. + + Args: + documentation: The documentation text to store + **kwargs: Additional metadata + + Returns: + Document ID + """ + import hashlib + from datetime import datetime + + doc_id = hashlib.md5(documentation.encode()).hexdigest() + embedding = self._generate_embedding(documentation) + + doc = { + "id": doc_id, + "text": documentation, + "embedding": embedding, + "type": "documentation", + "metadata": kwargs, + "created_at": datetime.utcnow().isoformat() + } + + self.es_client.index(index=self.index_name, id=doc_id, document=doc) + logger.debug(f"Added documentation to Elasticsearch: {doc_id}") + + return doc_id -class CustomEmbeddingFunction: + def add_question_sql(self, question: str, sql: str, **kwargs) -> str: + """ + Add a question-SQL pair to the vector store. + + Args: + question: The natural language question + sql: The corresponding SQL query + **kwargs: Additional metadata + + Returns: + Document ID + """ + import hashlib + from datetime import datetime + + # Combine question and SQL for embedding + combined_text = f"Question: {question}\nSQL: {sql}" + doc_id = hashlib.md5(combined_text.encode()).hexdigest() + embedding = self._generate_embedding(question) + + doc = { + "id": doc_id, + "text": combined_text, + "embedding": embedding, + "type": "sql", + "metadata": { + "question": question, + "sql": sql, + **kwargs + }, + "created_at": datetime.utcnow().isoformat() + } + + self.es_client.index(index=self.index_name, id=doc_id, document=doc) + logger.debug(f"Added question-SQL pair to Elasticsearch: {doc_id}") + + return doc_id + + def get_similar_question_sql(self, question: str, **kwargs) -> list: + """ + Retrieve similar question-SQL pairs using vector similarity search. + + Args: + question: The question to find similar examples for + **kwargs: Additional parameters (e.g., top_k) + + Returns: + List of similar documents + """ + top_k = kwargs.get("top_k", 10) + + # Generate query embedding + query_embedding = self._generate_embedding(question) + + # Build kNN search query + search_query = { + "knn": { + "field": "embedding", + "query_vector": query_embedding, + "k": top_k, + "num_candidates": top_k * 2, + "filter": {"term": {"type": "sql"}} + }, + "_source": ["text", "metadata", "type"] + } + + # Execute search + response = self.es_client.search(index=self.index_name, body=search_query) + + # Extract results + results = [] + for hit in response["hits"]["hits"]: + source = hit["_source"] + results.append({ + "question": source["metadata"].get("question", ""), + "sql": source["metadata"].get("sql", ""), + "score": hit["_score"] + }) + + logger.debug(f"Found {len(results)} similar question-SQL pairs") + return results + + def get_related_ddl(self, question: str, **kwargs) -> list: + """ + Retrieve related DDL statements using vector similarity search. + + Args: + question: The question to find related DDL for + **kwargs: Additional parameters (e.g., top_k) + + Returns: + List of related DDL statements + """ + top_k = kwargs.get("top_k", 10) + query_embedding = self._generate_embedding(question) + + search_query = { + "knn": { + "field": "embedding", + "query_vector": query_embedding, + "k": top_k, + "num_candidates": top_k * 2, + "filter": {"term": {"type": "ddl"}} + }, + "_source": ["text"] + } + + response = self.es_client.search(index=self.index_name, body=search_query) + + results = [hit["_source"]["text"] for hit in response["hits"]["hits"]] + logger.debug(f"Found {len(results)} related DDL statements") + return results + + def get_related_documentation(self, question: str, **kwargs) -> list: + """ + Retrieve related documentation using vector similarity search. + + Args: + question: The question to find related documentation for + **kwargs: Additional parameters (e.g., top_k) + + Returns: + List of related documentation + """ + top_k = kwargs.get("top_k", 10) + query_embedding = self._generate_embedding(question) + + search_query = { + "knn": { + "field": "embedding", + "query_vector": query_embedding, + "k": top_k, + "num_candidates": top_k * 2, + "filter": {"term": {"type": "documentation"}} + }, + "_source": ["text"] + } + + response = self.es_client.search(index=self.index_name, body=search_query) + + results = [hit["_source"]["text"] for hit in response["hits"]["hits"]] + logger.debug(f"Found {len(results)} related documentation entries") + return results + + def remove_training_data(self, id: str, **kwargs) -> bool: + """ + Remove a training data entry by ID. + + Args: + id: The document ID to remove + **kwargs: Additional parameters + + Returns: + True if successful + """ + try: + self.es_client.delete(index=self.index_name, id=id) + logger.debug(f"Removed training data: {id}") + return True + except Exception as e: + logger.error(f"Error removing training data {id}: {e}") + return False + + def generate_embedding(self, data: str, **kwargs) -> list[float]: + """ + Generate embedding for given data (required by Vanna base class). + + Args: + data: Text to generate embedding for + **kwargs: Additional parameters + + Returns: + Embedding vector + """ + return self._generate_embedding(data) + + def get_training_data(self, **kwargs) -> list: + """ + Get all training data from the vector store (required by Vanna base class). + + Args: + **kwargs: Additional parameters + + Returns: + List of training data entries + """ + try: + # Query all documents + query = { + "query": {"match_all": {}}, + "size": 10000 # Adjust based on expected data size + } + + response = self.es_client.search(index=self.index_name, body=query) + + training_data = [] + for hit in response["hits"]["hits"]: + source = hit["_source"] + training_data.append({ + "id": hit["_id"], + "type": source.get("type"), + "text": source.get("text"), + "metadata": source.get("metadata", {}) + }) + + return training_data + except Exception as e: + logger.error(f"Error getting training data: {e}") + return [] + + +class ElasticNIMVanna(ElasticVectorStore, NIMCustomLLM): + """ + Vanna implementation using NVIDIA NIM for LLM and Elasticsearch for vector storage. + + This class combines ElasticVectorStore for vector operations with NIMCustomLLM + for SQL generation, providing an alternative to ChromaDB-based storage. + + Example: + >>> vanna = ElasticNIMVanna( + ... VectorConfig={ + ... "url": "http://localhost:9200", + ... "index_name": "my_sql_vectors", + ... "username": "elastic", + ... "password": "changeme", + ... "embedding_function": NVIDIAEmbeddingFunction( + ... api_key="your-api-key", + ... model="nvidia/llama-3.2-nv-embedqa-1b-v2" + ... ) + ... }, + ... LLMConfig={ + ... "api_key": "your-api-key", + ... "model": "meta/llama-3.1-70b-instruct" + ... } + ... ) + """ + + def __init__(self, VectorConfig=None, LLMConfig=None): + ElasticVectorStore.__init__(self, config=VectorConfig) + NIMCustomLLM.__init__(self, config=LLMConfig) + + +class NVIDIAEmbeddingFunction: """ A class that can be used as a replacement for chroma's DefaultEmbeddingFunction. It takes in input (text or list of texts) and returns embeddings using NVIDIA's API. + + This class fixes two major interface compatibility issues between ChromaDB and NVIDIA embeddings: + + 1. INPUT FORMAT MISMATCH: + - ChromaDB passes ['query text'] (list) to embed_query() + - But langchain_nvidia's embed_query() expects 'query text' (string) + - When list is passed, langchain does [text] internally → [['query text']] → API 500 error + - FIX: Detect list input and extract string before calling langchain + + 2. OUTPUT FORMAT MISMATCH: + - ChromaDB expects embed_query() to return [[embedding_vector]] (list of embeddings) + - But langchain returns [embedding_vector] (single embedding vector) + - This causes: TypeError: 'float' object cannot be converted to 'Sequence' + - FIX: Wrap single embedding in list: return [embeddings] """ - def __init__(self, api_key, model="nvidia/nv-embedqa-e5-v5"): + def __init__(self, api_key, model="nvidia/llama-3.2-nv-embedqa-1b-v2"): """ Initialize the embedding function with the API key and model name. Parameters: - api_key (str): The API key for authentication. - - model (str): The model name to use for embeddings (default is "nvidia/nv-embedqa-e5-v5"). + - model (str): The model name to use for embeddings. + Default: nvidia/llama-3.2-nv-embedqa-1b-v2 (tested and working) """ - from langchain_nvidia import NVIDIAEmbeddings - + self.api_key = api_key + self.model = model + + logger.info(f"Initializing NVIDIA embeddings with model: {model}") + logger.debug(f"API key length: {len(api_key) if api_key else 0}") + self.embeddings = NVIDIAEmbeddings( - api_key=api_key, - model_name=model, - input_type="query", - truncate="NONE" + api_key=api_key, model_name=model, input_type="query", truncate="NONE" ) + logger.info("Successfully initialized NVIDIA embeddings") def __call__(self, input): """ Call method to make the object callable, as required by chroma's EmbeddingFunction interface. + NOTE: This method is used by ChromaDB for batch embedding operations. + The embed_query() method above handles the single query case with the critical fixes. + Parameters: - input (str or list): The input data for which embeddings need to be generated. Returns: - embedding (list): The embedding vector(s) for the input data. """ - # Ensure input is a list, as required by the API - input_data = [input] if isinstance(input, str) else input - - # Generate embeddings + logger.debug(f"__call__ method called with input type: {type(input)}") + logger.debug(f"__call__ input: {input}") + + # Ensure input is a list, as required by ChromaDB + if isinstance(input, str): + input_data = [input] + else: + input_data = input + + logger.debug(f"Processing {len(input_data)} texts for embedding") + + # Generate embeddings for each text embeddings = [] - for text in input_data: + for i, text in enumerate(input_data): + logger.debug(f"Embedding text {i+1}/{len(input_data)}: {text[:50]}...") embedding = self.embeddings.embed_query(text) embeddings.append(embedding) - - return embeddings[0] if len(embeddings) == 1 and isinstance(input, str) else embeddings - + + logger.debug(f"Generated {len(embeddings)} embeddings") + # Always return a list of embeddings for ChromaDB + return embeddings + def name(self): """ Returns a custom name for the embedding function. @@ -132,192 +634,78 @@ def name(self): str: The name of the embedding function. """ return "NVIDIA Embedding Function" - -def initVannaBackup(vn): - """ - Backup initialization function for Vanna with hardcoded NASA Turbofan Engine training data. - - This function provides the original hardcoded training approach for NASA Turbofan Engine - predictive maintenance queries. Use this as a fallback if the JSON-based training fails. - - Args: - vn: Vanna instance to be trained and configured - - Returns: - None: Modifies the Vanna instance in-place - - Example: - >>> from vanna.chromadb import ChromaDB_VectorStore - >>> vn = NIMCustomLLM(config) & ChromaDB_VectorStore() - >>> vn.connect_to_sqlite("path/to/nasa_turbo.db") - >>> initVannaBackup(vn) - >>> # Vanna is now ready with hardcoded NASA Turbofan training - """ - import json - import os - - # Get and train DDL from sqlite_master - df_ddl = vn.run_sql("SELECT type, sql FROM sqlite_master WHERE sql is not null") - for ddl in df_ddl['sql'].to_list(): - vn.train(ddl=ddl) - - # Fallback to default NASA Turbofan training - fd_datasets = ["FD001", "FD002", "FD003", "FD004"] - for fd in fd_datasets: - vn.train(ddl=f""" - CREATE TABLE IF NOT EXISTS RUL_{fd} ( - "unit_number" INTEGER, - "RUL" INTEGER + + def embed_query(self, input: str) -> list[list[float]]: + """ + Generate embeddings for a single query. + + ChromaDB calls this method with ['query text'] (list) but langchain_nvidia expects 'query text' (string). + We must extract the string from the list to prevent API 500 errors. + + ChromaDB expects this method to return [[embedding_vector]] (list of embeddings) + but langchain returns [embedding_vector] (single embedding). We wrap it in a list. + """ + logger.debug(f"Embedding query: {input}") + logger.debug(f"Input type: {type(input)}") + logger.debug(f"Using model: {self.model}") + + # Handle ChromaDB's list input format + # ChromaDB sometimes passes a list instead of a string + # Extract the string from the list if needed + if isinstance(input, list): + if len(input) == 1: + query_text = input[0] + logger.debug(f"Extracted string from list: {query_text}") + else: + logger.error(f"Unexpected list length: {len(input)}") + raise ValueError( + f"Expected single string or list with one element, got list with {len(input)} elements" + ) + else: + query_text = input + + try: + # Call langchain_nvidia with the extracted string + embeddings = self.embeddings.embed_query(query_text) + logger.debug( + f"Successfully generated embeddings of length: {len(embeddings) if embeddings else 0}" ) - """) - - sensor_columns = """ - "unit_number" INTEGER, - "time_in_cycles" INTEGER, - "operational_setting_1" REAL, - "operational_setting_2" REAL, - "operational_setting_3" REAL, - "sensor_measurement_1" REAL, - "sensor_measurement_2" REAL, - "sensor_measurement_3" REAL, - "sensor_measurement_4" REAL, - "sensor_measurement_5" REAL, - "sensor_measurement_6" REAL, - "sensor_measurement_7" REAL, - "sensor_measurement_8" REAL, - "sensor_measurement_9" REAL, - "sensor_measurement_10" REAL, - "sensor_measurement_11" REAL, - "sensor_measurement_12" REAL, - "sensor_measurement_13" REAL, - "sensor_measurement_14" REAL, - "sensor_measurement_15" REAL, - "sensor_measurement_16" REAL, - "sensor_measurement_17" INTEGER, - "sensor_measurement_18" INTEGER, - "sensor_measurement_19" REAL, - "sensor_measurement_20" REAL, - "sensor_measurement_21" REAL - """ - for fd in fd_datasets: - vn.train(ddl=f"CREATE TABLE IF NOT EXISTS train_{fd} ({sensor_columns})") - vn.train(ddl=f"CREATE TABLE IF NOT EXISTS test_{fd} ({sensor_columns})") - - # Default documentation for NASA Turbofan - dataset_documentation = """ - This SQL database contains train and test splits of four different datasets: FD001, FD002, FD003, FD004. - Each dataset consists of multiple multivariate time series from different engines of the same type. - - DATABASE STRUCTURE: - The data is organized into separate tables for each dataset: - - Training Tables: train_FD001, train_FD002, train_FD003, train_FD004 - Test Tables: test_FD001, test_FD002, test_FD003, test_FD004 - RUL Tables: RUL_FD001, RUL_FD002, RUL_FD003, RUL_FD004 - - Each training and test table contains 26 columns with identical structure: - - unit_number: INTEGER - Identifier for each engine unit - - time_in_cycles: INTEGER - Time step in operational cycles - - operational_setting_1: REAL - First operational setting affecting performance - - operational_setting_2: REAL - Second operational setting affecting performance - - operational_setting_3: REAL - Third operational setting affecting performance - - sensor_measurement_1 through sensor_measurement_21: REAL/INTEGER - Twenty-one sensor measurements - - Each RUL table contains 2 columns: - - unit_number: INTEGER - Engine unit identifier - - RUL: INTEGER - Remaining Useful Life value for that test unit - - QUERY PATTERNS: - - Table References: - - "train_FD001" or "dataset train_FD001" → Use table train_FD001 - - "test_FD002" or "dataset test_FD002" → Use table test_FD002 - - "FD003" (without train/test prefix) → Determine from context whether to use train_FD003 or test_FD003 - - For RUL queries: Use specific RUL table (RUL_FD001, RUL_FD002, RUL_FD003, or RUL_FD004) - - Counting Patterns: - - "How many units" → Use COUNT(DISTINCT unit_number) to count unique engines - - "How many records/data points/measurements/entries/rows" → Use COUNT(*) to count all records - - RUL Handling (CRITICAL DISTINCTION): - - 1. GROUND TRUTH RUL (for test data): - - Use when query asks for "actual RUL", "true RUL", "ground truth", or "what is the RUL" - - Query specific RUL table: SELECT RUL FROM RUL_FD001 WHERE unit_number=N - - For time-series with ground truth: ((SELECT MAX(time_in_cycles) FROM test_FDxxx WHERE unit_number=N) + (SELECT RUL FROM RUL_FDxxx WHERE unit_number=N) - time_in_cycles) - - 2. PREDICTED/CALCULATED RUL (for training data or prediction requests): - - Use when query asks to "predict RUL", "calculate RUL", "estimate RUL", or "find RUL" for training data - - For training data: Calculate as remaining cycles until failure = (MAX(time_in_cycles) - current_time_in_cycles + 1) - - Training RUL query: SELECT unit_number, time_in_cycles, (MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles + 1) AS predicted_RUL FROM train_FDxxx - - DEFAULT BEHAVIOR: If unclear, assume user wants PREDICTION (since this is more common) - - Column Names (consistent across all training and test tables): - - unit_number: Engine identifier - - time_in_cycles: Time step - - operational_setting_1, operational_setting_2, operational_setting_3: Operational settings - - sensor_measurement_1, sensor_measurement_2, ..., sensor_measurement_21: Sensor readings - - IMPORTANT NOTES: - - Each dataset (FD001, FD002, FD003, FD004) has its own separate RUL table - - RUL tables do NOT have a 'dataset' column - they are dataset-specific by table name - - Training tables contain data until engine failure - - Test tables contain data that stops before failure - - RUL tables provide the actual remaining cycles for test units - - ENGINE OPERATION CONTEXT: - Each engine starts with different degrees of initial wear and manufacturing variation. - The engine operates normally at the start of each time series and develops a fault at some point during the series. - In the training set, the fault grows in magnitude until system failure. - In the test set, the time series ends some time prior to system failure. - The objective is to predict the number of remaining operational cycles before failure in the test set. - """ - vn.train(documentation=dataset_documentation) + # Wrap single embedding in list for ChromaDB compatibility + # ChromaDB expects a list of embeddings, even for a single query + return [embeddings] + except Exception as e: + logger.error(f"Error generating embeddings for query: {e}") + logger.error(f"Error type: {type(e)}") + logger.error(f"Query text: {query_text}") + import traceback - # Default training for NASA Turbofan - queries = [ - # 1. JOIN pattern between training and RUL tables - "SELECT t.unit_number, t.time_in_cycles, t.operational_setting_1, r.RUL FROM train_FD001 AS t JOIN RUL_FD001 AS r ON t.unit_number = r.unit_number WHERE t.unit_number = 1 ORDER BY t.time_in_cycles", - - # 2. Aggregation with multiple statistical functions - "SELECT unit_number, AVG(sensor_measurement_1) AS avg_sensor1, MAX(sensor_measurement_2) AS max_sensor2, MIN(sensor_measurement_3) AS min_sensor3 FROM train_FD002 GROUP BY unit_number", - - # 3. Test table filtering with time-based conditions - "SELECT * FROM test_FD003 WHERE time_in_cycles > 50 AND sensor_measurement_1 > 500 ORDER BY unit_number, time_in_cycles", - - # 4. Window function for predicted RUL calculation on training data - "SELECT unit_number, time_in_cycles, (MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles + 1) AS predicted_RUL FROM train_FD004 WHERE unit_number <= 3 ORDER BY unit_number, time_in_cycles", - - # 5. Direct RUL table query with filtering - "SELECT unit_number, RUL FROM RUL_FD001 WHERE RUL > 100 ORDER BY RUL DESC" - ] + logger.error(f"Full traceback: {traceback.format_exc()}") + raise - for query in tqdm(queries, desc="Training NIMVanna"): - vn.train(sql=query) + def embed_documents(self, input: list[str]) -> list[list[float]]: + """ + Generate embeddings for multiple documents. + + This function expects a list of strings. If it's a list of lists of strings, flatten it to handle cases + where the input is unexpectedly nested. + """ + logger.debug(f"Embedding {len(input)} documents...") + logger.debug(f"Using model: {self.model}") + + try: + embeddings = self.embeddings.embed_documents(input) + logger.debug("Successfully generated document embeddings") + return embeddings + except Exception as e: + logger.error(f"Error generating document embeddings: {e}") + logger.error(f"Error type: {type(e)}") + logger.error(f"Input documents count: {len(input)}") + import traceback + + logger.error(f"Full traceback: {traceback.format_exc()}") + raise - # Essential question-SQL training pairs (covering key RUL distinction) - vn.train(question="Get time cycles and operational setting 1 for unit 1 from test FD001", - sql="SELECT time_in_cycles, operational_setting_1 FROM test_FD001 WHERE unit_number = 1") - - # Ground Truth RUL (from RUL tables) - vn.train(question="What is the actual remaining useful life for unit 1 in test dataset FD001", - sql="SELECT RUL FROM RUL_FD001 WHERE unit_number = 1") - - # Predicted RUL (calculated for training data) - vn.train(question="Predict the remaining useful life for each time cycle of unit 1 in training dataset FD001", - sql="SELECT unit_number, time_in_cycles, (MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles + 1) AS predicted_RUL FROM train_FD001 WHERE unit_number = 1 ORDER BY time_in_cycles") - - vn.train(question="How many units are in the training data for FD002", - sql="SELECT COUNT(DISTINCT unit_number) FROM train_FD002") - - # Additional RUL distinction training - vn.train(question="Calculate RUL for training data in FD003", - sql="SELECT unit_number, time_in_cycles, (MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles + 1) AS predicted_RUL FROM train_FD003 ORDER BY unit_number, time_in_cycles") - - vn.train(question="Get ground truth RUL values for all units in test FD002", - sql="SELECT unit_number, RUL FROM RUL_FD002 ORDER BY unit_number") def chunk_documentation(text: str, max_chars: int = 1500) -> list: """