LCOV - code coverage report
Current view: top level - src - RemoteSerializer.h (source / functions) Hit Total Coverage
Test: app.info Lines: 7 27 25.9 %
Date: 2010-12-13 Functions: 9 36 25.0 %
Branches: 1 2 50.0 %

           Branch data     Line data    Source code
       1                 :            : // $Id: RemoteSerializer.h 6951 2009-12-04 22:23:28Z vern $
       2                 :            : //
       3                 :            : // Communication between two Bro's.
       4                 :            : 
       5                 :            : #ifndef REMOTE_SERIALIZER
       6                 :            : #define REMOTE_SERIALIZER
       7                 :            : 
       8                 :            : #include "Dict.h"
       9                 :            : #include "List.h"
      10                 :            : #include "Serializer.h"
      11                 :            : #include "IOSource.h"
      12                 :            : #include "Stats.h"
      13                 :            : #include "File.h"
      14                 :            : 
      15                 :            : // All IP arguments are in host byte-order.
      16                 :            : // FIXME: Change this to network byte order
      17                 :            : 
      18                 :            : class IncrementalSendTimer;
      19                 :            : 
      20                 :            : // This class handles the communication done in Bro's main loop.
      21                 :            : class RemoteSerializer : public Serializer, public IOSource {
      22                 :            : public:
      23                 :            :         RemoteSerializer();
      24                 :            :         virtual ~RemoteSerializer();
      25                 :            : 
      26                 :            :         // Initialize the remote serializer (calling this will fork).
      27                 :            :         void Init();
      28                 :            : 
      29                 :            :         // FIXME: Use SourceID directly (or rename everything to Peer*).
      30                 :            :         typedef SourceID PeerID;
      31                 :            :         static const PeerID PEER_LOCAL = SOURCE_LOCAL;
      32                 :            :         static const PeerID PEER_NONE = SOURCE_LOCAL;
      33                 :            : 
      34                 :            :         // Connect to host (returns PEER_NONE on error).
      35                 :            :         PeerID Connect(addr_type ip, uint16 port, const char* our_class, double retry, bool use_ssl);
      36                 :            : 
      37                 :            :         // Request all events matching pattern from remote side.
      38                 :            :         bool RequestEvents(PeerID peer, RE_Matcher* pattern);
      39                 :            : 
      40                 :            :         // Request synchronization of IDs with remote side.  If auth is true,
      41                 :            :         // we consider our current state to authoritative and send it to
      42                 :            :         // the peer right after the handshake.
      43                 :            :         bool RequestSync(PeerID peer, bool auth);
      44                 :            : 
      45                 :            :         // Sets flag whether we're accepting state from this peer
      46                 :            :         // (default: yes).
      47                 :            :         bool SetAcceptState(PeerID peer, bool accept);
      48                 :            : 
      49                 :            :         // Sets compression level (0-9, 0 is defaults and means no compression)
      50                 :            :         bool SetCompressionLevel(PeerID peer, int level);
      51                 :            : 
      52                 :            :         // Signal the other side that we have finished our part of
      53                 :            :         // the initial handshake.
      54                 :            :         bool CompleteHandshake(PeerID peer);
      55                 :            : 
      56                 :            :         // Start to listen.
      57                 :            :         bool Listen(addr_type ip, uint16 port, bool expect_ssl);
      58                 :            : 
      59                 :            :         // Stop it.
      60                 :            :         bool StopListening();
      61                 :            : 
      62                 :            :         // Broadcast the event/function call.
      63                 :            :         bool SendCall(SerialInfo* info, const char* name, val_list* vl);
      64                 :            : 
      65                 :            :         // Send the event/function call (only if handshake completed).
      66                 :            :         bool SendCall(SerialInfo* info, PeerID peer, const char* name, val_list* vl);
      67                 :            : 
      68                 :            :         // Broadcasts the access (only if handshake completed).
      69                 :            :         bool SendAccess(SerialInfo* info, const StateAccess& access);
      70                 :            : 
      71                 :            :         // Send the access.
      72                 :            :         bool SendAccess(SerialInfo* info, PeerID pid, const StateAccess& access);
      73                 :            : 
      74                 :            :         // Sends ID.
      75                 :            :         bool SendID(SerialInfo* info, PeerID peer, const ID& id);
      76                 :            : 
      77                 :            :         // Sends the internal connection state.
      78                 :            :         bool SendConnection(SerialInfo* info, PeerID peer, const Connection& c);
      79                 :            : 
      80                 :            :         // Send capture filter.
      81                 :            :         bool SendCaptureFilter(PeerID peer, const char* filter);
      82                 :            : 
      83                 :            :         // Send packet.
      84                 :            :         bool SendPacket(SerialInfo* info, PeerID peer, const Packet& p);
      85                 :            : 
      86                 :            :         // Broadcast packet.
      87                 :            :         bool SendPacket(SerialInfo* info, const Packet& p);
      88                 :            : 
      89                 :            :         // Broadcast ping.
      90                 :            :         bool SendPing(PeerID peer, uint32 seq);
      91                 :            : 
      92                 :            :         // Broadcast remote print.
      93                 :            :         bool SendPrintHookEvent(BroFile* f, const char* txt);
      94                 :            : 
      95                 :            :         // Synchronzizes time with all connected peers. Returns number of
      96                 :            :         // current sync-point, or -1 on error.
      97                 :            :         uint32 SendSyncPoint();
      98                 :            :         void SendFinalSyncPoint();
      99                 :            : 
     100                 :            :         // Registers the ID to be &synchronized.
     101                 :            :         void Register(ID* id);
     102                 :            :         void Unregister(ID* id);
     103                 :            : 
     104                 :            :         // Stop/restart propagating state updates.
     105                 :          0 :         void SuspendStateUpdates()      { --propagate_accesses; }
     106                 :          0 :         void ResumeStateUpdates()       { ++propagate_accesses; }
     107                 :            : 
     108                 :            :         // Check for incoming events and queue them.
     109                 :            :         bool Poll(bool may_block);
     110                 :            : 
     111                 :            :         // Returns the corresponding record (already ref'ed).
     112                 :            :         RecordVal* GetPeerVal(PeerID id);
     113                 :            : 
     114                 :            :         // Log some statistics.
     115                 :            :         void LogStats();
     116                 :            : 
     117                 :            :         // Return a 0-terminated array of built-in functions which,
     118                 :            :         // when referenced, trigger the remote serializer's initialization.
     119                 :            :         const char* const* GetBuiltins() const;
     120                 :            : 
     121                 :            :         // Tries to sent out all remaining data.
     122                 :            :         // FIXME: Do we still need this?
     123                 :            :         void Finish();
     124                 :            : 
     125                 :            :         // Overidden from IOSource:
     126                 :            :         virtual void GetFds(int* read, int* write, int* except);
     127                 :            :         virtual double NextTimestamp(double* local_network_time);
     128                 :            :         virtual void Process();
     129                 :            :         virtual TimerMgr::Tag* GetCurrentTag();
     130                 :          0 :         virtual const char* Tag()       { return "RemoteSerializer"; }
     131                 :            : 
     132                 :            :         // Gracefully finishes communication by first making sure that all
     133                 :            :         // remaining data (parent & child) has been sent out.
     134                 :            :         virtual bool Terminate();
     135                 :            : 
     136                 :            : #ifdef DEBUG_COMMUNICATION
     137                 :            :         // Dump data recently read/written into files.
     138                 :            :         void DumpDebugData();
     139                 :            : 
     140                 :            :         // Read dump file and interpret as message block.
     141                 :            :         void ReadDumpAsMessageType(const char* file);
     142                 :            : 
     143                 :            :         // Read dump file and interpret as serialization.
     144                 :            :         void ReadDumpAsSerialization(const char* file);
     145                 :            : #endif
     146                 :            : 
     147                 :            :         enum LogLevel { LogInfo = 1, LogError = 2, };
     148                 :            :         static void Log(LogLevel level, const char* msg);
     149                 :            : 
     150                 :            : protected:
     151                 :            :         friend class PersistenceSerializer;
     152                 :            :         friend class IncrementalSendTimer;
     153                 :            : 
     154                 :            :         // Maximum size of serialization caches.
     155                 :            :         static const unsigned int MAX_CACHE_SIZE = 3000;
     156                 :            : 
     157                 :            :         // When syncing traces in pseudo-realtime mode, we wait this many
     158                 :            :         // seconds after the final sync-point to make sure that all
     159                 :            :         // remaining I/O gets propagated.
     160                 :            :         static const unsigned int FINAL_SYNC_POINT_DELAY = 5;
     161                 :            : 
     162                 :          0 :         declare(PList, EventHandler);
     163                 :            :         typedef PList(EventHandler) handler_list;
     164                 :            : 
     165                 :          0 :         struct Peer {
     166                 :            :                 PeerID id; // Unique ID (non-zero) per peer.
     167                 :            : 
     168                 :            :                 // ### Fix: currently, we only work for IPv4.
     169                 :            :                 // addr_type ip;
     170                 :            :                 uint32 ip;
     171                 :            : 
     172                 :            :                 uint16 port;
     173                 :            :                 handler_list handlers;
     174                 :            :                 RecordVal* val;         // Record of type event_source.
     175                 :            :                 SerializationCache* cache_in;   // One cache for each direction.
     176                 :            :                 SerializationCache* cache_out;
     177                 :            : 
     178                 :            :                 // TCP-level state of the connection to the peer.
     179                 :            :                 // State of the connection to the peer.
     180                 :            :                 enum { INIT, PENDING, CONNECTED, CLOSING, CLOSED } state;
     181                 :            : 
     182                 :            :                 // Current protocol phase of the connection (see RemoteSerializer.cc)
     183                 :            :                 enum { UNKNOWN, SETUP, HANDSHAKE, SYNC, RUNNING } phase;
     184                 :            : 
     185                 :            :                 // Capabilities.
     186                 :            :                 static const int COMPRESSION = 1;
     187                 :            :                 static const int NO_CACHING = 2;
     188                 :            :                 static const int PID_64BIT = 4;
     189                 :            :                 static const int NEW_CACHE_STRATEGY = 8;
     190                 :            : 
     191                 :            :                 // Constants to remember to who did something.
     192                 :            :                 static const int NONE = 0;
     193                 :            :                 static const int WE = 1;
     194                 :            :                 static const int PEER = 2;
     195                 :            :                 static const int BOTH = WE | PEER;
     196                 :            : 
     197                 :            :                 static const int AUTH_WE = 4;
     198                 :            :                 static const int AUTH_PEER = 8;
     199                 :            : 
     200                 :            :                 int sent_version;       // Who has sent the VERSION.
     201                 :            :                 int handshake_done;     // Who finished its handshake phase.
     202                 :            :                 int sync_requested;     // Who requested sync'ed state.
     203                 :            : 
     204                 :            :                 bool orig;      // True if we connected to the peer.
     205                 :            :                 bool accept_state;      // True if we accept state from peer.
     206                 :            :                 bool send_state; // True if we're supposed to initially sent our state.
     207                 :            :                 int comp_level; // Compression level.
     208                 :            : 
     209                 :            :                 // True if this peer triggered a net_suspend_processing().
     210                 :            :                 bool suspended_processing;
     211                 :            : 
     212                 :            :                 uint32 caps;    // Capabilities announced by peer.
     213                 :            :                 int runtime;    // Runtime we got from the peer.
     214                 :            :                 int our_runtime;        // Our runtime as we told it to this peer.
     215                 :            :                 string peer_class;      // Class from peer ("" = no class).
     216                 :            :                 string our_class;       // Class we send the peer.
     217                 :            :                 uint32 sync_point;      // Highest sync-point received so far
     218                 :            :                 char* print_buffer;     // Buffer for remote print or null.
     219                 :            :                 int print_buffer_used;  // Number of bytes used in buffer.
     220                 :            :         };
     221                 :            : 
     222                 :            :         // Shuts down remote serializer.
     223                 :            :         void FatalError(const char* msg);
     224                 :            : 
     225                 :            :         enum LogSrc { LogChild = 1, LogParent = 2, LogScript = 3, };
     226                 :            : 
     227                 :            :         static void Log(LogLevel level, const char* msg, Peer* peer, LogSrc src = LogParent);
     228                 :            : 
     229                 :            :         virtual void ReportError(const char* msg);
     230                 :            : 
     231                 :            :         virtual void GotEvent(const char* name, double time,
     232                 :            :                                 EventHandlerPtr event, val_list* args);
     233                 :            :         virtual void GotFunctionCall(const char* name, double time,
     234                 :            :                                 Func* func, val_list* args);
     235                 :            :         virtual void GotID(ID* id, Val* val);
     236                 :            :         virtual void GotStateAccess(StateAccess* s);
     237                 :            :         virtual void GotTimer(Timer* t);
     238                 :            :         virtual void GotConnection(Connection* c);
     239                 :            :         virtual void GotPacket(Packet* packet);
     240                 :            : 
     241                 :            :         void Fork();
     242                 :            : 
     243                 :            :         bool DoMessage();
     244                 :            :         bool ProcessConnected();
     245                 :            :         bool ProcessSerialization();
     246                 :            :         bool ProcessRequestEventsMsg();
     247                 :            :         bool ProcessRequestSyncMsg();
     248                 :            :         bool ProcessVersionMsg();
     249                 :            :         bool ProcessLogMsg(bool is_error);
     250                 :            :         bool ProcessStatsMsg();
     251                 :            :         bool ProcessCaptureFilterMsg();
     252                 :            :         bool ProcessPhaseDone();
     253                 :            :         bool ProcessPingMsg();
     254                 :            :         bool ProcessPongMsg();
     255                 :            :         bool ProcessCapsMsg();
     256                 :            :         bool ProcessSyncPointMsg();
     257                 :            :         bool ProcessRemotePrint();
     258                 :            : 
     259                 :            :         Peer* AddPeer(uint32 ip, uint16 port, PeerID id = PEER_NONE);
     260                 :            :         Peer* LookupPeer(PeerID id, bool only_if_connected);
     261                 :            :         void RemovePeer(Peer* peer);
     262                 :            :         bool IsConnectedPeer(PeerID id);
     263                 :            :         void PeerDisconnected(Peer* peer);
     264                 :            :         void PeerConnected(Peer* peer);
     265                 :            :         RecordVal* MakePeerVal(Peer* peer);
     266                 :            :         bool HandshakeDone(Peer* peer);
     267                 :            :         bool IsActive();
     268                 :            :         void SetupSerialInfo(SerialInfo* info, Peer* peer);
     269                 :            :         bool CheckSyncPoints();
     270                 :            :         void SendSyncPoint(uint32 syncpoint);
     271                 :          2 :         bool PropagateAccesses()
     272                 :            :                 {
     273                 :            :                 return ignore_accesses ?
     274         [ -  + ]:          2 :                         propagate_accesses > 1 : propagate_accesses > 0;
     275                 :            :                 }
     276                 :            : 
     277                 :            :         bool CloseConnection(Peer* peer);
     278                 :            : 
     279                 :            :         bool SendAllSynchronized(Peer* peer, SerialInfo* info);
     280                 :            :         bool SendCall(SerialInfo* info, Peer* peer, const char* name, val_list* vl);
     281                 :            :         bool SendAccess(SerialInfo* info, Peer* peer, const StateAccess& access);
     282                 :            :         bool SendID(SerialInfo* info, Peer* peer, const ID& id);
     283                 :            :         bool SendCapabilities(Peer* peer);
     284                 :            :         bool SendPacket(SerialInfo* info, Peer* peer, const Packet& p);
     285                 :            : 
     286                 :            :         void UnregisterHandlers(Peer* peer);
     287                 :            :         void RaiseEvent(EventHandlerPtr event, Peer* peer, const char* arg = 0);
     288                 :            :         bool EnterPhaseRunning(Peer* peer);
     289                 :            :         bool FlushPrintBuffer(Peer* p);
     290                 :            : 
     291                 :            :         void ChildDied();
     292                 :            :         void InternalCommError(const char* msg);
     293                 :            : 
     294                 :            :         // Communication helpers
     295                 :            :         bool SendCMsgToChild(char msg_type, Peer* peer);
     296                 :            :         bool SendToChild(char type, Peer* peer, char* str, int len = -1);
     297                 :            :         bool SendToChild(char type, Peer* peer, int nargs, ...); // can send uints32 only
     298                 :            :         bool SendToChild(ChunkedIO::Chunk* c);
     299                 :            : 
     300                 :            : private:
     301                 :            :         enum { TYPE, ARGS } msgstate;   // current state of reading comm.
     302                 :            :         Peer* current_peer;
     303                 :            :         PeerID current_id;
     304                 :            :         char current_msgtype;
     305                 :            :         ChunkedIO::Chunk* current_args;
     306                 :            : 
     307                 :            :         id_list sync_ids;
     308                 :            : 
     309                 :            :         // FIXME: Check which of these are necessary...
     310                 :            :         bool initialized;
     311                 :            :         bool listening;
     312                 :            :         int propagate_accesses;
     313                 :            :         bool ignore_accesses;
     314                 :            :         bool terminating;
     315                 :            :         Peer* source_peer;
     316                 :            :         PeerID id_counter;      // Keeps track of assigned IDs.
     317                 :            :         uint32 current_sync_point;
     318                 :            :         bool syncing_times;
     319                 :            : 
     320                 :          8 :         declare(PList, Peer);
     321                 :            :         typedef PList(Peer) peer_list;
     322                 :            :         peer_list peers;
     323                 :            : 
     324                 :            :         Peer* in_sync; // Peer we're currently syncing state with.
     325                 :            :         peer_list sync_pending; // List of peers waiting to sync state.
     326                 :            : 
     327                 :            :         // Event buffer
     328                 :          0 :         struct BufferedEvent {
     329                 :            :                 time_t time;
     330                 :            :                 PeerID src;
     331                 :            :                 EventHandlerPtr handler;
     332                 :            :                 val_list* args;
     333                 :            :         };
     334                 :            : 
     335                 :          4 :         declare(PList, BufferedEvent);
     336                 :            :         typedef PList(BufferedEvent) EventQueue;
     337                 :            :         EventQueue events;
     338                 :            : 
     339                 :            :         // Packet buffer
     340                 :            :         struct BufferedPacket {
     341                 :            :                 time_t time;
     342                 :            :                 Packet* p;
     343                 :            :         };
     344                 :            : 
     345                 :          4 :         declare(PList, BufferedPacket);
     346                 :            :         typedef PList(BufferedPacket) PacketQueue;
     347                 :            :         PacketQueue packets;
     348                 :            : 
     349                 :            :         // Some stats
     350                 :          3 :         struct Statistics {
     351                 :            :                 struct Pair {
     352                 :         15 :                 Pair() : in(0), out(0)  {}
     353                 :            :                         unsigned long in;
     354                 :            :                         unsigned long out;
     355                 :            :                         };
     356                 :            : 
     357                 :            :                 Pair events; // actually events and function calls
     358                 :            :                 Pair accesses;
     359                 :            :                 Pair conns;
     360                 :            :                 Pair packets;
     361                 :            :                 Pair ids;
     362                 :            :         } stats;
     363                 :            : 
     364                 :            : };
     365                 :            : 
     366                 :            : // This class handles the communication done in the forked child.
     367                 :            : class SocketComm {
     368                 :            : public:
     369                 :            :         SocketComm();
     370                 :            :         ~SocketComm();
     371                 :            : 
     372                 :          0 :         void SetParentIO(ChunkedIO* arg_io)     { io = arg_io; }
     373                 :            : 
     374                 :            :         void Run();     // does not return
     375                 :            : 
     376                 :            :         // Log some statistics (via pipe to parent).
     377                 :            :         bool LogStats();
     378                 :            : 
     379                 :            :         // Log CPU usage (again via pipe to parent).
     380                 :            :         bool LogProf();
     381                 :            : 
     382                 :            : protected:
     383                 :            :         struct Peer {
     384                 :          0 :                 Peer()
     385                 :            :                         {
     386                 :          0 :                         id = 0;
     387                 :          0 :                         io = 0;
     388                 :          0 :                         ip = 0;
     389                 :          0 :                         port = 0;
     390                 :          0 :                         state = 0;
     391                 :          0 :                         connected = false;
     392                 :          0 :                         ssl = false;
     393                 :          0 :                         retry = 0;
     394                 :          0 :                         next_try = 0;
     395                 :          0 :                         compressor = false;
     396                 :          0 :                         }
     397                 :            : 
     398                 :            :                 RemoteSerializer::PeerID id;
     399                 :            :                 ChunkedIO* io;
     400                 :            :                 uint32 ip;
     401                 :            :                 uint16 port;
     402                 :            :                 char state;
     403                 :            :                 bool connected;
     404                 :            :                 bool ssl;
     405                 :            :                 // If we get disconnected, reconnect after this many seconds.
     406                 :            :                 int retry;
     407                 :            :                 // Time of next connection attempt (0 if none).
     408                 :            :                 time_t next_try;
     409                 :            :                 // True if io is a CompressedChunkedIO.
     410                 :            :                 bool compressor;
     411                 :            :         };
     412                 :            : 
     413                 :            :         bool Listen(uint32 ip, uint16 port, bool expect_ssl);
     414                 :            :         bool AcceptConnection(int listen_fd);
     415                 :            :         bool Connect(Peer* peer);
     416                 :            :         bool CloseConnection(Peer* peer, bool reconnect);
     417                 :            : 
     418                 :            :         Peer* LookupPeer(RemoteSerializer::PeerID id, bool only_if_connected);
     419                 :            : 
     420                 :            :         bool ProcessRemoteMessage(Peer* peer);
     421                 :            :         bool ProcessParentMessage();
     422                 :            :         bool DoParentMessage();
     423                 :            : 
     424                 :            :         bool ProcessListen();
     425                 :            :         bool ProcessConnectTo();
     426                 :            :         bool ProcessCompress();
     427                 :            : 
     428                 :            :         void Log(const char* msg, Peer* peer = 0);
     429                 :            : 
     430                 :            :         // The connection to the peer will be closed.
     431                 :            :         bool Error(const char* msg, Peer* peer);
     432                 :            : 
     433                 :            :         // If kill is true, this is a fatal error and we kill ourselves.
     434                 :            :         void Error(const char* msg, bool kill = false);
     435                 :            : 
     436                 :            :         // Kill the current process.
     437                 :            :         void Kill();
     438                 :            : 
     439                 :            :         // Check whether everything has been sent out.
     440                 :            :         void CheckFinished();
     441                 :            : 
     442                 :            :         // Communication helpers.
     443                 :            :         bool SendToParent(char type, Peer* peer, const char* str, int len = -1);
     444                 :            :         bool SendToParent(char type, Peer* peer, int nargs, ...); // can send uints32 only
     445                 :            :         bool SendToParent(ChunkedIO::Chunk* c);
     446                 :            :         bool SendToPeer(Peer* peer, char type, const char* str, int len = -1);
     447                 :            :         bool SendToPeer(Peer* peer, char type, int nargs, ...); // can send uints32 only
     448                 :            :         bool SendToPeer(Peer* peer, ChunkedIO::Chunk* c);
     449                 :            :         bool ProcessParentCompress();
     450                 :            :         bool ProcessPeerCompress(Peer* peer);
     451                 :            :         bool ForwardChunkToParent(Peer* p, ChunkedIO::Chunk* c);
     452                 :            :         bool ForwardChunkToPeer();
     453                 :            :         const char* MakeLogString(const char* msg, Peer *peer);
     454                 :            : 
     455                 :            :         // Peers we are communicating with:
     456                 :          0 :         declare(PList, Peer);
     457                 :            :         typedef PList(Peer) peer_list;
     458                 :            : 
     459                 :            :         RemoteSerializer::PeerID id_counter;
     460                 :            :         peer_list peers;
     461                 :            : 
     462                 :            :         ChunkedIO* io;  // I/O to parent
     463                 :            : 
     464                 :            :         // Current state of reading from parent.
     465                 :            :         enum { TYPE, ARGS } parent_msgstate;
     466                 :            :         Peer* parent_peer;
     467                 :            :         RemoteSerializer::PeerID parent_id;
     468                 :            :         char parent_msgtype;
     469                 :            :         ChunkedIO::Chunk* parent_args;
     470                 :            : 
     471                 :            :         int listen_fd_clear;
     472                 :            :         int listen_fd_ssl;
     473                 :            : 
     474                 :            :         // If the port we're trying to bind to is already in use, we will retry
     475                 :            :         // it regularly.
     476                 :            :         uint32 listen_if;       // Fix: only supports IPv4
     477                 :            :         uint16 listen_port;
     478                 :            :         bool listen_ssl;
     479                 :            :         time_t listen_next_try;
     480                 :            :         bool shutting_conns_down;
     481                 :            :         bool terminating;
     482                 :            :         bool killing;
     483                 :            : };
     484                 :            : 
     485                 :            : extern RemoteSerializer* remote_serializer;
     486                 :            : 
     487                 :            : #endif

Generated by: LCOV version 1.8