CREATE OR REPLACE TABLE SALES_DATA (
SALE_DATE DATE,
REGION STRING,
PRODUCT STRING,
QUANTITY NUMBER,
AMOUNT NUMBER
);
INSERT INTO SALES_DATA (SALE_DATE, REGION, PRODUCT, QUANTITY, AMOUNT) VALUES
('2024-01-05', 'US', 'Laptop', 10, 15000),
('2024-01-15', 'US', 'Mouse', 200, 4000),
('2024-02-01', 'US', 'Keyboard', 150, 9000),
('2024-03-20', 'US', 'Monitor', 50, 25000),
('2024-03-28', 'EU', 'Laptop', 20, 30000),
('2024-03-30', 'US', 'Laptop', 5, 7500);
import snowflake.connector
conn = snowflake.connector.connect(
user = "MY_USER",
password = "MY_PASSWORD",
account = "MY_ACCOUNT",
warehouse = "MY_WH",
database = "MY_DB",
schema = "PUBLIC",
)
def ask_table_question(table_name: str, question: str) -> str:
# Safely wrap the table name in quotes to avoid SQL errors
table_name = table_name.upper()
sql = f"""
SELECT SNOWFLAKE.CORTEX.COMPLETE(
'gpt-4.1-mini',
OBJECT_CONSTRUCT(
'prompt',
'You are a data analyst for the data in the {table_name} table. '
|| 'Use only the data from {table_name} to answer. Question: ' || ?
)
) AS ANSWER;
"""
cur = conn.cursor()
cur.execute(sql, (question,))
row = cur.fetchone()
return row[0]["content"][0]["text"]
print(ask_table_question(
"SALES_DATA",
"What were total sales for Q1 2024 in the US?"
))
ask_table_question("SALES_DATA", "Show US laptop sales in January.")
ask_table_question("ORDERS_2024", "Which day had the highest revenue?")
WITH QUESTION AS (
SELECT
'What is the termination notice period in the ACME contract?' AS Q,
SNOWFLAKE.CORTEX.EMBED_TEXT(
'text-embedding-model',
'What is the termination notice period in the ACME contract?'
) AS QVEC
),
RANKED AS (
SELECT
c.CONTRACT_ID,
c.FILE_NAME,
c.TEXT,
VECTOR_DISTANCE('COSINE', c.VECTOR, q.QVEC) AS DIST
FROM CONTRACT_EMBEDDINGS c, QUESTION q
ORDER BY DIST
LIMIT 5
)
SELECT SNOWFLAKE.CORTEX.COMPLETE(
'gpt-4.1-mini',
OBJECT_CONSTRUCT(
'prompt',
'You are a contract analyst. Using ONLY the context below, answer the question.' ||
'\\n\\nCONTEXT:\\n' ||
LISTAGG(TEXT, '\\n\\n') || '\\n\\nQUESTION: ' ||
(SELECT Q FROM QUESTION)
)
) AS ANSWER
FROM RANKED;
Five stages: ingest (load source documents into a Snowflake stage or table), parse + chunk (extract text, split into retrieval-sized passages), embed (call SNOWFLAKE.CORTEX.EMBED_TEXT_768 to produce vectors stored in a VECTOR column or indexed by Cortex Search), retrieve (vector similarity or hybrid search at query time), and generate (pass top-k chunks plus the question to CORTEX.COMPLETE with a grounding prompt). Each stage is plain SQL or Snowpark, so the whole pipeline is observable, governable, and auditable through Snowflake's existing tooling.
Place a STREAM on the source documents table to capture inserts, updates, and deletes; create a TASK on a schedule (e.g., WHEN SYSTEM$STREAM_HAS_DATA('docs_stream')) that consumes the stream, re-chunks and re-embeds only the changed rows, and merges them into the embeddings table. Cortex Search has its own auto-refresh based on the underlying table's change tracking — set TARGET_LAG = '5 minutes' for near-real-time. The stream/task pattern is right for DIY embedding pipelines; the auto-refresh is right when Cortex Search owns the index.
Materialize when you embed once and serve many — store one row per chunk with the precomputed vector and metadata, which is the standard approach. A view-based pattern (computing embeddings or transformations at query time) only makes sense for tiny corpora or when freshness requirements outpace any refresh schedule, and it's prohibitively expensive at scale because you re-embed on every query. Cortex Search itself is a managed materialized index — you don't see the underlying table but it behaves like one. Pick materialized embeddings; use dynamic tables when you want declarative refresh-managed materialization.
Track three layers: cost via SNOWFLAKE.ACCOUNT_USAGE.CORTEX_FUNCTIONS_USAGE_HISTORY and CORTEX_SEARCH_SERVING_USAGE_HISTORY (token counts, credits per function); freshness via task history (TASK_HISTORY()) and the Cortex Search service's refreshed_on timestamp, alarming when lag exceeds the SLA; and quality via a labeled eval set replayed nightly that scores recall@k and answer correctness with LLM-as-judge. Persist every production query, retrieved chunks, and final answer to an audit table so you can replay any incident.
Three options: (1) one Cortex Search service per tenant — strongest isolation, simple security, but operational overhead grows with tenant count; (2) shared service with a tenant_id metadata column and a row access policy that filters by current role — single service to operate, governance is enforced by Snowflake, scales to many small tenants; (3) shared service with application-level filtering — only acceptable when tenants are not security boundaries. For B2B SaaS with strong isolation requirements, prefer the row access policy approach because it survives application bugs.
Version the embedding column: store vector_v1 from the old model and vector_v2 from the new model side by side, with a metadata column pointing to the active version per row. Backfill vector_v2 in batches with a Snowpark job, evaluate offline against a labeled set, then flip the retrieval query to read vector_v2 through a view. Keep vector_v1 until you've shipped and watched metrics for a week — instantaneous rollback is a view definition change. For Cortex Search, create a parallel service against the new model and dual-write queries during validation; cut over by changing the service name in the app config.