Skip to content

koi-demo #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
FROM python:3.12-slim AS build

# Install build dependencies and UV
RUN apt-get update && \
apt-get install -y build-essential curl && \
pip install --no-cache-dir uv && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Copy only requirements file first to leverage Docker cache
COPY requirements.txt* ./

# Install dependencies using UV for faster installation
RUN if [ -f "requirements.txt" ]; then \
uv pip install --system -r requirements.txt; \
fi

# Final stage
FROM python:3.12-slim

# Accept module name and port as build arguments
ARG MODULE_NAME
ARG PORT=8002

# Set environment variables from build args
ENV PORT=$PORT
ENV MODULE_NAME=hackmd_sensor_node

WORKDIR /app

# Install only runtime dependencies (curl for healthcheck)
RUN apt-get update && \
apt-get install -y curl && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Copy installed dependencies from build stage
COPY --from=build /usr/local/lib/python3.12/site-packages/ /usr/local/lib/python3.12/site-packages/
COPY --from=build /usr/local/bin/ /usr/local/bin/

# Copy project files and source code
COPY . /app/

# Expose port for container
EXPOSE $PORT

# Configure healthcheck
HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \
CMD curl --fail http://localhost:$PORT/koi-net/health || exit 1

# Start server using environment variables
# The module name is used to determine which server module to load
CMD uvicorn hackmd_sensor_node.server:app --host 0.0.0.0 --port $PORT
30 changes: 19 additions & 11 deletions hackmd_sensor_node/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
import logging
from rich.logging import RichHandler

log_level_str = "DEBUG"

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.setLevel(log_level_str.upper())

# Remove existing handlers to avoid duplicates if this module is reloaded
for handler in logger.handlers[:]:
logger.removeHandler(handler)

rich_handler = RichHandler()
rich_handler.setLevel(logging.INFO)
# Use stderr=True if you want logs to go to stderr instead of stdout
rich_handler = RichHandler(rich_tracebacks=True, show_path=False, log_time_format="%Y-%m-%d %H:%M:%S")
rich_handler.setLevel(log_level_str.upper()) # Set level for this handler
rich_handler.setFormatter(logging.Formatter(
"%(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
fmt="%(name)s - %(message)s", # Simplified format for console
datefmt="[%X]" # Use RichHandler's default time format
))
logger.addHandler(rich_handler)

file_handler = logging.FileHandler("node-log.txt")
file_handler.setLevel(logging.DEBUG)
# Add file handler to write logs to node.sensor.log
file_handler = logging.FileHandler("node.sensor.log")
file_handler.setLevel(log_level_str.upper())
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
fmt="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
))
logger.addHandler(file_handler)

# Add both
logger.addHandler(rich_handler)
logger.addHandler(file_handler)
logger.info(f"Logging initialized for HackMD Sensor Node. Level: {log_level_str.upper()}.")
21 changes: 16 additions & 5 deletions hackmd_sensor_node/__main__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import uvicorn
import logging
from .core import node

uvicorn.run(
"hackmd_sensor_node.server:app",
host=node.config.server.host,
port=node.config.server.port,
)
logger = logging.getLogger(__name__)

if __name__ == "__main__":
if not node.config.server or not node.config.server.host or node.config.server.port is None:
logger.critical("Server configuration (host/port) is missing in the loaded config. Cannot start.")
exit(1)

logger.info(f"Starting HackMD sensor node server on {node.config.server.host}:{node.config.server.port}")
uvicorn.run(
"hackmd_sensor_node.server:app",
host=node.config.server.host,
port=node.config.server.port,
log_config=None,
reload=False
)
197 changes: 175 additions & 22 deletions hackmd_sensor_node/backfill.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,183 @@
import logging
import asyncio
from rid_lib.ext import Bundle
from rid_types import HackMDNote
from . import hackmd_api
from .core import node
from .config import StateType

logger = logging.getLogger(__name__)

async def backfill(team_path="blockscience"):
notes = await hackmd_api.async_request(f"/teams/{team_path}/notes")

logger.debug(f"Found {len(notes)} in team")

for note in notes:
note_rid = HackMDNote(note["id"])

note_bundle = Bundle.generate(
rid=note_rid,
contents=note
)

node.processor.handle(bundle=note_bundle)

