Count Me In: Viable Distributed Summary Statistics for Securing High-Speed Networks (pdf)

Johanna Amann, Seth Hall, Robin Sommer
Proc. Symposium on Research in Attacks, Intrusions and Defenses (RAID), September 2014
Links to scripts and data structures used in the paper:
Main sumstats implementation (bro git master)
scan, app-metrics, traceroute detection scripts (bro git master)
ssh brute-force detection (bro git master)
ftp brute-force detection (bro git master)
Topk implementation (bro git master) & Hyperloglog implementation (bro git master) & Reservoir sampling plugin (bro git master)
Traffic Matrix measurement script (download)
module Subnet;

export {
	redef enum Log::ID += { LOG };

	type Info: record {
		ts: time &log &default=network_time();
		net: string &log;
		inbytes: double &log;
		outbytes: double &log;
	};

	global log_subnet: event( rec: Info );
}



function subn_norm(key: SumStats::Key): SumStats::Key {
	return [$str=cat(mask_addr(key$host, 24))];
}

event bro_init() {

	local r1: SumStats::Reducer = [$stream="inbytes", $apply=set(SumStats::SUM), $normalize_key=subn_norm];
	local r2: SumStats::Reducer = [$stream="outbytes", $apply=set(SumStats::SUM), $normalize_key=subn_norm];

	SumStats::create([$epoch=60mins,
		$reducers=set(r1, r2),
		$epoch_finished(data:SumStats::ResultTable) = {
				for ( hostkey in data ) {
					local out: Subnet::Info;
					out$inbytes = data[hostkey]["inbytes"]$sum;
					out$outbytes = data[hostkey]["outbytes"]$sum;
					out$net = hostkey$str;
					Log::write(Subnet::LOG, out);
				}
			}
		]);
	

	Log::create_stream(Subnet::LOG, [$columns=Info, $ev=log_subnet]);
	Log::set_buf(Subnet::LOG, F);
}


event connection_state_remove(c: connection) {
	if ( c$conn$proto == tcp && c$conn$conn_state == "SF" ) {

		if ( Site::is_local_addr(c$id$resp_h) ) {
			SumStats::observe("outbytes", [$host=c$id$resp_h], [$num=c$conn$orig_bytes]);
			SumStats::observe("inbytes", [$host=c$id$resp_h], [$num=c$conn$resp_bytes]);
		} else if ( Site::is_local_addr(c$id$orig_h) ) {
			SumStats::observe("outbytes", [$host=c$id$orig_h], [$num=c$conn$orig_bytes]);
			SumStats::observe("inbytes", [$host=c$id$orig_h], [$num=c$conn$resp_bytes]);
		}
	}
}
Top-k measurement script (download)
const usehttp: count = 1 ;
const useconnections: count = 1;
const usedns: count = 2;

module Topk;

export {
	redef enum Log::ID += { LOG, LOG10 };

	type Info: record {
		ts: time &log;
		iv: interval &log;
		sources: vector of string &log;
		sources_counts: vector of count &log;
		sources_epsilons: vector of count &log;
		sources_num: count &log;
		destinations: vector of string &log;
		destinations_counts: vector of count &log;
		destinations_epsilons: vector of count &log;
		destinations_num: count &log;
		dns_requests: vector of string &log;
		dns_requests_counts: vector of count &log;
		dns_requests_epsilons: vector of count &log;
		dns_requests_num: count &log;
		http_host: vector of string &log;
		http_host_counts: vector of count &log;
		http_host_epsilons: vector of count &log;
		http_host_num: count &log;
	};

	global log_topk: event( rec: Info );
}

global topk_howmuch = 10;
global topk_size = 1000;


