Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions pysteps/mongo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# mongo

## Executable scripts

### create_mongo_user.py

This script is run by the database administrator to register a new user for the STEPS database

### delete_files.py

House keeping utility to delete records from the database

### init_steps_db.py

This script creates the STEPS database with the expected colletions and indices.

### load_config.py

This script loads the JSON configuration file into the STEPS database.

### write_nc_files.py

Read the database and generate the netCDF files for exporting to users.

### write_ensembles.py

An example of a product that is supplied to an end-user.

## modules

### gridfs_io.py

Functions to read and write the binary data to GridFS

### mongo_access.py

Functions to read and write the metadata and parameters

### nc_utils.py

Functions to read and write the rain fields as CF netCDF binaries.

44 changes: 44 additions & 0 deletions pysteps/mongo/create_mongo_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import secrets
import string
import os
from pymongo import MongoClient
from urllib.parse import quote_plus

# === CONFIGURATION ===
MONGO_HOST = "localhost"
MONGO_PORT = 27017
AUTH_DB = "admin"
MONGO_ADMIN_USER = os.getenv("MONGO_USER")
MONGO_ADMIN_PASS = os.getenv("MONGO_PWD")
TARGET_DB = "STEPS"
PWD_DEFAULT = "c-bandBox"

# === FUNCTIONS ===
def generate_password(length=16):
alphabet = string.ascii_letters + string.digits + "!@#$%^&*()-_=+"
return ''.join(secrets.choice(alphabet) for _ in range(length))

def create_user(username, role="readWrite"):
# password = generate_password()
password = PWD_DEFAULT
client = MongoClient(f"mongodb://{quote_plus(MONGO_ADMIN_USER)}:{quote_plus(MONGO_ADMIN_PASS)}@{MONGO_HOST}:{MONGO_PORT}/?authSource={AUTH_DB}")
db = client[TARGET_DB]

try:
db.command("createUser", username, pwd=password, roles=[{"role": role, "db": TARGET_DB}])
print(f"\n✅ User '{username}' created with role '{role}'.\n")
print("Connection string:")
print(f" mongodb://{quote_plus(username)}:{quote_plus(password)}@{MONGO_HOST}:{MONGO_PORT}/{TARGET_DB}?authSource={TARGET_DB}\n")
except Exception as e:
print(f"❌ Failed to create user '{username}': {e}")

# === ENTRY POINT ===
if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(description="Create a MongoDB user with a random password.")
parser.add_argument("username", help="Username to create")
parser.add_argument("--role", default="readWrite", help="MongoDB role (default: readWrite)")
args = parser.parse_args()
create_user(args.username, args.role)

125 changes: 125 additions & 0 deletions pysteps/mongo/delete_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from models import get_db
from pymongo import MongoClient
import logging
import argparse
import gridfs
import pymongo
import datetime

def is_valid_iso8601(time_str: str) -> bool:
"""Check if the given string is a valid ISO 8601 datetime."""
try:
datetime.datetime.fromisoformat(time_str)
return True
except ValueError:
return False


def main():
parser = argparse.ArgumentParser(
description="Delete rainfall and/or state GridFS files.")

parser.add_argument('-s', '--start', type=str, required=True,
help='Start time yyyy-mm-ddTHH:MM:SS')
parser.add_argument('-e', '--end', type=str, required=True,
help='End time yyyy-mm-ddTHH:MM:SS')
parser.add_argument('-n', '--name', type=str, required=True,
help='Name of domain [AKL]')
parser.add_argument('-p', '--product', type=str, required=True,
help='Name of product to delete [QPE, auckprec, qpesim]')
parser.add_argument('-c', '--cascade', default=False, action='store_true',
help='Delete the cascade files')
parser.add_argument('-r', '--rain', default=False, action='store_true',
help='Delete the rainfall files')
parser.add_argument('--params', default=False, action='store_true',
help='Delete the parameter documents')

parser.add_argument('--dry_run', default=False, action='store_true',
help='Only list files that would be deleted, don’t delete them.')

args = parser.parse_args()
logging.basicConfig(level=logging.INFO)

if not (args.rain or args.cascade or args.params):
logging.warning("Nothing to delete: specify --rain, --cascade, or --params")
return

# Validate and parse times
def parse_time(time_str):
if not is_valid_iso8601(time_str):
logging.error(f"Invalid time format: {time_str}")
exit(1)
t = datetime.datetime.fromisoformat(time_str)
return t.replace(tzinfo=datetime.timezone.utc) if t.tzinfo is None else t

start_time = parse_time(args.start)
end_time = parse_time(args.end)

name = args.name
product = args.product
dry_run = args.dry_run

if product not in ["QPE", "auckprec", "qpesim", "nwpblend"]:
logging.error(f"Invalid product: {product}")
return

db = get_db()

def delete_files(collection_name):
coll = db[f"{collection_name}.files"]
fs = gridfs.GridFS(db, collection=collection_name)

if product == "QPE":
query = {
"metadata.product": product,
"metadata.valid_time": {"$gte": start_time, "$lte": end_time}
}
else:
query = {
"metadata.product": product,
"metadata.base_time": {"$gte": start_time, "$lte": end_time}
}

ids = list(coll.find(query, {"_id": 1,"filename":1}))
count = len(ids)

if dry_run:
logging.info(f"[Dry Run] {count} files matched in {collection_name}. Listing _id values:")
for doc in ids:
logging.info(f" Would delete: {doc['filename']}")
else:
for doc in ids:
fs.delete(doc["_id"])
logging.info(f"Deleted {count} files from {collection_name}")

if args.rain:
delete_files(f"{name}.rain")

if args.cascade:
delete_files(f"{name}.state")

if args.params:
collection_name = f"{name}.params"
coll = db[collection_name]
if product == "QPE":
query = {
"metadata.product": product,
"metadata.valid_time": {"$gte": start_time, "$lte": end_time}
}
else:
query = {
"metadata.product": product,
"metadata.base_time": {"$gte": start_time, "$lte": end_time}
}

ids = list(coll.find(query, {"_id": 1}))
count = len(ids)
if dry_run:
logging.info(f"[Dry Run] {count} files matched in {collection_name}")
else:
coll.delete_many(query)
logging.info(f"Deleted {count} files from {collection_name}")


if __name__ == "__main__":
main()
Loading
Loading