Providing Dynamic Control to Passive Network Security Monitoring (pdf)

Johanna Amann, Robin Sommer
Proc. Symposium on Research in Attacks, Intrusions and Defenses (RAID), November 2015
Links to scripts and software used in the paper:
Installation instructions

Bro NetControl connector scripts

The bro-netcontrol repository contains scripts that can be used to connect the Bro NetControl framework to systems outside of Bro and, e.g., send out switch commands via OpenFlow.

Please note that the NetControl framework and scripts is still under active development; the API is not completely fixed yet and the scripts have not seen thorough testing.

Installation Instructions

To use the connector scripts, you need to install the topic/johanna/netcontrol branch of Bro with commands similar to this:

git clone --recursive -b topic/johanna/netcontrol git://git.bro.org/bro
./configure --prefix=[install prefix] --with-libcaf=[libcaf location]
make install

You also need an installation of the Bro Communication Library broker with enabled python bindings. Installation will be similar to this:

git clone --recursive git://git.bro.org/broker
./configure --prefix=[install prefix] --with-libcaf=[libcaf location]
make install

To allow python to find the installed python broker bindings, it might be necessary to adjust the PYTHONPATH variable similar to this:

export PYTHONPATH=[install prefix]/lib/python

after that, you should be able to launch the provided scripts.

OpenFlow connector

The openflow directory contains the source for a Ryu OpenFlow controller, that can be used to interface the Bro NetControl framework with an OpenFlow capable switch. To use the controller, you need to first install the Ryu SDN framework.

After installation, you can run the openflow controller by executing

ryu-manager --verbose openflow/controller.py

or similar. After that, OpenFlow switches should be able to connect to port 6633; Broker connections can be made to port 9999. An example script that shunts all connection traffic to a switch after an SSL, SSH or GridFTP session has been established is provided in example.bro.

Command-line connector

The command-line directory contains a script that can be used to interface the NetControl framework to command-line invocations. commands.yaml shows an example that can be used to invoke iptables.

Acld connector

The acld directory contains the source for an connector to acld (more information). An example script that simply blocks all connections is provided in example.bro.

Implementation of the OpenFlow, acld and command-line connectors
Main netcontrol implementation (bro git branch) and testcases
OpenFlow module implementation (bro git branch) and testcases
Bro patch used to measure how much data is transfered after specified time (download)
diff --git a/src/analyzer/protocol/conn-size/ConnSize.cc b/src/analyzer/protocol/conn-size/ConnSize.cc
index 183ee1e..cdee22e 100644
--- a/src/analyzer/protocol/conn-size/ConnSize.cc
+++ b/src/analyzer/protocol/conn-size/ConnSize.cc
@@ -34,6 +34,10 @@ void ConnSize_Analyzer::Init()
 	orig_pkts_thresh = 0;
 	resp_bytes_thresh = 0;
 	resp_pkts_thresh = 0;
+
+	thresh = 0;
+	thresh_kind = 0;
+	num_bytes = 0;
 	}

 void ConnSize_Analyzer::Done()
@@ -85,10 +89,35 @@ void ConnSize_Analyzer::CheckSizes(bool is_orig)
 		}
 	}