event bro_init() {

	local r1_conn_source: SumStats::Reducer = [$stream="conn_source", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r2_conn_source: SumStats::Reducer = [$stream="conn_source", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r3_conn_source: SumStats::Reducer = [$stream="conn_source", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r1_conn_dest: SumStats::Reducer = [$stream="conn_dest", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r2_conn_dest: SumStats::Reducer = [$stream="conn_dest", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r3_conn_dest: SumStats::Reducer = [$stream="conn_dest", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r1_dns_request: SumStats::Reducer = [$stream="dns_request", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r2_dns_request: SumStats::Reducer = [$stream="dns_request", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r3_dns_request: SumStats::Reducer = [$stream="dns_request", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r1_http_host: SumStats::Reducer = [$stream="http_host", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r2_http_host: SumStats::Reducer = [$stream="http_host", $apply=set(SumStats::TOPK), $topk_size=topk_size];
	local r3_http_host: SumStats::Reducer = [$stream="http_host", $apply=set(SumStats::TOPK), $topk_size=topk_size];

	SumStats::create([$epoch=60mins,
		$reducers=set(r1_conn_source, r1_conn_dest, r1_dns_request, r1_http_host),
		$epoch_finished(data:SumStats::ResultTable) = {
			local ctr = 0;

			local out: Topk::Info;
			out$ts = network_time();
			out$iv= 60 mins;

			for ( a in data ) {
				# there really should be one nocat category...
				ctr = ctr + 1;
				if ( ctr == 0 ) {
					print "Something went wrong here";
					terminate();
				}
				local obs = data[a];
				if ( "conn_source" in obs ) {
					local top_sources_strs: vector of SumStats::Observation;
					top_sources_strs = topk_get_top(obs["conn_source"]$topk, topk_howmuch);
					for ( str in top_sources_strs ) {
						out$sources[|out$sources|] = top_sources_strs[str]$str;
						out$sources_counts[|out$sources_counts|] = topk_count(obs["conn_source"]$topk, top_sources_strs[str]);
						out$sources_epsilons[|out$sources_epsilons|] = topk_epsilon(obs["conn_source"]$topk, top_sources_strs[str]);
					}
					out$sources_num = topk_sum(obs["conn_source"]$topk);
				}
				if ( "conn_dest" in obs ) {
					local top_destinations_strs: vector of SumStats::Observation;
					top_destinations_strs = topk_get_top(obs["conn_dest"]$topk, topk_howmuch);
					for ( str in top_destinations_strs ) {
						out$destinations[|out$destinations|] = top_destinations_strs[str]$str;
						out$destinations_counts[|out$destinations_counts|] = topk_count(obs["conn_dest"]$topk, top_destinations_strs[str]);
						out$destinations_epsilons[|out$destinations_epsilons|] = topk_epsilon(obs["conn_dest"]$topk, top_destinations_strs[str]);
					}
					out$destinations_num = topk_sum(obs["conn_dest"]$topk);
				}
				if ( "dns_request" in obs ) {
					local top_dns_strs: vector of SumStats::Observation;
					top_dns_strs = topk_get_top(obs["dns_request"]$topk, topk_howmuch);
					for ( str in top_dns_strs ) {
						out$dns_requests[|out$dns_requests|] = top_dns_strs[str]$str;
						out$dns_requests_counts[|out$dns_requests_counts|] = topk_count(obs["dns_request"]$topk, top_dns_strs[str]);
						out$dns_requests_epsilons[|out$dns_requests_epsilons|] = topk_epsilon(obs["dns_request"]$topk, top_dns_strs[str]);
					}
					out$dns_requests_num = topk_sum(obs["dns_request"]$topk);
				}
				if ( "http_host" in obs ) {
					local top_hosts_strs: vector of SumStats::Observation;
					top_hosts_strs = topk_get_top(obs["http_host"]$topk, topk_howmuch);
					for ( str in top_hosts_strs ) {
						out$http_host[|out$http_host|] = top_hosts_strs[str]$str;
						out$http_host_counts[|out$http_host_counts|] = topk_count(obs["http_host"]$topk, top_hosts_strs[str]);
						out$http_host_epsilons[|out$http_host_epsilons|] = topk_epsilon(obs["http_host"]$topk, top_hosts_strs[str]);
					}
					out$http_host_num = topk_sum(obs["http_host"]$topk);
				}

				Log::write(Topk::LOG, out);
			}
		}]);
	
	SumStats::create([$epoch=10mins,
		$reducers=set(r2_conn_source, r2_conn_dest, r2_dns_request, r2_http_host),
		$epoch_finished(data:SumStats::ResultTable) = {
			local ctr = 0;

			local out: Topk::Info;
			out$ts = network_time();
			out$iv= 10 mins;

			for ( a in data ) {
				# there really should be one nocat category...
				ctr = ctr + 1;
				if ( ctr == 0 ) {
					print "Something went wrong here";
					terminate();
				}
				local obs = data[a];
				if ( "conn_source" in obs ) {
					local top_sources_strs: vector of SumStats::Observation;
					top_sources_strs = topk_get_top(obs["conn_source"]$topk, topk_howmuch);
					for ( str in top_sources_strs ) {
						out$sources[|out$sources|] = top_sources_strs[str]$str;
						out$sources_counts[|out$sources_counts|] = topk_count(obs["conn_source"]$topk, top_sources_strs[str]);
						out$sources_epsilons[|out$sources_epsilons|] = topk_epsilon(obs["conn_source"]$topk, top_sources_strs[str]);
					}
					out$sources_num = topk_sum(obs["conn_source"]$topk);
				}
				if ( "conn_dest" in obs ) {
					local top_destinations_strs: vector of SumStats::Observation;
					top_destinations_strs = topk_get_top(obs["conn_dest"]$topk, topk_howmuch);
					for ( str in top_destinations_strs ) {
						out$destinations[|out$destinations|] = top_destinations_strs[str]$str;
						out$destinations_counts[|out$destinations_counts|] = topk_count(obs["conn_dest"]$topk, top_destinations_strs[str]);
						out$destinations_epsilons[|out$destinations_epsilons|] = topk_epsilon(obs["conn_dest"]$topk, top_destinations_strs[str]);
					}
					out$destinations_num = topk_sum(obs["conn_dest"]$topk);
				}
				if ( "dns_request" in obs ) {
					local top_dns_strs: vector of SumStats::Observation;
					top_dns_strs = topk_get_top(obs["dns_request"]$topk, topk_howmuch);
					for ( str in top_dns_strs ) {
						out$dns_requests[|out$dns_requests|] = top_dns_strs[str]$str;
						out$dns_requests_counts[|out$dns_requests_counts|] = topk_count(obs["dns_request"]$topk, top_dns_strs[str]);
						out$dns_requests_epsilons[|out$dns_requests_epsilons|] = topk_epsilon(obs["dns_request"]$topk, top_dns_strs[str]);
					}
					out$dns_requests_num = topk_sum(obs["dns_request"]$topk);
				}
				if ( "http_host" in obs ) {
					local top_hosts_strs: vector of SumStats::Observation;
					top_hosts_strs = topk_get_top(obs["http_host"]$topk, topk_howmuch);
					for ( str in top_hosts_strs ) {
						out$http_host[|out$http_host|] = top_hosts_strs[str]$str;
						out$http_host_counts[|out$http_host_counts|] = topk_count(obs["http_host"]$topk, top_hosts_strs[str]);
						out$http_host_epsilons[|out$http_host_epsilons|] = topk_epsilon(obs["http_host"]$topk, top_hosts_strs[str]);
					}
					out$http_host_num = topk_sum(obs["http_host"]$topk);
				}

				Log::write(Topk::LOG10, out);
			}
		}]);

	Log::create_stream(Topk::LOG, [$columns=Info, $ev=log_topk]);
    Log::set_buf(Topk::LOG, F);
	Log::create_stream(Topk::LOG10, [$columns=Info, $ev=log_topk]);
    Log::set_buf(Topk::LOG10, F);
}

@if( useconnections == 1) 

event connection_state_remove(c: connection) {
	if ( Site::is_private_addr(c$id$resp_h) || Site::is_local_addr(c$id$resp_h) ) {
		return;
	}

	if ( c$conn$proto == tcp && c$conn$conn_state == "SF" ) {
		SumStats::observe("conn_source", [$str="nocat"], [$str=cat(c$id$orig_h)]);
		SumStats::observe("conn_dest", [$str="nocat"], [$str=cat(c$id$resp_h)]);
	}
}

@endif

@if ( usedns == 1 )

event dns_request(c: connection, msg: dns_msg, query: string, qtype: count, qclass: count) {
	if ( Site::is_private_addr(c$id$resp_h) || Site::is_local_addr(c$id$resp_h) ) {
		return;
	}

	local splitstring = split(query, /\./);
	local res: string;
	if ( |splitstring| <= 2 ) {
		res = query;
	} else {
		res = cat(splitstring[|splitstring|-1], ".", splitstring[|splitstring|]);
	}

	if ( res != ".in-addr.arpa" ) {
		SumStats::observe("dns_request", [$str="nocat"], [$str=res]);
	}
}

@endif

@if ( usedns == 2 )

# second implementation without split

event dns_request(c: connection, msg: dns_msg, query: string, qtype: count, qclass: count) {
	if ( Site::is_private_addr(c$id$resp_h) || Site::is_local_addr(c$id$resp_h) ) {
		return;
	}

	local res = find_last(query, /\.[^\.]+\.[^\.]+$/);
	if ( res == "" )
		res = query;

	if ( res != ".in-addr.arpa" ) {
		SumStats::observe("dns_request", [$str="nocat"], [$str=res]);
	}
}

@endif

@if ( usedns == 3 )

# just observe everything...

event dns_request(c: connection, msg: dns_msg, query: string, qtype: count, qclass: count) {
	if ( Site::is_private_addr(c$id$resp_h) || Site::is_local_addr(c$id$resp_h) ) {
		return;
	}

	SumStats::observe("dns_request", [$str="nocat"], [$str=query]);
}

@endif

@if ( usehttp == 1 ) 

event http_header(c: connection, is_orig: bool, name: string, value: string) {
	if ( Site::is_private_addr(c$id$resp_h) || Site::is_local_addr(c$id$resp_h) ) {
		return;
	}

	if ( is_orig && name == "HOST" && c$http?$host ) {
		SumStats::observe("http_host", [$str="nocat"], [$str=c$http$host]);
	}
}

@endif