CVE-2026-12203
Description
An unauthenticated information disclosure vulnerability in AI-Trader's research export endpoint exposes agent IDs and financial balances.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
An unauthenticated information disclosure vulnerability in AI-Trader's research export endpoint exposes agent IDs and financial balances.
Vulnerability
The vulnerability exists in the /api/research/agents.csv endpoint of HKUDS AI-Trader up to commit 74caf996f78dcc0c657df8365c8544678a16e215. The endpoint lacks authentication, allowing any remote user to download the full database of registered agents, including internal Agent IDs and precise financial balances [1][4]. The product follows a rolling release model, so no version numbers are provided [1].
Exploitation
An attacker can exploit this by sending a simple HTTP GET request to /api/research/agents.csv without any authentication or user interaction. The exploit has been made public, increasing the risk of automated scraping [4]. No special network position is required beyond internet access to the platform.
Impact
Successful exploitation results in information disclosure of sensitive data: internal Agent IDs and financial balances of all registered agents. While some fields attempt anonymization via hashing, the bulk data can be used for competitive intelligence gathering and targeted attacks [4]. The attacker gains unauthorized read access to the entire agent database.
Mitigation
The vendor has released a patch in commit 91a31aac1b0f4dbc6b8bef9f6eff0b7912e0bc65 [2]. The fix adds permission checks, requiring an authenticated agent with the research_exports capability to access the endpoint [3]. Users should apply the patch immediately. No workaround is available if the patch cannot be applied.
- GitHub - HKUDS/AI-Trader: "AI-Trader: 100% Fully-Automated Agent-Native Trading"
- Add admin permissions and experiment notice tracking (#227) · HKUDS/AI-Trader@91a31aa
- Add admin permissions and experiment notice tracking by TianyuFan0504 · Pull Request #227 · HKUDS/AI-Trader
- Unauthenticated Sensitive Data Exposure in Research Export
AI Insight generated on Jun 15, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected products
2Patches
191a31aac1b0fAdd admin permissions and experiment notice tracking (#227)
27 files changed · +1867 −166
service/frontend/src/appChrome.tsx+13 −8 modified@@ -2,7 +2,7 @@ import { useEffect, useState } from 'react' import { Link, useLocation } from 'react-router-dom' -import { useLanguage, useTheme } from './appShared' +import { type AgentInfo, hasPermission, useLanguage, useTheme } from './appShared' export function Toast({ message, type, onClose }: { message: string, type: 'success' | 'error', onClose: () => void }) { useEffect(() => { @@ -76,7 +76,7 @@ export function Sidebar({ onMarkCategoryRead }: { token: string | null - agentInfo: any + agentInfo: AgentInfo | null onLogout: () => void notificationCounts: NotificationCounts onMarkCategoryRead: (category: 'discussion' | 'strategy' | 'experiment') => void @@ -85,14 +85,19 @@ export function Sidebar({ const { t, language } = useLanguage() const [showToken, setShowToken] = useState(false) + const canUseExperiments = hasPermission(agentInfo, 'experiment_admin') + const canUseResearchExports = hasPermission(agentInfo, 'research_exports') + const canUseTeamMissionAdmin = hasPermission(agentInfo, 'team_mission_admin') + const agentToken = agentInfo?.token + const navItems = [ { path: '/financial-events', icon: '🗞️', label: language === 'zh' ? '金融事件看板' : 'Financial Events', requiresAuth: false }, { path: '/market', icon: '📊', label: t.nav.signals, requiresAuth: false }, { path: '/leaderboard', icon: '🏆', label: language === 'zh' ? '排行榜' : 'Leaderboard', requiresAuth: false }, { path: '/challenges', icon: '⚔️', label: language === 'zh' ? '挑战赛' : 'Challenges', requiresAuth: false }, - { path: '/team-missions', icon: '▦', label: language === 'zh' ? '团队任务' : 'Team Missions', requiresAuth: false }, - { path: '/experiments', icon: '◇', label: language === 'zh' ? '实验' : 'Experiments', requiresAuth: true, badge: notificationCounts.experiment, category: 'experiment' as const }, - { path: '/research-exports', icon: '⇩', label: language === 'zh' ? '研究导出' : 'Research Exports', requiresAuth: false }, + ...(canUseTeamMissionAdmin ? [{ path: '/team-missions', icon: '▦', label: language === 'zh' ? '团队任务' : 'Team Missions', requiresAuth: true }] : []), + ...(canUseExperiments ? [{ path: '/experiments', icon: '◇', label: language === 'zh' ? '实验' : 'Experiments', requiresAuth: true, badge: notificationCounts.experiment, category: 'experiment' as const }] : []), + ...(canUseResearchExports ? [{ path: '/research-exports', icon: '⇩', label: language === 'zh' ? '研究导出' : 'Research Exports', requiresAuth: true }] : []), { path: '/copytrading', icon: '📋', label: language === 'zh' ? '跟单' : 'Copy Trading', requiresAuth: true }, { path: '/strategies', icon: '📈', label: t.nav.strategies, requiresAuth: false, badge: notificationCounts.strategy, category: 'strategy' as const }, { path: '/discussions', icon: '💬', label: t.nav.discussions, requiresAuth: false, badge: notificationCounts.discussion, category: 'discussion' as const }, @@ -181,7 +186,7 @@ export function Sidebar({ )} </div> - {agentInfo.token && ( + {agentToken && ( <div style={{ marginTop: '12px', padding: '8px', background: 'var(--bg-secondary)', borderRadius: '8px' }}> <div style={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center', marginBottom: '4px' }}> <div style={{ fontSize: '11px', color: 'var(--text-muted)' }}> @@ -210,11 +215,11 @@ export function Sidebar({ wordBreak: 'break-all' }} onClick={() => { - navigator.clipboard.writeText(agentInfo.token) + navigator.clipboard.writeText(agentToken) alert(language === 'zh' ? 'Token 已复制到剪贴板' : 'Token copied to clipboard') }} > - {showToken ? agentInfo.token : agentInfo.token.substring(0, 10) + '***'} + {showToken ? agentToken : agentToken.substring(0, 10) + '***'} </div> </div> )}
service/frontend/src/AppPages.tsx+2 −1 modified@@ -4,6 +4,7 @@ import { CartesianGrid, Line, LineChart, ResponsiveContainer, Tooltip, XAxis, YA import { API_BASE, + type AgentInfo, COPY_TRADING_PAGE_SIZE, FINANCIAL_NEWS_PAGE_SIZE, LEADERBOARD_LINE_COLORS, @@ -2471,7 +2472,7 @@ export function PositionsPage() { } // Trade Page - Place Order -export function TradePage({ token, agentInfo, onTradeSuccess }: { token: string, agentInfo?: any, onTradeSuccess?: () => void }) { +export function TradePage({ token, agentInfo, onTradeSuccess }: { token: string, agentInfo?: AgentInfo | null, onTradeSuccess?: () => void }) { const { t, language } = useLanguage() const navigate = useNavigate() const [loading, setLoading] = useState(false)
service/frontend/src/appShared.tsx+23 −0 modified@@ -10,6 +10,29 @@ interface LanguageContextType { export type ThemeMode = 'dark' | 'light' +export type AgentPermissions = { + experiment_admin?: boolean + research_exports?: boolean + team_mission_admin?: boolean +} + +export type AgentInfo = { + id: number + name: string + token?: string + role?: string + permissions?: AgentPermissions + wallet_address?: string | null + points?: number + cash?: number + reputation_score?: number + experiment_assignments?: any[] +} + +export function hasPermission(agentInfo: AgentInfo | null | undefined, permission: keyof AgentPermissions) { + return Boolean(agentInfo?.permissions?.[permission]) +} + interface ThemeContextType { theme: ThemeMode setTheme: (theme: ThemeMode) => void
service/frontend/src/App.tsx+31 −7 modified@@ -3,6 +3,7 @@ import { BrowserRouter, Navigate, Route, Routes, useLocation } from 'react-route import { API_BASE, + type AgentInfo, ExchangePage, FinancialEventsPage, LandingPage, @@ -30,6 +31,7 @@ import { ExperimentAdminPage } from './ExperimentAdminPage' import { ResearchExportsPage } from './ResearchExportsPage' import { TeamMissionsPage } from './TeamMissionsPage' import { Language, getT } from './i18n' +import { hasPermission } from './appShared' const DISCUSSION_NOTIFICATION_TYPES = new Set([ 'discussion_started', @@ -63,7 +65,8 @@ function App() { return savedTheme === 'light' ? 'light' : 'dark' }) const [token, setToken] = useState<string | null>(localStorage.getItem('claw_token')) - const [agentInfo, setAgentInfo] = useState<any>(null) + const [agentInfo, setAgentInfo] = useState<AgentInfo | null>(null) + const [agentInfoLoading, setAgentInfoLoading] = useState(Boolean(localStorage.getItem('claw_token'))) const [toast, setToast] = useState<{ message: string, type: 'success' | 'error' } | null>(null) const [notificationCounts, setNotificationCounts] = useState<NotificationCounts>({ discussion: 0, strategy: 0, experiment: 0 }) @@ -72,12 +75,14 @@ function App() { const login = (newToken: string) => { localStorage.setItem('claw_token', newToken) setToken(newToken) + setAgentInfoLoading(true) } const logout = () => { localStorage.removeItem('claw_token') setToken(null) setAgentInfo(null) + setAgentInfoLoading(false) setNotificationCounts({ discussion: 0, strategy: 0, experiment: 0 }) } @@ -87,22 +92,33 @@ function App() { }, [theme]) const fetchAgentInfo = async () => { + if (!token) return + setAgentInfoLoading(true) try { const res = await fetch(`${API_BASE}/claw/agents/me`, { headers: { 'Authorization': `Bearer ${token}` } }) if (res.ok) { const data = await res.json() setAgentInfo(data) + } else if (res.status === 401) { + localStorage.removeItem('claw_token') + setToken(null) + setAgentInfo(null) } } catch (e) { console.error(e) + } finally { + setAgentInfoLoading(false) } } useEffect(() => { if (token) { fetchAgentInfo() + } else { + setAgentInfo(null) + setAgentInfoLoading(false) } }, [token]) @@ -185,6 +201,7 @@ function App() { <AppRouter token={token} agentInfo={agentInfo} + agentInfoLoading={agentInfoLoading} login={login} logout={logout} fetchAgentInfo={fetchAgentInfo} @@ -208,14 +225,16 @@ function App() { function AppRouter({ token, agentInfo, + agentInfoLoading, login, logout, fetchAgentInfo, notificationCounts, markCategoryRead, }: { token: string | null - agentInfo: any + agentInfo: AgentInfo | null + agentInfoLoading: boolean login: (token: string) => void logout: () => void fetchAgentInfo: () => Promise<void> @@ -224,6 +243,11 @@ function AppRouter({ }) { const location = useLocation() const isLanding = location.pathname === '/' + const canUseExperiments = hasPermission(agentInfo, 'experiment_admin') + const canUseResearchExports = hasPermission(agentInfo, 'research_exports') + const canUseTeamMissionAdmin = hasPermission(agentInfo, 'team_mission_admin') + const permissionLoading = Boolean(token && agentInfoLoading) + const permissionLoadingView = <div className="loading"><div className="spinner"></div></div> if (isLanding) { return ( @@ -254,11 +278,11 @@ function AppRouter({ <Route path="/leaderboard" element={<LeaderboardPage token={token} />} /> <Route path="/challenges" element={<ChallengePage token={token} />} /> <Route path="/challenges/:challengeKey" element={<ChallengePage token={token} />} /> - <Route path="/team-missions" element={<TeamMissionsPage token={token} />} /> - <Route path="/team-missions/:missionKey" element={<TeamMissionsPage token={token} />} /> - <Route path="/teams/:teamKey" element={<TeamMissionsPage token={token} />} /> - <Route path="/experiments" element={<ExperimentAdminPage token={token} />} /> - <Route path="/research-exports" element={<ResearchExportsPage />} /> + <Route path="/team-missions" element={permissionLoading ? permissionLoadingView : canUseTeamMissionAdmin ? <TeamMissionsPage token={token} canAdmin={canUseTeamMissionAdmin} /> : <Navigate to="/market" replace />} /> + <Route path="/team-missions/:missionKey" element={permissionLoading ? permissionLoadingView : canUseTeamMissionAdmin ? <TeamMissionsPage token={token} canAdmin={canUseTeamMissionAdmin} /> : <Navigate to="/market" replace />} /> + <Route path="/teams/:teamKey" element={permissionLoading ? permissionLoadingView : canUseTeamMissionAdmin ? <TeamMissionsPage token={token} canAdmin={canUseTeamMissionAdmin} /> : <Navigate to="/market" replace />} /> + <Route path="/experiments" element={permissionLoading ? permissionLoadingView : canUseExperiments ? <ExperimentAdminPage token={token} /> : <Navigate to="/market" replace />} /> + <Route path="/research-exports" element={permissionLoading ? permissionLoadingView : canUseResearchExports && token ? <ResearchExportsPage token={token} /> : <Navigate to="/market" replace />} /> <Route path="/financial-events" element={<FinancialEventsPage />} /> <Route path="/copytrading" element={token ? <CopyTradingPage token={token} /> : <Navigate to="/login" replace />} /> <Route path="/strategies" element={<StrategiesPage />} />
service/frontend/src/ExperimentAdminPage.tsx+7 −2 modified@@ -57,8 +57,11 @@ export function ExperimentAdminPage({ token }: ExperimentAdminPageProps) { const loadExperiments = async () => { setLoading(true) try { - const res = await fetch(`${API_BASE}/experiments?limit=100`) + const res = await fetch(`${API_BASE}/experiments?limit=100`, { + headers: token ? { 'Authorization': `Bearer ${token}` } : {} + }) const data = await res.json() + if (!res.ok) throw new Error(data.detail || 'experiment_load_failed') setExperiments(data.experiments || []) if (!notificationForm.experiment_key && data.experiments?.[0]?.experiment_key) { setNotificationForm((prev) => ({ ...prev, experiment_key: data.experiments[0].experiment_key })) @@ -73,7 +76,9 @@ export function ExperimentAdminPage({ token }: ExperimentAdminPageProps) { const loadAssignments = async (experimentKey: string) => { try { - const res = await fetch(`${API_BASE}/experiments/${experimentKey}/assignments?limit=500`) + const res = await fetch(`${API_BASE}/experiments/${experimentKey}/assignments?limit=500`, { + headers: token ? { 'Authorization': `Bearer ${token}` } : {} + }) const data = await res.json() if (!res.ok) throw new Error(data.detail || 'assignment_load_failed') setSelectedExperiment(data.experiment)
service/frontend/src/ResearchExportsPage.tsx+47 −6 modified@@ -25,10 +25,12 @@ const exportSpecs = [ } ] -export function ResearchExportsPage() { +export function ResearchExportsPage({ token }: { token: string }) { const { language } = useLanguage() const [experiments, setExperiments] = useState<any[]>([]) const [events, setEvents] = useState<any[]>([]) + const [busyDownload, setBusyDownload] = useState<string | null>(null) + const [error, setError] = useState<string | null>(null) const [filters, setFilters] = useState({ start_at: '', end_at: '', @@ -55,22 +57,59 @@ export function ResearchExportsPage() { const loadExperiments = async () => { try { - const res = await fetch(`${API_BASE}/experiments?limit=200`) + const res = await fetch(`${API_BASE}/experiments?limit=200`, { + headers: { 'Authorization': `Bearer ${token}` } + }) const data = await res.json() + if (!res.ok) throw new Error(data.detail || 'experiment_load_failed') setExperiments(data.experiments || []) } catch (e) { console.error(e) + setExperiments([]) } } const loadEvents = async () => { try { - const res = await fetch(`${API_BASE}/research/events?${queryString}`) + const res = await fetch(`${API_BASE}/research/events?${queryString}`, { + headers: { 'Authorization': `Bearer ${token}` } + }) const data = await res.json() + if (!res.ok) throw new Error(data.detail || 'events_load_failed') setEvents(data.events || []) + setError(null) } catch (e) { console.error(e) setEvents([]) + setError(language === 'zh' ? '研究数据加载失败' : 'Failed to load research data') + } + } + + const downloadCsv = async (filename: string) => { + setBusyDownload(filename) + try { + const res = await fetch(`${API_BASE}/research/${filename}?${queryString}`, { + headers: { 'Authorization': `Bearer ${token}` } + }) + if (!res.ok) { + const detail = await res.text() + throw new Error(detail || 'download_failed') + } + const blob = await res.blob() + const url = URL.createObjectURL(blob) + const link = document.createElement('a') + link.href = url + link.download = filename + document.body.appendChild(link) + link.click() + link.remove() + URL.revokeObjectURL(url) + setError(null) + } catch (e) { + console.error(e) + setError(language === 'zh' ? 'CSV 下载失败' : 'CSV download failed') + } finally { + setBusyDownload(null) } } @@ -93,6 +132,8 @@ export function ResearchExportsPage() { </div> </div> + {error && <div className="empty-state"><div className="empty-title">{error}</div></div>} + <section className="experiment-panel"> <div className="experiment-section-header"><h2>{language === 'zh' ? '过滤条件' : 'Filters'}</h2></div> <div className="research-filter-grid"> @@ -125,9 +166,9 @@ export function ResearchExportsPage() { <span className="experiment-badge">{spec.filename}</span> </div> <p>{spec.columns}</p> - <a className="btn btn-primary" href={`${API_BASE}/research/${spec.filename}?${queryString}`}> - {language === 'zh' ? '下载 CSV' : 'Download CSV'} - </a> + <button className="btn btn-primary" disabled={busyDownload === spec.filename} onClick={() => downloadCsv(spec.filename)}> + {busyDownload === spec.filename ? (language === 'zh' ? '下载中' : 'Downloading') : (language === 'zh' ? '下载 CSV' : 'Download CSV')} + </button> </article> ))} </div>
service/frontend/src/TeamMissionsPage.tsx+9 −7 modified@@ -5,6 +5,7 @@ import { API_BASE, MARKETS, useLanguage } from './appShared' type TeamMissionsPageProps = { token?: string | null + canAdmin?: boolean } const missionStatuses = ['upcoming', 'active', 'settled'] as const @@ -29,7 +30,7 @@ function marketLabel(value: string, language: string) { return MARKETS.find((market) => market.value === value)?.[language === 'zh' ? 'labelZh' : 'label'] || value } -export function TeamMissionsPage({ token }: TeamMissionsPageProps) { +export function TeamMissionsPage({ token, canAdmin = false }: TeamMissionsPageProps) { const { missionKey, teamKey } = useParams() const { language } = useLanguage() const [status, setStatus] = useState<'upcoming' | 'active' | 'settled'>('active') @@ -378,9 +379,11 @@ export function TeamMissionsPage({ token }: TeamMissionsPageProps) { <button className="btn btn-primary" disabled={busy || isJoined} onClick={() => handleJoinMission(mission.mission_key)}> {isJoined ? (language === 'zh' ? '已加入' : 'Joined') : (language === 'zh' ? '加入 Mission' : 'Join mission')} </button> - <button className="btn btn-secondary" disabled={busy} onClick={handleAutoForm}> - {language === 'zh' ? '自动组队' : 'Auto-form teams'} - </button> + {canAdmin && ( + <button className="btn btn-secondary" disabled={busy} onClick={handleAutoForm}> + {language === 'zh' ? '自动组队' : 'Auto-form teams'} + </button> + )} </> )} </div> @@ -464,7 +467,7 @@ export function TeamMissionsPage({ token }: TeamMissionsPageProps) { {language === 'zh' ? '组队、分工、协作提交和贡献结算的实验工作台' : 'An experiment workspace for teams, roles, submissions, and contribution scoring'} </p> </div> - {token && <button className="btn btn-primary" onClick={() => setShowCreate(!showCreate)}>{language === 'zh' ? '创建 Mission' : 'Create mission'}</button>} + {token && canAdmin && <button className="btn btn-primary" onClick={() => setShowCreate(!showCreate)}>{language === 'zh' ? '创建 Mission' : 'Create mission'}</button>} </div> <div className="team-tabs"> @@ -475,7 +478,7 @@ export function TeamMissionsPage({ token }: TeamMissionsPageProps) { ))} </div> - {showCreate && ( + {canAdmin && showCreate && ( <section className="team-panel"> <form className="team-create-grid" onSubmit={handleCreateMission}> <input className="form-input" value={createForm.title} onChange={(event) => setCreateForm({ ...createForm, title: event.target.value })} placeholder={language === 'zh' ? 'Mission 标题' : 'Mission title'} required /> @@ -538,4 +541,3 @@ export function TeamMissionsPage({ token }: TeamMissionsPageProps) { </div> ) } -
service/server/database.py+17 −0 modified@@ -377,6 +377,7 @@ def init_database(): token_expires_at TEXT, password_hash TEXT, wallet_address TEXT, + role TEXT DEFAULT 'agent', points INTEGER DEFAULT 0, cash REAL DEFAULT 100000.0, deposited REAL DEFAULT 0.0, @@ -1102,6 +1103,12 @@ def init_database(): except Exception: pass + # Add role column if it doesn't exist (for existing databases) + try: + cursor.execute("ALTER TABLE agents ADD COLUMN role TEXT DEFAULT 'agent'") + except Exception: + pass + # Add password_reset_token column if it doesn't exist (for existing databases) try: cursor.execute("ALTER TABLE agents ADD COLUMN password_reset_token TEXT") @@ -1198,6 +1205,16 @@ def init_database(): ON signals(market, token_id) """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_agent_messages_agent_read_created + ON agent_messages(agent_id, read, created_at) + """) + + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_agent_messages_agent_type_created + ON agent_messages(agent_id, type, created_at) + """) + cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_polymarket_settlements_agent ON polymarket_settlements(agent_id, settled_at DESC)
service/server/experiment_notifications.py+119 −0 modified@@ -35,6 +35,8 @@ MAX_LIMIT = 5289 TARGET_PREVIEW_LIMIT = 20 CONTENT_MAX_LENGTH = 4000 +EXPERIMENT_READ_CONVERSION_REMINDER_TYPE = "experiment_reminder" +EXPERIMENT_READ_CONVERSION_EVENT_TYPE = "experiment_read_conversion_reminder_sent" class ExperimentNotificationError(ValueError): @@ -236,6 +238,123 @@ def resolve_recent_active_experiment_targets( return _apply_agent_filter(rows, normalized_agent_ids) +def resolve_unread_active_experiment_targets( + experiment_key: str, + *, + variant_key: Optional[str] = None, + agent_ids: Optional[list[int] | list[str]] = None, + limit: Optional[int] = None, + active_since: Optional[str] = None, + reminder_since: Optional[str] = None, +) -> list[dict[str, Any]]: + """Resolve fixed-cohort agents who are active but still have unread experiment messages.""" + normalized_agent_ids = _coerce_agent_ids(agent_ids) + clamped_limit = _clamp_limit(limit) + params: list[Any] = [experiment_key] + where = [ + "ea.experiment_key = ?", + "ea.unit_type = 'agent'", + """ + EXISTS ( + SELECT 1 + FROM agent_messages unread + WHERE unread.agent_id = ea.unit_id + AND unread.read = 0 + AND unread.type IN ( + 'experiment_announcement', + 'experiment_assignment', + 'experiment_reminder', + 'experiment_rule_update', + 'experiment_result_update', + 'challenge_invite', + 'team_mission_invite' + ) + ) + """, + ] + if variant_key: + where.append("ea.variant_key = ?") + params.append(variant_key) + max_unit_id = get_experiment_enrollment_max_unit_id(experiment_key) + if max_unit_id is not None: + where.append("ea.unit_id <= ?") + params.append(max_unit_id) + if active_since: + where.append( + """ + EXISTS ( + SELECT 1 + FROM experiment_events active_event + WHERE active_event.actor_agent_id = ea.unit_id + AND active_event.created_at >= ? + AND active_event.event_type IN ( + 'agent_heartbeat', + 'agent_tasks_read', + 'signal_published', + 'experiment_notice_exposed' + ) + ) + """ + ) + params.append(active_since) + if reminder_since: + where.append( + """ + NOT EXISTS ( + SELECT 1 + FROM agent_messages recent_reminder + WHERE recent_reminder.agent_id = ea.unit_id + AND recent_reminder.type = ? + AND recent_reminder.created_at >= ? + AND recent_reminder.data LIKE ? + ) + """ + ) + params.extend([ + EXPERIMENT_READ_CONVERSION_REMINDER_TYPE, + reminder_since, + '%"purpose": "read_conversion"%', + ]) + + rows = _target_rows_from_query( + f""" + SELECT + ea.unit_id AS agent_id, + a.name AS agent_name, + ea.variant_key, + ea.experiment_key, + ( + SELECT MAX(activity.created_at) + FROM experiment_events activity + WHERE activity.actor_agent_id = ea.unit_id + ) AS latest_activity_at, + ( + SELECT COUNT(*) + FROM agent_messages unread + WHERE unread.agent_id = ea.unit_id + AND unread.read = 0 + AND unread.type IN ( + 'experiment_announcement', + 'experiment_assignment', + 'experiment_reminder', + 'experiment_rule_update', + 'experiment_result_update', + 'challenge_invite', + 'team_mission_invite' + ) + ) AS unread_experiment_count + FROM experiment_assignments ea + JOIN agents a ON a.id = ea.unit_id + WHERE {' AND '.join(where)} + ORDER BY latest_activity_at DESC NULLS LAST, ea.id ASC + LIMIT ? + """, + tuple(params), + limit=clamped_limit, + ) + return _apply_agent_filter(rows, normalized_agent_ids) + + def resolve_challenge_notification_targets( challenge_key: str, *,
service/server/experiments.py+193 −0 modified@@ -5,6 +5,7 @@ import hashlib import json import re +from datetime import datetime, timedelta, timezone from typing import Any, Optional from database import begin_write_transaction, get_db_connection @@ -24,6 +25,29 @@ "enrollment_status", } +EXPERIMENT_NOTIFICATION_TYPES = ( + "experiment_announcement", + "experiment_assignment", + "experiment_reminder", + "experiment_rule_update", + "experiment_result_update", + "challenge_invite", + "team_mission_invite", +) + +EXPERIMENT_BEHAVIOR_EVENT_TYPES = ( + "agent_heartbeat", + "agent_tasks_read", + "signal_published", + "reply_created", + "reply_accepted", + "experiment_notice_exposed", +) + +EXPERIMENT_PRIMARY_METRIC_FAMILY = "active_agent_behavior" +EXPERIMENT_READ_RECEIPTS_ROLE = "diagnostic_only" +EXPERIMENT_BEHAVIOR_WINDOW_HOURS = 24 + class ExperimentError(ValueError): pass @@ -158,6 +182,140 @@ def get_experiment_enrollment_max_unit_id(experiment_key: str) -> Optional[int]: return experiment_enrollment_max_unit_id(dict(row)) +def _behavior_window_since(hours: int = EXPERIMENT_BEHAVIOR_WINDOW_HOURS) -> str: + return (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat().replace("+00:00", "Z") + + +def _rows_by_variant(rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: + return {str(row.get("variant_key") or ""): row for row in rows} + + +def _experiment_behavior_metrics(cursor: Any, experiment_key: str, *, since: str) -> list[dict[str, Any]]: + """Return recent behavior metrics by variant without depending on message read state.""" + behavior_placeholders = ",".join("?" for _ in EXPERIMENT_BEHAVIOR_EVENT_TYPES) + cursor.execute( + f""" + SELECT + ea.variant_key, + COUNT(DISTINCT ea.unit_id) AS assigned_agent_count, + COUNT(DISTINCT CASE + WHEN ee.event_type IN ({behavior_placeholders}) THEN ea.unit_id + END) AS active_agent_count_24h, + SUM(CASE + WHEN ee.event_type IN ({behavior_placeholders}) THEN 1 ELSE 0 + END) AS active_behavior_event_count_24h, + COUNT(DISTINCT CASE WHEN ee.event_type = 'agent_heartbeat' THEN ea.unit_id END) AS heartbeat_agent_count_24h, + SUM(CASE WHEN ee.event_type = 'agent_heartbeat' THEN 1 ELSE 0 END) AS heartbeat_count_24h, + COUNT(DISTINCT CASE WHEN ee.event_type = 'agent_tasks_read' THEN ea.unit_id END) AS task_read_agent_count_24h, + SUM(CASE WHEN ee.event_type = 'agent_tasks_read' THEN 1 ELSE 0 END) AS task_read_count_24h, + COUNT(DISTINCT CASE WHEN ee.event_type = 'signal_published' THEN ea.unit_id END) AS signal_agent_count_24h, + SUM(CASE WHEN ee.event_type = 'signal_published' THEN 1 ELSE 0 END) AS signal_count_24h, + COUNT(DISTINCT CASE WHEN ee.event_type IN ('reply_created', 'reply_accepted') THEN ea.unit_id END) AS reply_agent_count_24h, + SUM(CASE WHEN ee.event_type IN ('reply_created', 'reply_accepted') THEN 1 ELSE 0 END) AS reply_count_24h, + COUNT(DISTINCT CASE WHEN ee.event_type = 'experiment_notice_exposed' THEN ea.unit_id END) AS experiment_notice_exposure_agent_count_24h, + SUM(CASE WHEN ee.event_type = 'experiment_notice_exposed' THEN 1 ELSE 0 END) AS experiment_notice_exposure_count_24h + FROM experiment_assignments ea + LEFT JOIN experiment_events ee + ON ee.actor_agent_id = ea.unit_id + AND ee.created_at >= ? + WHERE ea.experiment_key = ? + AND ea.unit_type = 'agent' + GROUP BY ea.variant_key + ORDER BY ea.variant_key + """, + (*EXPERIMENT_BEHAVIOR_EVENT_TYPES, *EXPERIMENT_BEHAVIOR_EVENT_TYPES, since, experiment_key), + ) + return [dict(row) for row in cursor.fetchall()] + + +def _experiment_read_diagnostics(cursor: Any, experiment_key: str) -> list[dict[str, Any]]: + """Return message read diagnostics by variant; these are not primary experiment metrics.""" + notification_placeholders = ",".join("?" for _ in EXPERIMENT_NOTIFICATION_TYPES) + cursor.execute( + f""" + SELECT + ea.variant_key, + COUNT(DISTINCT CASE WHEN am.read = 1 THEN ea.unit_id END) AS read_receipt_agent_count, + SUM(CASE WHEN am.read = 1 THEN 1 ELSE 0 END) AS read_receipt_message_count, + COUNT(DISTINCT CASE WHEN COALESCE(am.read, 0) = 0 AND am.id IS NOT NULL THEN ea.unit_id END) AS unread_experiment_agent_count, + SUM(CASE WHEN COALESCE(am.read, 0) = 0 AND am.id IS NOT NULL THEN 1 ELSE 0 END) AS unread_experiment_message_count + FROM experiment_assignments ea + LEFT JOIN agent_messages am + ON am.agent_id = ea.unit_id + AND am.type IN ({notification_placeholders}) + WHERE ea.experiment_key = ? + AND ea.unit_type = 'agent' + GROUP BY ea.variant_key + ORDER BY ea.variant_key + """, + (*EXPERIMENT_NOTIFICATION_TYPES, experiment_key), + ) + return [dict(row) for row in cursor.fetchall()] + + +def agent_experiment_behavior_context(agent_id: int) -> Optional[dict[str, Any]]: + """Return active experiment context for high-frequency agent APIs without enrolling new agents.""" + now = utc_now_iso_z() + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + SELECT + e.experiment_key, + e.title, + e.description, + e.status, + e.unit_type, + e.start_at, + e.end_at, + ea.variant_key, + ea.assignment_reason, + ea.created_at AS assignment_created_at + FROM experiment_assignments ea + JOIN experiments e ON e.experiment_key = ea.experiment_key + WHERE ea.unit_type = 'agent' + AND ea.unit_id = ? + AND e.status = 'active' + AND (e.start_at IS NULL OR e.start_at <= ?) + AND (e.end_at IS NULL OR e.end_at >= ?) + ORDER BY e.created_at DESC, e.id DESC + """, + (agent_id, now, now), + ) + rows = [dict(row) for row in cursor.fetchall()] + conn.close() + if not rows: + return None + + assignments = [] + for row in rows: + assignments.append({ + "experiment_key": row.get("experiment_key"), + "title": row.get("title"), + "variant_key": row.get("variant_key"), + "assignment_reason": row.get("assignment_reason"), + "assignment_created_at": row.get("assignment_created_at"), + "status": row.get("status"), + "primary_metric_family": EXPERIMENT_PRIMARY_METRIC_FAMILY, + "read_receipts_role": EXPERIMENT_READ_RECEIPTS_ROLE, + "message_read_state_required": False, + "tracked_behaviors": [ + "agent_heartbeat", + "agent_tasks_read", + "signal_published", + "reply_created", + "reply_accepted", + ], + }) + + return { + "primary_metric_family": EXPERIMENT_PRIMARY_METRIC_FAMILY, + "read_receipts_role": EXPERIMENT_READ_RECEIPTS_ROLE, + "message_read_state_required": False, + "assignments": assignments, + } + + def stable_bucket(experiment_key: str, unit_type: str, unit_id: int | str, *, salt: str = "") -> int: seed = f"{experiment_key}:{unit_type}:{unit_id}:{salt}" digest = hashlib.sha256(seed.encode("utf-8")).hexdigest() @@ -424,6 +582,20 @@ def get_experiment_assignments(experiment_key: str, limit: int = 1000, offset: i (experiment_key,), ) metrics = [dict(row) for row in cursor.fetchall()] + behavior_window_start_at = _behavior_window_since() + behavior_metrics = _rows_by_variant( + _experiment_behavior_metrics(cursor, experiment_key, since=behavior_window_start_at) + ) + read_diagnostics = _rows_by_variant(_experiment_read_diagnostics(cursor, experiment_key)) + for row in metrics: + variant_key = str(row.get("variant_key") or "") + row.update(behavior_metrics.get(variant_key, {})) + row.update(read_diagnostics.get(variant_key, {})) + row["primary_metric_family"] = EXPERIMENT_PRIMARY_METRIC_FAMILY + row["read_receipts_role"] = EXPERIMENT_READ_RECEIPTS_ROLE + row["message_read_state_required"] = False + row["behavior_window_hours"] = EXPERIMENT_BEHAVIOR_WINDOW_HOURS + row["behavior_window_start_at"] = behavior_window_start_at cursor.execute( """ SELECT ea.*, a.name AS agent_name @@ -441,6 +613,27 @@ def get_experiment_assignments(experiment_key: str, limit: int = 1000, offset: i "experiment": _serialize_experiment(experiment_row), "variant_counts": counts, "variant_metrics": metrics, + "metric_policy": { + "primary_metric_family": EXPERIMENT_PRIMARY_METRIC_FAMILY, + "read_receipts_role": EXPERIMENT_READ_RECEIPTS_ROLE, + "message_read_state_required": False, + "behavior_window_hours": EXPERIMENT_BEHAVIOR_WINDOW_HOURS, + "behavior_window_start_at": behavior_window_start_at, + "tracked_behaviors": [ + "agent_heartbeat", + "agent_tasks_read", + "signal_published", + "reply_created", + "reply_accepted", + "experiment_notice_exposed", + ], + "diagnostic_metrics": [ + "read_receipt_agent_count", + "read_receipt_message_count", + "unread_experiment_agent_count", + "unread_experiment_message_count", + ], + }, "assignments": assignments, "limit": limit, "offset": offset,
service/server/permissions.py+108 −0 added@@ -0,0 +1,108 @@ +"""Agent authorization helpers.""" + +from __future__ import annotations + +import os +from typing import Iterable + +from fastapi import HTTPException + +from services import _get_agent_by_token +from utils import _extract_token + + +EXPERIMENT_ADMIN_CAPABILITY = "experiment_admin" +RESEARCH_EXPORTS_CAPABILITY = "research_exports" +TEAM_MISSION_ADMIN_CAPABILITY = "team_mission_admin" + +ALL_CAPABILITIES = ( + EXPERIMENT_ADMIN_CAPABILITY, + RESEARCH_EXPORTS_CAPABILITY, + TEAM_MISSION_ADMIN_CAPABILITY, +) + +ROLE_CAPABILITIES = { + "admin": set(ALL_CAPABILITIES), + "experiment_admin": {EXPERIMENT_ADMIN_CAPABILITY}, + "researcher": {RESEARCH_EXPORTS_CAPABILITY}, + "research": {RESEARCH_EXPORTS_CAPABILITY}, + "team_mission_admin": {TEAM_MISSION_ADMIN_CAPABILITY}, + "team_admin": {TEAM_MISSION_ADMIN_CAPABILITY}, +} + +CAPABILITY_ENV_VARS = { + EXPERIMENT_ADMIN_CAPABILITY: "AI_TRADER_EXPERIMENT_ADMIN_AGENTS", + RESEARCH_EXPORTS_CAPABILITY: "AI_TRADER_RESEARCH_AGENTS", + TEAM_MISSION_ADMIN_CAPABILITY: "AI_TRADER_TEAM_MISSION_ADMIN_AGENTS", +} + + +def _split_values(value: str | None) -> set[str]: + if not value: + return set() + return {item.strip().lower() for item in value.split(",") if item.strip()} + + +def _agent_matches_env(agent: dict, env_var: str) -> bool: + values = _split_values(os.getenv(env_var)) + if not values: + return False + agent_id = str(agent.get("id", "")).strip().lower() + agent_name = str(agent.get("name", "")).strip().lower() + return agent_id in values or agent_name in values + + +def agent_role(agent: dict | None) -> str: + if not agent: + return "agent" + if _agent_matches_env(agent, "AI_TRADER_ADMIN_AGENTS"): + return "admin" + return str(agent.get("role") or "agent").strip().lower() or "agent" + + +def agent_capability_set(agent: dict | None) -> set[str]: + if not agent: + return set() + + capabilities: set[str] = set() + role_tokens = _split_values(agent.get("role")) + if not role_tokens: + role_tokens = {"agent"} + for role in role_tokens: + capabilities.update(ROLE_CAPABILITIES.get(role, set())) + + if _agent_matches_env(agent, "AI_TRADER_ADMIN_AGENTS"): + capabilities.update(ALL_CAPABILITIES) + for capability, env_var in CAPABILITY_ENV_VARS.items(): + if _agent_matches_env(agent, env_var): + capabilities.add(capability) + + return capabilities + + +def agent_permissions(agent: dict | None) -> dict[str, bool]: + capabilities = agent_capability_set(agent) + return {capability: capability in capabilities for capability in ALL_CAPABILITIES} + + +def require_agent(authorization: str | None) -> dict: + token = _extract_token(authorization) + agent = _get_agent_by_token(token) + if not agent: + raise HTTPException(status_code=401, detail="Invalid token") + return agent + + +def require_capability(authorization: str | None, capability: str) -> dict: + agent = require_agent(authorization) + if capability not in agent_capability_set(agent): + raise HTTPException(status_code=403, detail="Insufficient permissions") + return agent + + +def require_any_capability(authorization: str | None, capabilities: Iterable[str]) -> dict: + agent = require_agent(authorization) + agent_capabilities = agent_capability_set(agent) + if not any(capability in agent_capabilities for capability in capabilities): + raise HTTPException(status_code=403, detail="Insufficient permissions") + return agent
service/server/routes_agent.py+128 −8 modified@@ -6,7 +6,8 @@ from database import get_db_connection from experiment_events import record_event -from experiments import variant_for_agent +from experiments import agent_experiment_behavior_context, variant_for_agent +from permissions import agent_permissions, agent_role from routes_models import ( AgentTokenRecoveryConfirm, AgentTokenRecoveryRequest, @@ -24,11 +25,13 @@ PUBLIC_COUNT_CACHE_KEY_PREFIX, PUBLIC_COUNT_CACHE_TTL_SECONDS, RouteContext, + attach_experiment_unread_notice, get_short_cached_payload, invalidate_agent_message_caches, push_agent_message, set_short_cached_payload, utc_now_iso_z, + validate_market, ) from services import ( _get_agent_by_id, @@ -202,7 +205,13 @@ async def get_unread_message_summary(authorization: str = Header(None)): payload, AGENT_MESSAGE_SUMMARY_CACHE_TTL_SECONDS, ) - return payload + return attach_experiment_unread_notice( + dict(payload), + agent['id'], + surface='messages_unread_summary', + field='experiment_unread_notice', + ctx=ctx, + ) @app.get('/api/claw/messages/recent') async def get_recent_agent_messages( @@ -261,7 +270,9 @@ async def get_recent_agent_messages( pass messages.append(message) - return {'messages': messages} + payload = {'messages': messages} + surface = f'messages_recent_{category}' if category in category_types else 'messages_recent' + return attach_experiment_unread_notice(payload, agent['id'], surface=surface, ctx=ctx) @app.post('/api/claw/messages/mark-read') async def mark_agent_messages_read(data: AgentMessagesMarkReadRequest, authorization: str = Header(None)): @@ -297,6 +308,91 @@ async def mark_agent_messages_read(data: AgentMessagesMarkReadRequest, authoriza return {'success': True, 'updated': updated} + @app.post('/api/claw/messages/read-experiment') + async def read_experiment_messages(limit: int = 50, authorization: str = Header(None)): + token = _extract_token(authorization) + agent = _get_agent_by_token(token) + if not agent: + raise HTTPException(status_code=401, detail='Invalid token') + + limit = max(1, min(limit, 100)) + placeholders = ','.join('?' for _ in EXPERIMENT_NOTIFICATION_TYPES) + conn = get_db_connection() + cursor = conn.cursor() + cursor.execute( + f""" + SELECT COUNT(*) AS count + FROM agent_messages + WHERE agent_id = ? AND read = 0 AND type IN ({placeholders}) + """, + (agent['id'], *EXPERIMENT_NOTIFICATION_TYPES), + ) + unread_before = cursor.fetchone()['count'] + + cursor.execute( + f""" + SELECT * + FROM agent_messages + WHERE agent_id = ? AND read = 0 AND type IN ({placeholders}) + ORDER BY created_at DESC, id DESC + LIMIT ? + """, + (agent['id'], *EXPERIMENT_NOTIFICATION_TYPES, limit), + ) + rows = cursor.fetchall() + message_ids = [row['id'] for row in rows] + if message_ids: + id_placeholders = ','.join('?' for _ in message_ids) + cursor.execute( + f""" + UPDATE agent_messages + SET read = 1 + WHERE agent_id = ? AND id IN ({id_placeholders}) + """, + (agent['id'], *message_ids), + ) + + record_event( + 'experiment_messages_read', + actor_agent_id=agent['id'], + object_type='agent_message_batch', + object_id=','.join(str(message_id) for message_id in message_ids) if message_ids else None, + metadata={ + 'message_count': len(message_ids), + 'unread_before': unread_before, + 'remaining_unread_count': max(0, unread_before - len(message_ids)), + 'message_ids': message_ids, + 'read_method': 'read_experiment_endpoint', + }, + cursor=cursor, + ) + + conn.commit() + conn.close() + if message_ids: + invalidate_agent_message_caches(ctx, agent['id']) + + messages = [] + for row in rows: + message = dict(row) + message['read'] = 1 + if message.get('data'): + try: + message['data'] = json.loads(message['data']) + except Exception: + pass + messages.append(message) + + return { + 'success': True, + 'messages': messages, + 'message_count': len(messages), + 'updated': len(message_ids), + 'unread_before': unread_before, + 'remaining_unread_count': max(0, unread_before - len(message_ids)), + 'has_more_messages': unread_before > len(message_ids), + } + @app.post('/api/claw/tasks') async def create_agent_task(data: AgentTaskCreate, authorization: str = Header(None)): token = _extract_token(authorization) @@ -327,6 +423,10 @@ async def agent_heartbeat(authorization: str = Header(None)): raise HTTPException(status_code=401, detail='Invalid token') agent_id = agent['id'] + experiment_context = agent_experiment_behavior_context(agent_id) + experiment_assignment = (experiment_context.get('assignments') or [{}])[0] if experiment_context else {} + event_experiment_key = experiment_assignment.get('experiment_key') + event_variant_key = experiment_assignment.get('variant_key') conn = get_db_connection() cursor = conn.cursor() @@ -384,6 +484,8 @@ async def agent_heartbeat(authorization: str = Header(None)): actor_agent_id=agent_id, object_type='agent_task_batch', object_id=','.join(str(row['id']) for row in tasks), + experiment_key=event_experiment_key, + variant_key=event_variant_key, metadata={'task_count': len(tasks), 'message_count': len(messages)}, cursor=cursor, ) @@ -392,6 +494,8 @@ async def agent_heartbeat(authorization: str = Header(None)): actor_agent_id=agent_id, object_type='agent', object_id=agent_id, + experiment_key=event_experiment_key, + variant_key=event_variant_key, metadata={'unread_message_count': unread_message_count, 'pending_task_count': pending_task_count}, cursor=cursor, ) @@ -426,7 +530,7 @@ async def agent_heartbeat(authorization: str = Header(None)): pass parsed_tasks.append(task) - return { + payload = { 'agent_id': agent_id, 'server_time': utc_now_iso_z(), 'recommended_poll_interval_seconds': 30, @@ -440,6 +544,9 @@ async def agent_heartbeat(authorization: str = Header(None)): 'has_more_messages': unread_message_count > len(parsed_messages), 'has_more_tasks': pending_task_count > len(parsed_tasks), } + if experiment_context: + payload['experiment_context'] = experiment_context + return payload @app.post('/api/claw/agents/selfRegister') async def agent_self_register(data: AgentRegister): @@ -473,15 +580,21 @@ async def agent_self_register(data: AgentRegister): now = utc_now_iso_z() if data.positions: for pos in data.positions: + market = validate_market(pos.get('market', 'us-stock')) + symbol = str(pos.get('symbol') or '').strip() + if not symbol: + raise HTTPException(status_code=400, detail='Position symbol is required') + if market != 'polymarket': + symbol = symbol.upper() cursor.execute( """ INSERT INTO positions (agent_id, symbol, market, side, quantity, entry_price, opened_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( agent_id, - pos.get('symbol'), - pos.get('market', 'us-stock'), + symbol, + market, pos.get('side', 'long'), pos.get('quantity', 0), pos.get('entry_price', 0), @@ -676,16 +789,19 @@ async def get_agent_info(authorization: str = Header(None)): except Exception: experiment_assignments = [] - return { + payload = { 'id': agent['id'], 'name': agent['name'], 'token': token, + 'role': agent_role(agent), + 'permissions': agent_permissions(agent), 'wallet_address': agent.get('wallet_address'), 'points': agent.get('points', 0), 'cash': agent.get('cash', 100000.0), 'reputation_score': agent.get('reputation_score', 0), 'experiment_assignments': experiment_assignments, } + return attach_experiment_unread_notice(payload, agent['id'], surface='agents_me', ctx=ctx) @app.get('/api/claw/agents/me/points') async def get_agent_points(authorization: str = Header(None)): @@ -698,7 +814,7 @@ async def get_agent_points(authorization: str = Header(None)): return {'points': points} @app.get('/api/claw/agents/count') - async def get_agent_count(): + async def get_agent_count(authorization: str = Header(None)): redis_cache_key = f'{PUBLIC_COUNT_CACHE_KEY_PREFIX}:agents' payload = get_short_cached_payload( ctx, @@ -720,4 +836,8 @@ async def get_agent_count(): payload, PUBLIC_COUNT_CACHE_TTL_SECONDS, ) + token = _extract_token(authorization) + agent = _get_agent_by_token(token) + if agent: + return attach_experiment_unread_notice(dict(payload), agent['id'], surface='agents_count', ctx=ctx) return payload
service/server/routes_experiments.py+39 −19 modified@@ -4,6 +4,13 @@ from fastapi import FastAPI, Header, HTTPException +from permissions import ( + EXPERIMENT_ADMIN_CAPABILITY, + RESEARCH_EXPORTS_CAPABILITY, + require_agent, + require_any_capability, + require_capability, +) from experiment_notifications import ( ExperimentNotificationError, build_experiment_target_rule, @@ -12,6 +19,7 @@ resolve_experiment_notification_targets, resolve_recent_active_experiment_targets, resolve_team_mission_notification_targets, + resolve_unread_active_experiment_targets, send_agent_notifications, validate_notification_request, validate_task_request, @@ -33,16 +41,6 @@ ExperimentTaskRequest, ) from routes_shared import RouteContext -from services import _get_agent_by_token -from utils import _extract_token - - -def _require_agent(authorization: str | None) -> dict: - token = _extract_token(authorization) - agent = _get_agent_by_token(token) - if not agent: - raise HTTPException(status_code=401, detail="Invalid token") - return agent def _to_http_error(exc: Exception) -> HTTPException: @@ -82,6 +80,13 @@ def _resolve_targets_for_request(experiment_key: str, data: ExperimentNotificati agent_ids=data.agent_ids, limit=data.limit, ) + elif target in {"unread_active", "unread-active", "active_unread", "active-unread"}: + targets = resolve_unread_active_experiment_targets( + experiment_key, + variant_key=data.variant_key, + agent_ids=data.agent_ids, + limit=data.limit, + ) elif target in {"recent_active", "recent-active"}: targets = resolve_recent_active_experiment_targets( experiment_key, @@ -101,15 +106,24 @@ def _resolve_targets_for_request(experiment_key: str, data: ExperimentNotificati def register_experiment_routes(app: FastAPI, ctx: RouteContext) -> None: @app.get("/api/experiments") - async def api_list_experiments(status: str | None = None, limit: int = 100, offset: int = 0): + async def api_list_experiments( + status: str | None = None, + limit: int = 100, + offset: int = 0, + authorization: str = Header(None), + ): + require_any_capability( + authorization, + (EXPERIMENT_ADMIN_CAPABILITY, RESEARCH_EXPORTS_CAPABILITY), + ) try: return list_experiments(status=status, limit=limit, offset=offset) except Exception as exc: raise _to_http_error(exc) @app.post("/api/experiments") async def api_create_experiment(data: ExperimentCreateRequest, authorization: str = Header(None)): - _require_agent(authorization) + require_capability(authorization, EXPERIMENT_ADMIN_CAPABILITY) try: return create_experiment(data) except Exception as exc: @@ -121,14 +135,20 @@ async def api_update_experiment_status( data: ExperimentStatusRequest, authorization: str = Header(None), ): - _require_agent(authorization) + require_capability(authorization, EXPERIMENT_ADMIN_CAPABILITY) try: return update_experiment_status(experiment_key, data.status) except Exception as exc: raise _to_http_error(exc) @app.get("/api/experiments/{experiment_key}/assignments") - async def api_experiment_assignments(experiment_key: str, limit: int = 1000, offset: int = 0): + async def api_experiment_assignments( + experiment_key: str, + limit: int = 1000, + offset: int = 0, + authorization: str = Header(None), + ): + require_capability(authorization, EXPERIMENT_ADMIN_CAPABILITY) try: return get_experiment_assignments(experiment_key, limit=limit, offset=offset) except Exception as exc: @@ -140,7 +160,7 @@ async def api_notify_experiment( data: ExperimentNotificationRequest, authorization: str = Header(None), ): - actor = _require_agent(authorization) + actor = require_capability(authorization, EXPERIMENT_ADMIN_CAPABILITY) try: targets, target_rule = _resolve_targets_for_request(experiment_key, data) validate_notification_request(data.message_type, data.title, data.content) @@ -200,7 +220,7 @@ async def api_create_experiment_tasks( data: ExperimentTaskRequest, authorization: str = Header(None), ): - actor = _require_agent(authorization) + actor = require_capability(authorization, EXPERIMENT_ADMIN_CAPABILITY) try: targets, target_rule = _resolve_targets_for_request(experiment_key, data) return create_agent_tasks( @@ -221,23 +241,23 @@ async def api_create_experiment_tasks( @app.get("/api/agents/me/experiments") async def api_my_experiments(authorization: str = Header(None)): - agent = _require_agent(authorization) + agent = require_agent(authorization) try: return {"assignments": variant_for_agent(agent["id"])} except Exception as exc: raise _to_http_error(exc) @app.post("/api/experiments/{experiment_key}/assign") async def api_assign_me_to_experiment(experiment_key: str, authorization: str = Header(None)): - agent = _require_agent(authorization) + agent = require_agent(authorization) try: return assign_unit_to_experiment(experiment_key, "agent", agent["id"], assignment_reason="api_request") except Exception as exc: raise _to_http_error(exc) @app.get("/api/agents/me/rewards") async def api_my_rewards(limit: int = 100, offset: int = 0, authorization: str = Header(None)): - agent = _require_agent(authorization) + agent = require_agent(authorization) return { "rewards": get_agent_reward_history(agent["id"], limit=limit, offset=offset), "limit": max(1, min(limit, 500)),
service/server/routes_market.py+37 −16 modified@@ -1,6 +1,6 @@ from typing import Optional -from fastapi import FastAPI +from fastapi import FastAPI, Header from market_intel import ( get_etf_flows_payload, @@ -15,13 +15,23 @@ MARKET_INTEL_CACHE_KEY_PREFIX, MARKET_INTEL_CACHE_TTL_SECONDS, RouteContext, + attach_experiment_unread_notice, get_short_cached_payload, set_short_cached_payload, utc_now_iso_z, ) +from services import _get_agent_by_token +from utils import _extract_token def register_market_routes(app: FastAPI, ctx: RouteContext) -> None: + def _attach_agent_notice(payload: dict, authorization: str | None, *, surface: str) -> dict: + token = _extract_token(authorization) + agent = _get_agent_by_token(token) + if not agent: + return payload + return attach_experiment_unread_notice(dict(payload), agent['id'], surface=surface, ctx=ctx) + def _cached_market_payload(cache_key: str, builder): redis_key = f'{MARKET_INTEL_CACHE_KEY_PREFIX}:{cache_key}' cached = get_short_cached_payload(ctx, ctx.market_intel_cache, redis_key, MARKET_INTEL_CACHE_TTL_SECONDS) @@ -41,47 +51,58 @@ async def health_check(): return {'status': 'ok', 'timestamp': utc_now_iso_z()} @app.get('/api/market-intel/overview') - async def market_intel_overview(): - return _cached_market_payload('overview', get_market_intel_overview) + async def market_intel_overview(authorization: str = Header(None)): + payload = _cached_market_payload('overview', get_market_intel_overview) + return _attach_agent_notice(payload, authorization, surface='market_intel_overview') @app.get('/api/market-intel/news') - async def market_intel_news(category: Optional[str] = None, limit: int = 5): + async def market_intel_news( + category: Optional[str] = None, + limit: int = 5, + authorization: str = Header(None), + ): safe_limit = max(1, min(limit, 12)) category_key = (category or 'all').strip() or 'all' - return _cached_market_payload( + payload = _cached_market_payload( f'news:category={category_key}:limit={safe_limit}', lambda: get_market_news_payload(category=category, limit=safe_limit), ) + return _attach_agent_notice(payload, authorization, surface='market_intel_news') @app.get('/api/market-intel/macro-signals') - async def market_intel_macro_signals(): - return _cached_market_payload('macro_signals', get_macro_signals_payload) + async def market_intel_macro_signals(authorization: str = Header(None)): + payload = _cached_market_payload('macro_signals', get_macro_signals_payload) + return _attach_agent_notice(payload, authorization, surface='market_intel_macro_signals') @app.get('/api/market-intel/etf-flows') - async def market_intel_etf_flows(): - return _cached_market_payload('etf_flows', get_etf_flows_payload) + async def market_intel_etf_flows(authorization: str = Header(None)): + payload = _cached_market_payload('etf_flows', get_etf_flows_payload) + return _attach_agent_notice(payload, authorization, surface='market_intel_etf_flows') @app.get('/api/market-intel/stocks/featured') - async def market_intel_featured_stocks(limit: int = 6): + async def market_intel_featured_stocks(limit: int = 6, authorization: str = Header(None)): safe_limit = max(1, min(limit, 12)) - return _cached_market_payload( + payload = _cached_market_payload( f'stocks_featured:limit={safe_limit}', lambda: get_featured_stock_analysis_payload(limit=safe_limit), ) + return _attach_agent_notice(payload, authorization, surface='market_intel_stocks_featured') @app.get('/api/market-intel/stocks/{symbol}/latest') - async def market_intel_stock_latest(symbol: str): + async def market_intel_stock_latest(symbol: str, authorization: str = Header(None)): normalized_symbol = (symbol or '').strip().upper() - return _cached_market_payload( + payload = _cached_market_payload( f'stock_latest:symbol={normalized_symbol}', lambda: get_stock_analysis_latest_payload(normalized_symbol), ) + return _attach_agent_notice(payload, authorization, surface='market_intel_stock_latest') @app.get('/api/market-intel/stocks/{symbol}/history') - async def market_intel_stock_history(symbol: str, limit: int = 10): + async def market_intel_stock_history(symbol: str, limit: int = 10, authorization: str = Header(None)): + safe_limit = max(1, min(limit, 50)) normalized_symbol = (symbol or '').strip().upper() - safe_limit = max(1, min(limit, 100)) - return _cached_market_payload( + payload = _cached_market_payload( f'stock_history:symbol={normalized_symbol}:limit={safe_limit}', lambda: get_stock_analysis_history_payload(normalized_symbol, limit=safe_limit), ) + return _attach_agent_notice(payload, authorization, surface='market_intel_stock_history')
service/server/routes_research.py+22 −7 modified@@ -5,8 +5,9 @@ import csv import io -from fastapi import FastAPI, Response +from fastapi import FastAPI, Header, Response +from permissions import RESEARCH_EXPORTS_CAPABILITY, require_capability from research_exports import ( RESEARCH_EXPORTS, fetch_research_export_rows, @@ -72,7 +73,8 @@ def _fetch( return filename, columns, rows @app.get("/api/research/datasets") - async def api_research_datasets(): + async def api_research_datasets(authorization: str = Header(None)): + require_capability(authorization, RESEARCH_EXPORTS_CAPABILITY) return {"datasets": get_research_dataset_names()} @app.get("/api/research/events") @@ -87,7 +89,9 @@ async def api_research_events( include_content: bool = True, limit: int = 1000, offset: int = 0, + authorization: str = Header(None), ): + require_capability(authorization, RESEARCH_EXPORTS_CAPABILITY) _filename, columns, rows = _fetch( "events", start_at=start_at, @@ -116,7 +120,9 @@ async def api_research_export_csv( include_content: bool = True, limit: int = 100000, offset: int = 0, + authorization: str = Header(None), ): + require_capability(authorization, RESEARCH_EXPORTS_CAPABILITY) try: filename, columns, rows = _fetch( dataset_name, @@ -148,7 +154,9 @@ async def api_research_export_json( include_content: bool = True, limit: int = 100000, offset: int = 0, + authorization: str = Header(None), ): + require_capability(authorization, RESEARCH_EXPORTS_CAPABILITY) try: filename, columns, rows = _fetch( dataset_name, @@ -168,7 +176,8 @@ async def api_research_export_json( return {"dataset": filename, "columns": columns, "rows": rows, "limit": max(1, min(limit, 100000)), "offset": max(0, offset)} @app.get("/api/research/schema/{dataset_name}") - async def api_research_schema(dataset_name: str): + async def api_research_schema(dataset_name: str, authorization: str = Header(None)): + require_capability(authorization, RESEARCH_EXPORTS_CAPABILITY) try: return research_schema_for_dataset(dataset_name) except ValueError as exc: @@ -183,7 +192,9 @@ async def _download( market: str | None, limit: int, offset: int, + authorization: str | None, ) -> Response: + require_capability(authorization, RESEARCH_EXPORTS_CAPABILITY) try: normalized, columns, rows = _fetch( filename, @@ -208,8 +219,9 @@ async def api_research_agents_csv( market: str | None = None, limit: int = 100000, offset: int = 0, + authorization: str = Header(None), ): - return await _download("agents.csv", start_at, end_at, experiment_key, variant_key, market, limit, offset) + return await _download("agents.csv", start_at, end_at, experiment_key, variant_key, market, limit, offset, authorization) @app.get("/api/research/events.csv") async def api_research_events_csv( @@ -220,8 +232,9 @@ async def api_research_events_csv( market: str | None = None, limit: int = 100000, offset: int = 0, + authorization: str = Header(None), ): - return await _download("events.csv", start_at, end_at, experiment_key, variant_key, market, limit, offset) + return await _download("events.csv", start_at, end_at, experiment_key, variant_key, market, limit, offset, authorization) @app.get("/api/research/signals.csv") async def api_research_signals_csv( @@ -232,8 +245,9 @@ async def api_research_signals_csv( market: str | None = None, limit: int = 100000, offset: int = 0, + authorization: str = Header(None), ): - return await _download("signals.csv", start_at, end_at, experiment_key, variant_key, market, limit, offset) + return await _download("signals.csv", start_at, end_at, experiment_key, variant_key, market, limit, offset, authorization) @app.get("/api/research/network_edges.csv") async def api_research_network_edges_csv( @@ -244,5 +258,6 @@ async def api_research_network_edges_csv( market: str | None = None, limit: int = 100000, offset: int = 0, + authorization: str = Header(None), ): - return await _download("network_edges.csv", start_at, end_at, experiment_key, variant_key, market, limit, offset) + return await _download("network_edges.csv", start_at, end_at, experiment_key, variant_key, market, limit, offset, authorization)
service/server/routes_shared.py+242 −22 modified@@ -3,6 +3,7 @@ import os import re import time +import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Dict, Optional @@ -28,6 +29,8 @@ CONTENT_DUPLICATE_WINDOW_SECONDS = 1800 ACCEPT_REPLY_REWARD = 3 EXPERIMENT_UNREAD_PREVIEW_LIMIT = 3 +EXPERIMENT_NOTICE_EXPOSURE_EVENT_TTL_SECONDS = 1800 +EXPERIMENT_READ_ENDPOINT = '/api/claw/messages/read-experiment' EXPERIMENT_NOTIFICATION_TYPES = ( 'experiment_announcement', 'experiment_assignment', @@ -42,18 +45,21 @@ LEADERBOARD_CACHE_KEY_PREFIX = 'leaderboard:profit_history' GROUPED_SIGNALS_CACHE_KEY_PREFIX = 'signals:grouped' AGENT_SIGNALS_CACHE_KEY_PREFIX = 'signals:agent' -PRICE_CACHE_KEY_PREFIX = 'price:quote' SIGNAL_FEED_CACHE_KEY_PREFIX = 'signals:feed' -POSITIONS_CACHE_TTL_SECONDS = 10 -SIGNAL_FEED_CACHE_TTL_SECONDS = 10 -AGENT_MESSAGE_SUMMARY_CACHE_KEY_PREFIX = 'agent_messages:unread_summary' -AGENT_MESSAGE_SUMMARY_CACHE_TTL_SECONDS = 5 -PUBLIC_COUNT_CACHE_KEY_PREFIX = 'public_counts' -PUBLIC_COUNT_CACHE_TTL_SECONDS = 30 +PRICE_CACHE_KEY_PREFIX = 'price:quote' MARKET_INTEL_CACHE_KEY_PREFIX = 'market_intel' +PUBLIC_COUNT_CACHE_KEY_PREFIX = 'public_counts' +AGENT_MESSAGE_SUMMARY_CACHE_KEY_PREFIX = 'agent_messages:unread_summary' +EXPERIMENT_NOTICE_CACHE_KEY_PREFIX = 'experiment_notice' MARKET_INTEL_CACHE_TTL_SECONDS = 30 +PUBLIC_COUNT_CACHE_TTL_SECONDS = 30 +AGENT_MESSAGE_SUMMARY_CACHE_TTL_SECONDS = 5 +EXPERIMENT_NOTICE_CACHE_TTL_SECONDS = 5 +SIGNAL_FEED_CACHE_TTL_SECONDS = 10 +POSITIONS_CACHE_TTL_SECONDS = 10 MENTION_PATTERN = re.compile(r'@([A-Za-z0-9_\-]{2,64})') +_EXPERIMENT_NOTICE_EXPOSURE_EVENT_CACHE: dict[tuple[int, str, str], float] = {} SUPPORTED_MARKETS = {'us-stock', 'crypto', 'polymarket'} MARKET_ALIASES = { 'binance': 'crypto', @@ -66,13 +72,16 @@ 'hl': 'crypto', 'kraken': 'crypto', 'okx': 'crypto', + 'solusdt': 'crypto', 'stock': 'us-stock', 'stocks': 'us-stock', 'us': 'us-stock', 'us stock': 'us-stock', 'us stocks': 'us-stock', 'us_stock': 'us-stock', 'us_stocks': 'us-stock', + 'us-premarket': 'us-stock', + 'us-aftermarket': 'us-stock', 'usstock': 'us-stock', 'nasdaq': 'us-stock', 'sp500': 'us-stock', @@ -122,6 +131,7 @@ class RouteContext: market_intel_cache: dict[str, tuple[float, dict[str, Any]]] = field(default_factory=dict) public_count_cache: dict[str, tuple[float, dict[str, Any]]] = field(default_factory=dict) agent_message_summary_cache: dict[str, tuple[float, dict[str, Any]]] = field(default_factory=dict) + experiment_notice_cache: dict[int, tuple[float, Optional[dict[str, Any]]]] = field(default_factory=dict) content_rate_limit_state: dict[tuple[int, str], dict[str, Any]] = field(default_factory=dict) ws_connections: dict[int, WebSocket] = field(default_factory=dict) verification_codes: dict[str, dict[str, Any]] = field(default_factory=dict) @@ -232,14 +242,35 @@ def set_short_cached_payload( def invalidate_agent_message_caches(ctx: RouteContext, agent_id: int) -> None: from cache import delete - key = f'{AGENT_MESSAGE_SUMMARY_CACHE_KEY_PREFIX}:agent_id={agent_id}' - ctx.agent_message_summary_cache.pop(key, None) - delete(key) + ctx.agent_message_summary_cache.pop(f'{AGENT_MESSAGE_SUMMARY_CACHE_KEY_PREFIX}:agent_id={agent_id}', None) + ctx.experiment_notice_cache.pop(agent_id, None) + delete(f'{AGENT_MESSAGE_SUMMARY_CACHE_KEY_PREFIX}:agent_id={agent_id}') + delete(f'{EXPERIMENT_NOTICE_CACHE_KEY_PREFIX}:agent_id={agent_id}:limit={EXPERIMENT_UNREAD_PREVIEW_LIMIT}') -def experiment_unread_notice(agent_id: int, *, limit: int = EXPERIMENT_UNREAD_PREVIEW_LIMIT) -> Optional[dict[str, Any]]: +def experiment_unread_notice( + agent_id: int, + *, + limit: int = EXPERIMENT_UNREAD_PREVIEW_LIMIT, + ctx: RouteContext | None = None, +) -> Optional[dict[str, Any]]: """Return a small non-destructive unread experiment notice for API responses.""" limit = max(1, min(int(limit or EXPERIMENT_UNREAD_PREVIEW_LIMIT), 10)) + now_ts = time.time() + redis_cache_key = f'{EXPERIMENT_NOTICE_CACHE_KEY_PREFIX}:agent_id={agent_id}:limit={limit}' + if ctx is not None: + cached = ctx.experiment_notice_cache.get(agent_id) + if cached and now_ts - cached[0] < EXPERIMENT_NOTICE_CACHE_TTL_SECONDS: + return cached[1] + from cache import get_json + + cached_payload = get_json(redis_cache_key) + if isinstance(cached_payload, dict) or cached_payload is None: + if cached_payload is not None: + notice = cached_payload.get('notice') if isinstance(cached_payload, dict) else None + ctx.experiment_notice_cache[agent_id] = (now_ts, notice) + return notice + conn = get_db_connection() cursor = conn.cursor() try: @@ -254,6 +285,11 @@ def experiment_unread_notice(agent_id: int, *, limit: int = EXPERIMENT_UNREAD_PR ) total = cursor.fetchone()['count'] if not total: + if ctx is not None: + from cache import set_json + + ctx.experiment_notice_cache[agent_id] = (now_ts, None) + set_json(redis_cache_key, {'notice': None}, ttl_seconds=EXPERIMENT_NOTICE_CACHE_TTL_SECONDS) return None cursor.execute( @@ -275,23 +311,205 @@ def experiment_unread_notice(agent_id: int, *, limit: int = EXPERIMENT_UNREAD_PR except Exception: pass messages.append(message) - return { + message_ids = [message['id'] for message in messages if message.get('id') is not None] + notice = { 'unread_count': total, + 'requires_read': False, + 'read_receipts_role': 'diagnostic_only', + 'message_read_state_required': False, + 'reason': 'unread_experiment_messages', 'messages': messages, + 'message_ids': message_ids, + 'recommended_action': { + 'method': 'POST', + 'endpoint': EXPERIMENT_READ_ENDPOINT, + 'headers': {'Authorization': 'Bearer <agent_token>'}, + 'body': None, + 'marks_read': True, + 'description': 'Call this endpoint to receive unread experiment messages and mark them read in one step.', + }, + 'actions': [ + { + 'name': 'read_experiment_messages', + 'method': 'POST', + 'endpoint': EXPERIMENT_READ_ENDPOINT, + 'headers': {'Authorization': 'Bearer <agent_token>'}, + 'body': None, + 'marks_read': True, + }, + { + 'name': 'read_and_mark_via_heartbeat', + 'method': 'POST', + 'endpoint': '/api/claw/agents/heartbeat', + 'headers': {'Authorization': 'Bearer <agent_token>'}, + 'body': None, + 'marks_read': True, + }, + { + 'name': 'fetch_recent_experiment_messages', + 'method': 'GET', + 'endpoint': '/api/claw/messages/recent?category=experiment&limit=10', + 'headers': {'Authorization': 'Bearer <agent_token>'}, + 'body': None, + 'marks_read': False, + }, + { + 'name': 'mark_experiment_messages_read', + 'method': 'POST', + 'endpoint': '/api/claw/messages/mark-read', + 'headers': {'Authorization': 'Bearer <agent_token>'}, + 'body': {'categories': ['experiment']}, + 'marks_read': True, + }, + ], + 'read_endpoints': [ + f'POST {EXPERIMENT_READ_ENDPOINT}', + 'POST /api/claw/agents/heartbeat', + 'GET /api/claw/messages/recent?category=experiment&limit=10', + ], + 'mark_read_endpoint': { + 'method': 'POST', + 'endpoint': '/api/claw/messages/mark-read', + 'body': {'categories': ['experiment']}, + }, 'read_via': { + 'read_experiment': f'POST {EXPERIMENT_READ_ENDPOINT}', 'heartbeat': 'POST /api/claw/agents/heartbeat', 'recent': 'GET /api/claw/messages/recent?category=experiment&limit=10', + 'mark_read': 'POST /api/claw/messages/mark-read {"categories":["experiment"]}', }, - 'note': 'Unread experiment messages are attached here because this agent has not read them via heartbeat yet.', + 'note': f'Unread experiment messages are attached as diagnostics only. Experiment analysis now uses active behavior metrics; POST {EXPERIMENT_READ_ENDPOINT} remains available for explicit read receipts.', } + if ctx is not None: + from cache import set_json + + ctx.experiment_notice_cache[agent_id] = (now_ts, notice) + set_json(redis_cache_key, {'notice': notice}, ttl_seconds=EXPERIMENT_NOTICE_CACHE_TTL_SECONDS) + return notice + finally: + conn.close() + + +def _record_experiment_notice_exposed(agent_id: int, notice: dict[str, Any], *, surface: str) -> None: + messages = notice.get('messages') or [] + message_ids = [message.get('id') for message in messages if message.get('id') is not None] + message_types = sorted({message.get('type') for message in messages if message.get('type')}) + + campaign_ids: set[str] = set() + experiment_keys: set[str] = set() + variant_keys: set[str] = set() + for message in messages: + data = message.get('data') + if not isinstance(data, dict): + continue + if data.get('campaign_id'): + campaign_ids.add(str(data['campaign_id'])) + if data.get('experiment_key'): + experiment_keys.add(str(data['experiment_key'])) + if data.get('target_variant_key'): + variant_keys.add(str(data['target_variant_key'])) + + metadata = { + 'surface': surface, + 'unread_count': notice.get('unread_count'), + 'preview_count': len(messages), + 'message_ids': message_ids, + 'message_types': message_types, + 'campaign_ids': sorted(campaign_ids), + 'read_via': notice.get('read_via'), + } + experiment_key = next(iter(experiment_keys)) if len(experiment_keys) == 1 else None + variant_key = next(iter(variant_keys)) if len(variant_keys) == 1 else None + exposure_key = ( + agent_id, + surface, + ','.join(str(message_id) for message_id in message_ids) or ','.join(message_types), + ) + now_ts = time.time() + if now_ts - _EXPERIMENT_NOTICE_EXPOSURE_EVENT_CACHE.get(exposure_key, 0) < EXPERIMENT_NOTICE_EXPOSURE_EVENT_TTL_SECONDS: + return + _EXPERIMENT_NOTICE_EXPOSURE_EVENT_CACHE[exposure_key] = now_ts + if len(_EXPERIMENT_NOTICE_EXPOSURE_EVENT_CACHE) > 10000: + expired_before = now_ts - EXPERIMENT_NOTICE_EXPOSURE_EVENT_TTL_SECONDS + for key, last_seen_at in list(_EXPERIMENT_NOTICE_EXPOSURE_EVENT_CACHE.items()): + if last_seen_at < expired_before: + _EXPERIMENT_NOTICE_EXPOSURE_EVENT_CACHE.pop(key, None) + + conn = get_db_connection() + try: + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO experiment_events + (event_id, event_type, actor_agent_id, target_agent_id, object_type, object_id, + market, experiment_key, variant_key, metadata_json, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + str(uuid.uuid4()), + 'experiment_notice_exposed', + agent_id, + None, + 'agent', + str(agent_id), + None, + experiment_key, + variant_key, + json.dumps(metadata, ensure_ascii=False, sort_keys=True, default=str), + utc_now_iso_z(), + ), + ) + conn.commit() finally: conn.close() -def attach_experiment_unread_notice(payload: dict[str, Any], agent_id: int) -> dict[str, Any]: - notice = experiment_unread_notice(agent_id) +def attach_experiment_unread_notice( + payload: dict[str, Any], + agent_id: int, + *, + surface: str = 'signals_write_response', + field: str = 'experiment_unread', + ctx: RouteContext | None = None, +) -> dict[str, Any]: + notice = experiment_unread_notice(agent_id, ctx=ctx) if notice: - payload['experiment_unread'] = notice + must_call = { + 'method': 'POST', + 'endpoint': EXPERIMENT_READ_ENDPOINT, + 'headers': {'Authorization': 'Bearer <agent_token>'}, + 'body': None, + 'marks_read': True, + } + payload[field] = notice + payload['agent_notice'] = { + **(payload.get('agent_notice') if isinstance(payload.get('agent_notice'), dict) else {}), + 'experiment_unread': True, + 'must_call': EXPERIMENT_READ_ENDPOINT, + 'must_call_method': 'POST', + 'must_call_now': False, + 'must_call_action': must_call, + 'required_action': { + 'type': 'read_experiment_messages', + **must_call, + }, + 'read_receipts_role': 'diagnostic_only', + 'message_read_state_required': False, + 'primary_metric_family': 'active_agent_behavior', + 'reason': notice.get('reason'), + 'unread_count': notice.get('unread_count'), + 'message_ids': notice.get('message_ids', []), + 'messages': notice.get('messages', []), + 'recommended_action': notice.get('recommended_action'), + 'actions': notice.get('actions', []), + 'read_endpoints': notice.get('read_endpoints', []), + 'mark_read_endpoint': notice.get('mark_read_endpoint'), + 'read_via': notice.get('read_via', {}), + } + try: + _record_experiment_notice_exposed(agent_id, notice, surface=surface) + except Exception: + pass return payload @@ -406,18 +624,20 @@ def is_us_market_open() -> bool: def is_market_open(market: str) -> bool: - if market in ('crypto', 'polymarket'): + normalized_market = normalize_market(market) + if normalized_market in ('crypto', 'polymarket'): return True - if market == 'us-stock': + if normalized_market == 'us-stock': return is_us_market_open() return True def validate_executed_at(executed_at: str, market: str) -> tuple[bool, str]: try: + normalized_market = normalize_market(market) if executed_at.lower() == 'now': - if not is_market_open(market): - if market == 'us-stock': + if not is_market_open(normalized_market): + if normalized_market == 'us-stock': et_tz = ZoneInfo('America/New_York') now_et = datetime.now(et_tz) return ( @@ -426,7 +646,7 @@ def validate_executed_at(executed_at: str, market: str) -> tuple[bool, str]: f"Current time (ET): {now_et.strftime('%Y-%m-%d %H:%M:%S')}. " 'Trading hours: Mon-Fri 9:30-16:00 ET', ) - return False, f'{market} is currently closed' + return False, f'{normalized_market} is currently closed' return True, '' executed_at_clean = executed_at.strip() @@ -447,7 +667,7 @@ def validate_executed_at(executed_at: str, market: str) -> tuple[bool, str]: day = dt_et.weekday() time_in_minutes = dt_et.hour * 60 + dt_et.minute - if market == 'us-stock': + if normalized_market == 'us-stock': is_weekday = day < 5 is_market_hours = 570 <= time_in_minutes < 960 if not (is_weekday and is_market_hours):
service/server/routes_signals.py+63 −13 modified@@ -95,6 +95,11 @@ def _context_keys(context: dict[str, Any] | None) -> tuple[str | None, str | Non return assignment.get('experiment_key'), assignment.get('variant_key') +def _primary_experiment_context(agent_id: int) -> dict[str, Any] | None: + contexts = _agent_experiment_context(agent_id) + return contexts[0] if contexts else None + + def register_signal_routes(app: FastAPI, ctx: RouteContext) -> None: @app.post('/api/signals/realtime') async def push_realtime_signal(data: RealtimeSignalRequest, authorization: str = Header(None)): @@ -514,12 +519,16 @@ async def push_realtime_signal(data: RealtimeSignalRequest, authorization: str = }, cursor=cursor, ) + follower_context = _primary_experiment_context(follower_id) + follower_experiment_key, follower_variant_key = _context_keys(follower_context) record_signal_event( 'signal_published', agent_id=follower_id, signal_id=follower_signal_id, message_type='operation', market=market, + experiment_key=follower_experiment_key, + variant_key=follower_variant_key, metadata={'symbol': symbol, 'side': side, 'copied_from_agent_id': agent_id}, cursor=cursor, ) @@ -562,7 +571,7 @@ async def push_realtime_signal(data: RealtimeSignalRequest, authorization: str = } if market == 'polymarket': decorate_polymarket_item(payload, fetch_remote=fetch_price_in_request) - return attach_experiment_unread_notice(payload, agent_id) + return attach_experiment_unread_notice(payload, agent_id, ctx=ctx) @app.post('/api/signals/strategy') async def upload_strategy(data: StrategyRequest, authorization: str = Header(None)): @@ -692,6 +701,7 @@ async def upload_strategy(data: StrategyRequest, authorization: str = Header(Non return attach_experiment_unread_notice( {'success': True, 'signal_id': signal_id, 'points_earned': reward_points}, agent_id, + ctx=ctx, ) @app.post('/api/signals/discussion') @@ -832,6 +842,7 @@ async def post_discussion(data: DiscussionRequest, authorization: str = Header(N return attach_experiment_unread_notice( {'success': True, 'signal_id': signal_id, 'points_earned': reward_points}, agent_id, + ctx=ctx, ) @app.get('/api/signals/grouped') @@ -840,7 +851,18 @@ async def get_signals_grouped( market: str = None, limit: int = 20, offset: int = 0, + authorization: str = Header(None), ): + viewer = None + token = _extract_token(authorization) + if token: + viewer = _get_agent_by_token(token) + + def _attach_viewer_notice(payload: dict[str, Any]) -> dict[str, Any]: + if not viewer: + return payload + return attach_experiment_unread_notice(dict(payload), viewer['id'], surface='signals_grouped', ctx=ctx) + cache_key = ((message_type or '').strip(), (market or '').strip(), max(1, limit), max(0, offset)) now_ts = time.time() redis_cache_key = ( @@ -854,11 +876,11 @@ async def get_signals_grouped( cached_payload = get_json(redis_cache_key) if isinstance(cached_payload, dict): ctx.grouped_signals_cache[cache_key] = (now_ts, cached_payload) - return cached_payload + return _attach_viewer_notice(cached_payload) cached = ctx.grouped_signals_cache.get(cache_key) if cached and now_ts - cached[0] < GROUPED_SIGNALS_CACHE_TTL_SECONDS: - return cached[1] + return _attach_viewer_notice(cached[1]) conn = get_db_connection() cursor = conn.cursor() @@ -973,7 +995,7 @@ async def get_signals_grouped( payload = {'agents': agents, 'total': total} ctx.grouped_signals_cache[cache_key] = (now_ts, payload) set_json(redis_cache_key, payload, ttl_seconds=GROUPED_SIGNALS_CACHE_TTL_SECONDS) - return payload + return _attach_viewer_notice(payload) @app.get('/api/signals/{signal_id}/replies') async def get_signal_replies(signal_id: int): @@ -1027,14 +1049,20 @@ async def get_signal_feed( f"keyword={feed_cache_key[2] or 'none'}:" f'limit={limit}:offset={offset}:sort={feed_cache_key[5]}:viewer={feed_cache_key[6]}' ) + + def _attach_viewer_notice(payload: dict[str, Any]) -> dict[str, Any]: + if not viewer: + return payload + return attach_experiment_unread_notice(dict(payload), viewer['id'], surface='signals_feed', ctx=ctx) + cached_payload = get_json(redis_cache_key) if isinstance(cached_payload, dict): ctx.signal_feed_cache[feed_cache_key] = (now_ts, cached_payload) - return cached_payload + return _attach_viewer_notice(cached_payload) cached = ctx.signal_feed_cache.get(feed_cache_key) if cached and now_ts - cached[0] < SIGNAL_FEED_CACHE_TTL_SECONDS: - return cached[1] + return _attach_viewer_notice(cached[1]) conn = get_db_connection() cursor = conn.cursor() @@ -1219,7 +1247,7 @@ async def get_signal_feed( } ctx.signal_feed_cache[feed_cache_key] = (now_ts, payload) set_json(redis_cache_key, payload, ttl_seconds=SIGNAL_FEED_CACHE_TTL_SECONDS) - return payload + return _attach_viewer_notice(payload) @app.get('/api/signals/following') async def get_following( @@ -1293,13 +1321,14 @@ async def get_following( 'latest_discussion_title': row['latest_discussion_title'], }) - return { + payload = { 'following': following, 'total': total, 'limit': limit, 'offset': offset, 'has_more': offset + len(following) < total, } + return attach_experiment_unread_notice(payload, agent['id'], surface='signals_following', ctx=ctx) @app.get('/api/signals/subscribers') async def get_subscribers(authorization: str = Header(None)): @@ -1343,10 +1372,30 @@ async def get_subscribers(authorization: str = Header(None)): 'recent_activity_at': row['recent_activity_at'], }) - return {'subscribers': subscribers} + return attach_experiment_unread_notice( + {'subscribers': subscribers}, + agent['id'], + surface='signals_subscribers', + ctx=ctx, + ) @app.get('/api/signals/{agent_id}') - async def get_agent_signals(agent_id: int, message_type: str = None, limit: int = 50): + async def get_agent_signals( + agent_id: int, + message_type: str = None, + limit: int = 50, + authorization: str = Header(None), + ): + viewer = None + token = _extract_token(authorization) + if token: + viewer = _get_agent_by_token(token) + + def _attach_viewer_notice(payload: dict[str, Any]) -> dict[str, Any]: + if not viewer: + return payload + return attach_experiment_unread_notice(dict(payload), viewer['id'], surface='agent_signals', ctx=ctx) + cache_key = (agent_id, (message_type or '').strip(), max(1, limit)) now_ts = time.time() redis_cache_key = ( @@ -1359,11 +1408,11 @@ async def get_agent_signals(agent_id: int, message_type: str = None, limit: int cached_payload = get_json(redis_cache_key) if isinstance(cached_payload, dict): ctx.agent_signals_cache[cache_key] = (now_ts, cached_payload) - return cached_payload + return _attach_viewer_notice(cached_payload) cached = ctx.agent_signals_cache.get(cache_key) if cached and now_ts - cached[0] < AGENT_SIGNALS_CACHE_TTL_SECONDS: - return cached[1] + return _attach_viewer_notice(cached[1]) conn = get_db_connection() cursor = conn.cursor() @@ -1394,7 +1443,7 @@ async def get_agent_signals(agent_id: int, message_type: str = None, limit: int payload = {'signals': signals} ctx.agent_signals_cache[cache_key] = (now_ts, payload) set_json(redis_cache_key, payload, ttl_seconds=AGENT_SIGNALS_CACHE_TTL_SECONDS) - return payload + return _attach_viewer_notice(payload) @app.post('/api/signals/reply') async def reply_to_signal(data: ReplyRequest, authorization: str = Header(None)): @@ -1563,6 +1612,7 @@ async def reply_to_signal(data: ReplyRequest, authorization: str = Header(None)) return attach_experiment_unread_notice( {'success': True, 'points_earned': reward_points}, agent_id, + ctx=ctx, ) @app.post('/api/signals/{signal_id}/replies/{reply_id}/accept')
service/server/routes_team_missions.py+12 −21 modified@@ -4,6 +4,7 @@ from fastapi import FastAPI, Header, HTTPException +from permissions import TEAM_MISSION_ADMIN_CAPABILITY, require_agent, require_capability from experiment_notifications import ( ExperimentNotificationError, build_experiment_target_rule, @@ -19,7 +20,6 @@ TeamSubmissionRequest, ) from routes_shared import RouteContext -from services import _get_agent_by_token from team_missions import ( TeamMissionError, TeamMissionNotFound, @@ -38,7 +38,6 @@ list_team_missions, settle_team_mission, ) -from utils import _extract_token def _to_http_error(exc: Exception) -> HTTPException: @@ -49,14 +48,6 @@ def _to_http_error(exc: Exception) -> HTTPException: return HTTPException(status_code=500, detail=f"Team mission request failed: {exc}") -def _require_agent(authorization: str | None) -> dict: - token = _extract_token(authorization) - agent = _get_agent_by_token(token) - if not agent: - raise HTTPException(status_code=401, detail="Invalid token") - return agent - - def register_team_mission_routes(app: FastAPI, ctx: RouteContext) -> None: @app.get("/api/team-missions") async def api_list_team_missions(status: str | None = None, limit: int = 50, offset: int = 0): @@ -67,15 +58,15 @@ async def api_list_team_missions(status: str | None = None, limit: int = 50, off @app.post("/api/team-missions") async def api_create_team_mission(data: TeamMissionCreateRequest, authorization: str = Header(None)): - agent = _require_agent(authorization) + agent = require_capability(authorization, TEAM_MISSION_ADMIN_CAPABILITY) try: return create_team_mission(data, created_by_agent_id=agent["id"]) except Exception as exc: raise _to_http_error(exc) @app.get("/api/team-missions/me") async def api_my_team_missions(authorization: str = Header(None)): - agent = _require_agent(authorization) + agent = require_agent(authorization) try: return get_agent_team_missions(agent["id"]) except Exception as exc: @@ -101,7 +92,7 @@ async def api_join_team_mission( data: TeamJoinRequest | None = None, authorization: str = Header(None), ): - agent = _require_agent(authorization) + agent = require_agent(authorization) try: return join_team_mission(mission_key, agent["id"], data) except Exception as exc: @@ -113,7 +104,7 @@ async def api_create_team( data: TeamJoinRequest | None = None, authorization: str = Header(None), ): - agent = _require_agent(authorization) + agent = require_agent(authorization) try: return create_team_for_mission(mission_key, agent["id"], data) except Exception as exc: @@ -125,7 +116,7 @@ async def api_auto_form_teams( data: TeamMissionSettleRequest | None = None, authorization: str = Header(None), ): - _require_agent(authorization) + require_capability(authorization, TEAM_MISSION_ADMIN_CAPABILITY) try: return auto_form_teams(mission_key, assignment_mode=data.assignment_mode if data else None) except Exception as exc: @@ -137,7 +128,7 @@ async def api_settle_team_mission( data: TeamMissionSettleRequest | None = None, authorization: str = Header(None), ): - _require_agent(authorization) + require_capability(authorization, TEAM_MISSION_ADMIN_CAPABILITY) try: return settle_team_mission(mission_key, force=bool(data.force if data else False)) except Exception as exc: @@ -149,7 +140,7 @@ async def api_notify_team_mission( data: ExperimentNotificationRequest, authorization: str = Header(None), ): - agent = _require_agent(authorization) + agent = require_capability(authorization, TEAM_MISSION_ADMIN_CAPABILITY) try: mission = get_team_mission(mission_key) experiment_key = mission.get("experiment_key") or (data.data or {}).get("experiment_key") or "" @@ -208,7 +199,7 @@ async def api_join_team( data: TeamJoinRequest | None = None, authorization: str = Header(None), ): - agent = _require_agent(authorization) + agent = require_agent(authorization) try: return join_team(team_key, agent["id"], data) except Exception as exc: @@ -220,7 +211,7 @@ async def api_link_team_signal( data: TeamMessageLinkRequest, authorization: str = Header(None), ): - agent = _require_agent(authorization) + agent = require_agent(authorization) try: return link_signal_to_team(team_key, agent["id"], data) except Exception as exc: @@ -232,7 +223,7 @@ async def api_submit_team( data: TeamSubmissionRequest, authorization: str = Header(None), ): - agent = _require_agent(authorization) + agent = require_agent(authorization) try: from team_missions import submit_team @@ -246,7 +237,7 @@ async def api_notify_team( data: ExperimentNotificationRequest, authorization: str = Header(None), ): - agent = _require_agent(authorization) + agent = require_capability(authorization, TEAM_MISSION_ADMIN_CAPABILITY) try: team = get_team(team_key) mission = team.get("mission") or {}
service/server/routes_trading.py+40 −10 modified@@ -18,6 +18,7 @@ RouteContext, TRENDING_CACHE_KEY, allow_sync_price_fetch_in_api, + attach_experiment_unread_notice, check_price_api_rate_limit, clamp_profit_for_display, decorate_polymarket_item, @@ -42,6 +43,15 @@ def profit_percent_for_display(profit: float, deposited: float) -> float: def register_trading_routes(app: FastAPI, ctx: RouteContext) -> None: + def _agent_from_authorization(authorization: str | None) -> Optional[dict]: + token = _extract_token(authorization) + return _get_agent_by_token(token) + + def _attach_agent_notice(payload: dict, agent: Optional[dict], *, surface: str) -> dict: + if not agent: + return payload + return attach_experiment_unread_notice(dict(payload), agent['id'], surface=surface, ctx=ctx) + @app.get('/api/profit/history') async def get_profit_history( limit: int = 10, @@ -355,7 +365,12 @@ async def get_profit_history( async def get_leaderboard_position_pnl(limit: int = 10): conn = get_db_connection() cursor = conn.cursor() - cursor.execute('SELECT id, name FROM agents') + cursor.execute( + """ + SELECT id, name + FROM agents + """ + ) agents = cursor.fetchall() result = [] @@ -401,13 +416,18 @@ async def get_leaderboard_position_pnl(limit: int = 10): return {'top_agents': sorted(result, key=lambda item: item['position_pnl'], reverse=True)[:limit]} @app.get('/api/trending') - async def get_trending_symbols(limit: int = 10): + async def get_trending_symbols(limit: int = 10, authorization: str = Header(None)): + agent = _agent_from_authorization(authorization) cached = get_json(TRENDING_CACHE_KEY) if isinstance(cached, list): - return {'trending': cached[: max(1, limit)]} + return _attach_agent_notice({'trending': cached[: max(1, limit)]}, agent, surface='trending') if task_runtime.trending_cache: - return {'trending': task_runtime.trending_cache[: max(1, limit)]} + return _attach_agent_notice( + {'trending': task_runtime.trending_cache[: max(1, limit)]}, + agent, + surface='trending', + ) conn = get_db_connection() cursor = conn.cursor() @@ -445,7 +465,7 @@ async def get_trending_symbols(limit: int = 10): conn.close() set_json(TRENDING_CACHE_KEY, result, ttl_seconds=300) - return {'trending': result} + return _attach_agent_notice({'trending': result}, agent, surface='trending') @app.get('/api/price') async def get_price( @@ -486,12 +506,12 @@ async def get_price( cached_payload = get_json(redis_cache_key) if isinstance(cached_payload, dict): ctx.price_quote_cache[cache_key] = (time.time(), cached_payload) - return cached_payload + return _attach_agent_notice(cached_payload, agent, surface='price_quote') cached = ctx.price_quote_cache.get(cache_key) now_ts = time.time() if cached and now_ts - cached[0] < PRICE_QUOTE_CACHE_TTL_SECONDS: - return cached[1] + return _attach_agent_notice(cached[1], agent, surface='price_quote') price = None conn = get_db_connection() @@ -526,7 +546,7 @@ async def get_price( decorate_polymarket_item(payload, fetch_remote=allow_sync_price_fetch_in_api()) ctx.price_quote_cache[cache_key] = (now_ts, payload) set_json(redis_cache_key, payload, ttl_seconds=PRICE_QUOTE_CACHE_TTL_SECONDS) - return payload + return _attach_agent_notice(payload, agent, surface='price_quote') @app.get('/api/positions') async def get_my_positions(authorization: str = Header(None)): @@ -538,7 +558,12 @@ async def get_my_positions(authorization: str = Header(None)): now_ts = time.time() cached = ctx.positions_cache.get(agent['id']) if cached and now_ts - cached[0] < POSITIONS_CACHE_TTL_SECONDS: - return cached[1] + return attach_experiment_unread_notice( + dict(cached[1]), + agent['id'], + surface='positions', + ctx=ctx, + ) conn = get_db_connection() cursor = conn.cursor() @@ -588,7 +613,12 @@ async def get_my_positions(authorization: str = Header(None)): payload = {'positions': positions, 'cash': agent.get('cash', 100000.0)} ctx.positions_cache[agent['id']] = (now_ts, payload) - return payload + return attach_experiment_unread_notice( + dict(payload), + agent['id'], + surface='positions', + ctx=ctx, + ) @app.get('/api/agents/{agent_id}/positions') async def get_agent_positions(agent_id: int):
service/server/scripts/send_read_conversion_reminders.py+138 −0 added@@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +"""Send targeted reminders to active agents with unread experiment messages.""" + +from __future__ import annotations + +import argparse +import asyncio +import os +import sys +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + + +SERVER_DIR = Path(__file__).resolve().parents[1] +if str(SERVER_DIR) not in sys.path: + sys.path.insert(0, str(SERVER_DIR)) + +from experiment_notifications import ( # noqa: E402 + EXPERIMENT_READ_CONVERSION_EVENT_TYPE, + EXPERIMENT_READ_CONVERSION_REMINDER_TYPE, + resolve_unread_active_experiment_targets, + send_agent_notifications, +) +from routes_shared import RouteContext, utc_now_iso_z # noqa: E402 + + +DEFAULT_EXPERIMENT_KEY = "agent-collab-compete-season-001" +DEFAULT_TITLE = "Action required: read AI-Trader experiment messages" +DEFAULT_CONTENT = ( + "You have unread AI-Trader experiment messages. Immediately call " + "POST /api/claw/messages/read-experiment with Authorization: Bearer <your_token>. " + "This endpoint returns unread experiment messages and marks them read in one step. " + "This reminder is informational only and does not create a trading task." +) + + +def _iso_hours_ago(hours: int) -> str: + return (datetime.now(timezone.utc) - timedelta(hours=max(1, hours))).isoformat().replace("+00:00", "Z") + + +def _env_int(name: str, default: int) -> int: + try: + return int(os.getenv(name, str(default))) + except Exception: + return default + + +def _preview_rows(rows: list[dict[str, Any]], limit: int = 20) -> list[dict[str, Any]]: + preview = [] + for row in rows[:limit]: + preview.append({ + "agent_id": row.get("agent_id"), + "agent_name": row.get("agent_name"), + "variant_key": row.get("variant_key"), + "latest_activity_at": row.get("latest_activity_at"), + "unread_experiment_count": row.get("unread_experiment_count"), + }) + return preview + + +async def _run(args: argparse.Namespace) -> dict[str, Any]: + active_since = args.active_since or _iso_hours_ago(args.active_hours) + reminder_since = args.reminder_since or _iso_hours_ago(args.cooldown_hours) + targets = resolve_unread_active_experiment_targets( + args.experiment_key, + variant_key=args.variant_key, + limit=args.limit, + active_since=active_since, + reminder_since=reminder_since, + ) + dry_run = not args.send + ctx = RouteContext() + target_rule = { + "target": "unread_active", + "experiment_key": args.experiment_key, + "variant_key": args.variant_key, + "limit": args.limit, + "active_since": active_since, + "reminder_since": reminder_since, + "cooldown_hours": args.cooldown_hours, + "active_hours": args.active_hours, + } + data = { + "purpose": "read_conversion", + "recommended_endpoint": "/api/claw/messages/read-experiment", + "recommended_method": "POST", + "marks_read": True, + "active_since": active_since, + "reminder_since": reminder_since, + } + result = await send_agent_notifications( + ctx, + targets, + actor_agent_id=args.actor_agent_id, + message_type=EXPERIMENT_READ_CONVERSION_REMINDER_TYPE, + title=args.title, + content=args.content, + experiment_key=args.experiment_key, + variant_key=args.variant_key, + data=data, + dry_run=dry_run, + event_type=EXPERIMENT_READ_CONVERSION_EVENT_TYPE, + target_rule=target_rule, + ) + result["active_since"] = active_since + result["reminder_since"] = reminder_since + result["targets_preview"] = _preview_rows(targets) + result["sent_at"] = utc_now_iso_z() + return result + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--experiment-key", default=DEFAULT_EXPERIMENT_KEY) + parser.add_argument("--variant-key", default=None) + parser.add_argument("--limit", type=int, default=_env_int("READ_CONVERSION_REMINDER_LIMIT", 500)) + parser.add_argument("--active-hours", type=int, default=24) + parser.add_argument("--cooldown-hours", type=int, default=24) + parser.add_argument("--active-since", default=None) + parser.add_argument("--reminder-since", default=None) + parser.add_argument("--actor-agent-id", type=int, default=_env_int("READ_CONVERSION_ACTOR_AGENT_ID", 0)) + parser.add_argument("--title", default=DEFAULT_TITLE) + parser.add_argument("--content", default=DEFAULT_CONTENT) + parser.add_argument("--send", action="store_true", help="Actually write messages. Defaults to dry-run.") + args = parser.parse_args() + + if args.actor_agent_id <= 0: + parser.error("--actor-agent-id or READ_CONVERSION_ACTOR_AGENT_ID is required") + + result = asyncio.run(_run(args)) + import json + + print(json.dumps(result, ensure_ascii=False, indent=2, sort_keys=True)) + + +if __name__ == "__main__": + main()
service/server/tests/test_admin_permissions.py+129 −0 added@@ -0,0 +1,129 @@ +import os +import sys +import tempfile +import unittest +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from fastapi.testclient import TestClient + + +SERVER_DIR = Path(__file__).resolve().parents[1] +if str(SERVER_DIR) not in sys.path: + sys.path.insert(0, str(SERVER_DIR)) + +import database +from routes import create_app +from routes_shared import utc_now_iso_z + + +def iso(dt: datetime) -> str: + return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + + +class AdminPermissionTests(unittest.TestCase): + def setUp(self) -> None: + self.tmp = tempfile.TemporaryDirectory() + database.DATABASE_URL = "" + database._SQLITE_DB_PATH = os.path.join(self.tmp.name, "test.db") + database.init_database() + self.admin_id = self._create_agent("admin-agent", "admin") + self.regular_id = self._create_agent("regular-agent", "agent") + self.client = TestClient(create_app()) + + def tearDown(self) -> None: + self.tmp.cleanup() + + def _create_agent(self, name: str, role: str) -> int: + now = utc_now_iso_z() + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO agents (name, token, role, points, cash, created_at, updated_at) + VALUES (?, ?, ?, 0, 100000.0, ?, ?) + """, + (name, f"token-{name}", role, now, now), + ) + agent_id = cursor.lastrowid + conn.commit() + conn.close() + return agent_id + + def test_agent_me_returns_role_and_permissions(self): + regular = self.client.get( + "/api/claw/agents/me", + headers={"Authorization": "Bearer token-regular-agent"}, + ) + self.assertEqual(regular.status_code, 200, regular.text) + self.assertEqual(regular.json()["role"], "agent") + self.assertFalse(regular.json()["permissions"]["experiment_admin"]) + self.assertFalse(regular.json()["permissions"]["research_exports"]) + self.assertFalse(regular.json()["permissions"]["team_mission_admin"]) + + admin = self.client.get( + "/api/claw/agents/me", + headers={"Authorization": "Bearer token-admin-agent"}, + ) + self.assertEqual(admin.status_code, 200, admin.text) + self.assertEqual(admin.json()["role"], "admin") + self.assertTrue(admin.json()["permissions"]["experiment_admin"]) + self.assertTrue(admin.json()["permissions"]["research_exports"]) + self.assertTrue(admin.json()["permissions"]["team_mission_admin"]) + + def test_regular_agent_cannot_manage_experiments(self): + payload = { + "title": "Blocked experiment", + "experiment_key": "blocked-exp", + "variants_json": [{"key": "control", "weight": 1}], + } + regular = self.client.post( + "/api/experiments", + headers={"Authorization": "Bearer token-regular-agent"}, + json=payload, + ) + self.assertEqual(regular.status_code, 403, regular.text) + + admin = self.client.post( + "/api/experiments", + headers={"Authorization": "Bearer token-admin-agent"}, + json=payload, + ) + self.assertEqual(admin.status_code, 200, admin.text) + + def test_regular_agent_cannot_create_or_operate_team_missions(self): + due_at = iso(datetime.now(timezone.utc) + timedelta(hours=1)) + payload = { + "mission_key": "admin-only-mission", + "title": "Admin only mission", + "market": "crypto", + "symbol": "BTC", + "submission_due_at": due_at, + } + regular = self.client.post( + "/api/team-missions", + headers={"Authorization": "Bearer token-regular-agent"}, + json=payload, + ) + self.assertEqual(regular.status_code, 403, regular.text) + + admin = self.client.post( + "/api/team-missions", + headers={"Authorization": "Bearer token-admin-agent"}, + json=payload, + ) + self.assertEqual(admin.status_code, 200, admin.text) + + join = self.client.post( + "/api/team-missions/admin-only-mission/join", + headers={"Authorization": "Bearer token-regular-agent"}, + json={}, + ) + self.assertEqual(join.status_code, 200, join.text) + + auto_form = self.client.post( + "/api/team-missions/admin-only-mission/auto-form-teams", + headers={"Authorization": "Bearer token-regular-agent"}, + json={}, + ) + self.assertEqual(auto_form.status_code, 403, auto_form.text)
service/server/tests/test_experiment_assignments.py+36 −0 modified@@ -104,6 +104,33 @@ def test_assignment_summary_includes_variant_metrics(self): """, (self.agent_ids[0], now, now, now), ) + cursor.execute( + """ + INSERT INTO experiment_events + (event_id, event_type, actor_agent_id, object_type, object_id, + experiment_key, variant_key, metadata_json, created_at) + VALUES ('metric-heartbeat', 'agent_heartbeat', ?, 'agent', ?, + 'metric-summary', ?, '{}', ?) + """, + (self.agent_ids[0], str(self.agent_ids[0]), assignment["variant_key"], now), + ) + cursor.execute( + """ + INSERT INTO experiment_events + (event_id, event_type, actor_agent_id, object_type, object_id, + experiment_key, variant_key, metadata_json, created_at) + VALUES ('metric-signal', 'signal_published', ?, 'signal', '101', + 'metric-summary', ?, '{}', ?) + """, + (self.agent_ids[0], assignment["variant_key"], now), + ) + cursor.execute( + """ + INSERT INTO agent_messages (agent_id, type, content, data, read, created_at) + VALUES (?, 'experiment_announcement', 'read diagnostic', '{}', 1, ?) + """, + (self.agent_ids[0], now), + ) conn.commit() conn.close() @@ -115,6 +142,15 @@ def test_assignment_summary_includes_variant_metrics(self): self.assertEqual(variant_row["agent_count"], 1) self.assertEqual(variant_row["trade_count"], 4) self.assertAlmostEqual(variant_row["quality_score_avg"], 4.25) + self.assertEqual(variant_row["primary_metric_family"], "active_agent_behavior") + self.assertEqual(variant_row["read_receipts_role"], "diagnostic_only") + self.assertFalse(variant_row["message_read_state_required"]) + self.assertEqual(variant_row["active_agent_count_24h"], 1) + self.assertEqual(variant_row["heartbeat_count_24h"], 1) + self.assertEqual(variant_row["signal_count_24h"], 1) + self.assertEqual(variant_row["read_receipt_message_count"], 1) + self.assertEqual(summary["metric_policy"]["primary_metric_family"], "active_agent_behavior") + self.assertEqual(summary["metric_policy"]["read_receipts_role"], "diagnostic_only") def test_enrollment_limit_blocks_new_assignments_but_keeps_existing(self): create_experiment({
service/server/tests/test_experiment_events.py+68 −0 modified@@ -46,6 +46,20 @@ def _create_agent(self, name: str, token: str) -> int: conn.close() return agent_id + def _assign(self, experiment_key: str, agent_id: int, variant_key: str) -> None: + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO experiment_assignments + (experiment_key, unit_type, unit_id, variant_key, assignment_reason, metadata_json, created_at) + VALUES (?, 'agent', ?, ?, 'fixture', '{}', ?) + """, + (experiment_key, agent_id, variant_key, utc_now_iso_z()), + ) + conn.commit() + conn.close() + def test_strategy_discussion_reply_and_accept_write_events_with_json_metadata(self): create_experiment({ "experiment_key": "event-context", @@ -137,6 +151,60 @@ def test_strategy_discussion_reply_and_accept_write_events_with_json_metadata(se self.assertGreaterEqual(cursor.fetchone()["count"], 4) conn.close() + def test_copied_realtime_signal_event_uses_follower_experiment_context(self): + create_experiment({ + "experiment_key": "copy-context", + "title": "Copy context", + "variants_json": [{"key": "control", "weight": 1}, {"key": "treatment", "weight": 1}], + }) + self._assign("copy-context", self.author_id, "control") + self._assign("copy-context", self.reply_agent_id, "treatment") + + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + "INSERT INTO subscriptions (leader_id, follower_id, status, created_at) VALUES (?, ?, 'active', ?)", + (self.author_id, self.reply_agent_id, utc_now_iso_z()), + ) + conn.commit() + conn.close() + + response = self.client.post( + "/api/signals/realtime", + headers={"Authorization": "Bearer token-author"}, + json={ + "market": "crypto", + "symbol": "BTC", + "action": "buy", + "quantity": 0.01, + "price": 100, + "content": "BTC copy attribution check.", + "executed_at": utc_now_iso_z(), + }, + ) + self.assertEqual(response.status_code, 200, response.text) + self.assertEqual(response.json()["follower_count"], 1) + + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + SELECT actor_agent_id, experiment_key, variant_key, metadata_json + FROM experiment_events + WHERE event_type = 'signal_published' + AND actor_agent_id = ? + ORDER BY id DESC + LIMIT 1 + """, + (self.reply_agent_id,), + ) + row = cursor.fetchone() + conn.close() + + self.assertEqual(row["experiment_key"], "copy-context") + self.assertEqual(row["variant_key"], "treatment") + self.assertEqual(json.loads(row["metadata_json"])["copied_from_agent_id"], self.author_id) + if __name__ == "__main__": unittest.main()
service/server/tests/test_experiment_notifications.py+235 −6 modified@@ -13,7 +13,11 @@ sys.path.insert(0, str(SERVER_DIR)) import database -from experiment_notifications import MAX_LIMIT, _clamp_limit +from experiment_notifications import ( + MAX_LIMIT, + _clamp_limit, + resolve_unread_active_experiment_targets, +) from experiments import create_experiment from routes import create_app from routes_shared import utc_now_iso_z @@ -25,7 +29,7 @@ def setUp(self) -> None: database.DATABASE_URL = "" database._SQLITE_DB_PATH = os.path.join(self.tmp.name, "test.db") database.init_database() - self.admin_id = self._create_agent("notify-admin") + self.admin_id = self._create_agent("notify-admin", role="experiment_admin") self.agent_control = self._create_agent("notify-control") self.agent_treatment = self._create_agent("notify-treatment") self.agent_extra = self._create_agent("notify-extra") @@ -41,16 +45,16 @@ def setUp(self) -> None: def tearDown(self) -> None: self.tmp.cleanup() - def _create_agent(self, name: str) -> int: + def _create_agent(self, name: str, role: str = "agent") -> int: now = utc_now_iso_z() conn = database.get_db_connection() cursor = conn.cursor() cursor.execute( """ - INSERT INTO agents (name, token, points, cash, created_at, updated_at) - VALUES (?, ?, 0, 100000.0, ?, ?) + INSERT INTO agents (name, token, points, cash, role, created_at, updated_at) + VALUES (?, ?, 0, 100000.0, ?, ?, ?) """, - (name, f"token-{name}", now, now), + (name, f"token-{name}", role, now, now), ) agent_id = cursor.lastrowid conn.commit() @@ -86,6 +90,19 @@ def _notify(self, **overrides): json=payload, ) + def test_regular_agent_cannot_send_experiment_notifications(self): + response = self.client.post( + "/api/experiments/notify-exp/notify", + headers={"Authorization": "Bearer token-notify-control"}, + json={ + "message_type": "experiment_announcement", + "title": "Experiment notice", + "content": "This should be blocked.", + "dry_run": True, + }, + ) + self.assertEqual(response.status_code, 403, response.text) + def test_dry_run_resolves_experiment_targets_without_writing_messages(self): response = self._notify() self.assertEqual(response.status_code, 200, response.text) @@ -214,6 +231,218 @@ def test_unread_summary_and_recent_include_experiment_category(self): ) self.assertEqual(recent.status_code, 200, recent.text) self.assertEqual(len(recent.json()["messages"]), 1) + self.assertTrue(recent.json()["agent_notice"]["experiment_unread"]) + self.assertEqual( + recent.json()["agent_notice"]["recommended_action"]["endpoint"], + "/api/claw/messages/read-experiment", + ) + + def test_read_experiment_endpoint_returns_and_marks_experiment_messages(self): + send_response = self._notify(dry_run=False) + self.assertEqual(send_response.status_code, 200, send_response.text) + + response = self.client.post( + "/api/claw/messages/read-experiment", + headers={"Authorization": "Bearer token-notify-control"}, + ) + self.assertEqual(response.status_code, 200, response.text) + data = response.json() + self.assertTrue(data["success"]) + self.assertEqual(data["message_count"], 1) + self.assertEqual(data["updated"], 1) + self.assertEqual(data["unread_before"], 1) + self.assertEqual(data["remaining_unread_count"], 0) + self.assertEqual(data["messages"][0]["type"], "experiment_announcement") + self.assertEqual(data["messages"][0]["read"], 1) + + summary = self.client.get( + "/api/claw/messages/unread-summary", + headers={"Authorization": "Bearer token-notify-control"}, + ) + self.assertEqual(summary.status_code, 200, summary.text) + self.assertEqual(summary.json()["experiment_unread"], 0) + + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + SELECT COUNT(*) AS count + FROM experiment_events + WHERE event_type = 'experiment_messages_read' + AND actor_agent_id = ? + """, + (self.agent_control,), + ) + self.assertEqual(cursor.fetchone()["count"], 1) + conn.close() + + def test_agent_me_and_count_attach_unified_notice_when_authorized(self): + send_response = self._notify(dry_run=False) + self.assertEqual(send_response.status_code, 200, send_response.text) + + me = self.client.get( + "/api/claw/agents/me", + headers={"Authorization": "Bearer token-notify-control"}, + ) + self.assertEqual(me.status_code, 200, me.text) + self.assertTrue(me.json()["agent_notice"]["experiment_unread"]) + + count = self.client.get( + "/api/claw/agents/count", + headers={"Authorization": "Bearer token-notify-control"}, + ) + self.assertEqual(count.status_code, 200, count.text) + self.assertEqual(count.json()["count"], 4) + self.assertTrue(count.json()["agent_notice"]["experiment_unread"]) + + public_count = self.client.get("/api/claw/agents/count") + self.assertEqual(public_count.status_code, 200, public_count.text) + self.assertNotIn("agent_notice", public_count.json()) + + def test_heartbeat_includes_behavior_experiment_context(self): + response = self.client.post( + "/api/claw/agents/heartbeat", + headers={"Authorization": "Bearer token-notify-control"}, + ) + self.assertEqual(response.status_code, 200, response.text) + context = response.json()["experiment_context"] + + self.assertEqual(context["primary_metric_family"], "active_agent_behavior") + self.assertEqual(context["read_receipts_role"], "diagnostic_only") + self.assertFalse(context["message_read_state_required"]) + self.assertEqual(context["assignments"][0]["experiment_key"], "notify-exp") + self.assertEqual(context["assignments"][0]["variant_key"], "control") + self.assertFalse(context["assignments"][0]["message_read_state_required"]) + + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + SELECT experiment_key, variant_key + FROM experiment_events + WHERE event_type = 'agent_heartbeat' + AND actor_agent_id = ? + ORDER BY id DESC + LIMIT 1 + """, + (self.agent_control,), + ) + row = cursor.fetchone() + conn.close() + + self.assertEqual(row["experiment_key"], "notify-exp") + self.assertEqual(row["variant_key"], "control") + + def test_heartbeat_task_read_event_includes_experiment_context(self): + now = utc_now_iso_z() + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO agent_tasks (agent_id, type, status, input_data, created_at, updated_at) + VALUES (?, 'submit_strategy', 'pending', '{}', ?, ?) + """, + (self.agent_control, now, now), + ) + conn.commit() + conn.close() + + response = self.client.post( + "/api/claw/agents/heartbeat", + headers={"Authorization": "Bearer token-notify-control"}, + ) + self.assertEqual(response.status_code, 200, response.text) + + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + SELECT experiment_key, variant_key + FROM experiment_events + WHERE event_type = 'agent_tasks_read' + AND actor_agent_id = ? + ORDER BY id DESC + LIMIT 1 + """, + (self.agent_control,), + ) + row = cursor.fetchone() + conn.close() + + self.assertEqual(row["experiment_key"], "notify-exp") + self.assertEqual(row["variant_key"], "control") + + def test_trending_attaches_unified_notice_when_authorized(self): + send_response = self._notify(dry_run=False) + self.assertEqual(send_response.status_code, 200, send_response.text) + + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO positions (agent_id, symbol, market, side, quantity, entry_price, current_price, opened_at) + VALUES (?, 'AAPL', 'us-stock', 'long', 1, 100, 110, ?) + """, + (self.agent_control, utc_now_iso_z()), + ) + conn.commit() + conn.close() + + response = self.client.get( + "/api/trending?limit=5", + headers={"Authorization": "Bearer token-notify-control"}, + ) + self.assertEqual(response.status_code, 200, response.text) + data = response.json() + self.assertEqual(data["trending"][0]["symbol"], "AAPL") + self.assertTrue(data["agent_notice"]["experiment_unread"]) + self.assertEqual(data["agent_notice"]["must_call"], "/api/claw/messages/read-experiment") + + def test_unread_active_targets_require_activity_unread_and_no_recent_conversion_reminder(self): + send_response = self._notify(dry_run=False) + self.assertEqual(send_response.status_code, 200, send_response.text) + + now = utc_now_iso_z() + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO experiment_events + (event_id, event_type, actor_agent_id, object_type, object_id, experiment_key, variant_key, metadata_json, created_at) + VALUES ('active-control', 'agent_heartbeat', ?, 'agent', ?, 'notify-exp', 'control', '{}', ?) + """, + (self.agent_control, str(self.agent_control), now), + ) + cursor.execute( + """ + INSERT INTO experiment_events + (event_id, event_type, actor_agent_id, object_type, object_id, experiment_key, variant_key, metadata_json, created_at) + VALUES ('active-treatment', 'agent_heartbeat', ?, 'agent', ?, 'notify-exp', 'treatment', '{}', ?) + """, + (self.agent_treatment, str(self.agent_treatment), now), + ) + cursor.execute( + """ + INSERT INTO agent_messages (agent_id, type, content, data, read, created_at) + VALUES (?, 'experiment_reminder', 'recent conversion', ?, 0, ?) + """, + ( + self.agent_treatment, + json.dumps({"purpose": "read_conversion"}), + now, + ), + ) + conn.commit() + conn.close() + + targets = resolve_unread_active_experiment_targets( + "notify-exp", + active_since="2000-01-01T00:00:00Z", + reminder_since="2000-01-01T00:00:00Z", + limit=10, + ) + self.assertEqual([row["agent_id"] for row in targets], [self.agent_control]) + self.assertEqual(targets[0]["unread_experiment_count"], 1) def test_websocket_allows_matching_token_and_rejects_mismatch(self): with self.client.websocket_connect(f"/ws/notify/{self.agent_control}?token=token-notify-control") as websocket:
service/server/tests/test_research_exports.py+30 −7 modified@@ -47,15 +47,15 @@ def setUp(self) -> None: def tearDown(self) -> None: self.tmp.cleanup() - def _create_agent(self, name: str) -> int: + def _create_agent(self, name: str, role: str = "agent") -> int: conn = database.get_db_connection() cursor = conn.cursor() cursor.execute( """ - INSERT INTO agents (name, token, points, cash, created_at, updated_at) - VALUES (?, ?, 0, 100000.0, ?, ?) + INSERT INTO agents (name, token, role, points, cash, created_at, updated_at) + VALUES (?, ?, ?, 0, 100000.0, ?, ?) """, - (name, f"token-{name}", utc_now_iso_z(), utc_now_iso_z()), + (name, f"token-{name}", role, utc_now_iso_z(), utc_now_iso_z()), ) agent_id = cursor.lastrowid conn.commit() @@ -340,19 +340,42 @@ def test_research_export_api_serves_csv_json_and_schema(self): metadata={"token": "hidden", "safe": "ok"}, ) - csv_response = self.client.get("/api/research/export/events.csv?experiment_key=api-exp") + no_auth_response = self.client.get("/api/research/export/events.csv?experiment_key=api-exp") + self.assertEqual(no_auth_response.status_code, 401) + + regular_response = self.client.get( + "/api/research/export/events.csv?experiment_key=api-exp", + headers={"Authorization": "Bearer token-export-agent-1"}, + ) + self.assertEqual(regular_response.status_code, 403) + + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute("UPDATE agents SET role = 'researcher' WHERE id = ?", (self.agent_1,)) + conn.commit() + conn.close() + + auth_headers = {"Authorization": "Bearer token-export-agent-1"} + + csv_response = self.client.get( + "/api/research/export/events.csv?experiment_key=api-exp", + headers=auth_headers, + ) self.assertEqual(csv_response.status_code, 200) self.assertIn("event_id", csv_response.text.splitlines()[0]) self.assertNotIn("hidden", csv_response.text) - json_response = self.client.get("/api/research/export/events.json?experiment_key=api-exp") + json_response = self.client.get( + "/api/research/export/events.json?experiment_key=api-exp", + headers=auth_headers, + ) self.assertEqual(json_response.status_code, 200) payload = json_response.json() self.assertEqual(payload["dataset"], "events.csv") self.assertIn("event_id", payload["columns"]) self.assertEqual(payload["rows"][0]["experiment_key"], "api-exp") - schema_response = self.client.get("/api/research/schema/events") + schema_response = self.client.get("/api/research/schema/events", headers=auth_headers) self.assertEqual(schema_response.status_code, 200) schema = schema_response.json() self.assertEqual(schema["title"], "events.csv research export row")
service/server/tests/test_routes_shared.py+74 −1 modified@@ -11,20 +11,40 @@ sys.path.insert(0, str(SERVER_DIR)) import database -from routes_shared import attach_experiment_unread_notice, should_fetch_server_trade_price, utc_now_iso_z +from fastapi import HTTPException +from routes_shared import ( + attach_experiment_unread_notice, + normalize_market, + should_fetch_server_trade_price, + utc_now_iso_z, + validate_market, +) class TradePriceSourceTests(unittest.TestCase): def test_crypto_and_polymarket_always_use_server_prices(self) -> None: with patch.dict(os.environ, {'ALLOW_SYNC_PRICE_FETCH_IN_API': 'false'}, clear=False): self.assertTrue(should_fetch_server_trade_price('crypto')) + self.assertTrue(should_fetch_server_trade_price('binance')) self.assertTrue(should_fetch_server_trade_price('polymarket')) self.assertFalse(should_fetch_server_trade_price('us-stock')) def test_env_flag_keeps_server_fetch_for_other_markets(self) -> None: with patch.dict(os.environ, {'ALLOW_SYNC_PRICE_FETCH_IN_API': 'true'}, clear=False): self.assertTrue(should_fetch_server_trade_price('us-stock')) + def test_market_aliases_normalize_to_supported_markets(self) -> None: + self.assertEqual(normalize_market('binance'), 'crypto') + self.assertEqual(normalize_market('kraken'), 'crypto') + self.assertEqual(normalize_market('OKX'), 'crypto') + self.assertEqual(normalize_market('US Stock'), 'us-stock') + self.assertEqual(normalize_market('NASDAQ'), 'us-stock') + self.assertEqual(validate_market('stock'), 'us-stock') + + def test_unknown_market_is_rejected(self) -> None: + with self.assertRaises(HTTPException): + validate_market('forex') + class ExperimentUnreadNoticeTests(unittest.TestCase): def setUp(self) -> None: @@ -79,8 +99,33 @@ def test_attach_experiment_unread_notice_is_non_destructive(self) -> None: notice = payload["experiment_unread"] self.assertEqual(notice["unread_count"], 1) + self.assertFalse(notice["requires_read"]) + self.assertEqual(notice["read_receipts_role"], "diagnostic_only") + self.assertFalse(notice["message_read_state_required"]) + self.assertEqual(notice["recommended_action"]["endpoint"], "/api/claw/messages/read-experiment") + self.assertEqual(notice["recommended_action"]["method"], "POST") + self.assertIn("read_experiment_messages", [action["name"] for action in notice["actions"]]) + self.assertIn("read_and_mark_via_heartbeat", [action["name"] for action in notice["actions"]]) + self.assertEqual(notice["mark_read_endpoint"]["body"], {"categories": ["experiment"]}) self.assertEqual(notice["messages"][0]["type"], "experiment_reminder") + self.assertIn("read_experiment", notice["read_via"]) self.assertIn("heartbeat", notice["read_via"]) + self.assertTrue(payload["agent_notice"]["experiment_unread"]) + self.assertFalse(payload["agent_notice"]["must_call_now"]) + self.assertEqual(payload["agent_notice"]["must_call"], "/api/claw/messages/read-experiment") + self.assertEqual(payload["agent_notice"]["must_call_method"], "POST") + self.assertEqual(payload["agent_notice"]["primary_metric_family"], "active_agent_behavior") + self.assertEqual(payload["agent_notice"]["read_receipts_role"], "diagnostic_only") + self.assertFalse(payload["agent_notice"]["message_read_state_required"]) + self.assertEqual( + payload["agent_notice"]["required_action"]["endpoint"], + "/api/claw/messages/read-experiment", + ) + self.assertEqual(payload["agent_notice"]["unread_count"], 1) + self.assertEqual( + payload["agent_notice"]["recommended_action"]["endpoint"], + "/api/claw/messages/read-experiment", + ) conn = database.get_db_connection() cursor = conn.cursor() @@ -91,6 +136,34 @@ def test_attach_experiment_unread_notice_is_non_destructive(self) -> None: self.assertEqual(cursor.fetchone()["read"], 0) conn.close() + def test_attach_experiment_unread_notice_records_exposure(self) -> None: + self._insert_message("experiment_reminder") + payload = attach_experiment_unread_notice( + {"success": True}, + self.agent_id, + surface="unit_test_surface", + ) + + self.assertIn("experiment_unread", payload) + conn = database.get_db_connection() + cursor = conn.cursor() + cursor.execute( + """ + SELECT actor_agent_id, object_type, object_id, metadata_json + FROM experiment_events + WHERE event_type = 'experiment_notice_exposed' + """ + ) + row = cursor.fetchone() + conn.close() + + self.assertIsNotNone(row) + self.assertEqual(row["actor_agent_id"], self.agent_id) + self.assertEqual(row["object_type"], "agent") + self.assertEqual(row["object_id"], str(self.agent_id)) + self.assertIn("unit_test_surface", row["metadata_json"]) + self.assertIn("experiment_reminder", row["metadata_json"]) + def test_attach_experiment_unread_notice_omits_empty_notice(self) -> None: self._insert_message("experiment_reminder", read=1) payload = attach_experiment_unread_notice({"success": True}, self.agent_id)
service/server/tests/test_team_missions.py+5 −5 modified@@ -38,21 +38,21 @@ def setUp(self) -> None: database.DATABASE_URL = "" database._SQLITE_DB_PATH = os.path.join(self.tmp.name, "test.db") database.init_database() - self.admin_agent = self._create_agent("admin-agent") + self.admin_agent = self._create_agent("admin-agent", role="team_mission_admin") def tearDown(self) -> None: self.tmp.cleanup() - def _create_agent(self, name: str, *, profit: float = 0.0, market: str = "crypto") -> int: + def _create_agent(self, name: str, *, profit: float = 0.0, market: str = "crypto", role: str = "agent") -> int: now = utc_now_iso_z() conn = database.get_db_connection() cursor = conn.cursor() cursor.execute( """ - INSERT INTO agents (name, token, points, cash, created_at, updated_at) - VALUES (?, ?, 0, 100000.0, ?, ?) + INSERT INTO agents (name, token, role, points, cash, created_at, updated_at) + VALUES (?, ?, ?, 0, 100000.0, ?, ?) """, - (name, f"token-{name}", now, now), + (name, f"token-{name}", role, now, now), ) agent_id = cursor.lastrowid cursor.execute(
Vulnerability mechanics
Root cause
"Missing authorization check on the `/api/research/agents.csv` endpoint allows unauthenticated information disclosure."
Attack vector
An unauthenticated remote attacker can access the `/api/research/agents.csv` endpoint, which previously had no access control. The CVSS vector confirms the attack is over the network with no privileges required and no user interaction. By simply sending a GET request to this endpoint, the attacker can obtain exported research data that should have been restricted to authorized agents. [ref_id=2]
Affected code
The vulnerability is in the `/api/research/agents.csv` endpoint of the Research Export component. The patch shows that the frontend sidebar now conditionally renders the Research Exports navigation item only when the agent has the `research_exports` capability, and the vendor confirms that "Research export endpoints now require an authenticated agent with the research_exports capability."
What the fix does
The patch adds permission checks to the Research Export endpoints. In the frontend, the sidebar navigation item for Research Exports is now conditionally rendered only when the agent has the `research_exports` capability via `hasPermission(agentInfo, 'research_exports')`. The vendor confirms that "Research export endpoints now require an authenticated agent with the research_exports capability," meaning the backend also enforces this authorization check. This prevents unauthenticated or unauthorized access to the `/api/research/agents.csv` endpoint.
Preconditions
- authNo authentication required; the endpoint was publicly accessible.
- networkNetwork access to the AI-Trader instance.
Generated on Jun 15, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
8- github.com/Dave-gilmore-aus/security-advisories/blob/main/AI-Trader-Unauthenticated%20Sensitive%20Data%20Exposure%20in%20Research%20Export%20(CVE-Pending).mdnvd
- github.com/HKUDS/AI-Trader/commit/91a31aac1b0f4dbc6b8bef9f6eff0b7912e0bc65nvd
- github.com/HKUDS/AI-Trader/issues/242nvd
- github.com/HKUDS/AI-Trader/pull/227nvd
- vuldb.com/cve/CVE-2026-12203nvd
- vuldb.com/submit/830273nvd
- vuldb.com/vuln/370846nvd
- vuldb.com/vuln/370846/ctinvd
News mentions
0No linked articles in our index yet.