Grizabella MCP Server.
This module provides an MCP (Model Context Protocol) server for Grizabella,
exposing its core functionalities as tools that can be called remotely.
It uses FastMCP to define and serve these tools.
Server Description: This MCP server exposes the core functionalities of the Grizabella
knowledge management system, allowing for the creation, retrieval, and querying of
structured data objects and their relationships.
cleanup_resources()
Perform cleanup of all resources.
Source code in grizabella/mcp/server.py
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959 | def cleanup_resources():
"""Perform cleanup of all resources."""
logger.info("Starting resource cleanup...")
# Clean up database connections using the global singleton
try:
from grizabella.core.connection_pool import cleanup_global_connection_pool, get_connection_pool_manager
# First try to get the global instance and clean it up
pool_manager = get_connection_pool_manager()
pool_manager.close_all_pools()
logger.info("Connection pools closed via global manager")
except Exception as e:
logger.error(f"Error closing connection pools: {e}")
# Try alternative approach - direct cleanup
try:
from grizabella.core.connection_pool import cleanup_global_connection_pool
cleanup_global_connection_pool()
logger.info("Connection pools cleaned up via global cleanup function")
except Exception as e2:
logger.error(f"Error with global pool cleanup: {e2}")
# Last resort - try creating a new instance and cleaning it up
try:
pool_manager = ConnectionPoolManager()
pool_manager.close_all_pools()
logger.info("Connection pools closed via alternative method")
except Exception as e3:
logger.error(f"Error with alternative pool cleanup: {e3}")
# Clean up DB managers
try:
cleanup_all_managers()
logger.info("DB managers cleaned up")
except Exception as e:
logger.error(f"Error cleaning up DB managers: {e}")
# Stop monitoring
try:
stop_global_monitoring()
logger.info("Resource monitoring stopped")
except Exception as e:
logger.error(f"Error stopping resource monitoring: {e}")
# Force garbage collection
try:
import gc
collected = gc.collect()
logger.info(f"Garbage collector cleaned up {collected} objects")
except Exception as e:
logger.error(f"Error during garbage collection: {e}")
logger.info("Resource cleanup completed")
|
get_grizabella_client()
Returns the shared Grizabella client instance.
Source code in grizabella/mcp/server.py
| def get_grizabella_client() -> Grizabella:
"""Returns the shared Grizabella client instance."""
if grizabella_client_instance is None:
raise GrizabellaException("Grizabella client is not initialized.")
return grizabella_client_instance
|
get_grizabella_db_path(db_path_arg=None)
Determines the database path from arg, env var, or default.
Source code in grizabella/mcp/server.py
| def get_grizabella_db_path(db_path_arg: Optional[str] = None) -> Union[str, Path]:
"""Determines the database path from arg, env var, or default."""
if db_path_arg:
return db_path_arg
return os.getenv(GRIZABELLA_DB_PATH_ENV_VAR, DEFAULT_GRIZABELLA_DB_PATH)
|
Decorator to log detailed information about tool calls.
Source code in grizabella/mcp/server.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 | def log_tool_call(func):
"""Decorator to log detailed information about tool calls."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Extract tool name from function name (remove mcp_ prefix)
tool_name = func.__name__
if tool_name.startswith('mcp_'):
tool_name = tool_name[4:]
# Log the tool call with details
logger.info(f"🔧 Tool Call: {tool_name}")
# Log arguments if any (excluding 'self' for methods)
if args:
# Skip 'self' argument for methods
actual_args = args[1:] if args and hasattr(args[0], '__class__') else args
if actual_args:
logger.info(f"📝 Arguments: {actual_args}")
if kwargs:
logger.info(f"📝 Keyword Arguments: {kwargs}")
# Call the original function
try:
result = await func(*args, **kwargs)
logger.info(f"✅ Tool Call Success: {tool_name}")
return result
except Exception as e:
logger.error(f"❌ Tool Call Failed: {tool_name} - Error: {e}")
raise
return wrapper
|
main()
Initializes client and runs the FastMCP application.
Source code in grizabella/mcp/server.py
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037 | def main():
"""Initializes client and runs the FastMCP application."""
# Register signal handlers
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
parser = argparse.ArgumentParser(description="Grizabella MCP Server")
parser.add_argument("--db-path", help="Path to the Grizabella database.")
parser.add_argument("--use-gpu", action="store_true", help="Use GPU for embedding models.")
args = parser.parse_args()
global grizabella_client_instance
db_path = get_grizabella_db_path(args.db_path)
try:
with Grizabella(
db_name_or_path=db_path, create_if_not_exists=True, use_gpu=args.use_gpu
) as gb:
grizabella_client_instance = gb
app.run(show_banner=False)
except Exception as e:
print(f"Server error: {e}", file=sys.stderr)
logger.error(f"Server error: {e}", exc_info=True)
sys.exit(1)
finally:
# Ensure clean termination
grizabella_client_instance = None
cleanup_resources()
print("Server terminated cleanly", file=sys.stderr)
sys.exit(0)
|
mcp_find_objects(type_name, filter_criteria=None, limit=None)
async
Finds and retrieves a list of objects of a given type, with optional filtering criteria.
Source code in grizabella/mcp/server.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539 | @app.tool(
name="find_objects",
description=(
"Finds and retrieves a list of objects of a given type, with optional filtering criteria.\n\n"
"Example:\n"
"To find all 'Person' objects where the age is greater than 30:\n"
'{\n'
' "args": {\n'
' "type_name": "Person",\n'
' "filter_criteria": {\n'
' "age": {">": 30}\n'
' },\n'
' "limit": 10\n'
' }\n'
'}'
),
)
async def mcp_find_objects(
type_name: str,
filter_criteria: Optional[dict[str, Any]] = None,
limit: Optional[int] = None,
) -> list[ObjectInstance]:
"""Finds and retrieves a list of objects of a given type, with optional filtering criteria.
"""
# ctx: ToolContext,
try:
gb = get_grizabella_client()
return gb.find_objects(
type_name=type_name,
filter_criteria=filter_criteria,
limit=limit,
)
except GrizabellaException as e:
msg = f"MCP: Error finding objects of type '{type_name}': {e}"
raise GrizabellaException(msg) from e
except Exception as e: # pylint: disable=broad-except
msg = f"MCP: Unexpected error finding objects of type '{type_name}': {e}"
raise Exception(msg) from e
|
mcp_get_embedding_vector_for_text(args)
async
Generates an embedding vector for a given text using a specified embedding definition.
Source code in grizabella/mcp/server.py
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897 | @app.tool(
name="get_embedding_vector_for_text",
description="Generates an embedding vector for a given text using a specified embedding definition.",
)
@log_tool_call
async def mcp_get_embedding_vector_for_text(args: GetEmbeddingVectorForTextArgs) -> EmbeddingVector:
"""Generates an embedding vector for a given text using a specified embedding definition."""
gb = get_grizabella_client()
embedding_def = gb.get_embedding_definition(args.embedding_definition_name)
try:
# 1. Get the embedding definition
if not embedding_def:
raise GrizabellaException(f"Embedding definition '{args.embedding_definition_name}' not found.")
# 2. Generate embedding vector directly using the same logic as find_similar_objects_by_embedding
# This approach mirrors the Python client's successful method
logger.info(f"Generating embedding vector for text using model '{embedding_def.embedding_model}'")
# Get the embedding model function
# Strip 'huggingface/' prefix if present, as LanceDB registry expects just the model name
model_identifier = embedding_def.embedding_model
if model_identifier.startswith('huggingface/'):
model_identifier = model_identifier[len('huggingface/'):]
logger.info(f"Stripped 'huggingface/' prefix from model identifier: '{embedding_def.embedding_model}' -> '{model_identifier}'")
logger.info(f"About to load embedding model with identifier: '{model_identifier}'")
embedding_model_func = gb._db_manager._connection_helper.lancedb_adapter.get_embedding_model(
model_identifier,
)
# Generate embedding using compute_query_embeddings (same as Python client)
raw_query_embeddings = embedding_model_func.compute_query_embeddings([args.text_to_embed])
if not raw_query_embeddings:
logger.error(f"Model '{embedding_def.embedding_model}' returned empty list for text.")
raise GrizabellaException(f"Model {embedding_def.embedding_model} returned empty list for text.")
raw_query_vector = raw_query_embeddings[0]
# Convert to list if it's a numpy array
if hasattr(raw_query_vector, "tolist"): # Handles numpy array
final_query_vector = raw_query_vector.tolist()
elif isinstance(raw_query_vector, list):
final_query_vector = raw_query_vector
else:
logger.error(f"Unexpected query vector type from model '{embedding_def.embedding_model}': {type(raw_query_vector)}")
raise GrizabellaException(f"Unexpected query vector type from model {embedding_def.embedding_model}")
# Validate dimensions (temporarily disabled for debugging)
logger.info(f"Generated embedding vector with {len(final_query_vector)} dimensions. ED specifies {embedding_def.dimensions} dimensions.")
if embedding_def.dimensions and len(final_query_vector) != embedding_def.dimensions:
logger.warning(
f"Query vector dim ({len(final_query_vector)}) does not match ED "
f"'{embedding_def.name}' dim ({embedding_def.dimensions}). Continuing anyway."
)
# raise GrizabellaException(msg) # Temporarily disabled
logger.info(f"Successfully generated embedding vector with dimension {len(final_query_vector)}")
# Debug: Log what we're about to return
debug_return_value = {"vector": final_query_vector}
logger.info(f"MCP get_embedding_vector_for_text returning: type={type(debug_return_value)}, vector_type={type(debug_return_value['vector'])}, vector_length={len(debug_return_value['vector'])}")
logger.info(f"MCP get_embedding_vector_for_text return value preview: {debug_return_value['vector'][:5]}...")
# Return as a plain dict to ensure MCP serialization works correctly
return debug_return_value
except Exception as e:
logger.error(f"Failed to generate embedding vector: {e}", exc_info=True)
raise GrizabellaException(f"Failed to generate embedding vector: {e}") from e
|
mcp_get_incoming_relations(object_id, type_name, relation_type_name=None)
async
Retrieves all incoming relations to a specific object.
Source code in grizabella/mcp/server.py
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707 | @app.tool(
name="get_incoming_relations",
description=(
"Retrieves all incoming relations to a specific object.\n\n"
"Example:\n"
"To get all incoming relations to Jane Doe's 'Person' object:\n"
'{\n'
' "args": {\n'
' "object_id": "jane_doe_456",\n'
' "type_name": "Person"\n'
' }\n'
'}'
),
)
async def mcp_get_incoming_relations(
object_id: str, type_name: str, relation_type_name: Optional[str] = None,
) -> list[RelationInstance]:
"""Retrieves all incoming relations to a specific object.
"""
# ctx: ToolContext,
try:
gb = get_grizabella_client()
return gb.get_incoming_relations(
object_id=object_id,
type_name=type_name,
relation_type_name=relation_type_name,
)
except GrizabellaException as e:
msg = f"MCP: Error getting incoming relations for object '{object_id}': {e}"
raise GrizabellaException(msg) from e
except Exception as e: # pylint: disable=broad-except
msg = f"MCP: Unexpected error getting incoming relations for object '{object_id}': {e}"
raise Exception(msg) from e
|
mcp_get_outgoing_relations(object_id, type_name, relation_type_name=None)
async
Retrieves all outgoing relations from a specific object.
Source code in grizabella/mcp/server.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672 | @app.tool(
name="get_outgoing_relations",
description=(
"Retrieves all outgoing relations from a specific object.\n\n"
"Example:\n"
"To get all outgoing relations from John Doe's 'Person' object:\n"
'{\n'
' "args": {\n'
' "object_id": "john_doe_123",\n'
' "type_name": "Person"\n'
' }\n'
'}'
),
)
async def mcp_get_outgoing_relations(
object_id: str, type_name: str, relation_type_name: Optional[str] = None,
) -> list[RelationInstance]:
"""Retrieves all outgoing relations from a specific object.
"""
# ctx: ToolContext,
try:
gb = get_grizabella_client()
return gb.get_outgoing_relations(
object_id=object_id,
type_name=type_name,
relation_type_name=relation_type_name,
)
except GrizabellaException as e:
msg = f"MCP: Error getting outgoing relations for object '{object_id}': {e}"
raise GrizabellaException(msg) from e
except Exception as e: # pylint: disable=broad-except
msg = f"MCP: Unexpected error getting outgoing relations for object '{object_id}': {e}"
raise Exception(msg) from e
|
mcp_search_similar_objects(object_id, type_name, n_results=5, search_properties=None)
async
Searches for objects that are semantically similar to a given object, based on embeddings of their properties.
Source code in grizabella/mcp/server.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778 | @app.tool(
name="search_similar_objects",
description=(
"Searches for objects that are semantically similar to a given object, based on embeddings "
"of their properties. Note: This feature is not yet fully implemented.\n\n"
"Example:\n"
"To find 5 objects similar to John Doe's 'Person' object:\n"
'{\n'
' "args": {\n'
' "object_id": "john_doe_123",\n'
' "type_name": "Person",\n'
' "n_results": 5\n'
' }\n'
'}'
),
)
async def mcp_search_similar_objects(
object_id: str,
type_name: str,
n_results: int = 5,
search_properties: Optional[list[str]] = None,
) -> list[tuple[ObjectInstance, float]]:
"""Searches for objects that are semantically similar to a given object, based on embeddings of their properties.
"""
# ctx: ToolContext,
try:
gb = get_grizabella_client()
# The Grizabella client's search_similar_objects currently raises NotImplementedError.
# We must call it to respect the interface, but handle the expected error.
# If it were implemented, results would be List[Tuple[ObjectInstance, float]].
# For now, to satisfy Pylint and type checkers if the method were to return,
# we can assign and then immediately handle the expected NotImplementedError.
# However, a cleaner approach is to directly call and handle.
# Attempt the call and handle NotImplementedError specifically.
# Other GrizabellaExceptions or general Exceptions will be caught below.
try:
# This line will raise NotImplementedError based on current client.py
results: list[
tuple[ObjectInstance, float]
] = gb.search_similar_objects(
object_id=object_id,
type_name=type_name,
n_results=n_results,
search_properties=search_properties,
)
return results # This line will not be reached if NotImplementedError is raised
except NotImplementedError as nie:
# Specific handling for the known unimplemented feature.
# Raising a general Exception here for the MCP layer is acceptable to signal this state.
msg = f"MCP: search_similar_objects feature is not yet implemented in the Grizabella client: {nie}"
raise Exception(msg) from nie
except GrizabellaException as e:
# Handle other Grizabella-specific errors, re-raise as GrizabellaException
msg = f"MCP: Error searching similar objects for '{object_id}': {e}"
raise GrizabellaException(msg) from e
except Exception as e: # pylint: disable=broad-except
# Handle any other unexpected errors, re-raise as general Exception
msg = f"MCP: Unexpected error searching similar objects for '{object_id}': {e}"
raise Exception(msg) from e
|
shutdown_handler(signum, frame)
Handle shutdown signals gracefully.
Source code in grizabella/mcp/server.py
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005 | def shutdown_handler(signum, frame):
"""Handle shutdown signals gracefully."""
import sys
try:
print(f"Received signal {signum}, shutting down...", file=sys.stderr)
except Exception:
# sys.stderr might not be available during shutdown
# Using stderr even for the fallback to avoid stdout contamination
try:
print(f"Received signal {signum}, shutting down...", file=sys.stderr)
except Exception:
# If even stderr fails, just use logger
pass
logger.info(f"Received signal {signum}, shutting down...")
# Perform forceful cleanup during signal handling to avoid async issues
try:
# Stop monitoring first (sync)
stop_global_monitoring()
# Force cleanup DB managers without async operations
from grizabella.core.db_manager_factory import _db_manager_factory
if _db_manager_factory:
with _db_manager_factory._lock:
_db_manager_factory._instances.clear()
_db_manager_factory._reference_counts.clear()
# Force cleanup connection pools without async operations
from grizabella.core.connection_pool import _connection_pool_manager
if _connection_pool_manager:
_connection_pool_manager._shutdown = True
if _connection_pool_manager._cleanup_thread and _connection_pool_manager._cleanup_thread.is_alive():
_connection_pool_manager._cleanup_thread.join(timeout=1)
with _connection_pool_manager._lock:
_connection_pool_manager._connection_count.clear()
logger.info("Force cleanup completed during shutdown")
except Exception as e:
logger.error(f"Error during force cleanup: {e}")
# Exit immediately
import sys
sys.exit(0)
|