+
+void ConnSize_Analyzer::CheckSize(double st)
+	{
+	if ( thresh_kind != 0 && thresh < st && network_time - start_time >= st )
+		{
+		thresh = st;
+		printf("%.6f %s timing %.6f %llu %llu %.6f %.6f\n", network_time, Conn()->GetUID().Base62().c_str(), st, num_bytes, resp_bytes, network_time - start_time, network_time - Conn()->StartTime());
+		}
+	}
+
+
 void ConnSize_Analyzer::DeliverPacket(int len, const u_char* data, bool is_orig, uint64 seq, const IP_Hdr* ip, int caplen)
 	{
 	Analyzer::DeliverPacket(len, data, is_orig, seq, ip, caplen);

+	if ( ! is_orig ) {
+			CheckSize(0.008512);
+			CheckSize(0.011470);
+			CheckSize(0.041560);
+			CheckSize(0.08270);
+			CheckSize(0.089430);
+			CheckSize(0.09307);
+			CheckSize(0.1);
+			CheckSize(0.2);
+
+		if ( thresh_kind != 0 )
+			num_bytes += ip->PayloadLen() - 20; // 20 = minimum tcp header size
+	}
+
 	if ( is_orig )
 		{
 		orig_bytes += ip->TotalLen();
@@ -181,3 +210,13 @@ void ConnSize_Analyzer::FlipRoles()
 	resp_pkts = tmp;
 	}

+void ConnSize_Analyzer::EnableTiming()
+	{
+	if ( thresh_kind != 0 )
+		return;
+	printf("%.6f %s request %llu %.6f \n", network_time, Conn()->GetUID().Base62().c_str(), resp_bytes, network_time - Conn()->StartTime());
+
+	thresh_kind = 1;
+	start_time = network_time;
+	}
+
diff --git a/src/analyzer/protocol/conn-size/ConnSize.h b/src/analyzer/protocol/conn-size/ConnSize.h
index d8dff57..e065eb4 100644
--- a/src/analyzer/protocol/conn-size/ConnSize.h
+++ b/src/analyzer/protocol/conn-size/ConnSize.h
@@ -21,6 +21,8 @@ public:
 	virtual void UpdateConnVal(RecordVal *conn_val);
 	virtual void FlipRoles();

+	void EnableTiming();
+
 	void SetThreshold(uint64_t threshold, bool bytes, bool orig);
 	uint64 GetThreshold(bool bytes, bool orig);

@@ -31,6 +33,7 @@ protected:
 	virtual void DeliverPacket(int len, const u_char* data, bool is_orig,
 					uint64 seq, const IP_Hdr* ip, int caplen);
 	void CheckSizes(bool is_orig);
+	void CheckSize(double st);

 	void ThresholdEvent(EventHandlerPtr f, uint64 threshold, bool is_orig);

@@ -39,6 +42,11 @@ protected:
 	uint64_t orig_pkts;
 	uint64_t resp_pkts;

+	uint64_t num_bytes;
+	double thresh;
+	unsigned int thresh_kind;
+	double start_time;
+
 	uint64_t orig_bytes_thresh;
 	uint64_t resp_bytes_thresh;
 	uint64_t orig_pkts_thresh;
diff --git a/src/analyzer/protocol/http/HTTP.cc b/src/analyzer/protocol/http/HTTP.cc
index f60d137..e4aac54 100644
--- a/src/analyzer/protocol/http/HTTP.cc
+++ b/src/analyzer/protocol/http/HTTP.cc
@@ -12,6 +12,7 @@
 #include "HTTP.h"
 #include "Event.h"
 #include "analyzer/protocol/mime/MIME.h"
+#include "analyzer/protocol/conn-size/ConnSize.h"
 #include "file_analysis/Manager.h"

 #include "events.bif.h"
@@ -1372,6 +1373,11 @@ void HTTP_Analyzer::HTTP_Request()
 		vl->append(new StringVal(fmt("%.1f", request_version)));
 		// DEBUG_MSG("%.6f http_request\n", network_time);
 		ConnectionEvent(http_request, vl);
+		//printf("%.6f %s request \n", network_time, Conn()->GetUID().Base62().c_str());
+		analyzer::Analyzer* csa = Conn()->FindAnalyzer("CONNSIZE");
+		if ( ! csa )
+			printf("No connsize analyzer?");
+		static_cast<analyzer::conn_size::ConnSize_Analyzer*>(csa)->EnableTiming();
 		}
 	}

diff --git a/src/analyzer/protocol/tcp/TCP.cc b/src/analyzer/protocol/tcp/TCP.cc
index 72cad8a..c5f5dd0 100644
--- a/src/analyzer/protocol/tcp/TCP.cc
+++ b/src/analyzer/protocol/tcp/TCP.cc
@@ -1777,7 +1777,10 @@ void TCP_Analyzer::ConnectionFinished(int half_finished)
 	if ( half_finished )
 		Event(connection_half_finished);
 	else
