LCOV - code coverage report
Current view: top level - src - ChunkedIO.h (source / functions) Hit Total Coverage
Test: app.info Lines: 11 35 31.4 %
Date: 2010-12-13 Functions: 3 27 11.1 %
Branches: 0 16 0.0 %

           Branch data     Line data    Source code
       1                 :            : // $Id: ChunkedIO.h 6888 2009-08-20 18:23:11Z vern $
       2                 :            : //
       3                 :            : // Implements non-blocking chunk-wise I/O.
       4                 :            : 
       5                 :            : #ifndef CHUNKEDIO_H
       6                 :            : #define CHUNKEDIO_H
       7                 :            : 
       8                 :            : #include "config.h"
       9                 :            : #include "List.h"
      10                 :            : #include "util.h"
      11                 :            : 
      12                 :            : #include <list>
      13                 :            : 
      14                 :            : #ifdef NEED_KRB5_H
      15                 :            : # include <krb5.h>
      16                 :            : #endif 
      17                 :            : 
      18                 :            : #include <openssl/ssl.h>
      19                 :            : #include <openssl/err.h>
      20                 :            : 
      21                 :            : class CompressedChunkedIO;
      22                 :            : 
      23                 :            : // #define DEBUG_COMMUNICATION 10
      24                 :            : 
      25                 :            : // Abstract base class.
      26                 :            : class ChunkedIO {
      27                 :            : public:
      28                 :            :         ChunkedIO();
      29 [ #  # ][ #  # ]:          0 :         virtual ~ChunkedIO()    { }
                 [ #  # ]
      30                 :            : 
      31                 :            :         typedef struct {
      32                 :            :                 char* data;
      33                 :            :                 uint32 len;
      34                 :            :         } Chunk;
      35                 :            : 
      36                 :            :         // Initialization before any I/O operation is performed. Returns false
      37                 :            :         // on any form of error.
      38                 :          0 :         virtual bool Init()     { return true; }
      39                 :            : 
      40                 :            :         // Tries to read the next chunk of data. If it can be read completely,
      41                 :            :         // a pointer to it is returned in 'chunk' (ownership of chunk is
      42                 :            :         // passed).  If not, 'chunk' is set to nil.  Returns false if any
      43                 :            :         // I/O error occurred (use Eof() to see if it's an end-of-file).
      44                 :            :         // If 'may_block' is true, we explicitly allow blocking.
      45                 :            :         virtual bool Read(Chunk** chunk, bool may_block = false) = 0;
      46                 :            : 
      47                 :            :         // Puts the chunk into the write queue and writes as much data
      48                 :            :         // as possible (takes ownership of chunk).
      49                 :            :         // Returns false on any I/O error.
      50                 :            :         virtual bool Write(Chunk* chunk) = 0;
      51                 :            : 
      52                 :            :         // Tries to write as much as currently possible.
      53                 :            :         // Returns false on any I/O error.
      54                 :            :         virtual bool Flush() = 0;
      55                 :            : 
      56                 :            :         // If an I/O error has been encountered, returns a string describing it.
      57                 :            :         virtual const char* Error() = 0;
      58                 :            : 
      59                 :            :         // Return true if there is currently at least one chunk available
      60                 :            :         // for reading.
      61                 :            :         virtual bool CanRead() = 0;
      62                 :            : 
      63                 :            :         // Return true if there is currently at least one chunk waiting to be
      64                 :            :         // written.
      65                 :            :         virtual bool CanWrite() = 0;
      66                 :            : 
      67                 :            :         // Returns true if source believes that there won't be much data soon.
      68                 :            :         virtual bool IsIdle() = 0;
      69                 :            : 
      70                 :            :         // Returns true if internal write buffers are about to fill up.
      71                 :            :         virtual bool IsFillingUp() = 0;
      72                 :            : 
      73                 :            :         // Throws away buffered data.
      74                 :            :         virtual void Clear() = 0;
      75                 :            : 
      76                 :            :         // Returns true,if end-of-file has been reached.
      77                 :            :         virtual bool Eof() = 0;
      78                 :            : 
      79                 :            :         // Returns underlying fd if available, -1 otherwise.
      80                 :          0 :         virtual int Fd()        { return -1; }
      81                 :            : 
      82                 :            :         // Makes sure that no additional protocol data is written into
      83                 :            :         // the output stream.  If this is activated, the output cannot
      84                 :            :         // be read again by any of these classes!
      85                 :          0 :         void MakePure() { pure = true; }
      86                 :          3 :         bool IsPure()   { return pure; }
      87                 :            : 
      88                 :            :         // Writes a log message to the error_fd.
      89                 :            :         void Log(const char* str);
      90                 :            : 
      91                 :            :         struct Statistics {
      92                 :          2 :                 Statistics()
      93                 :            :                         {
      94                 :          2 :                         bytes_read = 0;
      95                 :          2 :                         bytes_written = 0;
      96                 :          2 :                         chunks_read = 0;
      97                 :          2 :                         chunks_written = 0;
      98                 :          2 :                         reads = 0;
      99                 :          2 :                         writes = 0;
     100                 :          2 :                         pending = 0;
     101                 :          2 :                         }
     102                 :            : 
     103                 :            :                 unsigned long bytes_read;
     104                 :            :                 unsigned long bytes_written;
     105                 :            :                 unsigned long chunks_read;
     106                 :            :                 unsigned long chunks_written;
     107                 :            :                 unsigned long reads;    // # calls which transferred > 0 bytes
     108                 :            :                 unsigned long writes;
     109                 :            :                 unsigned long pending;
     110                 :            :                 };
     111                 :            : 
     112                 :            :         // Returns raw statistics.
     113                 :          0 :         const Statistics* Stats() const         { return &stats; }
     114                 :            : 
     115                 :            :         // Puts a formatted string containing statistics into buffer.
     116                 :            :         virtual void Stats(char* buffer, int length);
     117                 :            : 
     118                 :            : #ifdef DEBUG_COMMUNICATION
     119                 :            :         void DumpDebugData(const char* basefnname, bool want_reads);
     120                 :            : #endif
     121                 :            : 
     122                 :            : protected:
     123                 :            :         Statistics stats;
     124                 :            :         const char* tag;
     125                 :            : 
     126                 :            : #ifdef DEBUG_COMMUNICATION
     127                 :            :         void AddToBuffer(char* data, bool is_read)
     128                 :            :                 { AddToBuffer(strlen(data), data, is_read); }
     129                 :            :         void AddToBuffer(uint32 len, char* data, bool is_read);
     130                 :            :         void AddToBuffer(Chunk* chunk, bool is_read);
     131                 :            :         std::list<Chunk*> data_read;
     132                 :            :         std::list<Chunk*> data_written;
     133                 :            : #endif
     134                 :            : 
     135                 :            : private:
     136                 :            :         bool pure;
     137                 :            : };
     138                 :            : 
     139                 :            : // Chunked I/O using a file descriptor.
     140                 :            : class ChunkedIOFd : public ChunkedIO {
     141                 :            : public:
     142                 :            :         // fd is an open bidirectional file descriptor, tag is used in error
     143                 :            :         // messages, and pid gives a pid to monitor (if the process dies, we
     144                 :            :         // return EOF).
     145                 :            :         ChunkedIOFd(int fd, const char* tag, pid_t pid = 0);
     146                 :            :         virtual ~ChunkedIOFd();
     147                 :            : 
     148                 :            :         virtual bool Read(Chunk** chunk, bool may_block = false);
     149                 :            :         virtual bool Write(Chunk* chunk);
     150                 :            :         virtual bool Flush();
     151                 :            :         virtual const char* Error();
     152                 :            :         virtual bool CanRead();
     153                 :            :         virtual bool CanWrite();
     154                 :            :         virtual bool IsIdle();
     155                 :            :         virtual bool IsFillingUp();
     156                 :            :         virtual void Clear();
     157                 :          1 :         virtual bool Eof()      { return eof; }
     158                 :          0 :         virtual int Fd()        { return fd; }
     159                 :            :         virtual void Stats(char* buffer, int length);
     160                 :            : 
     161                 :            : private:
     162                 :            : 
     163                 :            :         bool PutIntoWriteBuffer(Chunk* chunk);
     164                 :            :         bool FlushWriteBuffer();
     165                 :            :         Chunk* ExtractChunk();
     166                 :            : 
     167                 :            :         // Returns size of next chunk in buffer or 0 if none.
     168                 :            :         uint32 ChunkAvailable();
     169                 :            : 
     170                 :            :         // Flushes if it thinks it is time to.
     171                 :            :         bool OptionalFlush();
     172                 :            : 
     173                 :            :         // Concatenates the the data of the two chunks forming a new one.
     174                 :            :         // The old chunkds are deleted.
     175                 :            :         Chunk* ConcatChunks(Chunk* c1, Chunk* c2);
     176                 :            : 
     177                 :            :         // Reads/writes on chunk of upto BUFFER_SIZE bytes.
     178                 :            :         bool WriteChunk(Chunk* chunk, bool partial);
     179                 :            :         bool ReadChunk(Chunk** chunk, bool may_block);
     180                 :            : 
     181                 :            :         int fd;
     182                 :            :         bool eof;
     183                 :            :         double last_flush;
     184                 :            :         int failed_reads;
     185                 :            : 
     186                 :            :         // Optimally, this should match the file descriptor's
     187                 :            :         // buffer size (for sockets, it may be helpful to
     188                 :            :         // increase the send/receive buffers).
     189                 :            :         static const unsigned int BUFFER_SIZE = 1024 * 1024 * 1;
     190                 :            : 
     191                 :            :         // We 'or' this to the length of a data chunk to mark
     192                 :            :         // that it's part of a larger one. This has to be larger
     193                 :            :         // than BUFFER_SIZE.
     194                 :            :         static const uint32 FLAG_PARTIAL = 0x80000000;
     195                 :            : 
     196                 :            :         // We report that we're filling up when there are more than this number
     197                 :            :         // of pending chunks.
     198                 :            :         static const uint32 MAX_BUFFERED_CHUNKS_SOFT = 400000;
     199                 :            : 
     200                 :            :         // Maximum number of chunks we store in memory before rejecting writes.
     201                 :            :         static const uint32 MAX_BUFFERED_CHUNKS = 500000;
     202                 :            : 
     203                 :            :         char* read_buffer;
     204                 :            :         uint32 read_len;
     205                 :            :         uint32 read_pos;
     206                 :            :         Chunk* partial; // when we read an oversized chunk, we store it here
     207                 :            : 
     208                 :            :         char* write_buffer;
     209                 :            :         uint32 write_len;
     210                 :            :         uint32 write_pos;
     211                 :            : 
     212                 :            :         struct ChunkQueue {
     213                 :            :                 Chunk* chunk;
     214                 :            :                 ChunkQueue* next;
     215                 :            :         };
     216                 :            : 
     217                 :            :         // Chunks that don't fit into our write buffer.
     218                 :            :         ChunkQueue* pending_head;
     219                 :            :         ChunkQueue* pending_tail;
     220                 :            : 
     221                 :            :         pid_t pid;
     222                 :            : };
     223                 :            : 
     224                 :            : // Chunked I/O using an SSL connection.
     225                 :            : class ChunkedIOSSL : public ChunkedIO {
     226                 :            : public:
     227                 :            :         // Argument is an open socket and a flag indicating whether we are the
     228                 :            :         // server side of the connection.
     229                 :            :         ChunkedIOSSL(int socket, bool server);
     230                 :            :         virtual ~ChunkedIOSSL();
     231                 :            : 
     232                 :            :         virtual bool Init();
     233                 :            :         virtual bool Read(Chunk** chunk, bool mayblock = false);
     234                 :            :         virtual bool Write(Chunk* chunk);
     235                 :            :         virtual bool Flush();
     236                 :            :         virtual const char* Error();
     237                 :            :         virtual bool CanRead();
     238                 :            :         virtual bool CanWrite();
     239                 :            :         virtual bool IsIdle();
     240                 :            :         virtual bool IsFillingUp();
     241                 :            :         virtual void Clear();
     242                 :          0 :         virtual bool Eof()      { return eof; }
     243                 :          0 :         virtual int Fd()        { return socket; }
     244                 :            :         virtual void Stats(char* buffer, int length);
     245                 :            : 
     246                 :            : private:
     247                 :            :         // Maximum number of chunks we store in memory before rejecting writes.
     248                 :            :         static const uint32 MAX_BUFFERED_CHUNKS = 500000;
     249                 :            : 
     250                 :            :         // Only returns true if all data has been read. If not, call
     251                 :            :         // it again with the same parameters as long as error is not
     252                 :            :         // set to true.
     253                 :            :         bool ReadData(char* p, uint32 len, bool* error);
     254                 :            :         // Same for writing.
     255                 :            :         bool WriteData(char* p, uint32 len, bool* error);
     256                 :            : 
     257                 :            :         int socket;
     258                 :            :         int last_ret;   // last error code
     259                 :            :         bool eof;
     260                 :            : 
     261                 :            :         bool server;    // are we the server?
     262                 :            :         bool setup;     // has the connection been setup successfully?
     263                 :            : 
     264                 :            :         SSL* ssl;
     265                 :            : 
     266                 :            :         // Write queue.
     267                 :            :         struct Queue {
     268                 :            :                 Chunk* chunk;
     269                 :            :                 Queue* next;
     270                 :            :         };
     271                 :            : 
     272                 :            :         // The chunk part we are reading/writing
     273                 :            :         enum State { LEN, DATA };
     274                 :            : 
     275                 :            :         State write_state;
     276                 :            :         Queue* write_head;
     277                 :            :         Queue* write_tail;
     278                 :            : 
     279                 :            :         State read_state;
     280                 :            :         Chunk* read_chunk;
     281                 :            :         char* read_ptr;
     282                 :            : 
     283                 :            :         // One SSL for all connections.
     284                 :            :         static SSL_CTX* ctx;
     285                 :            : };
     286                 :            : 
     287                 :            : #ifdef HAVE_LIBZ
     288                 :            : 
     289                 :            : #include <zlib.h>
     290                 :            : 
     291                 :            : // Wrapper class around a another ChunkedIO which the (un-)compresses data.
     292                 :            : class CompressedChunkedIO : public ChunkedIO {
     293                 :            : public:
     294                 :          0 :         CompressedChunkedIO(ChunkedIO* arg_io)
     295                 :          0 :                 : io(arg_io) {} // takes ownership
     296 [ #  # ][ #  # ]:          0 :         virtual ~CompressedChunkedIO()  { delete io; }
         [ #  # ][ #  # ]
     297                 :            : 
     298                 :            :         virtual bool Init(); // does *not* call arg_io->Init()
     299                 :            :         virtual bool Read(Chunk** chunk, bool may_block = false);
     300                 :            :         virtual bool Write(Chunk* chunk);
     301                 :          0 :         virtual bool Flush()    { return io->Flush(); }
     302         [ #  # ]:          0 :         virtual const char* Error()     { return error ? error : io->Error(); }
     303                 :          0 :         virtual bool CanRead()  { return io->CanRead(); }
     304                 :          0 :         virtual bool CanWrite() { return io->CanWrite(); }
     305                 :          0 :         virtual bool IsIdle()   { return io->IsIdle(); }
     306                 :          0 :         virtual bool IsFillingUp()      { return io->IsFillingUp(); }
     307                 :          0 :         virtual void Clear()    { return io->Clear(); }
     308                 :            : 
     309                 :          0 :         virtual bool Eof()      { return io->Eof(); }
     310                 :          0 :         virtual int Fd()        { return io->Fd(); }
     311                 :            :         virtual void Stats(char* buffer, int length);
     312                 :            : 
     313                 :          0 :         void EnableCompression(int level)
     314                 :          0 :                 { deflateInit(&zout, level); compress = true; }
     315                 :          0 :         void EnableDecompression()
     316                 :          0 :                 { inflateInit(&zin); uncompress = true; }
     317                 :            : 
     318                 :            : protected:
     319                 :            :         // Only compress block with size >= this.
     320                 :            :         static const unsigned int MIN_COMPRESS_SIZE = 30;
     321                 :            : 
     322                 :            :         ChunkedIO* io;
     323                 :            :         z_stream zin;
     324                 :            :         z_stream zout;
     325                 :            :         const char* error;
     326                 :            : 
     327                 :            :         bool compress;
     328                 :            :         bool uncompress;
     329                 :            : 
     330                 :            :         // Keep some statistics.
     331                 :            :         unsigned long uncompressed_bytes_read;
     332                 :            :         unsigned long uncompressed_bytes_written;
     333                 :            : };
     334                 :            : 
     335                 :            : #endif  /* HAVE_LIBZ */
     336                 :            : 
     337                 :            : #endif

Generated by: LCOV version 1.8