From a504ab818734393c689369d03adf66c5fd38ad87 Mon Sep 17 00:00:00 2001 From: Sreeram Thoom Date: Tue, 25 Nov 2025 08:31:14 -0600 Subject: [PATCH] Adding UC ABAC policy agent base code as is. --- .../utils/abac_policy_agent/README.md | 212 +++++++ uc-quickstart/utils/abac_policy_agent/app.py | 493 +++++++++++++++ .../utils/abac_policy_agent/app.yaml | 11 + .../utils/abac_policy_agent/driver.py | 456 ++++++++++++++ .../utils/abac_policy_agent/messages.py | 124 ++++ .../abac_policy_agent/model_serving_utils.py | 267 ++++++++ .../utils/abac_policy_agent/requirements.txt | 2 + .../sample_udfs_and_abac_policies.sql | 577 ++++++++++++++++++ .../uc_demo_schema_setup.sql | 407 ++++++++++++ .../abac_policy_agent/uc_function_tools.sql | 60 ++ .../utils/abac_policy_agent/uc_tags_setup.sql | 118 ++++ 11 files changed, 2727 insertions(+) create mode 100644 uc-quickstart/utils/abac_policy_agent/README.md create mode 100644 uc-quickstart/utils/abac_policy_agent/app.py create mode 100644 uc-quickstart/utils/abac_policy_agent/app.yaml create mode 100644 uc-quickstart/utils/abac_policy_agent/driver.py create mode 100644 uc-quickstart/utils/abac_policy_agent/messages.py create mode 100644 uc-quickstart/utils/abac_policy_agent/model_serving_utils.py create mode 100644 uc-quickstart/utils/abac_policy_agent/requirements.txt create mode 100644 uc-quickstart/utils/abac_policy_agent/sample_udfs_and_abac_policies.sql create mode 100644 uc-quickstart/utils/abac_policy_agent/uc_demo_schema_setup.sql create mode 100644 uc-quickstart/utils/abac_policy_agent/uc_function_tools.sql create mode 100644 uc-quickstart/utils/abac_policy_agent/uc_tags_setup.sql diff --git a/uc-quickstart/utils/abac_policy_agent/README.md b/uc-quickstart/utils/abac_policy_agent/README.md new file mode 100644 index 00000000..10efd4fb --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/README.md @@ -0,0 +1,212 @@ +# šŸ›”ļø ABAC Policy Assistant + +An AI-powered Unity Catalog Attribute-Based Access Control (ABAC) policy generation assistant built with Databricks Agent Framework and Streamlit. + +## šŸš€ Features + +- **Intelligent Table Analysis** - Automatically examines Unity Catalog table structures, columns, and metadata +- **ABAC Policy Generation** - Creates ROW FILTER and COLUMN MASK policy recommendations +- **Tag-Based Conditions** - Generates MATCH COLUMNS and FOR TABLES conditions using `hasTag()` and `hasTagValue()` +- **Real-time Streaming** - Provides streaming responses with tool call visualization +- **Professional UI** - Clean, Databricks-branded interface with responsive design +- **Unity Catalog Integration** - Direct integration with UC functions for metadata retrieval + +## šŸ—ļø Architecture + +``` +ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” +│ Streamlit Chat UI │ +ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤ +│ Databricks Agent Framework │ +ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤ +│ Unity Catalog Tools │ LLM Endpoint │ +│ • describe_extended_table │ • Claude Sonnet 4 │ +│ • get_table_tags │ • Streaming │ +│ • get_column_tags │ • Tool Calling │ +│ • list_row_filter_column_masking │ │ +│ • list_uc_tables │ │ +ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤ +│ Unity Catalog Metastore │ +ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ +``` + +## šŸ› ļø Prerequisites + +- Databricks workspace with Unity Catalog enabled +- Model serving endpoint with agent capabilities +- Python 3.8+ +- Required Unity Catalog functions deployed + +## šŸ“¦ Installation + +1. **Clone the repository** + ```bash + git clone + cd e2e-chatbot-app + ``` + +2. **Install dependencies** + ```bash + pip install -r requirements.txt + ``` + +3. **Set up Unity Catalog functions** + + Deploy the following functions to your Unity Catalog: + - `enterprise_gov.gov_admin.describe_extended_table` + - `enterprise_gov.gov_admin.get_table_tags` + - `enterprise_gov.gov_admin.get_column_tags` + - `enterprise_gov.gov_admin.list_row_filter_column_masking` + - `enterprise_gov.gov_admin.list_uc_tables` + +4. **Configure environment variables** + ```bash + export SERVING_ENDPOINT="your-agent-endpoint-name" + ``` + +## šŸš€ Usage + +### Local Development + +```bash +streamlit run app.py +``` + +### Databricks Apps Deployment + +1. **Create app.yaml** (already configured) + ```yaml + command: ["streamlit", "run", "app.py"] + env: + - name: STREAMLIT_BROWSER_GATHER_USAGE_STATS + value: "false" + - name: "SERVING_ENDPOINT" + valueFrom: "serving-endpoint" + ``` + +2. **Deploy to Databricks** + ```bash + databricks apps create your-app-name + databricks apps deploy your-app-name --source-dir . + ``` + +## šŸ’¬ Example Queries + +Ask the assistant natural language questions about your Unity Catalog tables: + +``` +"Suggest ABAC policies for enterprise_gov.hr_finance.customers" + +"What table-level access controls should I implement for sensitive customer data?" + +"Generate tag-based ABAC policies with MATCH conditions for the customers table" + +"Analyze my table schema and recommend governance policies" + +"What are the recommended ABAC FOR table conditions for PII data?" +``` + +## šŸ”§ Configuration + +### Agent Configuration (agent.py) + +- **LLM Endpoint**: Configure in `LLM_ENDPOINT_NAME` +- **System Prompt**: Customize ABAC policy generation behavior +- **UC Tools**: Add/remove Unity Catalog functions as needed +- **Vector Search**: Optional integration for document retrieval + +### UI Configuration (app.py) + +- **Databricks Branding**: Colors and styling in CSS +- **Page Layout**: Streamlit page configuration +- **Chat Interface**: Message rendering and interaction flow + +## šŸ“‹ Available Tools + +| Tool Name | Description | +|-----------|-------------| +| `describe_extended_table` | Get detailed table schema and metadata | +| `get_table_tags` | Retrieve table-level tag information | +| `get_column_tags` | Retrieve column-level tag information | +| `list_row_filter_column_masking` | Review existing ABAC policies | +| `list_uc_tables` | Discover tables in catalogs and schemas | + +## šŸ” ABAC Policy Types Supported + +- **ROW FILTER** policies for row-level security +- **COLUMN MASK** policies for column-level protection +- **Tag-based conditions** using `hasTag()` and `hasTagValue()` +- **Multi-table policies** with FOR TABLES conditions +- **Principal-specific** policies with TO/EXCEPT clauses + +## šŸ“Š Example Policy Output + +```sql +CREATE POLICY hide_sensitive_customers +ON SCHEMA enterprise_gov.hr_finance +COMMENT 'Hide rows with sensitive customer data from general analysts' +ROW FILTER filter_sensitive_data +TO general_analysts +FOR TABLES +WHEN hasTag('sensitivity_level') +MATCH COLUMNS + hasTagValue('data_classification', 'sensitive') AS sensitive_col +USING COLUMNS (sensitive_col); +``` + +## šŸ” Troubleshooting + +### Common Issues + +1. **Endpoint Connection Errors** + - Verify `SERVING_ENDPOINT` environment variable + - Check model serving endpoint permissions + - Ensure endpoint supports agent/chat completions + +2. **Unity Catalog Function Errors** + - Verify UC functions are deployed and accessible + - Check function permissions (CAN_EXECUTE) + - Validate function signatures match expected format + +3. **UI Rendering Issues** + - Clear browser cache + - Check Streamlit version compatibility + - Verify CSS styling in different browsers + +### Debug Mode + +Enable debug logging: +```python +import logging +logging.basicConfig(level=logging.DEBUG) +``` + +## šŸ¤ Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Add tests if applicable +5. Submit a pull request + +## šŸ“„ License + +This project is licensed under the MIT License - see the LICENSE file for details. + +## šŸ†˜ Support + +For support and questions: +- Check Databricks documentation: [Agent Framework](https://docs.databricks.com/generative-ai/agent-framework/) +- Review Unity Catalog ABAC docs: [ABAC Policies](https://docs.databricks.com/data-governance/unity-catalog/abac/) +- Open an issue in this repository + +## šŸ“š Additional Resources + +- [Databricks Agent Framework](https://docs.databricks.com/generative-ai/agent-framework/) +- [Unity Catalog ABAC Tutorial](https://docs.databricks.com/data-governance/unity-catalog/abac/tutorial) +- [Streamlit Documentation](https://docs.streamlit.io/) +- [MLflow Agent Evaluation](https://docs.databricks.com/generative-ai/agent-evaluation/) + +--- + +Built with ā¤ļø using Databricks Agent Framework diff --git a/uc-quickstart/utils/abac_policy_agent/app.py b/uc-quickstart/utils/abac_policy_agent/app.py new file mode 100644 index 00000000..41e539a9 --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/app.py @@ -0,0 +1,493 @@ +import logging +import os +import streamlit as st +from model_serving_utils import ( + endpoint_supports_feedback, + query_endpoint, + query_endpoint_stream, + _get_endpoint_task_type, +) +from collections import OrderedDict +from messages import UserMessage, AssistantResponse, render_message + +# Configure page +st.set_page_config( + page_title="ABAC Policy Assistant", + page_icon="šŸ›”ļø", + layout="centered", + initial_sidebar_state="auto" +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SERVING_ENDPOINT = os.getenv('SERVING_ENDPOINT') +assert SERVING_ENDPOINT, \ + ("Unable to determine serving endpoint to use for chatbot app. If developing locally, " + "set the SERVING_ENDPOINT environment variable to the name of your serving endpoint. If " + "deploying to a Databricks app, include a serving endpoint resource named " + "'serving_endpoint' with CAN_QUERY permissions, as described in " + "https://docs.databricks.com/aws/en/generative-ai/agent-framework/chat-app#deploy-the-databricks-app") + +ENDPOINT_SUPPORTS_FEEDBACK = endpoint_supports_feedback(SERVING_ENDPOINT) + +def reduce_chat_agent_chunks(chunks): + """ + Reduce a list of ChatAgentChunk objects corresponding to a particular + message into a single ChatAgentMessage + """ + deltas = [chunk.delta for chunk in chunks] + first_delta = deltas[0] + result_msg = first_delta + msg_contents = [] + + # Accumulate tool calls properly + tool_call_map = {} # Map call_id to tool call for accumulation + + for delta in deltas: + # Handle content + if delta.content: + msg_contents.append(delta.content) + + # Handle tool calls + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + call_id = getattr(tool_call, 'id', None) + tool_type = getattr(tool_call, 'type', "function") + function_info = getattr(tool_call, 'function', None) + if function_info: + func_name = getattr(function_info, 'name', "") + func_args = getattr(function_info, 'arguments', "") + else: + func_name = "" + func_args = "" + + if call_id: + if call_id not in tool_call_map: + # New tool call + tool_call_map[call_id] = { + "id": call_id, + "type": tool_type, + "function": { + "name": func_name, + "arguments": func_args + } + } + else: + # Accumulate arguments for existing tool call + existing_args = tool_call_map[call_id]["function"]["arguments"] + tool_call_map[call_id]["function"]["arguments"] = existing_args + func_args + + # Update function name if provided + if func_name: + tool_call_map[call_id]["function"]["name"] = func_name + + # Handle tool call IDs (for tool response messages) + if hasattr(delta, 'tool_call_id') and delta.tool_call_id: + result_msg = result_msg.model_copy(update={"tool_call_id": delta.tool_call_id}) + + # Convert tool call map back to list + if tool_call_map: + accumulated_tool_calls = list(tool_call_map.values()) + result_msg = result_msg.model_copy(update={"tool_calls": accumulated_tool_calls}) + + result_msg = result_msg.model_copy(update={"content": "".join(msg_contents)}) + return result_msg + + + +# --- Init state --- +if "history" not in st.session_state: + st.session_state.history = [] + +# Databricks-themed CSS styling +st.markdown(""" + +""", unsafe_allow_html=True) + +# Clean, professional header +st.markdown(""" +
+

šŸ›”ļø ABAC Policy Assistant

+

Generate Unity Catalog access control policies with AI

