Skip to content

MCP Server

grizabella.mcp.server

Grizabella MCP Server.

This module provides an MCP (Model Context Protocol) server for Grizabella, exposing its core functionalities as tools that can be called remotely. It uses FastMCP to define and serve these tools.

Server Description: This MCP server exposes the core functionalities of the Grizabella knowledge management system, allowing for the creation, retrieval, and querying of structured data objects and their relationships.

cleanup_resources()

Perform cleanup of all resources.

Source code in grizabella/mcp/server.py
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
def cleanup_resources():
    """Perform cleanup of all resources."""
    logger.info("Starting resource cleanup...")

    # Clean up database connections using the global singleton
    try:
        from grizabella.core.connection_pool import cleanup_global_connection_pool, get_connection_pool_manager
        # First try to get the global instance and clean it up
        pool_manager = get_connection_pool_manager()
        pool_manager.close_all_pools()
        logger.info("Connection pools closed via global manager")
    except Exception as e:
        logger.error(f"Error closing connection pools: {e}")
        # Try alternative approach - direct cleanup
        try:
            from grizabella.core.connection_pool import cleanup_global_connection_pool
            cleanup_global_connection_pool()
            logger.info("Connection pools cleaned up via global cleanup function")
        except Exception as e2:
            logger.error(f"Error with global pool cleanup: {e2}")
            # Last resort - try creating a new instance and cleaning it up
            try:
                pool_manager = ConnectionPoolManager()
                pool_manager.close_all_pools()
                logger.info("Connection pools closed via alternative method")
            except Exception as e3:
                logger.error(f"Error with alternative pool cleanup: {e3}")

    # Clean up DB managers
    try:
        cleanup_all_managers()
        logger.info("DB managers cleaned up")
    except Exception as e:
        logger.error(f"Error cleaning up DB managers: {e}")

    # Stop monitoring
    try:
        stop_global_monitoring()
        logger.info("Resource monitoring stopped")
    except Exception as e:
        logger.error(f"Error stopping resource monitoring: {e}")

    # Force garbage collection
    try:
        import gc
        collected = gc.collect()
        logger.info(f"Garbage collector cleaned up {collected} objects")
    except Exception as e:
        logger.error(f"Error during garbage collection: {e}")

    logger.info("Resource cleanup completed")

get_grizabella_client()

Returns the shared Grizabella client instance.

Source code in grizabella/mcp/server.py
115
116
117
118
119
def get_grizabella_client() -> Grizabella:
    """Returns the shared Grizabella client instance."""
    if grizabella_client_instance is None:
        raise GrizabellaException("Grizabella client is not initialized.")
    return grizabella_client_instance

get_grizabella_db_path(db_path_arg=None)

Determines the database path from arg, env var, or default.

Source code in grizabella/mcp/server.py
 96
 97
 98
 99
100
def get_grizabella_db_path(db_path_arg: Optional[str] = None) -> Union[str, Path]:
    """Determines the database path from arg, env var, or default."""
    if db_path_arg:
        return db_path_arg
    return os.getenv(GRIZABELLA_DB_PATH_ENV_VAR, DEFAULT_GRIZABELLA_DB_PATH)

log_tool_call(func)

Decorator to log detailed information about tool calls.

Source code in grizabella/mcp/server.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def log_tool_call(func):
    """Decorator to log detailed information about tool calls."""
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        # Extract tool name from function name (remove mcp_ prefix)
        tool_name = func.__name__
        if tool_name.startswith('mcp_'):
            tool_name = tool_name[4:]

        # Log the tool call with details
        logger.info(f"🔧 Tool Call: {tool_name}")

        # Log arguments if any (excluding 'self' for methods)
        if args:
            # Skip 'self' argument for methods
            actual_args = args[1:] if args and hasattr(args[0], '__class__') else args
            if actual_args:
                logger.info(f"📝 Arguments: {actual_args}")

        if kwargs:
            logger.info(f"📝 Keyword Arguments: {kwargs}")

        # Call the original function
        try:
            result = await func(*args, **kwargs)
            logger.info(f"✅ Tool Call Success: {tool_name}")
            return result
        except Exception as e:
            logger.error(f"❌ Tool Call Failed: {tool_name} - Error: {e}")
            raise

    return wrapper

main()

Initializes client and runs the FastMCP application.

