1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
import boto3
GUARDRAIL_ID = "xxxxx" GUARDRAIL_VERSION = "DRAFT" REGION = "us-west-2"
bedrock = boto3.client("bedrock-runtime", region_name=REGION)
def apply_guardrail(text: str, source: str): response = bedrock.apply_guardrail( guardrailIdentifier=GUARDRAIL_ID, guardrailVersion=GUARDRAIL_VERSION, source=source, content=[ { "text": { "text": text } } ] )
return response
def classify(response): """用 API 原生值判断结果。 返回 (action, rule_actions): action = 顶层 response["action"]: NONE / GUARDRAIL_INTERVENED rule_actions= 命中的各规则的原生 action 集合,值来自 API: BLOCKED -> 拒绝(拦截) ANONYMIZED -> 修改(脱敏) NONE -> 仅检测不动作 """ action = response["action"] rule_actions = {h["action"] for h in collect_hits(response) if h.get("action")} return action, rule_actions
def collect_hits(response): """收集每条命中的规则,返回结构化列表 —— 覆盖 Guardrail 的全部策略类型。 每个元素: {policy, name, action, match, confidence?} policy : 哪类策略 (topic/content/word/profanity/pii/regex/grounding) name : 规则名 / 类型 action : 这条规则的动作 (BLOCKED / ANONYMIZED / NONE / ...) match : 实际匹配到的文本(主题/内容过滤等没有则为 None)
覆盖说明:Guardrail 所有策略块都在这里处理,新增/未知的块也用 通用兜底逻辑捞出来,保证"漏不掉"(unknown policy 也会被记录)。 """ BLOCK_MAP = { "topicPolicy": [("topics", "topic", "name")], "contentPolicy": [("filters", "content", "type")], "wordPolicy": [("customWords", "word", "match"), ("managedWordLists", "profanity", "type")], "sensitiveInformationPolicy":[("piiEntities", "pii", "type"), ("regexes", "regex", "name")], "contextualGroundingPolicy": [("filters", "grounding", "type")], }
hits = [] seen_blocks = set() for a in response.get("assessments", []): for block_name, sublists in BLOCK_MAP.items(): block = a.get(block_name) if not block: continue seen_blocks.add(block_name) for list_key, policy, name_field in sublists: for item in block.get(list_key, []): if not item.get("detected", True): continue hits.append({ "policy": policy, "name": item.get(name_field), "action": item.get("action"), "match": item.get("match"), "confidence": item.get("confidence"), }) for k, v in a.items(): if k in ("invocationMetrics", "appliedGuardrailDetails"): continue if k not in BLOCK_MAP and isinstance(v, dict): hits.append({"policy": f"unknown:{k}", "name": None, "action": None, "match": None, "raw": v}) return hits
def analyze(text, source="OUTPUT"): """对一段文本调 Guardrail,返回结构化的完整结果(便于统计/落库)。 字段全部用 API 原生值: action = NONE / GUARDRAIL_INTERVENED rule_actions = 命中各规则的原生动作集合: BLOCKED(拒绝) / ANONYMIZED(脱敏) / NONE """ resp = apply_guardrail(text, source) action, rule_actions = classify(resp) outs = resp.get("outputs", []) return { "input": text, "source": source, "action": action, "actionReason": resp.get("actionReason"), "rule_actions": sorted(rule_actions), "returned_text": outs[0]["text"] if outs else None, "hits": collect_hits(resp), }
def show(text, source="OUTPUT"): r = analyze(text, source) print("#" * 60) print(f"输入 : {r['input']} (source={r['source']})") print(f"action : {r['action']}") print(f"actionReason: {r['actionReason']}") print(f"rule_actions: {r['rule_actions'] or '(无)'}") print(f"返回内容 : {r['returned_text'] or '(无,原样放行)'}") if r["hits"]: print("命中规则 :") for h in r["hits"]: m = f" 匹配='{h['match']}'" if h.get("match") else "" c = f" 置信度={h['confidence']}" if h.get("confidence") else "" print(f" - [{h['policy']}] {h['name']} -> {h['action']}{m}{c}") else: print("命中规则 : (无)") print()
if __name__ == "__main__": show("You are a fucking asshole.")
|