if __name__ == "__main__":
node.start()
asyncio.run(
backfill()
)
node.stop()
async def _process_team_notes(team_path, state):
processed_count = 0
bundled_count = 0
logger.info(f"Processing all notes in team path: '{team_path}'")
if not team_path:
logger.error("HackMD team path is not configured. Cannot backfill all team notes.")
return processed_count, bundled_count
team_notes = await hackmd_api.async_request(f"/teams/{team_path}/notes")
if not team_notes:
logger.warning(f"No notes found or error fetching notes for team '{team_path}'. Backfill ending.")
return processed_count, bundled_count
logger.debug(f"Found {len(team_notes)} notes in team summary.")
for note_summary in team_notes:
processed_count += 1
note_id = note_summary.get("id")
last_modified_str = note_summary.get("lastChangedAt")
title = note_summary.get("title", f"Note {note_id}")
if not note_id or not last_modified_str:
logger.warning(f"Skipping note from team list due to missing ID or lastChangedAt: {note_summary}")
continue
if note_id not in state or last_modified_str > state[note_id]:
logger.info(f"Processing note '{title}' (ID: {note_id}) from team list - New or updated.")
note_details = hackmd_api.request(f"/notes/{note_id}")
if not note_details:
logger.error(f"Failed to fetch details for note ID {note_id} from team list. Skipping.")
continue
try:
rid = HackMDNote(note_id=note_id)
contents = {
"id": note_id,
"title": title,
"content": note_details.get("content"),
"createdAt": note_details.get("createdAt"),
"lastChangedAt": note_details.get("lastChangedAt", last_modified_str),
"publishLink": note_details.get("publishLink"),
"tags": note_details.get("tags", []),
}
if contents["content"] is None:
logger.error(f"Content missing for note ID {note_id} from team list. Skipping bundle.")
continue
bundle = Bundle.generate(rid=rid, contents=contents)
logger.debug(f"Making backfill note bundle {rid} from team list available locally.")
node.processor.handle(bundle=bundle)
bundled_count += 1
state[note_id] = contents["lastChangedAt"]
except Exception as e:
logger.error(f"Error creating/handling bundle for note {note_id} from team list: {e}", exc_info=True)
else:
logger.debug(f"Skipping note '{title}' (ID: {note_id}) from team list - Already up-to-date.")
return processed_count, bundled_count

def _process_target_notes(target_note_ids, state):
processed_count = 0
bundled_count = 0
logger.info(f"Targeting specific HackMD notes for backfill: {target_note_ids}")
for note_id in target_note_ids:
processed_count += 1
logger.debug(f"Fetching targeted note ID: {note_id}")
note_details = hackmd_api.request(f"/notes/{note_id}")
if not note_details:
logger.warning(f"Could not fetch details for targeted note ID {note_id}. Skipping.")
continue
last_modified_str = note_details.get("lastChangedAt")
title = note_details.get("title", f"Note {note_id}")
if not last_modified_str:
logger.warning(f"Skipping targeted note {note_id} ('{title}') due to missing lastChangedAt.")
continue
if note_id not in state or last_modified_str > state[note_id]:
logger.info(f"Processing targeted note '{title}' (ID: {note_id}) - New or updated.")
try:
rid = HackMDNote(note_id=note_id)
contents = {
"id": note_id,
"title": title,
"content": note_details.get("content"),
"createdAt": note_details.get("createdAt"),
"lastChangedAt": last_modified_str,
"publishLink": note_details.get("publishLink"),
"tags": note_details.get("tags", []),
}
if contents["content"] is None:
logger.error(f"Content missing for targeted note ID {note_id}. Skipping bundle.")
continue
bundle = Bundle.generate(rid=rid, contents=contents)
logger.debug(f"Making backfill targeted note bundle {rid} available locally.")
node.processor.handle(bundle=bundle)
bundled_count += 1
state[note_id] = last_modified_str
except Exception as e:
logger.error(f"Error bundling targeted note {note_id}: {e}", exc_info=True)
else:
logger.debug(f"Skipping targeted note '{title}' (ID: {note_id}) - Already up-to-date.")
return processed_count, bundled_count


async def backfill(state: StateType):
"""Fetches notes, compares with state, and bundles new/updated notes."""

team_path = getattr(node.config.hackmd, 'team_path', "blockscience") # Safer access with default
target_note_ids = getattr(node.config.hackmd, 'target_note_ids', None)

logger.info("Starting HackMD backfill.")

try:
processed_count = 0
bundled_count = 0

# Decide whether to process specific notes or all team notes
if target_note_ids:
pc, bc = _process_target_notes(target_note_ids, state)
processed_count += pc
bundled_count += bc
else:
# Original logic: process all notes in the team
logger.info(f"Processing all notes in team path: \'{team_path}\'")
if not team_path:
logger.error("HackMD team path is not configured. Cannot backfill all team notes.")
return

team_notes = await hackmd_api.async_request(f"/teams/{team_path}/notes")
if not team_notes:
logger.warning(f"No notes found or error fetching notes for team \'{team_path}\'. Backfill ending.")
return