Source code in grizabella/mcp/server.py
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
def main():
    """Initializes client and runs the FastMCP application."""
    # Register signal handlers
    signal.signal(signal.SIGINT, shutdown_handler)
    signal.signal(signal.SIGTERM, shutdown_handler)

    parser = argparse.ArgumentParser(description="Grizabella MCP Server")
    parser.add_argument("--db-path", help="Path to the Grizabella database.")
    parser.add_argument("--use-gpu", action="store_true", help="Use GPU for embedding models.")
    args = parser.parse_args()

    global grizabella_client_instance
    db_path = get_grizabella_db_path(args.db_path)

    try:
        with Grizabella(
            db_name_or_path=db_path, create_if_not_exists=True, use_gpu=args.use_gpu
        ) as gb:
            grizabella_client_instance = gb
            app.run(show_banner=False)
    except Exception as e:
        print(f"Server error: {e}", file=sys.stderr)
        logger.error(f"Server error: {e}", exc_info=True)
        sys.exit(1)
    finally:
        # Ensure clean termination
        grizabella_client_instance = None
        cleanup_resources()
        print("Server terminated cleanly", file=sys.stderr)

        sys.exit(0)

mcp_find_objects(type_name, filter_criteria=None, limit=None) async

Finds and retrieves a list of objects of a given type, with optional filtering criteria.

Source code in grizabella/mcp/server.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
@app.tool(
    name="find_objects",
    description=(
        "Finds and retrieves a list of objects of a given type, with optional filtering criteria.\n\n"
        "Example:\n"
        "To find all 'Person' objects where the age is greater than 30:\n"
        '{\n'
        '  "args": {\n'
        '    "type_name": "Person",\n'
        '    "filter_criteria": {\n'
        '      "age": {">": 30}\n'
        '    },\n'
        '    "limit": 10\n'
        '  }\n'
        '}'
    ),
)
async def mcp_find_objects(
    type_name: str,
    filter_criteria: Optional[dict[str, Any]] = None,
    limit: Optional[int] = None,
) -> list[ObjectInstance]:
    """Finds and retrieves a list of objects of a given type, with optional filtering criteria.
    """
    # ctx: ToolContext,
    try:
        gb = get_grizabella_client()
        return gb.find_objects(
            type_name=type_name,
            filter_criteria=filter_criteria,
            limit=limit,
        )
    except GrizabellaException as e:
        msg = f"MCP: Error finding objects of type '{type_name}': {e}"
        raise GrizabellaException(msg) from e
    except Exception as e:  # pylint: disable=broad-except
        msg = f"MCP: Unexpected error finding objects of type '{type_name}': {e}"
        raise Exception(msg) from e

mcp_get_embedding_vector_for_text(args) async

Generates an embedding vector for a given text using a specified embedding definition.

Source code in grizabella/mcp/server.py
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
@app.tool(
    name="get_embedding_vector_for_text",
    description="Generates an embedding vector for a given text using a specified embedding definition.",
)
@log_tool_call
async def mcp_get_embedding_vector_for_text(args: GetEmbeddingVectorForTextArgs) -> EmbeddingVector:
    """Generates an embedding vector for a given text using a specified embedding definition."""
    gb = get_grizabella_client()
    embedding_def = gb.get_embedding_definition(args.embedding_definition_name)
    try:
        # 1. Get the embedding definition
        if not embedding_def:
            raise GrizabellaException(f"Embedding definition '{args.embedding_definition_name}' not found.")

        # 2. Generate embedding vector directly using the same logic as find_similar_objects_by_embedding
        # This approach mirrors the Python client's successful method
        logger.info(f"Generating embedding vector for text using model '{embedding_def.embedding_model}'")

        # Get the embedding model function
        # Strip 'huggingface/' prefix if present, as LanceDB registry expects just the model name
        model_identifier = embedding_def.embedding_model
        if model_identifier.startswith('huggingface/'):
            model_identifier = model_identifier[len('huggingface/'):]
            logger.info(f"Stripped 'huggingface/' prefix from model identifier: '{embedding_def.embedding_model}' -> '{model_identifier}'")

        logger.info(f"About to load embedding model with identifier: '{model_identifier}'")
        embedding_model_func = gb._db_manager._connection_helper.lancedb_adapter.get_embedding_model(
            model_identifier,
        )

        # Generate embedding using compute_query_embeddings (same as Python client)
        raw_query_embeddings = embedding_model_func.compute_query_embeddings([args.text_to_embed])
        if not raw_query_embeddings:
            logger.error(f"Model '{embedding_def.embedding_model}' returned empty list for text.")
            raise GrizabellaException(f"Model {embedding_def.embedding_model} returned empty list for text.")

        raw_query_vector = raw_query_embeddings[0]

        # Convert to list if it's a numpy array
        if hasattr(raw_query_vector, "tolist"):  # Handles numpy array
            final_query_vector = raw_query_vector.tolist()
        elif isinstance(raw_query_vector, list):
            final_query_vector = raw_query_vector
        else:
            logger.error(f"Unexpected query vector type from model '{embedding_def.embedding_model}': {type(raw_query_vector)}")
            raise GrizabellaException(f"Unexpected query vector type from model {embedding_def.embedding_model}")

        # Validate dimensions (temporarily disabled for debugging)
        logger.info(f"Generated embedding vector with {len(final_query_vector)} dimensions. ED specifies {embedding_def.dimensions} dimensions.")
        if embedding_def.dimensions and len(final_query_vector) != embedding_def.dimensions:
            logger.warning(
                f"Query vector dim ({len(final_query_vector)}) does not match ED "
                f"'{embedding_def.name}' dim ({embedding_def.dimensions}). Continuing anyway."
            )
            # raise GrizabellaException(msg)  # Temporarily disabled

        logger.info(f"Successfully generated embedding vector with dimension {len(final_query_vector)}")

        # Debug: Log what we're about to return
        debug_return_value = {"vector": final_query_vector}
        logger.info(f"MCP get_embedding_vector_for_text returning: type={type(debug_return_value)}, vector_type={type(debug_return_value['vector'])}, vector_length={len(debug_return_value['vector'])}")
        logger.info(f"MCP get_embedding_vector_for_text return value preview: {debug_return_value['vector'][:5]}...")

        # Return as a plain dict to ensure MCP serialization works correctly
        return debug_return_value

    except Exception as e:
        logger.error(f"Failed to generate embedding vector: {e}", exc_info=True)
        raise GrizabellaException(f"Failed to generate embedding vector: {e}") from e