+		{
+		printf("%.6f %s finished \n", network_time, Conn()->GetUID().Base62().c_str());
 		Event(connection_finished);
+		}

 	is_active = 0;
 	}
Bro patch introducing features and functions needed for NetControl (download)
diff --git a/scripts/base/init-bare.bro b/scripts/base/init-bare.bro
index 23f4fd4..2a8a2f7 100644
--- a/scripts/base/init-bare.bro
+++ b/scripts/base/init-bare.bro
@@ -120,6 +120,18 @@ type conn_id: record {
 	resp_p: port;	##< The responder's port number.
 } &log;

+## The identifying 4-tuple of a uni-directional flow.
+##
+## .. note:: It's actually a 5-tuple: the transport-layer protocol is stored as
+##    part of the port values, `src_p` and `dst_p`, and can be extracted from
+##    them with :bro:id:`get_port_transport_proto`.
+type flow_id : record {
+	src_h: addr;	##< The source IP address.
+	src_p: port;	##< The source port number.
+	dst_h: addr;	##< The destination IP address.
+	dst_p: port;	##< The desintation port number.
+};
+
 ## Specifics about an ICMP conversation. ICMP events typically pass this in
 ## addition to :bro:type:`conn_id`.
 ##
diff --git a/scripts/base/init-default.bro b/scripts/base/init-default.bro
index 473d94f..0fee22a 100644
--- a/scripts/base/init-default.bro
+++ b/scripts/base/init-default.bro
@@ -37,6 +37,8 @@
 @load base/frameworks/reporter
 @load base/frameworks/sumstats
 @load base/frameworks/tunnels
+@load base/frameworks/openflow
+@load base/frameworks/pacf

 @load base/protocols/conn
 @load base/protocols/dhcp
diff --git a/scripts/base/utils/json.bro b/scripts/base/utils/json.bro
new file mode 100644
index 0000000..b6d0093
--- /dev/null
+++ b/scripts/base/utils/json.bro
@@ -0,0 +1,105 @@
+##! Functions to assist with generating JSON data from Bro data scructures.
+# We might want to implement this in core somtime, this looks... hacky at best.
+
+@load base/utils/strings
+
+## A function to convert arbitrary Bro data into a JSON string.
+##
+## v: The value to convert to JSON.  Typically a record.
+##
+## only_loggable: If the v value is a record this will only cause
+##                fields with the &log attribute to be included in the JSON.
+##
+## returns: a JSON formatted string.
+function to_json(v: any, only_loggable: bool &default=F, field_escape_pattern: pattern &default=/^_/): string
+	{
+	local tn = type_name(v);
+	switch ( tn )
+		{
+		case "type":
+		return "";
+
+		case "string":
+		return cat("\"", gsub(gsub(clean(v), /\/, "\\"), /\"/, "\\""), "\"");
+
+		case "port":
+		return cat(port_to_count(to_port(cat(v))));
+
+		case "addr":
+		fallthrough;
+		case "subnet":
+		return cat("\"", v, "\"");
+
+		case "int":
+		fallthrough;
+		case "count":
+		fallthrough;
+		case "time":
+		fallthrough;
+		case "double":
+		fallthrough;
+		case "bool":
+		fallthrough;
+		case "enum":
+		return cat(v);
+
+		default:
+		break;
+		}
+
+	if ( /^record/ in tn )
+		{
+		local rec_parts: string_vec = vector();
+
+		local ft = record_fields(v);
+		for ( field in ft )
+			{
+			local field_desc = ft[field];
+			# replace the escape pattern in the field.
+			if( field_escape_pattern in field )
+				field = cat(sub(field, field_escape_pattern, ""));
+			if ( field_desc?$value && (!only_loggable || field_desc$log) )
+				{
+				local onepart = cat("\"", field, "\": ", to_json(field_desc$value, only_loggable));
+				rec_parts[|rec_parts|] = onepart;
+				}
+			}
+			return cat("{", join_string_vec(rec_parts, ", "), "}");
+		}
+
+	# None of the following are supported.
+	else if ( /^set/ in tn )
+		{
+		local set_parts: string_vec = vector();
+		local sa: set[bool] = v;
+		for ( sv in sa )
+			{
+			set_parts[|set_parts|] = to_json(sv, only_loggable);
+			}
+		return cat("[", join_string_vec(set_parts, ", "), "]");
+		}
+	else if ( /^table/ in tn )
+		{
+		local tab_parts: vector of string = vector();
+		local ta: table[bool] of any = v;
+		for ( ti in ta )
+			{
+			local ts = to_json(ti);
+			local if_quotes = (ts[0] == "\"") ? "" : "\"";
+			tab_parts[|tab_parts|] = cat(if_quotes, ts, if_quotes, ": ", to_json(ta[ti], only_loggable));
+			}
+		return cat("{", join_string_vec(tab_parts, ", "), "}");
+		}
+	else if ( /^vector/ in tn )
+		{
+		local vec_parts: string_vec = vector();
+		local va: vector of any = v;
+		for ( vi in va )
+			{
+			vec_parts[|vec_parts|] = to_json(va[vi], only_loggable);
+			}
+		return cat("[", join_string_vec(vec_parts, ", "), "]");
+		}
+
+	return "\"\"";
+	}
diff --git a/src/Attr.cc b/src/Attr.cc
index dad51c6..c3d9a23 100644
--- a/src/Attr.cc
+++ b/src/Attr.cc
@@ -18,7 +18,7 @@ const char* attr_name(attr_tag t)
 		"&encrypt",
 		"&raw_output", "&mergeable", "&priority",
 		"&group", "&log", "&error_handler", "&type_column",
