Credential Access (TA0006)(external, opens in a new tab or window)
text code block:FROM logs-okta.system-* METADATA _id, _version, _index | WHERE event.dataset == "okta.system" AND (event.action LIKE "user.authentication.*" OR event.action == "user.session.start") AND okta.outcome.reason IN ("INVALID_CREDENTIALS", "LOCKED_OUT") AND okta.actor.alternate_id IS NOT NULL // Build user-source context as JSON for enrichment | EVAL Esql.user_source_info = CONCAT( "{\"user\":\"", okta.actor.alternate_id, "\",\"ip\":\"", COALESCE(okta.client.ip::STRING, "unknown"), "\",\"user_agent\":\"", COALESCE(okta.client.user_agent.raw_user_agent, "unknown"), "\"}" ) // FIRST STATS: Aggregate by (IP, user) to get per-user attempt counts // This prevents skew from outlier users with many attempts | STATS Esql.user_attempts = COUNT(*), Esql.user_dt_hashes = COUNT_DISTINCT(okta.debug_context.debug_data.dt_hash), Esql.user_source_info = VALUES(Esql.user_source_info), Esql.user_agents_per_user = VALUES(okta.client.user_agent.raw_user_agent), Esql.devices_per_user = VALUES(okta.client.device), Esql.is_proxy = VALUES(okta.security_context.is_proxy), Esql.geo_country = VALUES(client.geo.country_name), Esql.geo_city = VALUES(client.geo.city_name), Esql.asn_number = VALUES(source.as.number), Esql.asn_org = VALUES(source.as.organization.name), Esql.threat_suspected = VALUES(okta.debug_context.debug_data.threat_suspected), Esql.risk_level = VALUES(okta.debug_context.debug_data.risk_level), Esql.risk_reasons = VALUES(okta.debug_context.debug_data.risk_reasons), Esql.event_actions = VALUES(event.action), Esql.first_seen_user = MIN(@timestamp), Esql.last_seen_user = MAX(@timestamp) BY okta.client.ip, okta.actor.alternate_id // SECOND STATS: Aggregate by IP to detect credential stuffing pattern // Now we can accurately measure the distribution of attempts across users | STATS Esql.unique_users = COUNT(*), Esql.total_attempts = SUM(Esql.user_attempts), Esql.max_attempts_per_user = MAX(Esql.user_attempts), Esql.min_attempts_per_user = MIN(Esql.user_attempts), Esql.avg_attempts_per_user = AVG(Esql.user_attempts), Esql.users_with_single_attempt = SUM(CASE(Esql.user_attempts == 1, 1, 0)), Esql.users_with_few_attempts = SUM(CASE(Esql.user_attempts <= 2, 1, 0)), Esql.first_seen = MIN(Esql.first_seen_user), Esql.last_seen = MAX(Esql.last_seen_user), Esql.target_users = VALUES(okta.actor.alternate_id), Esql.user_source_mapping = VALUES(Esql.user_source_info), Esql.event_action_values = VALUES(Esql.event_actions), Esql.user_agent_values = VALUES(Esql.user_agents_per_user), Esql.device_values = VALUES(Esql.devices_per_user), Esql.is_proxy_values = VALUES(Esql.is_proxy), Esql.geo_country_values = VALUES(Esql.geo_country), Esql.geo_city_values = VALUES(Esql.geo_city), Esql.source_asn_values = VALUES(Esql.asn_number), Esql.source_asn_org_values = VALUES(Esql.asn_org), Esql.threat_suspected_values = VALUES(Esql.threat_suspected), Esql.risk_level_values = VALUES(Esql.risk_level), Esql.risk_reasons_values = VALUES(Esql.risk_reasons) BY okta.client.ip // Calculate stuffing signature: most users should have very few attempts | EVAL Esql.pct_users_few_attempts = Esql.users_with_few_attempts * 100.0 / Esql.unique_users // Credential stuffing: many users, most with 1-2 attempts each, low max per user // Stacked stats gives us accurate per-user distribution instead of skewed averages | WHERE Esql.total_attempts >= 25 AND Esql.unique_users >= 15 AND Esql.max_attempts_per_user <= 2 AND Esql.pct_users_few_attempts >= 80.0 | SORT Esql.unique_users DESC | KEEP Esql.*, okta.client.ip
Install detection rules in Elastic Security
Detect Potential Okta Credential Stuffing (Single Source) in the Elastic Security detection engine by installing this rule into your Elastic Stack.
To setup this rule, check out the installation guide for Prebuilt Security Detection Rules(external, opens in a new tab or window).