Skip to content

API Reference

Connection

duffy.connect(dsn, *, graph=None, **kwargs) → DuffyDriver

Connect to PostgreSQL with AGE and pgvector support.

Parameter Type Description
dsn str PostgreSQL connection string
graph str \| None Default graph name. Created if it doesn't exist
**kwargs Additional arguments passed to psycopg.connect()
db = duffy.connect("postgresql://localhost:5432/mydb", graph="my_graph")

duffy.connect_pool(dsn, *, graph=None, min_size=2, max_size=10, **kwargs) → DuffyPool

Create a connection pool for multi-threaded applications.

Parameter Type Default Description
dsn str "" PostgreSQL connection string
graph str \| None None Default graph name
min_size int 2 Minimum pool connections
max_size int 10 Maximum pool connections
**kwargs Additional arguments passed to psycopg.connect()

Requires psycopg_pool (pip install psycopg_pool).

pool = duffy.connect_pool("postgresql://...", graph="g", min_size=2, max_size=10)

with pool.connection() as db:
    result = db.cypher("MATCH (n) RETURN n LIMIT 10")

pool.close()

DuffyDriver

The main entry point for all graph and vector operations. Returned by duffy.connect().

Query methods

cypher(query, *, params=None, columns=None, graph=None) → Result

Execute a Cypher query.

Parameter Type Description
query str Cypher query string
params tuple \| None Parameters for %s placeholder substitution
columns list[str] \| None Explicit column names. Auto-extracted from RETURN if None
graph str \| None Graph name override
result = db.cypher("MATCH (n:Person {name: %s}) RETURN n.name, n.age", params=("Alice",))

sql(query, params=None) → Result

Execute a raw SQL query.

Parameter Type Description
query str SQL query string
params tuple \| None Query parameters
result = db.sql("SELECT * FROM my_table WHERE id = %s", (42,))

execute(builder, *, graph=None) → Result

Execute a query builder (Match, Create, or Merge) and return a Result.

q = Match("Person", alias="p").where(p__age__gt=30).return_("p.name", "p.age")
result = db.execute(q)

vector_search(table, column, query_vector, *, k=10, metric="cosine", where=None, columns=None) → Result

Search for similar vectors using pgvector.

Parameter Type Default Description
table str Table containing the vector column
column str Name of the vector column
query_vector list[float] Query vector
k int 10 Number of nearest neighbors
metric str "cosine" "cosine", "l2", or "inner_product"
where str \| None None SQL WHERE clause (without the WHERE keyword)
columns list[str] \| None None Columns to return. Defaults to all
results = db.vector_search("documents", "embedding", query_vec, k=10, metric="cosine")

create_vector_index(table, column, *, method="hnsw", metric="cosine", name=None, **kwargs)

Create a vector index on a table column.

Parameter Type Default Description
table str Table name
column str Vector column name
method str "hnsw" "hnsw" or "ivfflat"
metric str "cosine" Distance metric
name str \| None None Optional index name
**kwargs Index parameters (m, ef_construction, lists)

hybrid_search(*, cypher=None, cypher_params=None, vector_table, vector_column, query_vector, k=10, metric="cosine", join_column="id", graph=None, mode="graph_then_vector") → Result

Combine Cypher graph traversal with pgvector similarity search.

Parameter Type Default Description
cypher str \| None None Cypher query producing candidate rows
cypher_params tuple \| None None Parameters for the Cypher query
vector_table str Table with vector embeddings
vector_column str Vector column name
query_vector list[float] Query vector
k int 10 Number of results
metric str "cosine" Distance metric
join_column str "id" Column to join graph results with vector table
graph str \| None None Graph name override
mode str "graph_then_vector" "graph_then_vector" or "vector_then_graph"
results = db.hybrid_search(
    cypher="MATCH (p:Paper)-[:CITES]->(cited) RETURN cited",
    vector_table="papers",
    vector_column="abstract_embedding",
    query_vector=query_vec,
    k=10,
)

Schema

get_schema(graph=None) → dict

Get schema information for a graph. Returns a dict with keys: vertex_labels, edge_labels, vector_indexes.

Graph management

set_graph(graph_name)

Switch to a different graph, creating it if needed.

list_graphs() → list[str]

List all AGE graphs in the database.

drop_graph(graph_name)

Drop a graph and all its data.

Transactions

commit()

Commit the current transaction.

rollback()

Roll back the current transaction.

transaction() → context manager

Context manager that auto-commits on success and rolls back on exception.

with db.transaction():
    db.cypher("CREATE (:Person {name: 'Alice'})")
    db.cypher("CREATE (:Person {name: 'Bob'})")

Bulk operations

load_nodes(df, label, *, graph=None, id_col=None, property_cols=None, batch_size=1000) → int

Bulk-load nodes from a pandas DataFrame. Returns the number of nodes created.

Parameter Type Default Description
df DataFrame Source data
label str Vertex label
id_col str \| None None Column to use as node ID property
property_cols list[str] \| None None Columns to include as properties. All if None
batch_size int 1000 Rows per batch

load_edges(df, rel_type, source_col, source_label, target_col, target_label, *, source_key=None, target_key=None, graph=None, property_cols=None, batch_size=1000) → int

Bulk-load edges from a pandas DataFrame. Returns the number of edges created.

Parameter Type Default Description
df DataFrame Source data
rel_type str Relationship type
source_col str Column with source node identifiers
source_label str Source vertex label
target_col str Column with target node identifiers
target_label str Target vertex label
source_key str \| None None Property name to match source nodes on (default: column name)
target_key str \| None None Property name to match target nodes on (default: column name)
property_cols list[str] \| None None Columns to include as edge properties
batch_size int 1000 Rows per batch