-		"(&tracked)", "&deprecated",
+		"(&tracked)", "&deprecated", "&weaken",
 	};

 	return attr_names[int(t)];
@@ -453,6 +453,11 @@ void Attributes::CheckAttr(Attr* a)
 			Error("&log applied to a type that cannot be logged");
 		break;

+	case ATTR_WEAKEN:
+		if ( ! in_record )
+			Error("&weaken applied outside of record");
+		break;
+
 	case ATTR_TYPE_COLUMN:
 		{
 		if ( type->Tag() != TYPE_PORT )
diff --git a/src/Attr.h b/src/Attr.h
index 63f2524..f89fb9f 100644
--- a/src/Attr.h
+++ b/src/Attr.h
@@ -35,7 +35,8 @@ typedef enum {
 	ATTR_TYPE_COLUMN,	// for input framework
 	ATTR_TRACKED,	// hidden attribute, tracked by NotifierRegistry
 	ATTR_DEPRECATED,
-#define NUM_ATTRS (int(ATTR_DEPRECATED) + 1)
+	ATTR_WEAKEN,
+#define NUM_ATTRS (int(ATTR_WEAKEN) + 1)
 } attr_tag;

 class Attr : public BroObj {
diff --git a/src/Type.cc b/src/Type.cc
index 7fab056..8aa32f6 100644
--- a/src/Type.cc
+++ b/src/Type.cc
@@ -1129,7 +1129,10 @@ void RecordType::DescribeFields(ODesc* d) const
 			const TypeDecl* td = FieldDecl(i);
 			d->Add(td->id);
 			d->Add(":");
-			td->type->Describe(d);
+			if ( td->FindAttr(ATTR_WEAKEN) )
+				d->Add("<weakened>");
+			else
+				td->type->Describe(d);
 			d->Add(";");
 			}
 		}
@@ -1170,7 +1173,10 @@ void RecordType::DescribeFieldsReST(ODesc* d, bool func_args) const
 			}

 		const TypeDecl* td = FieldDecl(i);
-		td->DescribeReST(d);
+		if ( td->FindAttr(ATTR_WEAKEN) )
+			d->Add("<weakened>");
+		else
+			td->DescribeReST(d);

 		if ( func_args )
 			continue;