mcp_get_incoming_relations(object_id, type_name, relation_type_name=None) async

Retrieves all incoming relations to a specific object.

Source code in grizabella/mcp/server.py
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
@app.tool(
    name="get_incoming_relations",
    description=(
        "Retrieves all incoming relations to a specific object.\n\n"
        "Example:\n"
        "To get all incoming relations to Jane Doe's 'Person' object:\n"
        '{\n'
        '  "args": {\n'
        '    "object_id": "jane_doe_456",\n'
        '    "type_name": "Person"\n'
        '  }\n'
        '}'
    ),
)
async def mcp_get_incoming_relations(
    object_id: str, type_name: str, relation_type_name: Optional[str] = None,
) -> list[RelationInstance]:
    """Retrieves all incoming relations to a specific object.
    """
    # ctx: ToolContext,
    try:
        gb = get_grizabella_client()
        return gb.get_incoming_relations(
            object_id=object_id,
            type_name=type_name,
            relation_type_name=relation_type_name,
        )
    except GrizabellaException as e:
        msg = f"MCP: Error getting incoming relations for object '{object_id}': {e}"
        raise GrizabellaException(msg) from e
    except Exception as e:  # pylint: disable=broad-except
        msg = f"MCP: Unexpected error getting incoming relations for object '{object_id}': {e}"
        raise Exception(msg) from e

mcp_get_outgoing_relations(object_id, type_name, relation_type_name=None) async

Retrieves all outgoing relations from a specific object.

Source code in grizabella/mcp/server.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
@app.tool(
    name="get_outgoing_relations",
    description=(
        "Retrieves all outgoing relations from a specific object.\n\n"
        "Example:\n"
        "To get all outgoing relations from John Doe's 'Person' object:\n"
        '{\n'
        '  "args": {\n'
        '    "object_id": "john_doe_123",\n'
        '    "type_name": "Person"\n'
        '  }\n'
        '}'
    ),
)
async def mcp_get_outgoing_relations(
    object_id: str, type_name: str, relation_type_name: Optional[str] = None,
) -> list[RelationInstance]:
    """Retrieves all outgoing relations from a specific object.
    """
    # ctx: ToolContext,
    try:
        gb = get_grizabella_client()
        return gb.get_outgoing_relations(
            object_id=object_id,
            type_name=type_name,
            relation_type_name=relation_type_name,
        )
    except GrizabellaException as e:
        msg = f"MCP: Error getting outgoing relations for object '{object_id}': {e}"
        raise GrizabellaException(msg) from e
    except Exception as e:  # pylint: disable=broad-except
        msg = f"MCP: Unexpected error getting outgoing relations for object '{object_id}': {e}"
        raise Exception(msg) from e