export_nodes(label, *, graph=None, properties=None, limit=None) → DataFrame

Export nodes to a pandas DataFrame.

export_edges(rel_type, *, graph=None, properties=None, limit=None) → DataFrame

Export edges to a pandas DataFrame.

Graph algorithms

All algorithms work by converting the AGE graph to NetworkX internally.

to_networkx(*, graph=None, labels=None, rel_types=None, directed=True, node_properties=True, edge_properties=True) → nx.DiGraph | nx.Graph

Convert the AGE graph to a NetworkX graph object.

Parameter Type Default Description
labels list[str] \| None None Vertex labels to include. None = all
rel_types list[str] \| None None Edge types to include. None = all
directed bool True Return DiGraph vs Graph
node_properties bool True Include vertex properties as node attributes
edge_properties bool True Include edge properties as edge attributes

pagerank(*, graph=None, label=None, rel_type=None, damping=0.85) → DataFrame

Compute PageRank over the graph.

communities(*, graph=None, method="louvain", label=None, rel_type=None) → DataFrame

Detect communities. Methods: "louvain".

shortest_path(source, target, *, graph=None, weight=None, label=None, rel_type=None) → list[int]

Find shortest path between two node IDs. Returns list of node IDs.

centrality(*, graph=None, measure="degree", label=None, rel_type=None) → DataFrame

Compute centrality measures. Measures: "degree", "betweenness", "closeness".

Connection

connection → psycopg.Connection

The underlying psycopg connection.

graph → str | None

The default graph name.

close()

Close the database connection. Also usable as a context manager (with duffy.connect(...) as db:).


DuffyPool

Connection pool for multi-threaded / web applications. Returned by duffy.connect_pool().

connection() → context manager

Get a DuffyDriver backed by a pooled connection.

with pool.connection() as db:
    db.cypher("MATCH (n) RETURN n")

close()

Close the pool and all connections.


Result

Container for query results. Returned by cypher(), sql(), vector_search(), and hybrid_search().

Properties

Property Type Description
columns list[str] Column names
records list[Record] List of Record objects

Methods

to_df(*, expand=False) → DataFrame

Convert to a pandas DataFrame. With expand=True, Vertex/Edge objects are flattened into separate columns (e.g., n._id, n._label, n.name).

to_dicts(*, expand=False) → list[dict]

Convert to a list of plain Python dicts.

to_arrow(*, expand=False) → pyarrow.Table

Convert to a PyArrow Table.

single() → Record

Return the single record. Raises ValueError if count != 1.

Iteration

Result supports len(), bool(), and iteration over Record objects.

Record

A single row with keyed and positional access:

record = result.single()
record["n.name"]   # keyed access
record[0]          # positional access
record.keys()      # column names
record.values()    # row values

Query Builders

Programmatic construction of Cypher queries. All builders produce (cypher_string, params_tuple) via .build(), or can be passed directly to db.execute(builder).

Match(label=None, alias=None, **properties)

Build a MATCH query with fluent chaining.

q = (
    Match("Person", alias="p", name="Alice")
    .rel("KNOWS", alias="r")
    .node("Person", alias="q")
    .where(p__age__gt=30)
    .return_("p.name", "q.name")
    .order_by("p.name")
    .limit(10)
)
result = db.execute(q)

Chaining methods:

Method Description
.rel(rel_type, alias=None, direction="out", **props) Add a relationship pattern
.node(label=None, alias=None, **props) Add a node pattern
.where(raw=None, **kwargs) Add WHERE conditions (Django-style: alias__prop__op=value)
.return_(*exprs) Set RETURN expressions
.return_all() RETURN *
.with_(*exprs) Add WITH clause
.set(alias, **properties) Add SET clause
.delete(*aliases, detach=False) Add DELETE clause
.order_by(*exprs, desc=False) Add ORDER BY
.limit(n) Add LIMIT
.skip(n) Add SKIP
.build() → (str, tuple) Build the Cypher string and params

WHERE operators: gt, gte, lt, lte, ne, contains, starts_with, ends_with, in, is_null, is_not_null.

Create(label, alias="n", **properties)

Build a CREATE query.

q = Create("Person", name="Alice", age=30).return_("n")
result = db.execute(q)

Chaining methods:

Method Description
.rel(rel_type, target_label, direction="out", target_alias=None, **props) Chain a relationship + target node
.return_(*exprs) Set RETURN expressions

Merge(label, alias="n", **match_properties)

Build a MERGE query with optional ON CREATE SET / ON MATCH SET.

q = (
    Merge("Person", name="Alice")
    .on_create(created_at="2024-01-01")
    .on_match(last_seen="2024-06-01")
    .set(active=True)
    .return_("n")
)
result = db.execute(q)

Chaining methods:

Method Description
.on_create(**properties) Set properties only when creating
.on_match(**properties) Set properties only when matching
.set(**properties) Set properties always
.return_(*exprs) Set RETURN expressions

Types

Frozen dataclasses returned by Cypher queries when vertices, edges, or paths appear in results.

Vertex

Field Type Description
id int AGE vertex ID
label str Vertex label
properties dict[str, Any] Property map

Supports vertex["prop_name"] for property access.

Edge

Field Type Description
id int AGE edge ID
label str Edge label (relationship type)
start_id int Source vertex ID
end_id int Target vertex ID
properties dict[str, Any] Property map

Supports edge["prop_name"] for property access.

Path

An alternating sequence of vertices and edges.

Property/Method Type Description
entities tuple[Vertex \| Edge, ...] All entities in order
vertices list[Vertex] Just the vertices
edges list[Edge] Just the edges

Supports len(), indexing, and iteration.