diff --git a/src/bro.bif b/src/bro.bif
index 037b236..6571826 100644
--- a/src/bro.bif
+++ b/src/bro.bif
@@ -2077,6 +2077,33 @@ function is_v6_addr%(a: addr%): bool
 		return new Val(0, TYPE_BOOL);
 	%}

+## Returns whether a subnet specification is IPv4 or not.
+##
+## s: the subnet to check.
+##
+## Returns: true if *a* is an IPv4 subnet, else false.
+function is_v4_subnet%(s: subnet%): bool
+	%{
+	if ( s->AsSubNet().Prefix().GetFamily() == IPv4 )
+		return new Val(1, TYPE_BOOL);
+	else
+		return new Val(0, TYPE_BOOL);
+	%}
+
+## Returns whether a subnet specification is IPv6 or not.
+##
+## s: the subnet to check.
+##
+## Returns: true if *a* is an IPv6 subnet, else false.
+function is_v6_subnet%(s: subnet%): bool
+	%{
+	if ( s->AsSubNet().Prefix().GetFamily() == IPv6 )
+		return new Val(1, TYPE_BOOL);
+	else
+		return new Val(0, TYPE_BOOL);
+	%}
+
+
 # ===========================================================================
 #
 #                                 Conversion
@@ -2367,6 +2394,44 @@ function to_subnet%(sn: string%): subnet
 	return ret;
 	%}

+## Converts a :bro:type:`addr` to a :bro:type:`subnet`.
+##
+## a: The address to convert.
+##
+## Returns: The *a* address as a :bro:type:`subnet`.
+##
+## .. bro:see:: to_subset
+function addr_to_subnet%(a: addr%): subnet
+	%{
+	int width = (a->AsAddr().GetFamily() == IPv4 ? 32 : 128);
+	return new SubNetVal(a->AsAddr(), width);
+	%}
+
+## Converts a :bro:type:`subnet` to a :bro:type:`addr` by
+## extracting the prefix.
+##
+## s: The subnet to convert.
+##
+## Returns: The *s* subnet as a :bro:type:`addr`.
+##
+## .. bro:see:: to_subset
+function subnet_to_addr%(sn: subnet%): addr
+	%{
+	return new AddrVal(sn->Prefix());
+	%}
+
+## Returns the width of a :bro:type:`subnet`.
+##
+## s: The subnet to convert.
+##
+## Returns: The width of the subnet.
+##
+## .. bro:see:: to_subset
+function subnet_width%(sn: subnet%): count
+	%{
+	return new Val(sn->Width(), TYPE_COUNT);
+	%}
+
 ## Converts a :bro:type:`string` to a :bro:type:`double`.
 ##
 ## str: The :bro:type:`string` to convert.
diff --git a/src/parse.y b/src/parse.y
index c677328..ad275e0 100644
--- a/src/parse.y
+++ b/src/parse.y
@@ -2,7 +2,7 @@
 // See the file "COPYING" in the main distribution directory for copyright.
 %}

-%expect 78
+%expect 81

 %token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY
 %token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF
@@ -25,7 +25,7 @@
 %token TOK_ATTR_PERSISTENT TOK_ATTR_SYNCHRONIZED
 %token TOK_ATTR_RAW_OUTPUT TOK_ATTR_MERGEABLE
 %token TOK_ATTR_PRIORITY TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER
-%token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED
+%token TOK_ATTR_TYPE_COLUMN TOK_ATTR_DEPRECATED TOK_ATTR_WEAKEN

 %token TOK_DEBUG

@@ -1285,6 +1285,8 @@ attr:
 			{ $$ = new Attr(ATTR_ERROR_HANDLER); }
 	|	TOK_ATTR_DEPRECATED
 			{ $$ = new Attr(ATTR_DEPRECATED); }
+	|	TOK_ATTR_WEAKEN
+			{ $$ = new Attr(ATTR_WEAKEN); }
 	;

 stmt:
diff --git a/src/scan.l b/src/scan.l
index a6e37a6..8103201 100644
--- a/src/scan.l
+++ b/src/scan.l
@@ -276,6 +276,7 @@ when	return TOK_WHEN;
 &type_column	return TOK_ATTR_TYPE_COLUMN;
 &read_expire	return TOK_ATTR_EXPIRE_READ;
 &redef		return TOK_ATTR_REDEF;
+&weaken	return TOK_ATTR_WEAKEN;
 &write_expire	return TOK_ATTR_EXPIRE_WRITE;

 &encrypt {
Example script using OpenFlow and NetControl to shunt GridFTP traffic (download)
@load base/protocols/conn
@load base/frameworks/openflow
@load base/frameworks/netcontrol

const broker_port: port = 9999/tcp &redef;
global of_controller: OpenFlow::Controller;

# Switch datapath ID
const switch_dpid: count = 12 &redef;
# port on which Bro is listening - we install a rule to the switch to mirror traffic here...
const switch_bro_port: count = 19 &redef;


event bro_init() &priority=2
	{
	of_controller = OpenFlow::broker_new("of", 127.0.0.1, broker_port, "bro/event/openflow", switch_dpid);
	local pacf_of = NetControl::create_openflow(of_controller, NetControl::OfConfig($monitor=T, $forward=F, $priority_offset=+5));
	NetControl::activate(pacf_of, 0);
	}

event BrokerComm::outgoing_connection_established(peer_address: string,
                                            peer_port: port,
                                            peer_name: string)
	{
	print "BrokerComm::outgoing_connection_established", peer_address, peer_port;
	OpenFlow::flow_clear(of_controller);
	OpenFlow::flow_mod(of_controller, [], [$cookie=OpenFlow::generate_cookie(1337), $priority=2, $command=OpenFlow::OFPFC_ADD, $actions=[$out_ports=vector(switch_bro_port)]]);
	}

event NetControl::rule_added(r: NetControl::Rule, p: NetControl::PluginState, msg: string)
	{
	print "Rule added successfully", r$id;
	}

event NetControl::rule_error(r: NetControl::Rule, p: NetControl::PluginState, msg: string)
	{
	print "Rule error", r$id, msg;
	}

event NetControl::rule_timeout(r: NetControl::Rule, i: NetControl::FlowInfo, p: NetControl::PluginState)
	{
	print "Rule timeout", r$id, i;
	}

event OpenFlow::flow_mod_success(match: OpenFlow::ofp_match, flow_mod: OpenFlow::ofp_flow_mod, msg: string)
	{
	#print "Flow mod success";
	}

event OpenFlow::flow_mod_failure(match: OpenFlow::ofp_match, flow_mod: OpenFlow::ofp_flow_mod, msg: string)
	{
	print "Flow mod failure", flow_mod$cookie, msg;
	}

event OpenFlow::flow_removed(match: OpenFlow::ofp_match, cookie: count, priority: count, reason: count, duration_sec: count, idle_timeout: count, packet_count: count, byte_count: count)
	{
	print "Flow removed", match;
	}

# Shunt all ssl, gridftp and ssh connections after we cannot get any data from them anymore

event ssl_established(c: connection)
	{
	local id = c$id;
	NetControl::shunt_flow([$src_h=id$orig_h, $src_p=id$orig_p, $dst_h=id$resp_h, $dst_p=id$resp_p], 30sec);
	}

event GridFTP::data_channel_detected(c: connection)
	{
	local id = c$id;
	NetControl::shunt_flow([$src_h=id$orig_h, $src_p=id$orig_p, $dst_h=id$resp_h, $dst_p=id$resp_p], 30sec);
	}

event ssh_auth_successful(c: connection, auth_method_none: bool)
	{
	if ( ! c$ssh$auth_success )
		return;

	local id = c$id;
	NetControl::shunt_flow([$src_h=id$orig_h, $src_p=id$orig_p, $dst_h=id$resp_h, $dst_p=id$resp_p], 5sec);
	print current_time();
	}