+
+""", unsafe_allow_html=True) + +# Clean sidebar +with st.sidebar: + st.markdown("### šŸ“‹ Quick Guide") + + st.markdown("**How to use:**") + st.markdown(""" + 1. Enter a Unity Catalog table name + 2. Ask for policy recommendations + 3. Review the generated ABAC policies + """) + + + st.markdown("---") + + if st.button("šŸ—‘ļø Clear History"): + st.session_state.history = [] + st.rerun() + + st.markdown("---") + st.caption(f"Endpoint: {SERVING_ENDPOINT}") + +# Simple example section +if len(st.session_state.history) == 0: + st.markdown(""" +
+

Get Started

+

Ask me to analyze any Unity Catalog table and generate ABAC policies. For example:

+
+ "Suggest ABAC policies for catalog.schema.table" +
+
+ """, unsafe_allow_html=True) + + + +# --- Render chat history --- +for i, element in enumerate(st.session_state.history): + element.render(i) + +def query_endpoint_and_render(task_type, input_messages): + """Handle streaming response based on task type.""" + if task_type == "agent/v1/responses": + return query_responses_endpoint_and_render(input_messages) + elif task_type == "agent/v2/chat": + return query_chat_agent_endpoint_and_render(input_messages) + else: # chat/completions + return query_chat_completions_endpoint_and_render(input_messages) + + +def query_chat_completions_endpoint_and_render(input_messages): + """Handle ChatCompletions streaming format.""" + with st.chat_message("assistant"): + response_area = st.empty() + response_area.markdown("_Thinking..._") + + accumulated_content = "" + request_id = None + + try: + for chunk in query_endpoint_stream( + endpoint_name=SERVING_ENDPOINT, + messages=input_messages, + return_traces=ENDPOINT_SUPPORTS_FEEDBACK + ): + if "choices" in chunk and chunk["choices"]: + delta = chunk["choices"][0].get("delta", {}) + content = delta.get("content", "") + if content: + accumulated_content += content + response_area.markdown(accumulated_content) + + if "databricks_output" in chunk: + req_id = chunk["databricks_output"].get("databricks_request_id") + if req_id: + request_id = req_id + + return AssistantResponse( + messages=[{"role": "assistant", "content": accumulated_content}], + request_id=request_id + ) + except Exception: + response_area.markdown("_Ran into an error. Retrying without streaming..._") + messages, request_id = query_endpoint( + endpoint_name=SERVING_ENDPOINT, + messages=input_messages, + return_traces=ENDPOINT_SUPPORTS_FEEDBACK + ) + response_area.empty() + with response_area.container(): + for message in messages: + render_message(message) + return AssistantResponse(messages=messages, request_id=request_id) + + +def query_chat_agent_endpoint_and_render(input_messages): + """Handle ChatAgent streaming format.""" + from mlflow.types.agent import ChatAgentChunk + + with st.chat_message("assistant"): + response_area = st.empty() + response_area.markdown("_Thinking..._") + + message_buffers = OrderedDict() + request_id = None + + try: + for raw_chunk in query_endpoint_stream( + endpoint_name=SERVING_ENDPOINT, + messages=input_messages, + return_traces=ENDPOINT_SUPPORTS_FEEDBACK + ): + response_area.empty() + chunk = ChatAgentChunk.model_validate(raw_chunk) + delta = chunk.delta + message_id = delta.id + + req_id = raw_chunk.get("databricks_output", {}).get("databricks_request_id") + if req_id: + request_id = req_id + if message_id not in message_buffers: + message_buffers[message_id] = { + "chunks": [], + "render_area": st.empty(), + } + message_buffers[message_id]["chunks"].append(chunk) + + partial_message = reduce_chat_agent_chunks(message_buffers[message_id]["chunks"]) + render_area = message_buffers[message_id]["render_area"] + message_content = partial_message.model_dump_compat(exclude_none=True) + with render_area.container(): + render_message(message_content) + + messages = [] + for msg_id, msg_info in message_buffers.items(): + messages.append(reduce_chat_agent_chunks(msg_info["chunks"])) + + return AssistantResponse( + messages=[message.model_dump_compat(exclude_none=True) for message in messages], + request_id=request_id + ) + except Exception: + response_area.markdown("_Ran into an error. Retrying without streaming..._") + messages, request_id = query_endpoint( + endpoint_name=SERVING_ENDPOINT, + messages=input_messages, + return_traces=ENDPOINT_SUPPORTS_FEEDBACK + ) + response_area.empty() + with response_area.container(): + for message in messages: + render_message(message) + return AssistantResponse(messages=messages, request_id=request_id) + + +def query_responses_endpoint_and_render(input_messages): + """Handle ResponsesAgent streaming format using MLflow types.""" + from mlflow.types.responses import ResponsesAgentStreamEvent + + with st.chat_message("assistant"): + response_area = st.empty() + response_area.markdown("_Thinking..._") + + # Track all the messages that need to be rendered in order + all_messages = [] + request_id = None + + try: + for raw_event in query_endpoint_stream( + endpoint_name=SERVING_ENDPOINT, + messages=input_messages, + return_traces=ENDPOINT_SUPPORTS_FEEDBACK + ): + # Extract databricks_output for request_id + if "databricks_output" in raw_event: + req_id = raw_event["databricks_output"].get("databricks_request_id") + if req_id: + request_id = req_id + + # Parse using MLflow streaming event types, similar to ChatAgentChunk + if "type" in raw_event: + event = ResponsesAgentStreamEvent.model_validate(raw_event) + + if hasattr(event, 'item') and event.item: + item = event.item # This is a dict, not a parsed object + + if item.get("type") == "message": + # Extract text content from message if present + content_parts = item.get("content", []) + for content_part in content_parts: + if content_part.get("type") == "output_text": + text = content_part.get("text", "") + if text: + all_messages.append({ + "role": "assistant", + "content": text + }) + + elif item.get("type") == "function_call": + # Tool call + call_id = item.get("call_id") + function_name = item.get("name") + arguments = item.get("arguments", "") + + # Add to messages for history + all_messages.append({ + "role": "assistant", + "content": "", + "tool_calls": [{ + "id": call_id, + "type": "function", + "function": { + "name": function_name, + "arguments": arguments + } + }] + }) + + elif item.get("type") == "function_call_output": + # Tool call output/result + call_id = item.get("call_id") + output = item.get("output", "") + + # Add to messages for history + all_messages.append({ + "role": "tool", + "content": output, + "tool_call_id": call_id + }) + + # Update the display by rendering all accumulated messages + if all_messages: + with response_area.container(): + for msg in all_messages: + render_message(msg) + + return AssistantResponse(messages=all_messages, request_id=request_id) + except Exception: + response_area.markdown("_Ran into an error. Retrying without streaming..._") + messages, request_id = query_endpoint( + endpoint_name=SERVING_ENDPOINT, + messages=input_messages, + return_traces=ENDPOINT_SUPPORTS_FEEDBACK + ) + response_area.empty() + with response_area.container(): + for message in messages: + render_message(message) + return AssistantResponse(messages=messages, request_id=request_id) + + + + +# --- Chat input (must run BEFORE rendering messages) --- +prompt = st.chat_input("Enter a table name or ask about ABAC policies...") +if prompt: + # Get the task type for this endpoint + task_type = _get_endpoint_task_type(SERVING_ENDPOINT) + + # Add user message to chat history + user_msg = UserMessage(content=prompt) + st.session_state.history.append(user_msg) + user_msg.render(len(st.session_state.history) - 1) + + # Convert history to standard chat message format for the query methods + input_messages = [msg for elem in st.session_state.history for msg in elem.to_input_messages()] + + # Handle the response using the appropriate handler + assistant_response = query_endpoint_and_render(task_type, input_messages) + + # Add assistant response to history + st.session_state.history.append(assistant_response) diff --git a/uc-quickstart/utils/abac_policy_agent/app.yaml b/uc-quickstart/utils/abac_policy_agent/app.yaml new file mode 100644 index 00000000..d81e38a3 --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/app.yaml @@ -0,0 +1,11 @@ +command: [ + "streamlit", + "run", + "app.py" +] + +env: + - name: STREAMLIT_BROWSER_GATHER_USAGE_STATS + value: "false" + - name: "SERVING_ENDPOINT" + valueFrom: "serving-endpoint" diff --git a/uc-quickstart/utils/abac_policy_agent/driver.py b/uc-quickstart/utils/abac_policy_agent/driver.py new file mode 100644 index 00000000..dd91684c --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/driver.py @@ -0,0 +1,456 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC #Tool-calling Agent +# MAGIC +# MAGIC This is an auto-generated notebook created by an AI playground export. In this notebook, you will: +# MAGIC - Author a tool-calling [MLflow's `ResponsesAgent`](https://mlflow.org/docs/latest/api_reference/python_api/mlflow.pyfunc.html#mlflow.pyfunc.ResponsesAgent) that uses the OpenAI client +# MAGIC - Manually test the agent's output +# MAGIC - Evaluate the agent with Mosaic AI Agent Evaluation +# MAGIC - Log and deploy the agent +# MAGIC +# MAGIC This notebook should be run on serverless or a cluster with DBR<17. +# MAGIC +# MAGIC **_NOTE:_** This notebook uses the OpenAI SDK, but AI Agent Framework is compatible with any agent authoring framework, including LlamaIndex or LangGraph. To learn more, see the [Authoring Agents](https://docs.databricks.com/generative-ai/agent-framework/author-agent) Databricks documentation. +# MAGIC +# MAGIC ## Prerequisites +# MAGIC +# MAGIC - Address all `TODO`s in this notebook. + +# COMMAND ---------- + +# MAGIC %pip install -U -qqqq backoff databricks-openai uv databricks-agents mlflow-skinny[databricks] +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +# MAGIC %md ## Define the agent in code +# MAGIC Below we define our agent code in a single cell, enabling us to easily write it to a local Python file for subsequent logging and deployment using the `%%writefile` magic command. +# MAGIC +# MAGIC For more examples of tools to add to your agent, see [docs](https://docs.databricks.com/generative-ai/agent-framework/agent-tool.html). + +# COMMAND ---------- + +# MAGIC %%writefile agent.py +# MAGIC import json +# MAGIC from typing import Any, Callable, Generator, Optional +# MAGIC from uuid import uuid4 +# MAGIC import warnings +# MAGIC +# MAGIC import backoff +# MAGIC import mlflow +# MAGIC import openai +# MAGIC from databricks.sdk import WorkspaceClient +# MAGIC from databricks_openai import UCFunctionToolkit, VectorSearchRetrieverTool +# MAGIC from mlflow.entities import SpanType +# MAGIC from mlflow.pyfunc import ResponsesAgent +# MAGIC from mlflow.types.responses import ( +# MAGIC ResponsesAgentRequest, +# MAGIC ResponsesAgentResponse, +# MAGIC ResponsesAgentStreamEvent, +# MAGIC ) +# MAGIC from openai import OpenAI +# MAGIC from pydantic import BaseModel +# MAGIC from unitycatalog.ai.core.base import get_uc_function_client +# MAGIC +# MAGIC ############################################ +# MAGIC # Define your LLM endpoint and system prompt +# MAGIC ############################################ +# MAGIC LLM_ENDPOINT_NAME = "databricks-claude-sonnet-4" +# MAGIC +# MAGIC SYSTEM_PROMPT = """You are an agent to review the Unity catalog schema tables and suggest the Attribute-based access control(ABAC) policies using the metadata obtained using the tool functions and the documentation as below. Feel free to research additional documentation to understand the concepts and policy generation aspects +# MAGIC +# MAGIC https://docs.databricks.com/aws/en/data-governance/unity-catalog/abac/ +# MAGIC +# MAGIC https://docs.databricks.com/aws/en/data-governance/unity-catalog/abac/tutorial +# MAGIC +# MAGIC https://docs.databricks.com/aws/en/data-governance/unity-catalog/abac/policies +# MAGIC +# MAGIC – +# MAGIC The following is the general syntax for creating a policy: +# MAGIC SQL +# MAGIC +# MAGIC CREATE POLICY +# MAGIC ON +# MAGIC COMMENT '' +# MAGIC -- One of the following: +# MAGIC ROW FILTER +# MAGIC | COLUMN MASK ON COLUMN +# MAGIC TO [, , ...] +# MAGIC [EXCEPT [, , ...]] +# MAGIC FOR TABLES +# MAGIC [WHEN hasTag('') OR hasTagValue('', '')] +# MAGIC MATCH COLUMNS hasTag('') OR hasTagValue('', '') AS +# MAGIC USING COLUMNS [, , ...]; +# MAGIC This example defines a row filter policy that excludes rows for European customers from queries by US-based analysts: +# MAGIC SQL +# MAGIC +# MAGIC CREATE POLICY hide_eu_customers +# MAGIC ON SCHEMA prod.customers +# MAGIC COMMENT 'Hide rows with European customers from sensitive tables' +# MAGIC ROW FILTER non_eu_region +# MAGIC TO us_analysts +# MAGIC FOR TABLES +# MAGIC MATCH COLUMNS +# MAGIC hasTag('geo_region') AS region +# MAGIC USING COLUMNS (region); +# MAGIC This example defines a column mask policy that hides social security numbers from US analysts, except for those with in the admins group: +# MAGIC SQL +# MAGIC +# MAGIC CREATE POLICY mask_SSN +# MAGIC ON SCHEMA prod.customers +# MAGIC COMMENT 'Mask social security numbers' +# MAGIC COLUMN MASK mask_SSN +# MAGIC TO us_analysts +# MAGIC EXCEPT admins +# MAGIC FOR TABLES +# MAGIC MATCH COLUMNS +# MAGIC hasTagValue('pii', 'ssn') AS ssn +# MAGIC ON COLUMN ssn; +# MAGIC +# MAGIC +# MAGIC Find additional examples in the documentation - https://docs.databricks.com/aws/en/data-governance/unity-catalog/abac/policies?language=SQL +# MAGIC +# MAGIC Usually table name is given as catalog_name.schem_name.table_name. +# MAGIC Considering the table metadata, column metadata, and corresponding tag details, +# MAGIC """ +# MAGIC +# MAGIC +# MAGIC ############################################################################### +# MAGIC ## Define tools for your agent, enabling it to retrieve data or take actions +# MAGIC ## beyond text generation +# MAGIC ## To create and see usage examples of more tools, see +# MAGIC ## https://docs.databricks.com/generative-ai/agent-framework/agent-tool.html +# MAGIC ############################################################################### +# MAGIC class ToolInfo(BaseModel): +# MAGIC """ +# MAGIC Class representing a tool for the agent. +# MAGIC - "name" (str): The name of the tool. +# MAGIC - "spec" (dict): JSON description of the tool (matches OpenAI Responses format) +# MAGIC - "exec_fn" (Callable): Function that implements the tool logic +# MAGIC """ +# MAGIC +# MAGIC name: str +# MAGIC spec: dict +# MAGIC exec_fn: Callable +# MAGIC +# MAGIC +# MAGIC def create_tool_info(tool_spec, exec_fn_param: Optional[Callable] = None): +# MAGIC tool_spec["function"].pop("strict", None) +# MAGIC tool_name = tool_spec["function"]["name"] +# MAGIC udf_name = tool_name.replace("__", ".") +# MAGIC +# MAGIC # Define a wrapper that accepts kwargs for the UC tool call, +# MAGIC # then passes them to the UC tool execution client +# MAGIC def exec_fn(**kwargs): +# MAGIC function_result = uc_function_client.execute_function(udf_name, kwargs) +# MAGIC if function_result.error is not None: +# MAGIC return function_result.error +# MAGIC else: +# MAGIC return function_result.value +# MAGIC return ToolInfo(name=tool_name, spec=tool_spec, exec_fn=exec_fn_param or exec_fn) +# MAGIC +# MAGIC +# MAGIC TOOL_INFOS = [] +# MAGIC +# MAGIC # You can use UDFs in Unity Catalog as agent tools +# MAGIC # TODO: Add additional tools +# MAGIC UC_TOOL_NAMES = ["enterprise_gov.gov_admin.describe_extended_table", "enterprise_gov.gov_admin.list_uc_tables", "enterprise_gov.gov_admin.list_row_filter_column_masking", "enterprise_gov.gov_admin.get_table_tags", "enterprise_gov.gov_admin.get_column_tags"] +# MAGIC +# MAGIC uc_toolkit = UCFunctionToolkit(function_names=UC_TOOL_NAMES) +# MAGIC uc_function_client = get_uc_function_client() +# MAGIC for tool_spec in uc_toolkit.tools: +# MAGIC TOOL_INFOS.append(create_tool_info(tool_spec)) +# MAGIC +# MAGIC +# MAGIC # Use Databricks vector search indexes as tools +# MAGIC # See [docs](https://docs.databricks.com/generative-ai/agent-framework/unstructured-retrieval-tools.html) for details +# MAGIC +# MAGIC # # (Optional) Use Databricks vector search indexes as tools +# MAGIC # # See https://docs.databricks.com/generative-ai/agent-framework/unstructured-retrieval-tools.html +# MAGIC # # for details +# MAGIC VECTOR_SEARCH_TOOLS = [] +# MAGIC # # TODO: Add vector search indexes as tools or delete this block +# MAGIC # VECTOR_SEARCH_TOOLS.append( +# MAGIC # VectorSearchRetrieverTool( +# MAGIC # index_name="", +# MAGIC # # filters="..." +# MAGIC # ) +# MAGIC # ) +# MAGIC for vs_tool in VECTOR_SEARCH_TOOLS: +# MAGIC TOOL_INFOS.append(create_tool_info(vs_tool.tool, vs_tool.execute)) +# MAGIC +# MAGIC +# MAGIC +# MAGIC class ToolCallingAgent(ResponsesAgent): +# MAGIC """ +# MAGIC Class representing a tool-calling Agent +# MAGIC """ +# MAGIC +# MAGIC def __init__(self, llm_endpoint: str, tools: list[ToolInfo]): +# MAGIC """Initializes the ToolCallingAgent with tools.""" +# MAGIC self.llm_endpoint = llm_endpoint +# MAGIC self.workspace_client = WorkspaceClient() +# MAGIC self.model_serving_client: OpenAI = ( +# MAGIC self.workspace_client.serving_endpoints.get_open_ai_client() +# MAGIC ) +# MAGIC self._tools_dict = {tool.name: tool for tool in tools} +# MAGIC +# MAGIC def get_tool_specs(self) -> list[dict]: +# MAGIC """Returns tool specifications in the format OpenAI expects.""" +# MAGIC return [tool_info.spec for tool_info in self._tools_dict.values()] +# MAGIC +# MAGIC @mlflow.trace(span_type=SpanType.TOOL) +# MAGIC def execute_tool(self, tool_name: str, args: dict) -> Any: +# MAGIC """Executes the specified tool with the given arguments.""" +# MAGIC return self._tools_dict[tool_name].exec_fn(**args) +# MAGIC +# MAGIC def call_llm(self, messages: list[dict[str, Any]]) -> Generator[dict[str, Any], None, None]: +# MAGIC with warnings.catch_warnings(): +# MAGIC warnings.filterwarnings("ignore", message="PydanticSerializationUnexpectedValue") +# MAGIC for chunk in self.model_serving_client.chat.completions.create( +# MAGIC model=self.llm_endpoint, +# MAGIC messages=self.prep_msgs_for_cc_llm(messages), +# MAGIC tools=self.get_tool_specs(), +# MAGIC stream=True, +# MAGIC ): +# MAGIC yield chunk.to_dict() +# MAGIC +# MAGIC def handle_tool_call( +# MAGIC self, +# MAGIC tool_call: dict[str, Any], +# MAGIC messages: list[dict[str, Any]], +# MAGIC ) -> ResponsesAgentStreamEvent: +# MAGIC """ +# MAGIC Execute tool calls, add them to the running message history, and return a ResponsesStreamEvent w/ tool output +# MAGIC """ +# MAGIC args = json.loads(tool_call["arguments"]) +# MAGIC result = str(self.execute_tool(tool_name=tool_call["name"], args=args)) +# MAGIC +# MAGIC tool_call_output = self.create_function_call_output_item(tool_call["call_id"], result) +# MAGIC messages.append(tool_call_output) +# MAGIC return ResponsesAgentStreamEvent(type="response.output_item.done", item=tool_call_output) +# MAGIC +# MAGIC def call_and_run_tools( +# MAGIC self, +# MAGIC messages: list[dict[str, Any]], +# MAGIC max_iter: int = 10, +# MAGIC ) -> Generator[ResponsesAgentStreamEvent, None, None]: +# MAGIC for _ in range(max_iter): +# MAGIC last_msg = messages[-1] +# MAGIC if last_msg.get("role", None) == "assistant": +# MAGIC return +# MAGIC elif last_msg.get("type", None) == "function_call": +# MAGIC yield self.handle_tool_call(last_msg, messages) +# MAGIC else: +# MAGIC yield from self.output_to_responses_items_stream( +# MAGIC chunks=self.call_llm(messages), aggregator=messages +# MAGIC ) +# MAGIC +# MAGIC yield ResponsesAgentStreamEvent( +# MAGIC type="response.output_item.done", +# MAGIC item=self.create_text_output_item("Max iterations reached. Stopping.", str(uuid4())), +# MAGIC ) +# MAGIC +# MAGIC def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse: +# MAGIC outputs = [ +# MAGIC event.item +# MAGIC for event in self.predict_stream(request) +# MAGIC if event.type == "response.output_item.done" +# MAGIC ] +# MAGIC return ResponsesAgentResponse(output=outputs, custom_outputs=request.custom_inputs) +# MAGIC +# MAGIC def predict_stream( +# MAGIC self, request: ResponsesAgentRequest +# MAGIC ) -> Generator[ResponsesAgentStreamEvent, None, None]: +# MAGIC messages = self.prep_msgs_for_cc_llm([i.model_dump() for i in request.input]) +# MAGIC if SYSTEM_PROMPT: +# MAGIC messages.insert(0, {"role": "system", "content": SYSTEM_PROMPT}) +# MAGIC yield from self.call_and_run_tools(messages=messages) +# MAGIC +# MAGIC +# MAGIC # Log the model using MLflow +# MAGIC mlflow.openai.autolog() +# MAGIC AGENT = ToolCallingAgent(llm_endpoint=LLM_ENDPOINT_NAME, tools=TOOL_INFOS) +# MAGIC mlflow.models.set_model(AGENT) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Test the agent +# MAGIC +# MAGIC Interact with the agent to test its output. Since we manually traced methods within `ResponsesAgent`, you can view the trace for each step the agent takes, with any LLM calls made via the OpenAI SDK automatically traced by autologging. +# MAGIC +# MAGIC Replace this placeholder input with an appropriate domain-specific example for your agent. + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +from agent import AGENT + +AGENT.predict({"input": [{"role": "user", "content": "what is 4*3 in python"}]}) + +# COMMAND ---------- + +for chunk in AGENT.predict_stream( + {"input": [{"role": "user", "content": "What is 4*3 in Python?"}]} +): + print(chunk.model_dump(exclude_none=True)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Log the `agent` as an MLflow model +# MAGIC Determine Databricks resources to specify for automatic auth passthrough at deployment time +# MAGIC - **TODO**: If your Unity Catalog Function queries a [vector search index](https://docs.databricks.com/generative-ai/agent-framework/unstructured-retrieval-tools.html) or leverages [external functions](https://docs.databricks.com/generative-ai/agent-framework/external-connection-tools.html), you need to include the dependent vector search index and UC connection objects, respectively, as resources. See [docs](https://docs.databricks.com/generative-ai/agent-framework/log-agent.html#specify-resources-for-automatic-authentication-passthrough) for more details. +# MAGIC +# MAGIC Log the agent as code from the `agent.py` file. See [MLflow - Models from Code](https://mlflow.org/docs/latest/models.html#models-from-code). + +# COMMAND ---------- + +# Determine Databricks resources to specify for automatic auth passthrough at deployment time +import mlflow +from agent import UC_TOOL_NAMES, VECTOR_SEARCH_TOOLS, LLM_ENDPOINT_NAME +from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint +from pkg_resources import get_distribution + +resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME)] +for tool in VECTOR_SEARCH_TOOLS: + resources.extend(tool.resources) +for tool_name in UC_TOOL_NAMES: + # TODO: If the UC function includes dependencies like external connection or vector search, please include them manually. + # See the TODO in the markdown above for more information. + resources.append(DatabricksFunction(function_name=tool_name)) + +input_example = { + "input": [ + { + "role": "user", + "content": "What is an LLM agent?" + } + ] +} + +with mlflow.start_run(): + logged_agent_info = mlflow.pyfunc.log_model( + name="agent", + python_model="agent.py", + input_example=input_example, + pip_requirements=[ + "databricks-openai", + "backoff", + f"databricks-connect=={get_distribution('databricks-connect').version}", + ], + #resources=resources, + ) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Evaluate the agent with [Agent Evaluation](https://docs.databricks.com/mlflow3/genai/eval-monitor) +# MAGIC +# MAGIC You can edit the requests or expected responses in your evaluation dataset and run evaluation as you iterate your agent, leveraging mlflow to track the computed quality metrics. +# MAGIC +# MAGIC Evaluate your agent with one of our [predefined LLM scorers](https://docs.databricks.com/mlflow3/genai/eval-monitor/predefined-judge-scorers), or try adding [custom metrics](https://docs.databricks.com/mlflow3/genai/eval-monitor/custom-scorers). + +# COMMAND ---------- + +import mlflow +from mlflow.genai.scorers import RelevanceToQuery, Safety, RetrievalRelevance, RetrievalGroundedness + +eval_dataset = [ + { + "inputs": { + "input": [ + { + "role": "system", + "content": "You are an agent to review the Unity catalog schema tables and suggest the Attribute-based access control(ABAC) policies using the metadata obtained using the tool functions and the documentation as below. Feel free to research additional documentation to understand the concepts and policy generation aspects\n\nhttps://docs.databricks.com/aws/en/data-governance/unity-catalog/abac/\n\nhttps://docs.databricks.com/aws/en/data-governance/unity-catalog/abac/tutorial\n\nhttps://docs.databricks.com/aws/en/data-governance/unity-catalog/abac/policies\n\n–\nThe following is the general syntax for creating a policy:\nSQL\n\nCREATE POLICY \nON \nCOMMENT ''\n-- One of the following:\n ROW FILTER \n | COLUMN MASK ON COLUMN \nTO [, , ...]\n[EXCEPT [, , ...]]\nFOR TABLES\n[WHEN hasTag('') OR hasTagValue('', '')]\nMATCH COLUMNS hasTag('') OR hasTagValue('', '') AS \nUSING COLUMNS [, , ...];\nThis example defines a row filter policy that excludes rows for European customers from queries by US-based analysts:\nSQL\n\nCREATE POLICY hide_eu_customers\nON SCHEMA prod.customers\nCOMMENT 'Hide rows with European customers from sensitive tables'\nROW FILTER non_eu_region\nTO us_analysts\nFOR TABLES\nMATCH COLUMNS\n hasTag('geo_region') AS region\nUSING COLUMNS (region);\nThis example defines a column mask policy that hides social security numbers from US analysts, except for those with in the admins group:\nSQL\n\nCREATE POLICY mask_SSN\nON SCHEMA prod.customers\nCOMMENT 'Mask social security numbers'\nCOLUMN MASK mask_SSN\nTO us_analysts\nEXCEPT admins\nFOR TABLES\nMATCH COLUMNS\n hasTagValue('pii', 'ssn') AS ssn\nON COLUMN ssn;\n\n\nFind additional examples in the documentation - https://docs.databricks.com/aws/en/data-governance/unity-catalog/abac/policies?language=SQL\n\nUsually table name is given as catalog_name.schem_name.table_name. \nConsidering the table metadata, column metadata, and corresponding tag details, \n" + }, + { + "role": "user", + "content": "What are some of the available functions for masking and filtering data in the enterprise_gov.hr_finance schema, and what do they do?" + } + ] + }, + "expected_response": None + } +] + +eval_results = mlflow.genai.evaluate( + data=eval_dataset, + predict_fn=lambda input: AGENT.predict({"input": input}), + scorers=[RelevanceToQuery(), Safety()], # add more scorers here if they're applicable +) + +# Review the evaluation results in the MLfLow UI (see console output) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Perform pre-deployment validation of the agent +# MAGIC Before registering and deploying the agent, we perform pre-deployment checks via the [mlflow.models.predict()](https://mlflow.org/docs/latest/python_api/mlflow.models.html#mlflow.models.predict) API. See [documentation](https://docs.databricks.com/machine-learning/model-serving/model-serving-debug.html#validate-inputs) for details + +# COMMAND ---------- + +mlflow.models.predict( + model_uri=f"runs:/{logged_agent_info.run_id}/agent", + input_data={"input": [{"role": "user", "content": "Hello!"}]}, + env_manager="uv", +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Register the model to Unity Catalog +# MAGIC +# MAGIC Update the `catalog`, `schema`, and `model_name` below to register the MLflow model to Unity Catalog. + +# COMMAND ---------- + +mlflow.set_registry_uri("databricks-uc") + +# TODO: define the catalog, schema, and model name for your UC model +catalog = "enterprise_gov" +schema = "gov_admin" +model_name = "governance_abac_model" +UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}" + +# register the model to UC +uc_registered_model_info = mlflow.register_model( + model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Deploy the agent + +# COMMAND ---------- + +secret_scope = 'david_scope' +client_secret_key = 'DATABRICKS_CLIENT_SECRET' +client_id_key = 'DATABRICKS_CLIENT_ID' + + +# COMMAND ---------- + +from databricks import agents + +deployment_info = agents.deploy( + UC_MODEL_NAME, + uc_registered_model_info.version, + environment_vars={ + "DATABRICKS_HOST": "https://dbc-a612b3a4-f0ff.cloud.databricks.com", + "DATABRICKS_CLIENT_ID": dbutils.secrets.get(scope=secret_scope, key=client_id_key), + "DATABRICKS_CLIENT_SECRET": dbutils.secrets.get(scope=secret_scope, key=client_secret_key), + }, +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Next steps +# MAGIC +# MAGIC After your agent is deployed, you can chat with it in AI playground to perform additional checks, share it with SMEs in your organization for feedback, or embed it in a production application. See [docs](https://docs.databricks.com/generative-ai/deploy-agent.html) for details \ No newline at end of file diff --git a/uc-quickstart/utils/abac_policy_agent/messages.py b/uc-quickstart/utils/abac_policy_agent/messages.py new file mode 100644 index 00000000..7fcc569e --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/messages.py @@ -0,0 +1,124 @@ +""" +Message classes for the chatbot application. + +This module contains the message classes used throughout the app. +By keeping them in a separate module, they remain stable across +Streamlit app reruns, avoiding isinstance comparison issues. +""" +import streamlit as st +from abc import ABC, abstractmethod + + +class Message(ABC): + def __init__(self): + pass + + @abstractmethod + def to_input_messages(self): + """Convert this message into a list of dicts suitable for the model API.""" + pass + + @abstractmethod + def render(self, idx): + """Render the message in the Streamlit app.""" + pass + + +class UserMessage(Message): + def __init__(self, content): + super().__init__() + self.content = content + + def to_input_messages(self): + return [{ + "role": "user", + "content": self.content + }] + + def render(self, _): + with st.chat_message("user"): + st.markdown(self.content) + + +class AssistantResponse(Message): + def __init__(self, messages, request_id): + super().__init__() + self.messages = messages + # Request ID tracked to enable submitting feedback on assistant responses via the feedback endpoint + self.request_id = request_id + + def to_input_messages(self): + return self.messages + + def render(self, idx): + with st.chat_message("assistant"): + for msg in self.messages: + render_message(msg) + + if self.request_id is not None: + render_assistant_message_feedback(idx, self.request_id) + + +def render_message(msg): + """Render a single message with enhanced formatting for ABAC content.""" + if msg["role"] == "assistant": + # Render content first if it exists + if msg.get("content"): + st.markdown(msg["content"]) + + # Then render tool calls if they exist + if "tool_calls" in msg and msg["tool_calls"]: + for call in msg["tool_calls"]: + fn_name = call["function"]["name"] + args = call["function"]["arguments"] + + # Databricks-themed display for function calls + st.markdown(f""" +
+
+ šŸ” {fn_name.replace('enterprise_gov__gov_admin__', '').replace('_', ' ').title()} +
+
+ """, unsafe_allow_html=True) + + # Only show parameters if they're not empty + try: + import json + parsed_args = json.loads(args) + if parsed_args: + with st.expander("View parameters", expanded=False): + st.json(parsed_args) + except: + pass + + elif msg["role"] == "tool": + # Clean, minimal tool response display + try: + import json + parsed = json.loads(msg["content"]) + + # Show results in an expandable section for cleaner UI + with st.expander("šŸ“Š View Results", expanded=True): + st.json(parsed) + except: + # If not JSON, show as code + with st.expander("šŸ“Š View Results", expanded=True): + st.text(msg["content"]) + + +@st.fragment +def render_assistant_message_feedback(i, request_id): + """Render feedback UI for assistant messages.""" + from model_serving_utils import submit_feedback + import os + + def save_feedback(index): + serving_endpoint = os.getenv('SERVING_ENDPOINT') + if serving_endpoint: + submit_feedback( + endpoint=serving_endpoint, + request_id=request_id, + rating=st.session_state[f"feedback_{index}"] + ) + + st.feedback("thumbs", key=f"feedback_{i}", on_change=save_feedback, args=[i]) \ No newline at end of file diff --git a/uc-quickstart/utils/abac_policy_agent/model_serving_utils.py b/uc-quickstart/utils/abac_policy_agent/model_serving_utils.py new file mode 100644 index 00000000..e25f61e6 --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/model_serving_utils.py @@ -0,0 +1,267 @@ +from mlflow.deployments import get_deploy_client +from databricks.sdk import WorkspaceClient +import json +import uuid + +import logging + +logging.basicConfig( + format="%(levelname)s [%(asctime)s] %(name)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.DEBUG +) + +def _get_endpoint_task_type(endpoint_name: str) -> str: + """Get the task type of a serving endpoint.""" + try: + w = WorkspaceClient() + ep = w.serving_endpoints.get(endpoint_name) + return ep.task if ep.task else "chat/completions" + except Exception: + return "chat/completions" + +def _convert_to_responses_format(messages): + """Convert chat messages to ResponsesAgent API format.""" + input_messages = [] + for msg in messages: + if msg["role"] == "user": + input_messages.append({"role": "user", "content": msg["content"]}) + elif msg["role"] == "assistant": + # Handle assistant messages with tool calls + if msg.get("tool_calls"): + # Add function calls + for tool_call in msg["tool_calls"]: + input_messages.append({ + "type": "function_call", + "id": tool_call["id"], + "call_id": tool_call["id"], + "name": tool_call["function"]["name"], + "arguments": tool_call["function"]["arguments"] + }) + # Add assistant message if it has content + if msg.get("content"): + input_messages.append({ + "type": "message", + "id": msg.get("id", str(uuid.uuid4())), + "content": [{"type": "output_text", "text": msg["content"]}], + "role": "assistant" + }) + else: + # Regular assistant message + input_messages.append({ + "type": "message", + "id": msg.get("id", str(uuid.uuid4())), + "content": [{"type": "output_text", "text": msg["content"]}], + "role": "assistant" + }) + elif msg["role"] == "tool": + input_messages.append({ + "type": "function_call_output", + "call_id": msg.get("tool_call_id"), + "output": msg["content"] + }) + return input_messages + +def _throw_unexpected_endpoint_format(): + raise Exception("This app can only run against ChatModel, ChatAgent, or ResponsesAgent endpoints") + +def query_endpoint_stream(endpoint_name: str, messages: list[dict[str, str]], return_traces: bool): + task_type = _get_endpoint_task_type(endpoint_name) + + if task_type == "agent/v1/responses": + return _query_responses_endpoint_stream(endpoint_name, messages, return_traces) + else: + return _query_chat_endpoint_stream(endpoint_name, messages, return_traces) + +def _query_chat_endpoint_stream(endpoint_name: str, messages: list[dict[str, str]], return_traces: bool): + """Invoke an endpoint that implements either chat completions or ChatAgent and stream the response""" + client = get_deploy_client("databricks") + + # Prepare input payload + inputs = { + "messages": messages, + } + if return_traces: + inputs["databricks_options"] = {"return_trace": True} + + for chunk in client.predict_stream(endpoint=endpoint_name, inputs=inputs): + if "choices" in chunk: + yield chunk + elif "delta" in chunk: + yield chunk + else: + _throw_unexpected_endpoint_format() + +def _query_responses_endpoint_stream(endpoint_name: str, messages: list[dict[str, str]], return_traces: bool): + """Stream responses from agent/v1/responses endpoints using MLflow deployments client.""" + client = get_deploy_client("databricks") + + input_messages = _convert_to_responses_format(messages) + + # Prepare input payload for ResponsesAgent + inputs = { + "input": input_messages, + "context": {}, + "stream": True + } + if return_traces: + inputs["databricks_options"] = {"return_trace": True} + + for event_data in client.predict_stream(endpoint=endpoint_name, inputs=inputs): + # Just yield the raw event data, let app.py handle the parsing + yield event_data + +def query_endpoint(endpoint_name, messages, return_traces): + """ + Query an endpoint, returning the string message content and request + ID for feedback + """ + task_type = _get_endpoint_task_type(endpoint_name) + + if task_type == "agent/v1/responses": + return _query_responses_endpoint(endpoint_name, messages, return_traces) + else: + return _query_chat_endpoint(endpoint_name, messages, return_traces) + +def _query_chat_endpoint(endpoint_name, messages, return_traces): + """Calls a model serving endpoint with chat/completions format.""" + inputs = {'messages': messages} + if return_traces: + inputs['databricks_options'] = {'return_trace': True} + + res = get_deploy_client('databricks').predict( + endpoint=endpoint_name, + inputs=inputs, + ) + request_id = res.get("databricks_output", {}).get("databricks_request_id") + if "messages" in res: + return res["messages"], request_id + elif "choices" in res: + choice_message = res["choices"][0]["message"] + choice_content = choice_message.get("content") + + # Case 1: The content is a list of structured objects + if isinstance(choice_content, list): + combined_content = "".join([part.get("text", "") for part in choice_content if part.get("type") == "text"]) + reformatted_message = { + "role": choice_message.get("role"), + "content": combined_content + } + return [reformatted_message], request_id + + # Case 2: The content is a simple string + elif isinstance(choice_content, str): + return [choice_message], request_id + + _throw_unexpected_endpoint_format() + +def _query_responses_endpoint(endpoint_name, messages, return_traces): + """Query agent/v1/responses endpoints using MLflow deployments client.""" + client = get_deploy_client("databricks") + + input_messages = _convert_to_responses_format(messages) + + # Prepare input payload for ResponsesAgent + inputs = { + "input": input_messages, + "context": {} + } + if return_traces: + inputs["databricks_options"] = {"return_trace": True} + + # Make the prediction call + response = client.predict(endpoint=endpoint_name, inputs=inputs) + + # Extract messages from the response + result_messages = [] + request_id = response.get("databricks_output", {}).get("databricks_request_id") + + # Process the output items from ResponsesAgent response + output_items = response.get("output", []) + + for item in output_items: + item_type = item.get("type") + + if item_type == "message": + # Extract text content from message + text_content = "" + content_parts = item.get("content", []) + + for content_part in content_parts: + if content_part.get("type") == "output_text": + text_content += content_part.get("text", "") + + if text_content: + result_messages.append({ + "role": "assistant", + "content": text_content + }) + + elif item_type == "function_call": + # Handle function calls + call_id = item.get("call_id") + function_name = item.get("name") + arguments = item.get("arguments", "") + + tool_calls = [{ + "id": call_id, + "type": "function", + "function": { + "name": function_name, + "arguments": arguments + } + }] + result_messages.append({ + "role": "assistant", + "content": "", + "tool_calls": tool_calls + }) + + elif item_type == "function_call_output": + # Handle function call output/result + call_id = item.get("call_id") + output_content = item.get("output", "") + + result_messages.append({ + "role": "tool", + "content": output_content, + "tool_call_id": call_id + }) + + return result_messages or [{"role": "assistant", "content": "No response found"}], request_id + +def submit_feedback(endpoint, request_id, rating): + """Submit feedback to the agent.""" + rating_string = "positive" if rating == 1 else "negative" + text_assessments = [] if rating is None else [{ + "ratings": { + "answer_correct": {"value": rating_string}, + }, + "free_text_comment": None + }] + + proxy_payload = { + "dataframe_records": [ + { + "source": json.dumps({ + "id": "e2e-chatbot-app", # Or extract from auth + "type": "human" + }), + "request_id": request_id, + "text_assessments": json.dumps(text_assessments), + "retrieval_assessments": json.dumps([]), + } + ] + } + w = WorkspaceClient() + return w.api_client.do( + method='POST', + path=f"/serving-endpoints/{endpoint}/served-models/feedback/invocations", + body=proxy_payload, + ) + + +def endpoint_supports_feedback(endpoint_name): + w = WorkspaceClient() + endpoint = w.serving_endpoints.get(endpoint_name) + return "feedback" in [entity.name for entity in endpoint.config.served_entities] diff --git a/uc-quickstart/utils/abac_policy_agent/requirements.txt b/uc-quickstart/utils/abac_policy_agent/requirements.txt new file mode 100644 index 00000000..6dc0a944 --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/requirements.txt @@ -0,0 +1,2 @@ +mlflow>=2.21.2 +streamlit==1.44.1 diff --git a/uc-quickstart/utils/abac_policy_agent/sample_udfs_and_abac_policies.sql b/uc-quickstart/utils/abac_policy_agent/sample_udfs_and_abac_policies.sql new file mode 100644 index 00000000..9a025f01 --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/sample_udfs_and_abac_policies.sql @@ -0,0 +1,577 @@ +-- ================================================================ +-- COMPLETE UNITY CATALOG ABAC POLICIES IMPLEMENTATION +-- Based on existing Unity Catalog demo with enterprise governance +-- ================================================================ + +-- Prerequisites: +-- 1. Enable ABAC in workspace: Admin > Previews > Attribute Based Access Control = ON +-- 2. Enable Tag Policies: Account Console > Previews > Tag policies = ON +-- 3. Compute: Databricks Runtime 16.4+ or Serverless +-- 4. Groups created in Databricks workspace (see section at bottom) + +USE CATALOG enterprise_gov; +USE SCHEMA hr_finance; + + +-- ================================================================ +-- STEP 1: CREATE OPTIMIZED UDFS FOR ABAC POLICIES +-- ================================================================ +-- Following best practices: simple, deterministic, no external calls + +-- UDF for hierarchical employee row filtering +CREATE OR REPLACE FUNCTION abac_employee_row_filter(emp_email STRING, emp_department STRING, emp_region STRING, emp_manager STRING) +RETURNS BOOLEAN +DETERMINISTIC +COMMENT 'ABAC row filter for employee data based on hierarchy and department' +RETURN ( + -- Self access + emp_email = current_user() + OR + -- Manager access (lookup current user's direct reports) + emp_manager = current_user() + OR + -- Department admin access + EXISTS( + SELECT 1 FROM user_access_control uac1 + JOIN user_access_control uac2 ON uac1.department = uac2.department + WHERE uac1.username = current_user() + AND uac2.username = emp_email + AND uac1.access_level = 'admin' + ) + OR + -- Regional manager access + EXISTS( + SELECT 1 FROM user_access_control uac1 + JOIN user_access_control uac2 ON uac1.region = uac2.region + WHERE uac1.username = current_user() + AND uac2.username = emp_email + AND uac1.access_level IN ('manager', 'super_admin') + ) + OR + -- Super admin access + EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND access_level = 'super_admin' + ) +); + +-- UDF for sales territory customer filtering +CREATE OR REPLACE FUNCTION abac_customer_row_filter(assigned_rep STRING, cust_region STRING) +RETURNS BOOLEAN +DETERMINISTIC +COMMENT 'ABAC row filter for customer data based on sales territory' +RETURN ( + -- Sales rep sees assigned customers + assigned_rep = current_user() + OR + -- Regional managers see customers in their region + EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND region = cust_region + AND access_level IN ('manager', 'super_admin') + ) + OR + -- Admins see everything + EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND access_level IN ('admin', 'super_admin') + ) +); + +-- UDF for personal record filtering (user can only see their own) +CREATE OR REPLACE FUNCTION abac_personal_record_filter(record_user_email STRING) +RETURNS BOOLEAN +DETERMINISTIC +COMMENT 'ABAC row filter for personal records - users can only see their own' +RETURN record_user_email = current_user(); + +-- UDF for department-based filtering +CREATE OR REPLACE FUNCTION abac_department_filter(record_department STRING) +RETURNS BOOLEAN +DETERMINISTIC +COMMENT 'ABAC row filter based on department membership' +RETURN ( + EXISTS( + SELECT 1 FROM user_access_control uac1 + WHERE uac1.username = current_user() + AND ( + uac1.department = record_department -- Same department + OR uac1.access_level = 'super_admin' -- Super admin override + ) + ) +); + +-- ================================================================ +-- STEP 4: CREATE COLUMN MASKING UDFS +-- ================================================================ + +-- SSN masking function +CREATE OR REPLACE FUNCTION abac_mask_ssn(ssn_value STRING) +RETURNS STRING +DETERMINISTIC +COMMENT 'ABAC column mask for SSN - only Human Resource admins see full SSN' +RETURN ( + CASE + WHEN EXISTS( + SELECT 1 FROM user_access_control uac + JOIN department_privileges dp ON uac.department = dp.department + WHERE uac.username = current_user() + AND dp.can_view_ssn = true + ) THEN ssn_value + ELSE CONCAT('XXX-XX-', RIGHT(ssn_value, 4)) + END +); + +-- Salary masking function +CREATE OR REPLACE FUNCTION abac_mask_salary(salary_value DECIMAL(10,2)) +RETURNS DECIMAL(10,2) +DETERMINISTIC +COMMENT 'ABAC column mask for salary - only authorized roles see full salary' +RETURN ( + CASE + WHEN EXISTS( + SELECT 1 FROM user_access_control uac + JOIN department_privileges dp ON uac.department = dp.department + WHERE uac.username = current_user() + AND dp.can_view_salary = true + ) THEN salary_value + ELSE 0.00 + END +); + +-- Phone number partial masking +CREATE OR REPLACE FUNCTION abac_mask_phone(phone_value STRING, record_owner STRING) +RETURNS STRING +DETERMINISTIC +COMMENT 'ABAC column mask for phone numbers with partial masking' +RETURN ( + CASE + WHEN record_owner = current_user() THEN phone_value -- Own record + WHEN EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND access_level IN ('admin', 'super_admin') + ) THEN phone_value -- Admins see all + ELSE CONCAT(LEFT(phone_value, 8), 'XXXX') -- Partial masking + END +); + +-- Address masking with hierarchical logic +CREATE OR REPLACE FUNCTION abac_mask_address(address_value STRING, record_owner STRING, record_manager STRING) +RETURNS STRING +DETERMINISTIC +COMMENT 'ABAC column mask for addresses based on hierarchy' +RETURN ( + CASE + WHEN record_owner = current_user() THEN address_value -- Own record + WHEN record_manager = current_user() THEN address_value -- Direct reports + WHEN EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND access_level IN ('admin', 'super_admin') + ) THEN address_value -- Admins see all + ELSE '[CONFIDENTIAL ADDRESS]' + END +); + +-- Emergency contact masking (Human Resource only) +CREATE OR REPLACE FUNCTION abac_mask_emergency_contact(contact_value STRING, record_owner STRING) +RETURNS STRING +DETERMINISTIC +COMMENT 'ABAC column mask for emergency contacts - Human Resource and self only' +RETURN ( + CASE + WHEN record_owner = current_user() THEN contact_value -- Own record + WHEN EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND department = 'Human Resource' + AND access_level IN ('admin', 'super_admin') + ) THEN contact_value -- Human Resource admin only + ELSE '[Human Resource CONFIDENTIAL]' + END +); + +-- Credit score masking for customers +CREATE OR REPLACE FUNCTION abac_mask_credit_score(score_value INT, cust_region STRING) +RETURNS INT +DETERMINISTIC +COMMENT 'ABAC column mask for customer credit scores' +RETURN ( + CASE + WHEN EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND department IN ('Finance', 'Management') + AND access_level IN ('admin', 'super_admin') + ) THEN score_value + WHEN EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND region = cust_region + AND access_level = 'manager' + ) THEN score_value + ELSE 0 + END +); + +-- Generic PII masking function +CREATE OR REPLACE FUNCTION abac_mask_pii(pii_value STRING, sensitivity_level STRING) +RETURNS STRING +DETERMINISTIC +COMMENT 'Generic ABAC PII masking based on sensitivity level' +RETURN ( + CASE + WHEN EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND access_level = 'super_admin' + ) THEN pii_value -- Super admin bypass + WHEN sensitivity_level = 'low' AND LENGTH(pii_value) > 4 + THEN CONCAT(LEFT(pii_value, LENGTH(pii_value) - 4), REPEAT('*', 4)) + WHEN sensitivity_level = 'medium' AND LENGTH(pii_value) > 6 + THEN CONCAT(LEFT(pii_value, 3), REPEAT('*', LENGTH(pii_value) - 6), RIGHT(pii_value, 3)) + WHEN sensitivity_level = 'high' + THEN '[REDACTED]' + ELSE pii_value + END +); + +-- ================================================================ +-- STEP 5: CREATE ABAC POLICIES AT CATALOG LEVEL +-- ================================================================ +-- Policies are created through the UI but here's the equivalent SQL syntax + +-- Row filter policy for employee hierarchical access +CREATE POLICY employee_hierarchical_access ON CATALOG enterprise_governance +COMMENT 'Hierarchical row filtering for employee data based on management structure' +ROW FILTER abac_employee_row_filter +TO `hr_users`, `hr_admins`, `managers`, `system_admins`, `all_employees` +EXCEPT `data_stewards` -- Data stewards can see all for governance +FOR TABLES +WHEN hasTagValue('department', 'Human Resource') OR hasTagValue('pii_level', 'high') +MATCH COLUMNS hasTag('department') AS dept, hasTag('region') AS reg, hasTag('access_level') AS mgr +USING COLUMNS (email, dept, reg, manager_email); + +-- Row filter policy for sales territory access +CREATE POLICY sales_territory_access ON CATALOG enterprise_governance +COMMENT 'Sales territory-based row filtering for customer data' +ROW FILTER abac_customer_row_filter +TO `sales_users`, `sales_managers`, `regional_managers` +EXCEPT `finance_admins` -- Finance can see all for reporting +FOR TABLES +WHEN hasTagValue('department', 'Sales') OR hasTagValue('business_unit', 'Field_Sales') +MATCH COLUMNS hasTag('region') AS region +USING COLUMNS (assigned_sales_rep, region); + +-- Row filter policy for personal records +CREATE POLICY personal_record_access ON CATALOG enterprise_governance +COMMENT 'Users can only access their own personal records' +ROW FILTER abac_personal_record_filter +TO `all_employees` +EXCEPT `hr_admins`, `system_admins` +FOR TABLES +WHEN hasTagValue('hr_data_classification', 'restricted') AND hasTagValue('pii_level', 'high') +MATCH COLUMNS hasTag('pii_level') AS pii +USING COLUMNS (user_email); + +-- Row filter policy for department-based access +CREATE POLICY department_based_access ON CATALOG enterprise_governance +COMMENT 'Department-based row filtering for organizational data' +ROW FILTER abac_department_filter +TO `all_employees` +EXCEPT `system_admins` +FOR TABLES +WHEN hasTagValue('hr_data_classification', 'internal') OR hasTagValue('hr_data_classification', 'confidential') +MATCH COLUMNS hasTag('department') AS dept +USING COLUMNS (dept); + +-- ================================================================ +-- STEP 6: CREATE COLUMN MASK POLICIES +-- ================================================================ + +-- SSN masking policy +CREATE POLICY mask_ssn_policy ON CATALOG enterprise_governance +COMMENT 'Mask SSN except for Human Resource admins and system admins' +COLUMN MASK abac_mask_ssn +TO `all_employees` +EXCEPT `hr_admins`, `system_admins` +FOR TABLES +WHEN hasTagValue('pii_level', 'high') +MATCH COLUMNS hasTagValue('role_required', 'hr_admin') AS ssn_col +USING COLUMNS (ssn_col); + +-- Salary masking policy +CREATE POLICY mask_salary_policy ON CATALOG enterprise_governance +COMMENT 'Mask salary except for authorized roles' +COLUMN MASK abac_mask_salary +TO `all_employees` +EXCEPT `finance_admins`, `hr_admins`, `system_admins` +FOR TABLES +WHEN hasTagValue('financial_data', 'true') +MATCH COLUMNS hasTagValue('access_level', 'manager') AS sal_col +USING COLUMNS (sal_col); + +-- Phone number masking policy +CREATE POLICY mask_phone_policy ON CATALOG enterprise_governance +COMMENT 'Partially mask phone numbers with hierarchical access' +COLUMN MASK abac_mask_phone +TO `all_employees` +EXCEPT `system_admins` +FOR TABLES +WHEN hasTagValue('pii_level', 'medium') +MATCH COLUMNS hasTag('pii_level') AS phone_col +USING COLUMNS (phone_col, email); + +-- Address masking policy +CREATE POLICY mask_address_policy ON CATALOG enterprise_governance +COMMENT 'Mask addresses based on hierarchical relationships' +COLUMN MASK abac_mask_address +TO `all_employees` +EXCEPT `hr_admins`, `system_admins` +FOR TABLES +WHEN hasTagValue('pii_level', 'high') AND hasTagValue('hr_data_classification', 'confidential') +MATCH COLUMNS hasTag('pii_level') AS addr_col +USING COLUMNS (addr_col, email, manager_email); + +-- Emergency contact masking policy +CREATE POLICY mask_emergency_contact_policy ON CATALOG enterprise_governance +COMMENT 'Mask emergency contacts - Human Resource admins and self only' +COLUMN MASK abac_mask_emergency_contact +TO `all_employees` +EXCEPT `hr_admins`, `system_admins` +FOR TABLES +WHEN hasTagValue('role_required', 'hr_admin') +MATCH COLUMNS hasTagValue('role_required', 'hr_admin') AS emerg_col +USING COLUMNS (emerg_col, email); + +-- Credit score masking policy +CREATE POLICY mask_credit_score_policy ON CATALOG enterprise_governance +COMMENT 'Mask customer credit scores except for finance and regional managers' +COLUMN MASK abac_mask_credit_score +TO `sales_users`, `all_employees` +EXCEPT `finance_admins`, `regional_managers`, `system_admins` +FOR TABLES +WHEN hasTagValue('financial_data', 'true') AND hasTagValue('role_required', 'finance_admin') +MATCH COLUMNS hasTagValue('role_required', 'finance_admin') AS credit_col, hasTag('region') AS reg_col +USING COLUMNS (credit_col, reg_col); + +-- ================================================================ +-- STEP 7: SPECIALIZED POLICIES FOR DIFFERENT SCENARIOS +-- ================================================================ + +-- Policy for financial data access (multi-table) +CREATE POLICY financial_data_access ON CATALOG enterprise_governance +COMMENT 'Comprehensive financial data access control across all tables' +ROW FILTER ( + CASE + WHEN EXISTS( + SELECT 1 FROM user_access_control + WHERE username = current_user() + AND department IN ('Finance', 'Management') + AND access_level IN ('admin', 'super_admin') + ) THEN TRUE + ELSE FALSE + END +) +TO `all_employees` +EXCEPT `finance_admins`, `system_admins` +FOR TABLES +WHEN hasTagValue('financial_data', 'true'); + +-- Policy for regional data compliance (GDPR-like) +CREATE POLICY regional_compliance_policy ON CATALOG enterprise_governance +COMMENT 'Regional data access control for compliance requirements' +ROW FILTER ( + EXISTS( + SELECT 1 FROM user_access_control uac1 + WHERE uac1.username = current_user() + AND ( + uac1.access_level = 'super_admin' + OR EXISTS( + SELECT 1 FROM user_access_control uac2 + WHERE uac2.region = uac1.region + AND uac2.username = current_user() + ) + ) + ) +) +TO `all_employees` +EXCEPT `system_admins`, `data_stewards` +FOR TABLES +WHEN hasTag('region') +MATCH COLUMNS hasTag('region') AS reg_col +USING COLUMNS (reg_col); + +-- ================================================================ +-- STEP 8: MONITORING AND AUDIT POLICIES +-- ================================================================ + +-- Create audit view for policy effectiveness +CREATE OR REPLACE VIEW abac_policy_audit AS +SELECT + current_user() as accessing_user, + current_timestamp() as access_time, + 'employees' as table_name, + COUNT(*) as visible_records, + AVG( + CASE + WHEN salary > 0 THEN 1 + ELSE 0 + END + ) as salary_visibility_rate +FROM employees +UNION ALL +SELECT + current_user(), + current_timestamp(), + 'customers', + COUNT(*), + AVG( + CASE + WHEN credit_score > 0 THEN 1 + ELSE 0 + END + ) +FROM customers; + +-- Create user access summary view +CREATE OR REPLACE VIEW user_access_summary AS +SELECT + current_user() as user, + uac.department, + uac.region, + uac.access_level, + dp.can_view_salary, + dp.can_view_ssn, + dp.can_view_all_regions +FROM user_access_control uac +LEFT JOIN department_privileges dp ON uac.department = dp.department +WHERE uac.username = current_user(); + +-- ================================================================ +-- STEP 9: TEST QUERIES FOR DIFFERENT USER SCENARIOS +-- ================================================================ + +-- Test query 1: Employee data visibility +-- SELECT employee_id, first_name, last_name, department, salary, ssn, phone_number, address +-- FROM employees +-- ORDER BY employee_id; + +-- Test query 2: Customer data visibility +-- SELECT customer_id, company_name, region, account_value, credit_score +-- FROM customers +-- ORDER BY customer_id; + +-- Test query 3: Personal records access +-- SELECT * FROM personal_records; + +-- Test query 4: Department data access +-- SELECT * FROM department_data; + +-- Test query 5: Cross-table join with masking +-- SELECT +-- e.first_name, +-- e.last_name, +-- e.department, +-- e.salary, +-- e.ssn, +-- c.company_name, +-- c.credit_score +-- FROM employees e +-- LEFT JOIN customers c ON e.email = c.assigned_sales_rep; + +-- Test query 6: Audit current user access +-- SELECT * FROM user_access_summary; + +-- Test query 7: Policy effectiveness audit +-- SELECT * FROM abac_policy_audit; + +-- ================================================================ +-- STEP 10: REQUIRED DATABRICKS GROUPS +-- ================================================================ +/* +Create these groups in Databricks Admin Console > Groups: + +Core Groups: +1. all_employees - All company employees +2. system_admins - IT administrators with full access +3. data_stewards - Data governance team with audit access + +Department Groups: +4. hr_users - Human Resource department standard users +5. hr_admins - Human Resource department administrators +6. finance_users - Finance department standard users +7. finance_admins - Finance department administrators +8. sales_users - Sales department standard users +9. sales_managers - Sales department managers +10. it_users - IT department users + +Regional Groups: +11. us_west_users - US West region users +12. us_east_users - US East region users +13. us_central_users - US Central region users +14. apac_users - APAC region users +15. europe_users - Europe region users + +Role-based Groups: +16. managers - All management levels +17. regional_managers - Regional management +18. senior_managers - Senior management + +Business Unit Groups: +19. corporate_users - Corporate functions +20. field_sales_users - Field sales teams +21. support_users - Support functions +22. operations_users - Operations teams + +Group Assignments should be based on: +- user_access_control table data +- Employee department and access_level +- Regional assignments +- Management hierarchy + +Example assignments: +- john.smith@databricks.com -> sales_users, us_west_users, all_employees +- jane.doe@databricks.com -> hr_users, hr_admins, us_east_users, all_employees +- sarah.johnson@databricks.com -> system_admins, managers, senior_managers, all_employees +*/ + +-- ================================================================ +-- STEP 11: POLICY MANAGEMENT AND MAINTENANCE +-- ================================================================ + +-- View all active policies +-- SELECT policy_name, policy_type, scope, principals +-- FROM system.information_schema.policies +-- WHERE catalog_name = 'enterprise_gov'; + +-- Monitor policy performance +-- SELECT * FROM system.access_control.audit() +-- WHERE source_type = 'ABAC_POLICY' +-- AND event_time > current_timestamp() - INTERVAL 1 DAY; + +-- ================================================================ +-- CLEANUP COMMANDS (FOR TESTING ONLY) +-- ================================================================ +-- Uncomment to remove policies during testing + +-- DROP POLICY IF EXISTS employee_hierarchical_access ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS sales_territory_access ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS personal_record_access ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS department_based_access ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS mask_ssn_policy ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS mask_salary_policy ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS mask_phone_policy ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS mask_address_policy ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS mask_emergency_contact_policy ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS mask_credit_score_policy ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS financial_data_access ON CATALOG enterprise_gov; +-- DROP POLICY IF EXISTS regional_compliance_policy ON CATALOG enterprise_gov; + +-- ================================================================ +-- END OF COMPLETE ABAC POLICIES IMPLEMENTATION +-- ================================================================ diff --git a/uc-quickstart/utils/abac_policy_agent/uc_demo_schema_setup.sql b/uc-quickstart/utils/abac_policy_agent/uc_demo_schema_setup.sql new file mode 100644 index 00000000..b7daba88 --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/uc_demo_schema_setup.sql @@ -0,0 +1,407 @@ +-- ================================================================ +-- Unity Catalog Row Filter & Column Mask Demo +-- Comprehensive examples showcasing enterprise data governance +-- ================================================================ + +-- Create Demo Catalog and Schema +CREATE CATALOG IF NOT EXISTS enterprise_gov; +USE CATALOG enterprise_gov; + +CREATE SCHEMA IF NOT EXISTS hr_finance; +USE SCHEMA hr_finance; + +-- ================================================================ +-- STEP 1: Create User Groups and Mapping Tables +-- ================================================================ + +-- User access mapping table +CREATE OR REPLACE TABLE user_access_control ( + username STRING, + department STRING, + region STRING, + access_level STRING, + manager_email STRING +); + +INSERT INTO user_access_control VALUES +('john.smith@databricks.com', 'Sales', 'US-West', 'standard', 'sarah.johnson@databricks.com'), +('jane.doe@databricks.com', 'Human Resource', 'US-East', 'admin', 'sarah.johnson@databricks.com'), +('mike.wilson@databricks.com', 'Finance', 'US-Central', 'standard', 'sarah.johnson@databricks.com'), +('sarah.johnson@databricks.com', 'Management', 'US-West', 'super_admin', null), +('raj.patel@databricks.com', 'Sales', 'APAC', 'standard', 'lisa.chen@databricks.com'), +('lisa.chen@databricks.com', 'Sales', 'APAC', 'manager', 'sarah.johnson@databricks.com'), +('emma.brown@databricks.com', 'Human Resource', 'Europe', 'admin', 'david.garcia@databricks.com'), +('david.garcia@databricks.com', 'Management', 'Europe', 'manager', 'sarah.johnson@databricks.com'), +('alex.kim@databricks.com', 'Finance', 'APAC', 'standard', 'mike.wilson@databricks.com'), +('maria.rodriguez@databricks.com', 'IT', 'US-Central', 'admin', 'sarah.johnson@databricks.com'); + +-- Department access levels +CREATE OR REPLACE TABLE department_privileges ( + department STRING, + can_view_salary BOOLEAN, + can_view_ssn BOOLEAN, + can_view_all_regions BOOLEAN +); + +INSERT INTO department_privileges VALUES +('Human Resource', true, true, true), +('Management', true, true, true), +('Finance', true, false, false), +('Sales', false, false, false), +('IT', false, false, true); + +-- ================================================================ +-- STEP 2: Create Main Employee Data Table +-- ================================================================ + +CREATE OR REPLACE TABLE employees ( + employee_id INT, + first_name STRING, + last_name STRING, + email STRING, + department STRING, + region STRING, + salary DECIMAL(10,2), + ssn STRING, + hire_date DATE, + performance_rating STRING, + manager_email STRING, + phone_number STRING, + address STRING, + emergency_contact STRING +); + +INSERT INTO employees VALUES +(1001, 'John', 'Smith', 'john.smith@databricks.com', 'Sales', 'US-West', 95000.00, '123-45-6789', '2022-03-15', 'Exceeds', 'sarah.johnson@databricks.com', '555-0101', '123 Main St, San Francisco, CA', 'spouse: 555-0102'), +(1002, 'Jane', 'Doe', 'jane.doe@databricks.com', 'Human Resource', 'US-East', 87000.00, '234-56-7890', '2021-08-20', 'Meets', 'sarah.johnson@databricks.com', '555-0103', '456 Oak Ave, New York, NY', 'parent: 555-0104'), +(1003, 'Mike', 'Wilson', 'mike.wilson@databricks.com', 'Finance', 'US-Central', 105000.00, '345-67-8901', '2020-11-10', 'Exceeds', 'sarah.johnson@databricks.com', '555-0105', '789 Pine Rd, Chicago, IL', 'sibling: 555-0106'), +(1004, 'Sarah', 'Johnson', 'sarah.johnson@databricks.com', 'Management', 'US-West', 150000.00, '456-78-9012', '2019-01-05', 'Outstanding', null, '555-0107', '321 Elm Dr, Los Angeles, CA', 'spouse: 555-0108'), +(1005, 'Raj', 'Patel', 'raj.patel@databricks.com', 'Sales', 'APAC', 78000.00, '567-89-0123', '2022-07-12', 'Meets', 'lisa.chen@databricks.com', '555-0109', '654 Maple Ln, Singapore', 'parent: 555-0110'), +(1006, 'Lisa', 'Chen', 'lisa.chen@databricks.com', 'Sales', 'APAC', 120000.00, '678-90-1234', '2021-02-28', 'Exceeds', 'sarah.johnson@databricks.com', '555-0111', '987 Cedar St, Tokyo, Japan', 'spouse: 555-0112'), +(1007, 'Emma', 'Brown', 'emma.brown@databricks.com', 'Human Resource', 'Europe', 82000.00, '789-01-2345', '2021-09-15', 'Meets', 'david.garcia@databricks.com', '555-0113', '147 Birch Ave, London, UK', 'friend: 555-0114'), +(1008, 'David', 'Garcia', 'david.garcia@databricks.com', 'Management', 'Europe', 135000.00, '890-12-3456', '2020-04-22', 'Exceeds', 'sarah.johnson@databricks.com', '555-0115', '258 Willow Rd, Madrid, Spain', 'parent: 555-0116'), +(1009, 'Alex', 'Kim', 'alex.kim@databricks.com', 'Finance', 'APAC', 92000.00, '901-23-4567', '2022-12-01', 'Meets', 'mike.wilson@databricks.com', '555-0117', '369 Spruce Dr, Seoul, Korea', 'sibling: 555-0118'), +(1010, 'Maria', 'Rodriguez', 'maria.rodriguez@databricks.com', 'IT', 'US-Central', 98000.00, '012-34-5678', '2021-06-30', 'Exceeds', 'sarah.johnson@databricks.com', '555-0119', '741 Ash Ln, Austin, TX', 'spouse: 555-0120'); + +-- ================================================================ +-- STEP 3: Create Customer Data Table for Sales Filtering +-- ================================================================ + +CREATE OR REPLACE TABLE customers ( + customer_id INT, + company_name STRING, + contact_email STRING, + region STRING, + account_value DECIMAL(12,2), + assigned_sales_rep STRING, + credit_score INT, + payment_terms STRING, + created_date DATE +); + +INSERT INTO customers VALUES +(2001, 'TechCorp Inc', 'admin@techcorp.com', 'US-West', 250000.00, 'john.smith@databricks.com', 780, 'NET30', '2022-01-15'), +(2002, 'DataSystems LLC', 'contact@datasystems.com', 'US-East', 180000.00, 'john.smith@databricks.com', 720, 'NET45', '2022-03-20'), +(2003, 'Global Analytics', 'info@globalanalytics.com', 'APAC', 320000.00, 'raj.patel@databricks.com', 820, 'NET30', '2022-02-10'), +(2004, 'InnovateTech', 'hello@innovatetech.com', 'Europe', 195000.00, 'lisa.chen@databricks.com', 750, 'NET30', '2022-04-05'), +(2005, 'SmartSolutions', 'team@smartsolutions.com', 'APAC', 280000.00, 'raj.patel@databricks.com', 790, 'NET15', '2022-05-12'), +(2006, 'CloudFirst Systems', 'support@cloudfirst.com', 'US-Central', 150000.00, 'john.smith@databricks.com', 680, 'NET60', '2022-06-18'), +(2007, 'SecureData Corp', 'admin@securedata.com', 'Europe', 420000.00, 'lisa.chen@databricks.com', 850, 'NET30', '2022-07-25'), +(2008, 'AI Innovations', 'contact@aiinnovations.com', 'US-West', 380000.00, 'john.smith@databricks.com', 800, 'NET30', '2022-08-14'), +(2009, 'NextGen Analytics', 'info@nextgenanalytics.com', 'APAC', 220000.00, 'raj.patel@databricks.com', 740, 'NET45', '2022-09-30'), +(2010, 'Future Systems', 'hello@futuresystems.com', 'Europe', 160000.00, 'lisa.chen@databricks.com', 710, 'NET30', '2022-10-22'); + +-- ================================================================ +-- STEP 4: ROW FILTER FUNCTIONS +-- ================================================================ + +-- 1. User-based row filter - Users can only see their own employee record +CREATE OR REPLACE FUNCTION user_own_record_filter(email STRING) +RETURN email = current_user(); + +-- 2. Department-based row filter - Users can see records in their department +CREATE OR REPLACE FUNCTION department_filter(emp_email STRING) +RETURN EXISTS( + SELECT 1 FROM user_access_control u1 + JOIN user_access_control u2 ON u1.department = u2.department + WHERE u1.username = current_user() + AND u2.username = emp_email +); + +-- 3. Regional access filter with admin override +CREATE OR REPLACE FUNCTION regional_access_filter(emp_region STRING, emp_email STRING) +RETURN + -- Super admins see everything + EXISTS(SELECT 1 FROM user_access_control WHERE username = current_user() AND access_level = 'super_admin') + OR + -- Regional managers see their region + EXISTS(SELECT 1 FROM user_access_control u1 + JOIN user_access_control u2 ON u1.region = u2.region + WHERE u1.username = current_user() + AND u2.username = emp_email + AND u1.access_level IN ('manager', 'admin')) + OR + -- Users see their own record + emp_email = current_user(); + +-- 4. Hierarchical access filter - Managers can see their subordinates +CREATE OR REPLACE FUNCTION hierarchical_filter(emp_email STRING, emp_manager STRING) +RETURN + -- Super admins see everything + EXISTS(SELECT 1 FROM user_access_control WHERE username = current_user() AND access_level = 'super_admin') + OR + -- Managers see their direct reports + emp_manager = current_user() + OR + -- Users see their own record + emp_email = current_user() + OR + -- Department admins see department records + EXISTS(SELECT 1 FROM user_access_control u1 + JOIN user_access_control u2 ON u1.department = u2.department + WHERE u1.username = current_user() + AND u2.username = emp_email + AND u1.access_level = 'admin'); + +-- 5. Sales territory filter for customers +CREATE OR REPLACE FUNCTION sales_territory_filter(assigned_rep STRING, cust_region STRING) +RETURN + -- Sales reps see their assigned customers + assigned_rep = current_user() + OR + -- Managers see customers in their region + EXISTS(SELECT 1 FROM user_access_control + WHERE username = current_user() + AND region = cust_region + AND access_level IN ('manager', 'super_admin')) + OR + -- Admins see everything + EXISTS(SELECT 1 FROM user_access_control + WHERE username = current_user() + AND access_level IN ('admin', 'super_admin')); + +-- ================================================================ +-- STEP 5: COLUMN MASK FUNCTIONS +-- ================================================================ + +-- 1. SSN masking - Only Human Resource and Management can see full SSN +CREATE OR REPLACE FUNCTION ssn_mask(ssn STRING) +RETURN + CASE + WHEN EXISTS(SELECT 1 FROM user_access_control u + JOIN department_privileges d ON u.department = d.department + WHERE u.username = current_user() AND d.can_view_ssn = true) + THEN ssn + ELSE 'XXX-XX-' || RIGHT(ssn, 4) + END; + +-- 2. Salary masking - Human Resource, Management, and Finance can see salaries +CREATE OR REPLACE FUNCTION salary_mask(salary DECIMAL(10,2)) +RETURN + CASE + WHEN EXISTS(SELECT 1 FROM user_access_control u + JOIN department_privileges d ON u.department = d.department + WHERE u.username = current_user() AND d.can_view_salary = true) + THEN salary + ELSE 0.00 + END; + +-- 3. Phone number partial masking +CREATE OR REPLACE FUNCTION phone_mask(phone STRING, emp_email STRING) +RETURN + CASE + WHEN emp_email = current_user() THEN phone -- Own record + WHEN EXISTS(SELECT 1 FROM user_access_control WHERE username = current_user() AND access_level IN ('admin', 'super_admin')) + THEN phone -- Admins see all + ELSE LEFT(phone, 8) || 'XXXX' -- Others see partial + END; + +-- 4. Address masking based on hierarchy +CREATE OR REPLACE FUNCTION address_mask(address STRING, emp_email STRING, emp_manager STRING) +RETURN + CASE + WHEN emp_email = current_user() THEN address -- Own record + WHEN emp_manager = current_user() THEN address -- Direct reports + WHEN EXISTS(SELECT 1 FROM user_access_control WHERE username = current_user() AND access_level IN ('admin', 'super_admin')) + THEN address -- Admins see all + ELSE '[CONFIDENTIAL]' + END; + +-- 5. Emergency contact masking +CREATE OR REPLACE FUNCTION emergency_contact_mask(contact STRING, emp_email STRING) +RETURN + CASE + WHEN emp_email = current_user() THEN contact -- Own record + WHEN EXISTS(SELECT 1 FROM user_access_control u + WHERE u.username = current_user() + AND u.department IN ('Human Resource', 'Management')) + THEN contact -- Human Resource and Management only + ELSE '[REDACTED]' + END; + +-- 6. Credit score masking for customers +CREATE OR REPLACE FUNCTION credit_score_mask(score INT, cust_region STRING) +RETURN + CASE + WHEN EXISTS(SELECT 1 FROM user_access_control + WHERE username = current_user() + AND department IN ('Finance', 'Management')) + THEN score + WHEN EXISTS(SELECT 1 FROM user_access_control + WHERE username = current_user() + AND region = cust_region + AND access_level = 'manager') + THEN score + ELSE 0 + END; + +-- ================================================================ +-- STEP 6: APPLY FILTERS AND MASKS TO TABLES +-- ================================================================ + +-- Apply comprehensive governance to employees table +ALTER TABLE employees SET ROW FILTER hierarchical_filter ON (email, manager_email); + +-- Apply column masks +ALTER TABLE employees ALTER COLUMN ssn SET MASK ssn_mask; +ALTER TABLE employees ALTER COLUMN salary SET MASK salary_mask; +ALTER TABLE employees ALTER COLUMN phone_number SET MASK phone_mask USING COLUMNS (email); +ALTER TABLE employees ALTER COLUMN address SET MASK address_mask USING COLUMNS (email, manager_email); +ALTER TABLE employees ALTER COLUMN emergency_contact SET MASK emergency_contact_mask USING COLUMNS (email); + +-- Apply sales territory filter to customers +ALTER TABLE customers SET ROW FILTER sales_territory_filter ON (assigned_sales_rep, region); + +-- Apply credit score masking +ALTER TABLE customers ALTER COLUMN credit_score SET MASK credit_score_mask USING COLUMNS (region); + +-- ================================================================ +-- STEP 7: CREATE ADDITIONAL DEMO TABLES FOR SPECIFIC USE CASES +-- ================================================================ + +-- Table with user-level filtering only +CREATE OR REPLACE TABLE personal_records ( + record_id INT, + user_email STRING, + personal_notes STRING, + confidential_data STRING +) WITH ROW FILTER user_own_record_filter ON (user_email); + +INSERT INTO personal_records VALUES +(1, 'john.smith@databricks.com', 'Performance goals for Q4', 'Salary negotiation notes'), +(2, 'jane.doe@databricks.com', 'Training completion status', 'Human Resource disciplinary records'), +(3, 'mike.wilson@databricks.com', 'Budget planning notes', 'Financial projections'); + +-- Table with department-only access +CREATE OR REPLACE TABLE department_data ( + dept_id INT, + department STRING, + budget DECIMAL(12,2), + headcount INT, + sensitive_info STRING +) WITH ROW FILTER department_filter ON (department); + +-- Create department-specific masking for budget +CREATE OR REPLACE FUNCTION budget_mask(budget DECIMAL(12,2)) +RETURN + CASE + WHEN EXISTS(SELECT 1 FROM user_access_control + WHERE username = current_user() + AND access_level IN ('super_admin', 'admin') + AND department IN ('Management', 'Finance')) + THEN budget + ELSE 0.00 + END; + +ALTER TABLE department_data ALTER COLUMN budget SET MASK budget_mask; +ALTER TABLE department_data ALTER COLUMN sensitive_info SET MASK + (CASE + WHEN EXISTS(SELECT 1 FROM user_access_control + WHERE username = current_user() + AND access_level = 'super_admin') + THEN sensitive_info + ELSE '[DEPARTMENT CONFIDENTIAL]' + END); + +INSERT INTO department_data VALUES +(1, 'Sales', 2500000.00, 25, 'Commission structure details'), +(2, 'Human Resource', 1800000.00, 15, 'Employee relations issues'), +(3, 'Finance', 1200000.00, 12, 'Audit findings'), +(4, 'Management', 3000000.00, 8, 'Strategic planning documents'), +(5, 'IT', 2200000.00, 20, 'Security incident reports'); + +-- ================================================================ +-- STEP 8: TEST QUERIES AND EXAMPLES +-- ================================================================ + +-- Example queries to test different access patterns: + +-- 1. View all employees (filtered by hierarchy) +-- SELECT * FROM employees; + +-- 2. View customers (filtered by territory) +-- SELECT * FROM customers; + +-- 3. View personal records (user can only see their own) +-- SELECT * FROM personal_records; + +-- 4. View department data (filtered by department membership) +-- SELECT * FROM department_data; + +-- 5. Complex join with multiple filters and masks +-- SELECT +-- e.first_name, +-- e.last_name, +-- e.department, +-- e.salary, +-- e.ssn, +-- c.company_name, +-- c.account_value, +-- c.credit_score +-- FROM employees e +-- LEFT JOIN customers c ON e.email = c.assigned_sales_rep; + +-- ================================================================ +-- STEP 9: ADMINISTRATIVE QUERIES FOR TESTING +-- ================================================================ + +-- View current user context +-- SELECT current_user() as current_user; + +-- Check user access levels +-- SELECT * FROM user_access_control WHERE username = current_user(); + +-- View department privileges +-- SELECT * FROM department_privileges; + +-- ================================================================ +-- STEP 10: CLEANUP COMMANDS (OPTIONAL) +-- ================================================================ + +-- To remove filters and masks for testing: +-- ALTER TABLE employees DROP ROW FILTER; +-- ALTER TABLE employees ALTER COLUMN ssn DROP MASK; +-- ALTER TABLE employees ALTER COLUMN salary DROP MASK; +-- ALTER TABLE employees ALTER COLUMN phone_number DROP MASK; +-- ALTER TABLE employees ALTER COLUMN address DROP MASK; +-- ALTER TABLE employees ALTER COLUMN emergency_contact DROP MASK; + +-- ALTER TABLE customers DROP ROW FILTER; +-- ALTER TABLE customers ALTER COLUMN credit_score DROP MASK; + +-- DROP FUNCTION IF EXISTS user_own_record_filter; +-- DROP FUNCTION IF EXISTS department_filter; +-- DROP FUNCTION IF EXISTS regional_access_filter; +-- DROP FUNCTION IF EXISTS hierarchical_filter; +-- DROP FUNCTION IF EXISTS sales_territory_filter; +-- DROP FUNCTION IF EXISTS ssn_mask; +-- DROP FUNCTION IF EXISTS salary_mask; +-- DROP FUNCTION IF EXISTS phone_mask; +-- DROP FUNCTION IF EXISTS address_mask; +-- DROP FUNCTION IF EXISTS emergency_contact_mask; +-- DROP FUNCTION IF EXISTS credit_score_mask; +-- DROP FUNCTION IF EXISTS budget_mask; + +-- ================================================================ +-- END OF DEMO SCRIPT +-- ================================================================ diff --git a/uc-quickstart/utils/abac_policy_agent/uc_function_tools.sql b/uc-quickstart/utils/abac_policy_agent/uc_function_tools.sql new file mode 100644 index 00000000..5e8e3e9b --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/uc_function_tools.sql @@ -0,0 +1,60 @@ +-- Create Demo Catalog and Schema +CREATE CATALOG IF NOT EXISTS enterprise_gov; +USE CATALOG enterprise_gov; + +CREATE SCHEMA IF NOT EXISTS gov_admin; +USE SCHEMA gov_admin; + +-- Crete Agent tool UC UDFs + + +CREATE OR REPLACE FUNCTION enterprise_gov.gov_admin.list_row_filter_column_masking(catalog_name STRING, schema_name STRING) +RETURNS TABLE (routine_catalog STRING, routine_schema STRING, routine_name STRING, comment STRING, full_data_type STRING, routine_definition STRING) +RETURN ( +select routine_catalog, routine_schema, routine_name, comment, full_data_type, routine_definition +FROM system.information_schema.routines +WHERE routine_catalog = catalog_name +and routine_schema = schema_name +and routine_type = 'FUNCTION' +); + + +CREATE OR REPLACE FUNCTION enterprise_gov.gov_admin.describe_extended_table(catalog_name STRING, schema_name STRING, table_name STRING) +RETURNS TABLE (table_catalog STRING, table_schema STRING, table_name STRING, column_name STRING, data_type STRING, comment STRING) +RETURN ( + SELECT table_catalog,table_schema, table_name, column_name, data_type, comment + FROM system.information_schema.columns + WHERE table_catalog = catalog_name + AND table_schema = schema_name + AND table_name = table_name +); + + +CREATE OR REPLACE FUNCTION enterprise_gov.gov_admin.list_uc_tables(catalog_name STRING, schema_name STRING) +RETURNS TABLE (table_name STRING, table_type STRING, created TIMESTAMP) +RETURN ( + SELECT table_name, table_type, created + FROM system.information_schema.tables + WHERE table_catalog = catalog_name AND table_schema = schema_name +); + + +CREATE OR REPLACE FUNCTION enterprise_gov.gov_admin.get_table_tags(catalog_name STRING, schema_name STRING, table_name STRING) +RETURNS TABLE (table_name STRING, tag_name STRING, tag_value STRING) +RETURN ( + SELECT table_name, tag_name, tag_value + FROM system.information_schema.table_tags + WHERE catalog_name = catalog_name AND schema_name = schema_name AND table_name = table_name +); + + +CREATE OR REPLACE FUNCTION enterprise_gov.gov_admin.get_column_tags(catalog_name STRING, schema_name STRING, table_name STRING) +RETURNS TABLE (table_name STRING, column_name STRING, tag_name STRING, tag_value STRING) +RETURN ( + SELECT table_name, column_name, tag_name, tag_value + FROM system.information_schema.column_tags + WHERE catalog_name = catalog_name AND schema_name = schema_name AND table_name = table_name +); + + + diff --git a/uc-quickstart/utils/abac_policy_agent/uc_tags_setup.sql b/uc-quickstart/utils/abac_policy_agent/uc_tags_setup.sql new file mode 100644 index 00000000..847d47ba --- /dev/null +++ b/uc-quickstart/utils/abac_policy_agent/uc_tags_setup.sql @@ -0,0 +1,118 @@ +-- ================================================================ +-- UNITY CATALOG ABAC POLICIES IMPLEMENTATION +-- Based on existing Unity Catalog demo with enterprise governance +-- ================================================================ + +-- Prerequisites: +-- 1. Enable ABAC in workspace: Admin > Previews > Attribute Based Access Control = ON +-- 2. Enable Tag Policies: Account Console > Previews > Tag policies = ON +-- 3. Compute: Databricks Runtime 16.4+ or Serverless +-- 4. Groups created in Databricks workspace (see section at bottom) + +USE CATALOG enterprise_gov; +USE SCHEMA hr_finance; + +-- ================================================================ +-- STEP 1: CREATE GOVERNED TAGS AT ACCOUNT LEVEL +-- ================================================================ +-- Note: These must be created in the Account Console > Governed tags section + +/* +Account-level governed tags to be created via UI: +1. hr_data_classification: public, internal, confidential, restricted +2. pii_level: none, low, medium, high +3. financial_data: true, false +4. department: Human Resource, Finance, Sales, IT, Management +5. region: US-West, US-East, US-Central, APAC, Europe +6. access_level: standard, manager, admin, super_admin +7. role_required: employee, manager, hr_admin, finance_admin, system_admin +8. business_unit: Corporate, Field_Sales, Support, Operations +*/ + +-- ================================================================ +-- STEP 2: APPLY TAGS TO EXISTING TABLES AND COLUMNS +-- ================================================================ + +-- Tag employees table and columns +ALTER TABLE employees SET TAGS ( + 'hr_data_classification' = 'confidential', + 'department' = 'Human Resource', + 'pii_level' = 'high', + 'business_unit' = 'Corporate' +); + +-- Tag sensitive employee columns +ALTER TABLE employees ALTER COLUMN ssn SET TAGS ( + 'pii_level' = 'high', + 'hr_data_classification' = 'restricted', + 'role_required' = 'hr_admin' +); + +ALTER TABLE employees ALTER COLUMN salary SET TAGS ( + 'financial_data' = 'true', + 'hr_data_classification' = 'confidential', + 'access_level' = 'manager' +); + +ALTER TABLE employees ALTER COLUMN phone_number SET TAGS ( + 'pii_level' = 'medium', + 'hr_data_classification' = 'internal' +); + +ALTER TABLE employees ALTER COLUMN address SET TAGS ( + 'pii_level' = 'high', + 'hr_data_classification' = 'confidential' +); + +ALTER TABLE employees ALTER COLUMN emergency_contact SET TAGS ( + 'pii_level' = 'high', + 'hr_data_classification' = 'restricted', + 'role_required' = 'hr_admin' +); + +-- Tag customers table and columns +ALTER TABLE customers SET TAGS ( + 'hr_data_classification' = 'internal', + 'department' = 'Sales', + 'business_unit' = 'Field_Sales' +); + +ALTER TABLE customers ALTER COLUMN credit_score SET TAGS ( + 'financial_data' = 'true', + 'hr_data_classification' = 'confidential', + 'role_required' = 'finance_admin' +); + +ALTER TABLE customers ALTER COLUMN account_value SET TAGS ( + 'financial_data' = 'true', + 'hr_data_classification' = 'internal', + 'access_level' = 'manager' +); + +-- Tag other demo tables +ALTER TABLE user_access_control SET TAGS ( + 'hr_data_classification' = 'internal', + 'department' = 'IT', + 'business_unit' = 'Operations' +); + +ALTER TABLE department_privileges SET TAGS ( + 'hr_data_classification' = 'internal', + 'department' = 'IT' +); + +ALTER TABLE personal_records SET TAGS ( + 'hr_data_classification' = 'restricted', + 'pii_level' = 'high' +); + +ALTER TABLE department_data SET TAGS ( + 'hr_data_classification' = 'confidential', + 'financial_data' = 'true' +); + +ALTER TABLE department_data ALTER COLUMN budget SET TAGS ( + 'financial_data' = 'true', + 'hr_data_classification' = 'restricted', + 'role_required' = 'finance_admin' +);