mcp_search_similar_objects(object_id, type_name, n_results=5, search_properties=None) async

Searches for objects that are semantically similar to a given object, based on embeddings of their properties.

Source code in grizabella/mcp/server.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
@app.tool(
    name="search_similar_objects",
    description=(
        "Searches for objects that are semantically similar to a given object, based on embeddings "
        "of their properties. Note: This feature is not yet fully implemented.\n\n"
        "Example:\n"
        "To find 5 objects similar to John Doe's 'Person' object:\n"
        '{\n'
        '  "args": {\n'
        '    "object_id": "john_doe_123",\n'
        '    "type_name": "Person",\n'
        '    "n_results": 5\n'
        '  }\n'
        '}'
    ),
)
async def mcp_search_similar_objects(
    object_id: str,
    type_name: str,
    n_results: int = 5,
    search_properties: Optional[list[str]] = None,
) -> list[tuple[ObjectInstance, float]]:
    """Searches for objects that are semantically similar to a given object, based on embeddings of their properties.
    """
    # ctx: ToolContext,
    try:
        gb = get_grizabella_client()
        # The Grizabella client's search_similar_objects currently raises NotImplementedError.
        # We must call it to respect the interface, but handle the expected error.
        # If it were implemented, results would be List[Tuple[ObjectInstance, float]].
        # For now, to satisfy Pylint and type checkers if the method were to return,
        # we can assign and then immediately handle the expected NotImplementedError.
        # However, a cleaner approach is to directly call and handle.

        # Attempt the call and handle NotImplementedError specifically.
        # Other GrizabellaExceptions or general Exceptions will be caught below.
        try:
            # This line will raise NotImplementedError based on current client.py
            results: list[
                tuple[ObjectInstance, float]
            ] = gb.search_similar_objects(
                object_id=object_id,
                type_name=type_name,
                n_results=n_results,
                search_properties=search_properties,
            )
            return results  # This line will not be reached if NotImplementedError is raised
        except NotImplementedError as nie:
            # Specific handling for the known unimplemented feature.
            # Raising a general Exception here for the MCP layer is acceptable to signal this state.
            msg = f"MCP: search_similar_objects feature is not yet implemented in the Grizabella client: {nie}"
            raise Exception(msg) from nie

    except GrizabellaException as e:
        # Handle other Grizabella-specific errors, re-raise as GrizabellaException
        msg = f"MCP: Error searching similar objects for '{object_id}': {e}"
        raise GrizabellaException(msg) from e
    except Exception as e:  # pylint: disable=broad-except
        # Handle any other unexpected errors, re-raise as general Exception
        msg = f"MCP: Unexpected error searching similar objects for '{object_id}': {e}"
        raise Exception(msg) from e

shutdown_handler(signum, frame)

Handle shutdown signals gracefully.

Source code in grizabella/mcp/server.py
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
def shutdown_handler(signum, frame):
    """Handle shutdown signals gracefully."""
    import sys
    try:
        print(f"Received signal {signum}, shutting down...", file=sys.stderr)
    except Exception:
        # sys.stderr might not be available during shutdown
        # Using stderr even for the fallback to avoid stdout contamination
        try:
            print(f"Received signal {signum}, shutting down...", file=sys.stderr)
        except Exception:
            # If even stderr fails, just use logger
            pass

    logger.info(f"Received signal {signum}, shutting down...")

    # Perform forceful cleanup during signal handling to avoid async issues
    try:
        # Stop monitoring first (sync)
        stop_global_monitoring()

        # Force cleanup DB managers without async operations
        from grizabella.core.db_manager_factory import _db_manager_factory
        if _db_manager_factory:
            with _db_manager_factory._lock:
                _db_manager_factory._instances.clear()
                _db_manager_factory._reference_counts.clear()

        # Force cleanup connection pools without async operations
        from grizabella.core.connection_pool import _connection_pool_manager
        if _connection_pool_manager:
            _connection_pool_manager._shutdown = True
            if _connection_pool_manager._cleanup_thread and _connection_pool_manager._cleanup_thread.is_alive():
                _connection_pool_manager._cleanup_thread.join(timeout=1)
            with _connection_pool_manager._lock:
                _connection_pool_manager._connection_count.clear()

        logger.info("Force cleanup completed during shutdown")
    except Exception as e:
        logger.error(f"Error during force cleanup: {e}")

    # Exit immediately
    import sys
    sys.exit(0)