logger.debug(f"Found {len(team_notes)} notes in team summary.")
for note_summary in team_notes:
processed_count += 1
note_id = note_summary.get("id")
last_modified_str = note_summary.get("lastChangedAt")
title = note_summary.get("title", f"Note {note_id}")

if not note_id or not last_modified_str:
logger.warning(f"Skipping note from team list due to missing ID or lastChangedAt: {note_summary}")
continue

# Check if note needs processing based on state
if note_id not in state or last_modified_str > state[note_id]:
logger.info(f"Processing note \'{title}\' (ID: {note_id}) from team list - New or updated.")
# Fetch full content only when needed
note_details = hackmd_api.request(f"/notes/{note_id}")
if not note_details:
logger.error(f"Failed to fetch details for note ID {note_id} from team list. Skipping.")
continue

try:
rid = HackMDNote(note_id=note_id)
contents = {
"id": note_id,
"title": title,
"content": note_details.get("content"),
"createdAt": note_details.get("createdAt"),
"lastChangedAt": note_details.get("lastChangedAt", last_modified_str),
"publishLink": note_details.get("publishLink"),
"tags": note_details.get("tags", []),
}
if contents["content"] is None:
logger.error(f"Content missing for note ID {note_id} from team list. Skipping bundle.")
continue

bundle = Bundle.generate(rid=rid, contents=contents)
logger.debug(f"Making backfill note bundle {rid} from team list available locally.")
node.processor.handle(bundle=bundle)
bundled_count += 1
state[note_id] = contents["lastChangedAt"] # Update state with timestamp used

except Exception as e:
logger.error(f"Error creating/handling bundle for note {note_id} from team list: {e}", exc_info=True)
else:
logger.debug(f"Skipping note \'{title}\' (ID: {note_id}) from team list - Already up-to-date.")

logger.info(f"HackMD backfill complete. Processed {processed_count} notes, bundled {bundled_count} new/updated notes.")

except Exception as e:
logger.error(f"Unexpected error during HackMD backfill: {e}", exc_info=True)
43 changes: 41 additions & 2 deletions hackmd_sensor_node/config.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
import json
import logging
from pydantic import BaseModel, Field
from koi_net.protocol.node import NodeProfile, NodeType, NodeProvides
from koi_net.config import NodeConfig, EnvConfig, KoiNetConfig
from rid_types import HackMDNote
from pathlib import Path
from typing import Dict

# Define StateType here to avoid circular import
StateType = Dict[str, str]

logger = logging.getLogger(__name__)

HACKMD_STATE_FILE_PATH = Path(".koi/hackmd/hackmd_state.json")

class HackMDConfig(BaseModel):
team_path: str | None = "blockscience"
target_note_ids: list[str] | None = None

class HackMDEnvConfig(EnvConfig):
hackmd_api_token: str | None = "HACKMD_API_TOKEN"

class HackMDSensorNodeConfig(NodeConfig):
koi_net: KoiNetConfig | None = Field(default_factory = lambda:
koi_net: KoiNetConfig = Field(default_factory=lambda:
KoiNetConfig(
node_name="hackmd-sensor",
node_profile=NodeProfile(
Expand All @@ -23,4 +35,31 @@ class HackMDSensorNodeConfig(NodeConfig):
)
)
env: HackMDEnvConfig | None = Field(default_factory=HackMDEnvConfig)
hackmd: HackMDConfig | None = Field(default_factory=HackMDConfig)
hackmd: HackMDConfig | None = Field(default_factory=HackMDConfig)

# --- State Management Functions ---
def load_hackmd_state() -> StateType:
"""Loads the last modified timestamp state from the JSON file."""
try:
HACKMD_STATE_FILE_PATH.parent.mkdir(parents=True, exist_ok=True)
if HACKMD_STATE_FILE_PATH.exists():
with open(HACKMD_STATE_FILE_PATH, "r") as f:
state_data = json.load(f)
logger.info(f"Loaded HackMD state from '{HACKMD_STATE_FILE_PATH}': {len(state_data)} notes tracked.")
return state_data
else:
logger.info(f"HackMD state file '{HACKMD_STATE_FILE_PATH}' not found. Starting empty.")
return {}
except Exception as e:
logger.error(f"Error loading HackMD state file '{HACKMD_STATE_FILE_PATH}': {e}", exc_info=True)
return {}

def save_hackmd_state(state: StateType):
"""Saves the state dictionary to the JSON file."""
try:
HACKMD_STATE_FILE_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(HACKMD_STATE_FILE_PATH, "w") as f:
json.dump(state, f, indent=4)
logger.debug(f"Saved HackMD state to '{HACKMD_STATE_FILE_PATH}'.")
except Exception as e:
logger.error(f"Error writing HackMD state file '{HACKMD_STATE_FILE_PATH}': {e}", exc_info=True)
Loading