LCOV - code coverage report
Current view: top level - src - ChunkedIO.cc (source / functions) Hit Total Coverage
Test: app.info Lines: 117 592 19.8 %
Date: 2010-12-13 Functions: 16 52 30.8 %
Branches: 40 366 10.9 %

           Branch data     Line data    Source code
       1                 :            : // $Id: ChunkedIO.cc 6888 2009-08-20 18:23:11Z vern $
       2                 :            : 
       3                 :            : #include <unistd.h>
       4                 :            : #include <fcntl.h>
       5                 :            : #include <errno.h>
       6                 :            : #include <signal.h>
       7                 :            : #include <sys/time.h>
       8                 :            : #include <netinet/in.h>
       9                 :            : #include <assert.h>
      10                 :            : #include <openssl/ssl.h>
      11                 :            : 
      12                 :            : #include "config.h"
      13                 :            : #include "ChunkedIO.h"
      14                 :            : #include "NetVar.h"
      15                 :            : #include "RemoteSerializer.h"
      16                 :            : 
      17                 :          2 : ChunkedIO::ChunkedIO()
      18                 :            :         {
      19                 :          2 :         pure = false;
      20                 :          2 :         }
      21                 :            : 
      22                 :          0 : void ChunkedIO::Stats(char* buffer, int length)
      23                 :            :         {
      24                 :            :         safe_snprintf(buffer, length,
      25                 :            :                       "bytes=%luK/%luK chunks=%lu/%lu io=%lu/%lu bytes/io=%.2fK/%.2fK",
      26                 :            :                       stats.bytes_read / 1024, stats.bytes_written / 1024,
      27                 :            :                       stats.chunks_read, stats.chunks_written,
      28                 :            :                       stats.reads, stats.writes,
      29                 :            :                       stats.bytes_read / (1024.0 * stats.reads),
      30                 :          0 :                       stats.bytes_written / (1024.0 * stats.writes));
      31                 :          0 :         }
      32                 :            : 
      33                 :            : #ifdef DEBUG_COMMUNICATION
      34                 :            : 
      35                 :            : void ChunkedIO::AddToBuffer(uint32 len, char* data, bool is_read)
      36                 :            :         {
      37                 :            :         Chunk* copy = new Chunk;
      38                 :            :         copy->len = len;
      39                 :            :         copy->data = new char[len];
      40                 :            :         memcpy(copy->data, data, len);
      41                 :            : 
      42                 :            :         std::list<Chunk*>* l = is_read ? &data_read : &data_written;
      43                 :            :         l->push_back(copy);
      44                 :            : 
      45                 :            :         if ( l->size() > DEBUG_COMMUNICATION )
      46                 :            :                 {
      47                 :            :                 Chunk* old = l->front();
      48                 :            :                 l->pop_front();
      49                 :            :                 delete [] old->data;
      50                 :            :                 delete old;
      51                 :            :                 }
      52                 :            :         }
      53                 :            : 
      54                 :            : void ChunkedIO::AddToBuffer(Chunk* chunk, bool is_read)
      55                 :            :         {
      56                 :            :         AddToBuffer(chunk->len, chunk->data, is_read);
      57                 :            :         }
      58                 :            : 
      59                 :            : void ChunkedIO::DumpDebugData(const char* basefnname, bool want_reads)
      60                 :            :         {
      61                 :            :         std::list<Chunk*>* l = want_reads ? &data_read : &data_written;
      62                 :            : 
      63                 :            :         int count = 0;
      64                 :            : 
      65                 :            :         for ( std::list<Chunk*>::iterator i = l->begin(); i != l->end(); ++i )
      66                 :            :                 {
      67                 :            :                 static char buffer[128];
      68                 :            :                 snprintf(buffer, sizeof(buffer), "%s.%s.%d", basefnname,
      69                 :            :                                  want_reads ? "read" : "write", ++count);
      70                 :            :                 buffer[sizeof(buffer) - 1] = '\0';
      71                 :            : 
      72                 :            :                 int fd = open(buffer, O_WRONLY | O_CREAT | O_TRUNC, 0600);
      73                 :            :                 if ( fd < 0 )
      74                 :            :                         continue;
      75                 :            : 
      76                 :            :                 ChunkedIOFd io(fd, "dump-file");
      77                 :            :                 io.Write(*i);
      78                 :            :                 io.Flush();
      79                 :            :                 close(fd);
      80                 :            :                 }
      81                 :            : 
      82                 :            :         l->clear();
      83                 :            :         }
      84                 :            : 
      85                 :            : #endif
      86                 :            : 
      87                 :          2 : ChunkedIOFd::ChunkedIOFd(int arg_fd, const char* arg_tag, pid_t arg_pid)
      88                 :            :         {
      89                 :            :         int flags;
      90                 :            : 
      91                 :          2 :         tag = arg_tag;
      92                 :          2 :         fd = arg_fd;
      93                 :          2 :         eof = 0;
      94                 :          2 :         last_flush = current_time();
      95                 :          2 :         failed_reads = 0;
      96                 :            : 
      97   [ -  +  #  # ]:          2 :         if ( (flags = fcntl(fd, F_GETFL, 0)) < 0)
      98                 :            :                 {
      99                 :          0 :                 Log(fmt("can't obtain socket flags: %s", strerror(errno)));
     100                 :          0 :                 exit(1);
     101                 :            :                 }
     102                 :            : 
     103 [ -  + ][ #  # ]:          2 :         if ( fcntl(fd, F_SETFL, flags|O_NONBLOCK) < 0 )
     104                 :            :                 {
     105                 :            :                 Log(fmt("can't set fd to non-blocking: %s (%d)",
     106                 :          0 :                           strerror(errno), getpid()));
     107                 :          0 :                 exit(1);
     108                 :            :                 }
     109                 :            : 
     110                 :          2 :         read_buffer = new char[BUFFER_SIZE];
     111                 :          2 :         read_len = 0;
     112                 :          2 :         read_pos = 0;
     113                 :          2 :         partial = 0;
     114                 :          2 :         write_buffer = new char[BUFFER_SIZE];
     115                 :          2 :         write_len = 0;
     116                 :          2 :         write_pos = 0;
     117                 :            : 
     118                 :          2 :         pending_head = 0;
     119                 :          2 :         pending_tail = 0;
     120                 :            : 
     121                 :          2 :         pid = arg_pid;
     122                 :          2 :         }
     123                 :            : 
     124                 :          2 : ChunkedIOFd::~ChunkedIOFd()
     125                 :            :         {
     126                 :          2 :         Clear();
     127                 :            : 
     128   [ +  -  #  # ]:          2 :         delete [] read_buffer;
                 [ #  # ]
     129 [ +  - ][ #  # ]:          2 :         delete [] write_buffer;
                 [ #  # ]
     130                 :          2 :         close(fd);
     131                 :            : 
     132 [ -  +  #  #  # :          2 :         if ( partial )
                      # ]
     133                 :            :                 {
     134 [ #  # ][ #  # ]:          0 :                 delete [] partial->data;
                 [ #  # ]
     135                 :          0 :                 delete partial;
     136                 :            :                 }
     137 [ +  - ][ #  # ]:          2 :         }
                 [ #  # ]
     138                 :            : 
     139                 :          1 : bool ChunkedIOFd::Write(Chunk* chunk)
     140                 :            :         {
     141                 :            : #ifdef DEBUG
     142                 :          1 :         DBG_LOG(DBG_CHUNKEDIO, "write of size %d [%s]",
     143                 :            :                 chunk->len, fmt_bytes(chunk->data, min(20, chunk->len)));
     144                 :            : #endif
     145                 :            : 
     146                 :            :         // Reject if our queue of pending chunks is way too large. Otherwise,
     147                 :            :         // memory could fill up if the other side doesn't read.
     148         [ -  + ]:          1 :         if ( stats.pending > MAX_BUFFERED_CHUNKS )
     149                 :            :                 {
     150                 :          0 :                 DBG_LOG(DBG_CHUNKEDIO, "write queue full");
     151                 :            : 
     152                 :            : #ifdef DEBUG_COMMUNICATION
     153                 :            :                 AddToBuffer("<false:write-queue-full>", false);
     154                 :            : #endif
     155                 :            : 
     156                 :          0 :                 errno = ENOSPC;
     157                 :          0 :                 return false;
     158                 :            :                 }
     159                 :            : 
     160                 :            : #ifdef DEBUG_COMMUNICATION
     161                 :            :         AddToBuffer(chunk, false);
     162                 :            : #endif
     163                 :            : 
     164         [ +  - ]:          1 :         if ( chunk->len <= BUFFER_SIZE - sizeof(uint32) )
     165                 :          1 :                 return WriteChunk(chunk, false);
     166                 :            : 
     167                 :            :         // We have to split it up.
     168                 :          0 :         char* p = chunk->data;
     169                 :          0 :         unsigned long left = chunk->len;
     170                 :            : 
     171         [ #  # ]:          0 :         while ( left )
     172                 :            :                 {
     173                 :          0 :                 Chunk* part = new Chunk;
     174                 :            : 
     175                 :          0 :                 part->len = min(BUFFER_SIZE - sizeof(uint32), left);
     176                 :          0 :                 part->data = new char[part->len];
     177                 :          0 :                 memcpy(part->data, p, part->len);
     178                 :          0 :                 left -= part->len;
     179                 :          0 :                 p += part->len;
     180                 :            : 
     181         [ #  # ]:          0 :                 if ( ! WriteChunk(part, left != 0) )
     182                 :          0 :                         return false;
     183                 :            :                 }
     184                 :            : 
     185         [ #  # ]:          0 :         delete [] chunk->data;
     186                 :          0 :         delete chunk;
     187                 :            : 
     188                 :          1 :         return true;
     189                 :            :         }
     190                 :            : 
     191                 :          1 : bool ChunkedIOFd::WriteChunk(Chunk* chunk, bool partial)
     192                 :            :         {
     193         [ -  + ]:          1 :         assert(chunk->len <= BUFFER_SIZE - sizeof(uint32) );
     194                 :            : 
     195         [ -  + ]:          1 :         if ( chunk->len == 0 )
     196                 :          0 :                 internal_error( "attempt to write 0 bytes chunk");
     197                 :            : 
     198         [ -  + ]:          1 :         if ( partial )
     199                 :          0 :                 chunk->len |= FLAG_PARTIAL;
     200                 :            : 
     201                 :          1 :         ++stats.chunks_written;
     202                 :            : 
     203                 :            :         // If it fits into the buffer, we're done (but keep care not
     204                 :            :         // to reorder chunks).
     205 [ +  - ][ +  - ]:          1 :         if ( ! pending_head && PutIntoWriteBuffer(chunk) )
                 [ +  - ]
     206                 :          1 :                 return true;
     207                 :            : 
     208                 :            :         // Otherwise queue it.
     209                 :          0 :         ++stats.pending;
     210                 :          0 :         ChunkQueue* q = new ChunkQueue;
     211                 :          0 :         q->chunk = chunk;
     212                 :          0 :         q->next = 0;
     213                 :            : 
     214         [ #  # ]:          0 :         if ( pending_tail )
     215                 :            :                 {
     216                 :          0 :                 pending_tail->next = q;
     217                 :          0 :                 pending_tail = q;
     218                 :            :                 }
     219                 :            :         else
     220                 :          0 :                 pending_head = pending_tail = q;
     221                 :            : 
     222                 :          1 :         return Flush();
     223                 :            :         }
     224                 :            : 
     225                 :            : 
     226                 :          1 : bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk)
     227                 :            :         {
     228                 :          1 :         uint32 len = chunk->len & ~FLAG_PARTIAL;
     229                 :            : 
     230 [ -  + ][ -  + ]:          1 :         if ( write_len + len + (IsPure() ? 0 : sizeof(len)) > BUFFER_SIZE )
     231                 :          0 :                 return false;
     232                 :            : 
     233         [ +  - ]:          1 :         if ( ! IsPure() )
     234                 :            :                 {
     235                 :          1 :                 uint32 nlen = htonl(chunk->len);
     236                 :          1 :                 memcpy(write_buffer + write_len, &nlen, sizeof(nlen));
     237                 :          1 :                 write_len += sizeof(nlen);
     238                 :            :                 }
     239                 :            : 
     240                 :          1 :         memcpy(write_buffer + write_len, chunk->data, len);
     241                 :          1 :         write_len += len;
     242                 :            : 
     243         [ +  - ]:          1 :         delete [] chunk->data;
     244                 :          1 :         delete chunk;
     245                 :            : 
     246         [ -  + ]:          1 :         if ( network_time - last_flush > 0.005 )
     247                 :          0 :                 FlushWriteBuffer();
     248                 :            : 
     249                 :          1 :         return true;
     250                 :            :         }
     251                 :            : 
     252                 :          4 : bool ChunkedIOFd::FlushWriteBuffer()
     253                 :            :         {
     254                 :          4 :         last_flush = network_time;
     255                 :            : 
     256         [ +  + ]:          4 :         while ( write_pos != write_len )
     257                 :            :                 {
     258                 :          1 :                 uint32 len = write_len - write_pos;
     259                 :            : 
     260                 :          1 :                 int written = write(fd, write_buffer + write_pos, len);
     261                 :            : 
     262         [ -  + ]:          1 :                 if ( written < 0 )
     263                 :            :                         {
     264         [ #  # ]:          0 :                         if ( errno == EPIPE )
     265                 :          0 :                                 eof = true;
     266                 :            : 
     267         [ #  # ]:          0 :                         if ( errno != EINTR )
     268                 :            :                                 // These errnos are equal on POSIX.
     269 [ #  # ][ #  # ]:          0 :                                 return errno == EWOULDBLOCK || errno == EAGAIN;
     270                 :            : 
     271                 :            :                         else
     272                 :          0 :                                 written = 0;
     273                 :            :                         }
     274                 :            : 
     275                 :          1 :                 stats.bytes_written += written;
     276         [ +  - ]:          1 :                 if ( written > 0 )
     277                 :          1 :                         ++stats.writes;
     278                 :            : 
     279         [ +  - ]:          1 :                 if ( unsigned(written) == len )
     280                 :            :                         {
     281                 :          1 :                         write_pos = write_len = 0;
     282                 :          1 :                         return true;
     283                 :            :                         }
     284                 :            : 
     285         [ #  # ]:          0 :                 if ( written == 0 )
     286                 :          0 :                         internal_error("written==0");
     287                 :            : 
     288                 :            :                 // Short write.
     289                 :          0 :                 write_pos += written;
     290                 :            :                 }
     291                 :            : 
     292                 :          4 :         return true;
     293                 :            :         }
     294                 :            : 
     295                 :          2 : bool ChunkedIOFd::OptionalFlush()
     296                 :            :         {
     297                 :            :         // This threshhold is quite arbitrary.
     298                 :            : //      if ( current_time() - last_flush > 0.01 )
     299                 :          2 :         return Flush();
     300                 :            :         }
     301                 :            : 
     302                 :          4 : bool ChunkedIOFd::Flush()
     303                 :            :         {
     304                 :            :         // Try to write data out.
     305         [ -  + ]:          4 :         while ( pending_head )
     306                 :            :                 {
     307         [ #  # ]:          0 :                 if ( ! FlushWriteBuffer() )
     308                 :          0 :                         return false;
     309                 :            : 
     310                 :            :                 // If we couldn't write the whole buffer, we stop here
     311                 :            :                 // and try again next time.
     312         [ #  # ]:          0 :                 if ( write_len > 0 )
     313                 :          0 :                         return true;
     314                 :            : 
     315                 :            :                 // Put as many pending chunks into the buffer as possible.
     316         [ #  # ]:          0 :                 while ( pending_head )
     317                 :            :                         {
     318         [ #  # ]:          0 :                         if ( ! PutIntoWriteBuffer(pending_head->chunk) )
     319                 :          0 :                                 break;
     320                 :            : 
     321                 :          0 :                         ChunkQueue* q = pending_head;
     322                 :          0 :                         pending_head = pending_head->next;
     323         [ #  # ]:          0 :                         if ( ! pending_head )
     324                 :          0 :                                 pending_tail = 0;
     325                 :            : 
     326                 :          0 :                         --stats.pending;
     327                 :          0 :                         delete q;
     328                 :            :                         }
     329                 :            :                 }
     330                 :            : 
     331                 :          4 :         return FlushWriteBuffer();
     332                 :            :         }
     333                 :            : 
     334                 :          2 : uint32 ChunkedIOFd::ChunkAvailable()
     335                 :            :         {
     336                 :          2 :         int bytes_left = read_len - read_pos;
     337                 :            : 
     338         [ +  - ]:          2 :         if ( bytes_left < int(sizeof(uint32)) )
     339                 :          2 :                 return 0;
     340                 :            : 
     341                 :          0 :         bytes_left -= sizeof(uint32);
     342                 :            : 
     343                 :            :         // We have to copy the value here as it may not be
     344                 :            :         // aligned correctly in the data.
     345                 :            :         uint32 len;
     346                 :          0 :         memcpy(&len, read_buffer + read_pos, sizeof(len));
     347                 :          0 :         len = ntohl(len);
     348                 :            : 
     349         [ #  # ]:          0 :         if ( uint32(bytes_left) < (len & ~FLAG_PARTIAL) )
     350                 :          0 :                 return 0;
     351                 :            : 
     352         [ #  # ]:          0 :         assert(len & ~FLAG_PARTIAL);
     353                 :            : 
     354                 :          2 :         return len;
     355                 :            :         }
     356                 :            : 
     357                 :          2 : ChunkedIO::Chunk* ChunkedIOFd::ExtractChunk()
     358                 :            :         {
     359                 :          2 :         uint32 len = ChunkAvailable();
     360                 :          2 :         uint32 real_len = len & ~FLAG_PARTIAL;
     361         [ +  - ]:          2 :         if ( ! real_len )
     362                 :          2 :                 return 0;
     363                 :            : 
     364                 :          0 :         read_pos += sizeof(uint32);
     365                 :            : 
     366                 :          0 :         Chunk* chunk = new Chunk;
     367                 :          0 :         chunk->len = len;
     368                 :          0 :         chunk->data = new char[real_len];
     369                 :          0 :         memcpy(chunk->data, read_buffer + read_pos, real_len);
     370                 :          0 :         read_pos += real_len;
     371                 :            : 
     372                 :          0 :         ++stats.chunks_read;
     373                 :            : 
     374                 :          2 :         return chunk;
     375                 :            :         }
     376                 :            : 
     377                 :          0 : ChunkedIO::Chunk* ChunkedIOFd::ConcatChunks(Chunk* c1, Chunk* c2)
     378                 :            :         {
     379                 :          0 :         Chunk* c = new Chunk;
     380                 :            : 
     381                 :          0 :         c->len = c1->len + c2->len;
     382                 :          0 :         c->data = new char[c->len];
     383                 :            : 
     384                 :          0 :         memcpy(c->data, c1->data, c1->len);
     385                 :          0 :         memcpy(c->data + c1->len, c2->data, c2->len);
     386                 :            : 
     387         [ #  # ]:          0 :         delete [] c1->data;
     388                 :          0 :         delete c1;
     389         [ #  # ]:          0 :         delete [] c2->data;
     390                 :          0 :         delete c2;
     391                 :            : 
     392                 :          0 :         return c;
     393                 :            :         }
     394                 :            : 
     395                 :          0 : void ChunkedIO::Log(const char* str)
     396                 :            :         {
     397                 :          0 :         RemoteSerializer::Log(RemoteSerializer::LogError, str);
     398                 :          0 :         }
     399                 :            : 
     400                 :          1 : bool ChunkedIOFd::Read(Chunk** chunk, bool may_block)
     401                 :            :         {
     402                 :          1 :         *chunk = 0;
     403                 :            : 
     404                 :            :         // We will be called regularly. So take the opportunity
     405                 :            :         // to flush the write buffer once in a while.
     406                 :          1 :         OptionalFlush();
     407                 :            : 
     408         [ +  - ]:          1 :         if ( ! ReadChunk(chunk, may_block) )
     409                 :            :                 {
     410                 :            : #ifdef DEBUG_COMMUNICATION
     411                 :            :                 AddToBuffer("<false:read-chunk>", true);
     412                 :            : #endif
     413                 :          1 :                 return false;
     414                 :            :                 }
     415                 :            : 
     416         [ #  # ]:          0 :         if ( ! *chunk )
     417                 :            :                 {
     418                 :            : #ifdef DEBUG_COMMUNICATION
     419                 :            :                 AddToBuffer("<null:no-data>", true);
     420                 :            : #endif
     421                 :          0 :                 return true;
     422                 :            :                 }
     423                 :            : 
     424                 :            : #ifdef DEBUG
     425         [ #  # ]:          0 :         if ( *chunk )
     426         [ #  # ]:          0 :                 DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]",
     427                 :            :                                 (*chunk)->len & ~FLAG_PARTIAL,
     428                 :            :                                 (*chunk)->len & FLAG_PARTIAL ? "(P) " : "",
     429                 :            :                                 fmt_bytes((*chunk)->data,
     430                 :            :                                                 min(20, (*chunk)->len)));
     431                 :            : #endif
     432                 :            : 
     433         [ #  # ]:          0 :         if ( ! ((*chunk)->len & FLAG_PARTIAL) )
     434                 :            :                 {
     435         [ #  # ]:          0 :                 if ( ! partial )
     436                 :            :                         {
     437                 :            : #ifdef DEBUG_COMMUNICATION
     438                 :            :                         AddToBuffer(*chunk, true);
     439                 :            : #endif
     440                 :          0 :                         return true;
     441                 :            :                         }
     442                 :            :                 else
     443                 :            :                         {
     444                 :            :                         // This is the last chunk of an oversized one.
     445                 :          0 :                         *chunk = ConcatChunks(partial, *chunk);
     446                 :          0 :                         partial = 0;
     447                 :            : 
     448                 :            : #ifdef DEBUG
     449         [ #  # ]:          0 :                         if ( *chunk )
     450                 :          0 :                                 DBG_LOG(DBG_CHUNKEDIO,
     451                 :            :                                         "built virtual chunk of size %d [%s]",
     452                 :            :                                         (*chunk)->len,
     453                 :            :                                         fmt_bytes((*chunk)->data, 20));
     454                 :            : #endif
     455                 :            : 
     456                 :            : #ifdef DEBUG_COMMUNICATION
     457                 :            :                         AddToBuffer(*chunk, true);
     458                 :            : #endif
     459                 :          0 :                         return true;
     460                 :            :                         }
     461                 :            :                 }
     462                 :            : 
     463                 :            :         // This chunk is the non-last part of an oversized.
     464                 :          0 :         (*chunk)->len &= ~FLAG_PARTIAL;
     465                 :            : 
     466         [ #  # ]:          0 :         if ( ! partial )
     467                 :            :                 // First part of oversized chunk.
     468                 :          0 :                 partial = *chunk;
     469                 :            :         else
     470                 :          0 :                 partial = ConcatChunks(partial, *chunk);
     471                 :            : 
     472                 :            : #ifdef DEBUG_COMMUNICATION
     473                 :            :         AddToBuffer("<null:partial>", true);
     474                 :            : #endif
     475                 :            : 
     476                 :          0 :         *chunk = 0;
     477                 :          1 :         return true; // Read following part next time.
     478                 :            :         }
     479                 :            : 
     480                 :          1 : bool ChunkedIOFd::ReadChunk(Chunk** chunk, bool may_block)
     481                 :            :         {
     482                 :            :         // We will be called regularly. So take the opportunity
     483                 :            :         // to flush the write buffer once in a while.
     484                 :          1 :         OptionalFlush();
     485                 :            : 
     486                 :          1 :         *chunk = ExtractChunk();
     487         [ -  + ]:          1 :         if ( *chunk )
     488                 :          0 :                 return true;
     489                 :            : 
     490                 :          1 :         int bytes_left = read_len - read_pos;
     491                 :            : 
     492                 :            :         // If we have a partial chunk left, move this to the head of
     493                 :            :         // the buffer.
     494         [ -  + ]:          1 :         if ( bytes_left )
     495                 :          0 :                 memmove(read_buffer, read_buffer + read_pos, bytes_left);
     496                 :            : 
     497                 :          1 :         read_pos = 0;
     498                 :          1 :         read_len = bytes_left;
     499                 :            : 
     500                 :            :         // If allowed, wait a bit for something to read.
     501         [ -  + ]:          1 :         if ( may_block )
     502                 :            :                 {
     503                 :            :                 fd_set fd_read, fd_write, fd_except;
     504                 :            : 
     505                 :          0 :                 FD_ZERO(&fd_read);
     506                 :          0 :                 FD_ZERO(&fd_write);
     507                 :          0 :                 FD_ZERO(&fd_except);
     508                 :          0 :                 FD_SET(fd, &fd_read);
     509                 :            : 
     510                 :            :                 struct timeval small_timeout;
     511                 :          0 :                 small_timeout.tv_sec = 0;
     512                 :          0 :                 small_timeout.tv_usec = 50;
     513                 :            : 
     514                 :          0 :                 select(fd + 1, &fd_read, &fd_write, &fd_except, &small_timeout);
     515                 :            :                 }
     516                 :            : 
     517                 :            :         // Make sure the process is still runnning
     518                 :            :         // (only checking for EPIPE after a read doesn't
     519                 :            :         // seem to be sufficient).
     520 [ -  + ][ #  # ]:          1 :         if ( pid && kill(pid, 0) < 0 && errno != EPERM )
         [ #  # ][ -  + ]
     521                 :            :                 {
     522                 :          0 :                 eof = true;
     523                 :          0 :                 errno = EPIPE;
     524                 :          0 :                 return false;
     525                 :            :                 }
     526                 :            : 
     527                 :            :         // Try to fill the buffer.
     528                 :          0 :         while ( true )
     529                 :            :                 {
     530                 :          1 :                 int len = BUFFER_SIZE - read_len;
     531                 :          1 :                 int read = ::read(fd, read_buffer + read_len, len);
     532                 :            : 
     533         [ -  + ]:          1 :                 if ( read < 0 )
     534                 :            :                         {
     535         [ #  # ]:          0 :                         if ( errno != EINTR )
     536                 :            :                                 {
     537                 :            :                                 // These errnos are equal on POSIX.
     538 [ #  # ][ #  # ]:          0 :                                 if ( errno == EWOULDBLOCK || errno == EAGAIN )
                 [ #  # ]
     539                 :            :                                         {
     540                 :            :                                         // Let's see if we have a chunk now --
     541                 :            :                                         // even if we time out, we may have read
     542                 :            :                                         // just enough in previous iterations!
     543                 :          0 :                                         *chunk = ExtractChunk();
     544                 :          0 :                                         ++failed_reads;
     545                 :          0 :                                         return true;
     546                 :            :                                         }
     547                 :            : 
     548         [ #  # ]:          0 :                                 if ( errno == EPIPE )
     549                 :          0 :                                         eof = true;
     550                 :            : 
     551                 :          0 :                                 return false;
     552                 :            :                                 }
     553                 :            : 
     554                 :            :                         else
     555                 :          0 :                                 read = 0;
     556                 :            :                         }
     557                 :            : 
     558                 :          1 :                 failed_reads = 0;
     559                 :            : 
     560 [ +  - ][ +  - ]:          1 :                 if ( read == 0 && len != 0 )
     561                 :            :                         {
     562                 :          1 :                         *chunk = ExtractChunk();
     563         [ -  + ]:          1 :                         if ( *chunk )
     564                 :          0 :                                 return true;
     565                 :            : 
     566                 :          1 :                         eof = true;
     567                 :          1 :                         return false;
     568                 :            :                         }
     569                 :            : 
     570                 :          0 :                 read_len += read;
     571                 :            : 
     572                 :          0 :                 ++stats.reads;
     573                 :          0 :                 stats.bytes_read += read;
     574                 :            : 
     575         [ #  # ]:          0 :                 if ( read == len )
     576                 :          0 :                         break;
     577                 :            :                 }
     578                 :            : 
     579                 :            :         // Let's see if we have a chunk now.
     580                 :          0 :         *chunk = ExtractChunk();
     581                 :            : 
     582                 :          1 :         return true;
     583                 :            :         }
     584                 :            : 
     585                 :          0 : bool ChunkedIOFd::CanRead()
     586                 :            :         {
     587                 :            :         // We will be called regularly. So take the opportunity
     588                 :            :         // to flush the write buffer once in a while.
     589                 :          0 :         OptionalFlush();
     590                 :            : 
     591         [ #  # ]:          0 :         if ( ChunkAvailable() )
     592                 :          0 :                 return true;
     593                 :            : 
     594                 :            :         fd_set fd_read;
     595                 :          0 :         FD_ZERO(&fd_read);
     596                 :          0 :         FD_SET(fd, &fd_read);
     597                 :            : 
     598                 :            :         struct timeval no_timeout;
     599                 :          0 :         no_timeout.tv_sec = 0;
     600                 :          0 :         no_timeout.tv_usec = 0;
     601                 :            : 
     602                 :          0 :         return select(fd + 1, &fd_read, 0, 0, &no_timeout) > 0;
     603                 :            :         }
     604                 :            : 
     605                 :          0 : bool ChunkedIOFd::CanWrite()
     606                 :            :         {
     607                 :          0 :         return pending_head != 0;
     608                 :            :         }
     609                 :            : 
     610                 :          0 : bool ChunkedIOFd::IsIdle()
     611                 :            :         {
     612 [ #  # ][ #  # ]:          0 :         if ( pending_head || ChunkAvailable() )
                 [ #  # ]
     613                 :          0 :                 return false;
     614                 :            : 
     615         [ #  # ]:          0 :         if ( failed_reads > 0 )
     616                 :          0 :                 return true;
     617                 :            : 
     618                 :          0 :         return false;
     619                 :            :         }
     620                 :            : 
     621                 :          0 : bool ChunkedIOFd::IsFillingUp()
     622                 :            :         {
     623                 :          0 :         return stats.pending > MAX_BUFFERED_CHUNKS_SOFT;
     624                 :            :         }
     625                 :            : 
     626                 :          2 : void ChunkedIOFd::Clear()
     627                 :            :         {
     628         [ -  + ]:          2 :         while ( pending_head )
     629                 :            :                 {
     630                 :          0 :                 ChunkQueue* next = pending_head->next;
     631         [ #  # ]:          0 :                 delete [] pending_head->chunk->data;
     632                 :          0 :                 delete pending_head->chunk;
     633                 :          0 :                 delete pending_head;
     634                 :          0 :                 pending_head = next;
     635                 :            :                 }
     636                 :            : 
     637                 :          2 :         pending_head = pending_tail = 0;
     638                 :          2 :         }
     639                 :            : 
     640                 :          0 : const char* ChunkedIOFd::Error()
     641                 :            :         {
     642                 :            :         static char buffer[1024];
     643                 :          0 :         safe_snprintf(buffer, sizeof(buffer), "%s [%d]", strerror(errno), errno);
     644                 :            : 
     645                 :          0 :         return buffer;
     646                 :            :         }
     647                 :            : 
     648                 :          0 : void ChunkedIOFd::Stats(char* buffer, int length)
     649                 :            :         {
     650                 :          0 :         int i = safe_snprintf(buffer, length, "pending=%d ", stats.pending);
     651                 :          0 :         ChunkedIO::Stats(buffer + i, length - i);
     652                 :          0 :         }
     653                 :            : 
     654                 :            : SSL_CTX* ChunkedIOSSL::ctx;
     655                 :            : 
     656                 :          0 : ChunkedIOSSL::ChunkedIOSSL(int arg_socket, bool arg_server)
     657                 :            :         {
     658                 :          0 :         socket = arg_socket;
     659                 :          0 :         eof = false;
     660                 :          0 :         setup = false;
     661                 :          0 :         server = arg_server;
     662                 :          0 :         ssl = 0;
     663                 :            : 
     664                 :          0 :         write_state = LEN;
     665                 :          0 :         write_head = 0;
     666                 :          0 :         write_tail = 0;
     667                 :            : 
     668                 :          0 :         read_state = LEN;
     669                 :          0 :         read_chunk = 0;
     670                 :          0 :         read_ptr = 0;
     671                 :          0 :         }
     672                 :            : 
     673                 :          0 : ChunkedIOSSL::~ChunkedIOSSL()
     674                 :            :         {
     675 [ #  # ][ #  # ]:          0 :         if ( setup )
                 [ #  # ]
     676                 :            :                 {
     677                 :          0 :                 SSL_shutdown(ssl);
     678                 :            : 
     679                 :            :                 // We don't care if the other side closes properly.
     680                 :          0 :                 setup = false;
     681                 :            :                 }
     682                 :            : 
     683 [ #  # ][ #  # ]:          0 :         if ( ssl )
                 [ #  # ]
     684                 :            :                 {
     685                 :          0 :                 SSL_free(ssl);
     686                 :          0 :                 ssl = 0;
     687                 :            :                 }
     688                 :            : 
     689                 :          0 :         close(socket);
     690 [ #  # ][ #  # ]:          0 :         }
                 [ #  # ]
     691                 :            : 
     692                 :            : 
     693                 :          0 : static int pem_passwd_cb(char* buf, int size, int rwflag, void* passphrase)
     694                 :            :         {
     695                 :          0 :         safe_strncpy(buf, (char*) passphrase, size);
     696                 :          0 :         buf[size - 1] = '\0';
     697                 :          0 :         return strlen(buf);
     698                 :            :         }
     699                 :            : 
     700                 :          0 : bool ChunkedIOSSL::Init()
     701                 :            :         {
     702                 :            :         // If the handshake doesn't succeed immediately we will
     703                 :            :         // be called multiple times.
     704         [ #  # ]:          0 :         if ( ! ctx )
     705                 :            :                 {
     706                 :          0 :                 SSL_load_error_strings();
     707                 :            : 
     708                 :          0 :                 ctx = SSL_CTX_new(SSLv3_method());
     709         [ #  # ]:          0 :                 if ( ! ctx )
     710                 :            :                         {
     711                 :          0 :                         Log("can't create SSL context");
     712                 :          0 :                         return false;
     713                 :            :                         }
     714                 :            : 
     715                 :            :                 // We access global variables here. But as they are
     716                 :            :                 // declared const and we don't modify them this should
     717                 :            :                 // be fine.
     718                 :          0 :                 const char* key = ssl_private_key->AsString()->CheckString();
     719                 :            : 
     720   [ #  #  #  # ]:          0 :                 if ( ! (key && *key &&
         [ #  # ][ #  # ]
     721                 :            :                         SSL_CTX_use_certificate_chain_file(ctx, key)) )
     722                 :            :                         {
     723                 :          0 :                         Log(fmt("can't read certificate from file %s", key));
     724                 :          0 :                         return false;
     725                 :            :                         }
     726                 :            : 
     727                 :            :                 const char* passphrase =
     728                 :          0 :                         ssl_passphrase->AsString()->CheckString();
     729                 :            : 
     730   [ #  #  #  # ]:          0 :                 if ( passphrase && ! streq(passphrase, "<undefined>") )
                 [ #  # ]
     731                 :            :                         {
     732                 :          0 :                         SSL_CTX_set_default_passwd_cb(ctx, pem_passwd_cb);
     733                 :            :                         SSL_CTX_set_default_passwd_cb_userdata(ctx,
     734                 :          0 :                                                         (void*) passphrase);
     735                 :            :                         }
     736                 :            : 
     737 [ #  # ][ #  # ]:          0 :                 if ( ! (key && *key &&
         [ #  # ][ #  # ]
     738                 :            :                         SSL_CTX_use_PrivateKey_file(ctx, key, SSL_FILETYPE_PEM)) )
     739                 :            :                         {
     740                 :          0 :                         Log(fmt("can't read private key from file %s", key));
     741                 :          0 :                         return false;
     742                 :            :                         }
     743                 :            : 
     744                 :          0 :                 const char* ca = ssl_ca_certificate->AsString()->CheckString();
     745   [ #  #  #  # ]:          0 :                 if ( ! (ca && *ca && SSL_CTX_load_verify_locations(ctx, ca, 0)) )
         [ #  # ][ #  # ]
     746                 :            :                         {
     747                 :          0 :                         Log(fmt("can't read CA certificate from file %s", ca));
     748                 :          0 :                         return false;
     749                 :            :                         }
     750                 :            : 
     751                 :            :                 // Only use real ciphers.
     752         [ #  # ]:          0 :                 if ( ! SSL_CTX_set_cipher_list(ctx, "HIGH") )
     753                 :            :                         {
     754                 :          0 :                         Log("can't set cipher list");
     755                 :          0 :                         return false;
     756                 :            :                         }
     757                 :            : 
     758                 :            :                 // Require client certificate.
     759                 :            :                 SSL_CTX_set_verify(ctx,
     760                 :          0 :                         SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, 0);
     761                 :            :                 }
     762                 :            : 
     763                 :            :         int flags;
     764                 :            : 
     765         [ #  # ]:          0 :         if ( (flags = fcntl(socket, F_GETFL, 0)) < 0)
     766                 :            :                 {
     767                 :          0 :                 Log(fmt("can't obtain socket flags: %s", strerror(errno)));
     768                 :          0 :                 return false;
     769                 :            :                 }
     770                 :            : 
     771         [ #  # ]:          0 :         if ( fcntl(socket, F_SETFL, flags|O_NONBLOCK) < 0 )
     772                 :            :                 {
     773                 :            :                 Log(fmt("can't set socket to non-blocking: %s",
     774                 :          0 :                           strerror(errno)));
     775                 :          0 :                 return false;
     776                 :            :                 }
     777                 :            : 
     778         [ #  # ]:          0 :         if ( ! ssl )
     779                 :            :                 {
     780                 :          0 :                 ssl = SSL_new(ctx);
     781         [ #  # ]:          0 :                 if ( ! ssl )
     782                 :            :                         {
     783                 :          0 :                         Log("can't create SSL object");
     784                 :          0 :                         return false;
     785                 :            :                         }
     786                 :            : 
     787                 :          0 :                 BIO* bio = BIO_new_socket(socket, BIO_NOCLOSE);
     788                 :          0 :                 BIO_set_nbio(bio, 1);
     789                 :          0 :                 SSL_set_bio(ssl, bio, bio);
     790                 :            :                 }
     791                 :            : 
     792                 :            :         int success;
     793         [ #  # ]:          0 :         if ( server )
     794                 :          0 :                 success = last_ret = SSL_accept(ssl);
     795                 :            :         else
     796                 :          0 :                 success = last_ret = SSL_connect(ssl);
     797                 :            : 
     798         [ #  # ]:          0 :         if ( success > 0 )
     799                 :            :                 { // handshake done
     800                 :          0 :                 setup = true;
     801                 :          0 :                 return true;
     802                 :            :                 }
     803                 :            : 
     804                 :          0 :         int error = SSL_get_error(ssl, success);
     805                 :            : 
     806   [ #  #  #  # ]:          0 :         if ( success <= 0 &&
                 [ #  # ]
     807                 :            :              (error == SSL_ERROR_WANT_WRITE || error == SSL_ERROR_WANT_READ) )
     808                 :            :                 // Handshake not finished yet, but that's ok for now.
     809                 :          0 :                 return true;
     810                 :            : 
     811                 :            :         // Some error.
     812                 :          0 :         eof = true;
     813                 :          0 :         return false;
     814                 :            :         }
     815                 :            : 
     816                 :          0 : bool ChunkedIOSSL::Write(Chunk* chunk)
     817                 :            :         {
     818                 :            : #ifdef DEBUG
     819                 :          0 :         DBG_LOG(DBG_CHUNKEDIO, "ssl write of size %d [%s]",
     820                 :            :                 chunk->len, fmt_bytes(chunk->data, 20));
     821                 :            : #endif
     822                 :            : 
     823                 :            :         // Reject if our queue of pending chunks is way too large. Otherwise,
     824                 :            :         // memory could fill up if the other side doesn't read.
     825         [ #  # ]:          0 :         if ( stats.pending > MAX_BUFFERED_CHUNKS )
     826                 :            :                 {
     827                 :          0 :                 DBG_LOG(DBG_CHUNKEDIO, "write queue full");
     828                 :          0 :                 errno = ENOSPC;
     829                 :          0 :                 return false;
     830                 :            :                 }
     831                 :            : 
     832                 :            :         // Queue it.
     833                 :          0 :         ++stats.pending;
     834                 :          0 :         Queue* q = new Queue;
     835                 :          0 :         q->chunk = chunk;
     836                 :          0 :         q->next = 0;
     837                 :            : 
     838                 :            :         // Temporarily convert len into network byte order.
     839                 :          0 :         chunk->len = htonl(chunk->len);
     840                 :            : 
     841         [ #  # ]:          0 :         if ( write_tail )
     842                 :            :                 {
     843                 :          0 :                 write_tail->next = q;
     844                 :          0 :                 write_tail = q;
     845                 :            :                 }
     846                 :            :         else
     847                 :          0 :                 write_head = write_tail = q;
     848                 :            : 
     849                 :          0 :         Flush();
     850                 :          0 :         return true;
     851                 :            :         }
     852                 :            : 
     853                 :          0 : bool ChunkedIOSSL::WriteData(char* p, uint32 len, bool* error)
     854                 :            :         {
     855                 :          0 :         *error = false;
     856                 :            : 
     857                 :          0 :         double t = current_time();
     858                 :            : 
     859                 :          0 :         int written = last_ret = SSL_write(ssl, p, len);
     860                 :            : 
     861   [ #  #  #  #  :          0 :         switch ( SSL_get_error(ssl, written) ) {
                      # ]
     862                 :            :                 case SSL_ERROR_NONE:
     863                 :            :                         // SSL guarantees us that all bytes have been written.
     864                 :            :                         // That's nice. :-)
     865                 :          0 :                         return true;
     866                 :            : 
     867                 :            :                 case SSL_ERROR_WANT_READ:
     868                 :            :                 case SSL_ERROR_WANT_WRITE:
     869                 :            :                         // Would block.
     870                 :          0 :                         DBG_LOG(DBG_CHUNKEDIO,
     871                 :            :                                 "SSL_write: SSL_ERROR_WANT_READ [%d,%d]",
     872                 :            :                                 written, SSL_get_error(ssl, written));
     873                 :          0 :                         *error = false;
     874                 :          0 :                         return false;
     875                 :            : 
     876                 :            :                 case SSL_ERROR_ZERO_RETURN:
     877                 :            :                         // Regular remote connection shutdown.
     878                 :          0 :                         DBG_LOG(DBG_CHUNKEDIO,
     879                 :            :                                 "SSL_write: SSL_ZERO_RETURN [%d,%d]",
     880                 :            :                                 written, SSL_get_error(ssl, written));
     881                 :          0 :                         *error = eof = true;
     882                 :          0 :                         return false;
     883                 :            : 
     884                 :            :                 case SSL_ERROR_SYSCALL:
     885                 :          0 :                         DBG_LOG(DBG_CHUNKEDIO,
     886                 :            :                                 "SSL_write: SSL_SYS_CALL [%d,%d]",
     887                 :            :                                 written, SSL_get_error(ssl, written));
     888                 :            : 
     889         [ #  # ]:          0 :                         if ( written == 0 )
     890                 :            :                                 {
     891                 :            :                                 // Socket connection closed.
     892                 :          0 :                                 *error = eof = true;
     893                 :          0 :                                 return false;
     894                 :            :                                 }
     895                 :            : 
     896                 :            :                         // Fall through.
     897                 :            : 
     898                 :            :                 default:
     899                 :          0 :                         DBG_LOG(DBG_CHUNKEDIO,
     900                 :            :                                 "SSL_write: fatal error [%d,%d]",
     901                 :            :                                 written, SSL_get_error(ssl, written));
     902                 :            :                         // Fatal SSL error.
     903                 :          0 :                         *error = true;
     904                 :          0 :                         return false;
     905                 :            :         }
     906                 :            : 
     907                 :            :         internal_error("can't be reached");
     908                 :            :         return false;
     909                 :            :         }
     910                 :            : 
     911                 :          0 : bool ChunkedIOSSL::Flush()
     912                 :            :         {
     913         [ #  # ]:          0 :         if ( ! setup )
     914                 :            :                 {
     915                 :            :                 // We may need to finish the handshake.
     916         [ #  # ]:          0 :                 if ( ! Init() )
     917                 :          0 :                         return false;
     918         [ #  # ]:          0 :                 if ( ! setup )
     919                 :          0 :                         return true;
     920                 :            :                 }
     921                 :            : 
     922         [ #  # ]:          0 :         while ( write_head )
     923                 :            :                 {
     924                 :            :                 bool error;
     925                 :            : 
     926                 :          0 :                 Chunk* c = write_head->chunk;
     927                 :            : 
     928         [ #  # ]:          0 :                 if ( write_state == LEN )
     929                 :            :                         {
     930         [ #  # ]:          0 :                         if ( ! WriteData((char*)&c->len, sizeof(c->len), &error) )
     931                 :          0 :                                 return ! error;
     932                 :          0 :                         write_state = DATA;
     933                 :            : 
     934                 :            :                         // Convert back from network byte order.
     935                 :          0 :                         c->len = ntohl(c->len);
     936                 :            :                         }
     937                 :            : 
     938         [ #  # ]:          0 :                 if ( ! WriteData(c->data, c->len, &error) )
     939                 :          0 :                         return ! error;
     940                 :            : 
     941                 :            :                 // Chunk written, throw away.
     942                 :          0 :                 Queue* q = write_head;
     943                 :          0 :                 write_head = write_head->next;
     944         [ #  # ]:          0 :                 if ( ! write_head )
     945                 :          0 :                         write_tail = 0;
     946                 :          0 :                 --stats.pending;
     947                 :          0 :                 delete q;
     948                 :            : 
     949         [ #  # ]:          0 :                 delete [] c->data;
     950                 :          0 :                 delete c;
     951                 :            : 
     952                 :          0 :                 write_state = LEN;
     953                 :            :                 }
     954                 :            : 
     955                 :          0 :         return true;
     956                 :            :         }
     957                 :            : 
     958                 :          0 : bool ChunkedIOSSL::ReadData(char* p, uint32 len, bool* error)
     959                 :            :         {
     960         [ #  # ]:          0 :         if ( ! read_ptr )
     961                 :          0 :                 read_ptr = p;
     962                 :            : 
     963                 :          0 :         while ( true )
     964                 :            :                 {
     965                 :          0 :                 double t = current_time();
     966                 :            : 
     967                 :            :                 int read = last_ret =
     968                 :          0 :                         SSL_read(ssl, read_ptr, len - (read_ptr - p));
     969                 :            : 
     970   [ #  #  #  #  :          0 :                 switch ( SSL_get_error(ssl, read) ) {
                      # ]
     971                 :            :                 case SSL_ERROR_NONE:
     972                 :            :                         // We're fine.
     973                 :          0 :                         read_ptr += read;
     974                 :            : 
     975         [ #  # ]:          0 :                         if ( unsigned(read_ptr - p) == len )
     976                 :            :                                 {
     977                 :            :                                 // We have read as much as requested..
     978                 :          0 :                                 read_ptr = 0;
     979                 :          0 :                                 *error = false;
     980                 :          0 :                                 return true;
     981                 :            :                                 }
     982                 :            : 
     983                 :            :                         break;
     984                 :            : 
     985                 :            :                 case SSL_ERROR_WANT_READ:
     986                 :            :                 case SSL_ERROR_WANT_WRITE:
     987                 :            :                         // Would block.
     988                 :          0 :                         DBG_LOG(DBG_CHUNKEDIO,
     989                 :            :                                 "SSL_read: SSL_ERROR_WANT_READ [%d,%d]",
     990                 :            :                                 read, SSL_get_error(ssl, read));
     991                 :          0 :                         *error = false;
     992                 :          0 :                         return false;
     993                 :            : 
     994                 :            :                 case SSL_ERROR_ZERO_RETURN:
     995                 :            :                         // Regular remote connection shutdown.
     996                 :          0 :                         DBG_LOG(DBG_CHUNKEDIO,
     997                 :            :                                 "SSL_read: SSL_ZERO_RETURN [%d,%d]",
     998                 :            :                                 read, SSL_get_error(ssl, read));
     999                 :          0 :                         *error = eof = true;
    1000                 :          0 :                         return false;
    1001                 :            : 
    1002                 :            :                 case SSL_ERROR_SYSCALL:
    1003                 :          0 :                         DBG_LOG(DBG_CHUNKEDIO, "SSL_read: SSL_SYS_CALL [%d,%d]",
    1004                 :            :                                 read, SSL_get_error(ssl, read));
    1005                 :            : 
    1006         [ #  # ]:          0 :                         if ( read == 0 )
    1007                 :            :                                 {
    1008                 :            :                                 // Socket connection closed.
    1009                 :          0 :                                 *error = eof = true;
    1010                 :          0 :                                 return false;
    1011                 :            :                                 }
    1012                 :            : 
    1013                 :            :                         // Fall through.
    1014                 :            : 
    1015                 :            :                 default:
    1016                 :          0 :                         DBG_LOG(DBG_CHUNKEDIO,
    1017                 :            :                                 "SSL_read: fatal error [%d,%d]",
    1018                 :            :                                 read, SSL_get_error(ssl, read));
    1019                 :            : 
    1020                 :            :                         // Fatal SSL error.
    1021                 :          0 :                         *error = true;
    1022                 :          0 :                         return false;
    1023                 :            :                 }
    1024                 :            :                 }
    1025                 :            : 
    1026                 :            :         // Can't be reached.
    1027                 :            :         internal_error("can't be reached");
    1028                 :            :         return false;
    1029                 :            :         }
    1030                 :            : 
    1031                 :          0 : bool ChunkedIOSSL::Read(Chunk** chunk, bool mayblock)
    1032                 :            :         {
    1033                 :          0 :         *chunk = 0;
    1034                 :            : 
    1035         [ #  # ]:          0 :         if ( ! setup )
    1036                 :            :                 {
    1037                 :            :                 // We may need to finish the handshake.
    1038         [ #  # ]:          0 :                 if ( ! Init() )
    1039                 :          0 :                         return false;
    1040         [ #  # ]:          0 :                 if ( ! setup )
    1041                 :          0 :                         return true;
    1042                 :            :                 }
    1043                 :            : 
    1044                 :            :         bool error;
    1045                 :            : 
    1046                 :          0 :         Flush();
    1047                 :            : 
    1048         [ #  # ]:          0 :         if ( read_state == LEN )
    1049                 :            :                 {
    1050         [ #  # ]:          0 :                 if ( ! read_chunk )
    1051                 :            :                         {
    1052                 :          0 :                         read_chunk = new Chunk;
    1053                 :          0 :                         read_chunk->data = 0;
    1054                 :            :                         }
    1055                 :            : 
    1056         [ #  # ]:          0 :                 if ( ! ReadData((char*)&read_chunk->len,
    1057                 :            :                                 sizeof(read_chunk->len),
    1058                 :            :                                 &error) )
    1059                 :          0 :                         return ! error;
    1060                 :            : 
    1061                 :          0 :                 read_state = DATA;
    1062                 :          0 :                 read_chunk->len = ntohl(read_chunk->len);
    1063                 :            :                 }
    1064                 :            : 
    1065         [ #  # ]:          0 :         if ( ! read_chunk->data )
    1066                 :          0 :                 read_chunk->data = new char[read_chunk->len];
    1067                 :            : 
    1068         [ #  # ]:          0 :         if ( ! ReadData(read_chunk->data, read_chunk->len, &error) )
    1069                 :          0 :                 return ! error;
    1070                 :            : 
    1071                 :            :         // Chunk fully read. Pass it on.
    1072                 :          0 :         *chunk = read_chunk;
    1073                 :          0 :         read_chunk = 0;
    1074                 :          0 :         read_state = LEN;
    1075                 :            : 
    1076                 :            : #ifdef DEBUG
    1077         [ #  # ]:          0 :         if ( *chunk )
    1078                 :          0 :                 DBG_LOG(DBG_CHUNKEDIO, "ssl read of size %d [%s]",
    1079                 :            :                         (*chunk)->len, fmt_bytes((*chunk)->data, 20));
    1080                 :            : #endif
    1081                 :            : 
    1082                 :          0 :         return true;
    1083                 :            :         }
    1084                 :            : 
    1085                 :          0 : bool ChunkedIOSSL::CanRead()
    1086                 :            :         {
    1087                 :            :         // We will be called regularly. So take the opportunity
    1088                 :            :         // to flush the write buffer.
    1089                 :          0 :         Flush();
    1090                 :            : 
    1091         [ #  # ]:          0 :         if ( SSL_pending(ssl) )
    1092                 :          0 :                 return true;
    1093                 :            : 
    1094                 :            :         fd_set fd_read;
    1095                 :          0 :         FD_ZERO(&fd_read);
    1096                 :          0 :         FD_SET(socket, &fd_read);
    1097                 :            : 
    1098                 :            :         struct timeval notimeout;
    1099                 :          0 :         notimeout.tv_sec = 0;
    1100                 :          0 :         notimeout.tv_usec = 0;
    1101                 :            : 
    1102                 :          0 :         return select(socket + 1, &fd_read, NULL, NULL, &notimeout) > 0;
    1103                 :            :         }
    1104                 :            : 
    1105                 :          0 : bool ChunkedIOSSL::CanWrite()
    1106                 :            :         {
    1107                 :          0 :         return write_head != 0;
    1108                 :            :         }
    1109                 :            : 
    1110                 :          0 : bool ChunkedIOSSL::IsIdle()
    1111                 :            :         {
    1112 [ #  # ][ #  # ]:          0 :         return ! (CanRead() || CanWrite());
    1113                 :            :         }
    1114                 :            : 
    1115                 :          0 : bool ChunkedIOSSL::IsFillingUp()
    1116                 :            :         {
    1117                 :            :         // We don't really need this at the moment (since SSL is only used for
    1118                 :            :         // peer-to-peer communication). Thus, we always return false for now.
    1119                 :          0 :         return false;
    1120                 :            :         }
    1121                 :            : 
    1122                 :          0 : void ChunkedIOSSL::Clear()
    1123                 :            :         {
    1124         [ #  # ]:          0 :         while ( write_head )
    1125                 :            :                 {
    1126                 :          0 :                 Queue* next = write_head->next;
    1127         [ #  # ]:          0 :                 delete [] write_head->chunk->data;
    1128                 :          0 :                 delete write_head->chunk;
    1129                 :          0 :                 delete write_head;
    1130                 :          0 :                 write_head = next;
    1131                 :            :                 }
    1132                 :          0 :         write_head = write_tail = 0;
    1133                 :          0 :         }
    1134                 :            : 
    1135                 :          0 : const char* ChunkedIOSSL::Error()
    1136                 :            :         {
    1137                 :          0 :         const int BUFLEN = 512;
    1138                 :            :         static char buffer[BUFLEN];
    1139                 :            : 
    1140                 :          0 :         int sslcode = SSL_get_error(ssl, last_ret);
    1141                 :          0 :         int errcode = ERR_get_error();
    1142                 :            : 
    1143                 :            :         int count = safe_snprintf(buffer, BUFLEN, "[%d,%d,%d] SSL error: ",
    1144                 :          0 :                                         errcode, sslcode, last_ret);
    1145                 :            : 
    1146         [ #  # ]:          0 :         if ( errcode )
    1147                 :          0 :                 ERR_error_string_n(errcode, buffer + count, BUFLEN - count);
    1148                 :            : 
    1149         [ #  # ]:          0 :         else if ( sslcode == SSL_ERROR_SYSCALL )
    1150                 :            :                 {
    1151         [ #  # ]:          0 :                 if ( last_ret )
    1152                 :            :                         // Look at errno.
    1153                 :            :                         safe_snprintf(buffer + count, BUFLEN - count,
    1154                 :          0 :                                         "syscall: %s", strerror(errno));
    1155                 :            :                 else
    1156                 :            :                         // Errno is not valid in this case.
    1157                 :            :                         safe_strncpy(buffer + count,
    1158                 :            :                                         "syscall: unexpected end-of-file",
    1159                 :          0 :                                         BUFLEN - count);
    1160                 :            :                 }
    1161                 :            :         else
    1162                 :          0 :                 safe_strncpy(buffer + count, "unknown error", BUFLEN - count);
    1163                 :            : 
    1164                 :          0 :         return buffer;
    1165                 :            :         }
    1166                 :            : 
    1167                 :          0 : void ChunkedIOSSL::Stats(char* buffer, int length)
    1168                 :            :         {
    1169                 :          0 :         int i = safe_snprintf(buffer, length, "pending=%ld ", stats.pending);
    1170                 :          0 :         ChunkedIO::Stats(buffer + i, length - i);
    1171                 :          0 :         }
    1172                 :            : 
    1173                 :            : #ifdef HAVE_LIBZ
    1174                 :            : 
    1175                 :          0 : bool CompressedChunkedIO::Init()
    1176                 :            :         {
    1177                 :          0 :         zin.zalloc = 0;
    1178                 :          0 :         zin.zfree = 0;
    1179                 :          0 :         zin.opaque = 0;
    1180                 :            : 
    1181                 :          0 :         zout.zalloc = 0;
    1182                 :          0 :         zout.zfree = 0;
    1183                 :          0 :         zout.opaque = 0;
    1184                 :            : 
    1185                 :          0 :         compress = uncompress = false;
    1186                 :          0 :         error = 0;
    1187                 :          0 :         uncompressed_bytes_read = 0;
    1188                 :          0 :         uncompressed_bytes_written = 0;
    1189                 :            : 
    1190                 :          0 :         return true;
    1191                 :            :         }
    1192                 :            : 
    1193                 :          0 : bool CompressedChunkedIO::Read(Chunk** chunk, bool may_block)
    1194                 :            :         {
    1195         [ #  # ]:          0 :         if ( ! io->Read(chunk, may_block) )
    1196                 :          0 :                 return false;
    1197                 :            : 
    1198         [ #  # ]:          0 :         if ( ! uncompress )
    1199                 :          0 :                 return true;
    1200                 :            : 
    1201         [ #  # ]:          0 :         if ( ! *chunk )
    1202                 :          0 :                 return true;
    1203                 :            : 
    1204                 :            :         uint32 uncompressed_len =
    1205                 :          0 :                 *(uint32*)((*chunk)->data + (*chunk)->len - sizeof(uint32));
    1206                 :            : 
    1207         [ #  # ]:          0 :         if ( uncompressed_len == 0 )
    1208                 :            :                 {
    1209                 :            :                 // Not compressed.
    1210                 :          0 :                 DBG_LOG(DBG_CHUNKEDIO, "zlib read pass-through: size=%d",
    1211                 :            :                         (*chunk)->len);
    1212                 :          0 :                 return true;
    1213                 :            :                 }
    1214                 :            : 
    1215                 :          0 :         char* uncompressed = new char[uncompressed_len];
    1216                 :            : 
    1217                 :          0 :         DBG_LOG(DBG_CHUNKEDIO, "zlib read: size=%d uncompressed=%d",
    1218                 :            :                 (*chunk)->len, uncompressed_len);
    1219                 :            : 
    1220                 :          0 :         zin.next_in = (Bytef*) (*chunk)->data;
    1221                 :          0 :         zin.avail_in = (*chunk)->len - sizeof(uint32);
    1222                 :          0 :         zin.next_out = (Bytef*) uncompressed;
    1223                 :          0 :         zin.avail_out = uncompressed_len;
    1224                 :            : 
    1225         [ #  # ]:          0 :         if ( inflate(&zin, Z_SYNC_FLUSH) != Z_OK )
    1226                 :            :                 {
    1227                 :          0 :                 error = zin.msg;
    1228                 :          0 :                 return false;
    1229                 :            :                 }
    1230                 :            : 
    1231         [ #  # ]:          0 :         if ( zin.avail_in > 0 )
    1232                 :            :                 {
    1233                 :          0 :                 error = "compressed data longer than expected";
    1234                 :          0 :                 return false;
    1235                 :            :                 }
    1236                 :            : 
    1237         [ #  # ]:          0 :         delete [] (*chunk)->data;
    1238                 :            : 
    1239                 :          0 :         uncompressed_bytes_read += uncompressed_len;
    1240                 :            : 
    1241                 :          0 :         (*chunk)->len = uncompressed_len;
    1242                 :          0 :         (*chunk)->data = uncompressed;
    1243                 :            : 
    1244                 :          0 :         return true;
    1245                 :            :         }
    1246                 :            : 
    1247                 :          0 : bool CompressedChunkedIO::Write(Chunk* chunk)
    1248                 :            :         {
    1249 [ #  # ][ #  # ]:          0 :         if ( (! compress) || IsPure() )
                 [ #  # ]
    1250                 :            :                 // No compression.
    1251                 :          0 :                 return io->Write(chunk);
    1252                 :            : 
    1253                 :            :         // We compress block-wise (rather than stream-wise) because:
    1254                 :            :         //
    1255                 :            :         // (1) it's significantly easier to implement due to our block-oriented
    1256                 :            :         // communication model (with a stream compression, we'd need to chop
    1257                 :            :         // the stream into blocks during decompression which would require
    1258                 :            :         // additional buffering and copying).
    1259                 :            :         //
    1260                 :            :         // (2) it ensures that we do not introduce any additional latencies (a
    1261                 :            :         // stream compression may decide to wait for the next chunk of data
    1262                 :            :         // before writing anything out).
    1263                 :            :         //
    1264                 :            :         // The block-wise compression comes at the cost of a smaller compression
    1265                 :            :         // factor.
    1266                 :            :         //
    1267                 :            :         // A compressed chunk's data looks like this:
    1268                 :            :         //   char[] compressed data
    1269                 :            :         //   uint32 uncompressed_length
    1270                 :            :         //
    1271                 :            :         // By including uncompressed_length, we again trade easier
    1272                 :            :         // decompression for a smaller reduction factor. If uncompressed_length
    1273                 :            :         // is zero, the data is *not* compressed.
    1274                 :            : 
    1275                 :          0 :         uncompressed_bytes_written += chunk->len;
    1276                 :          0 :         uint32 original_size = chunk->len;
    1277                 :            : 
    1278                 :          0 :         char* compressed = new char[chunk->len + sizeof(uint32)];
    1279                 :            : 
    1280         [ #  # ]:          0 :         if ( chunk->len < MIN_COMPRESS_SIZE )
    1281                 :            :                 {
    1282                 :            :                 // Too small; not worth any compression.
    1283                 :          0 :                 memcpy(compressed, chunk->data, chunk->len);
    1284                 :          0 :                 *(uint32*) (compressed + chunk->len) = 0; // uncompressed_length
    1285                 :            : 
    1286         [ #  # ]:          0 :                 delete [] chunk->data;
    1287                 :          0 :                 chunk->data = compressed;
    1288                 :          0 :                 chunk->len += 4;
    1289                 :            : 
    1290                 :          0 :                 DBG_LOG(DBG_CHUNKEDIO, "zlib write pass-through: size=%d", chunk->len);
    1291                 :            :                 }
    1292                 :            :         else
    1293                 :            :                 {
    1294                 :          0 :                 zout.next_in = (Bytef*) chunk->data;
    1295                 :          0 :                 zout.avail_in = chunk->len;
    1296                 :          0 :                 zout.next_out = (Bytef*) compressed;
    1297                 :          0 :                 zout.avail_out = chunk->len;
    1298                 :            : 
    1299         [ #  # ]:          0 :                 if ( deflate(&zout, Z_SYNC_FLUSH) != Z_OK )
    1300                 :            :                         {
    1301                 :          0 :                         error = zout.msg;
    1302                 :          0 :                         return false;
    1303                 :            :                         }
    1304                 :            : 
    1305         [ #  # ]:          0 :                 while ( zout.avail_out == 0 )
    1306                 :            :                         {
    1307                 :            :                         // D'oh! Not enough space, i.e., it hasn't got smaller.
    1308                 :          0 :                         char* old = compressed;
    1309                 :          0 :                         int old_size = (char*) zout.next_out - compressed;
    1310                 :          0 :                         int new_size = old_size * 2 + sizeof(uint32);
    1311                 :            : 
    1312                 :          0 :                         compressed = new char[new_size];
    1313                 :          0 :                         memcpy(compressed, old, old_size);
    1314         [ #  # ]:          0 :                         delete [] old;
    1315                 :            : 
    1316                 :          0 :                         zout.next_out = (Bytef*) (compressed + old_size);
    1317                 :          0 :                         zout.avail_out = old_size; // Sic! We doubled.
    1318                 :            : 
    1319         [ #  # ]:          0 :                         if ( deflate(&zout, Z_SYNC_FLUSH) != Z_OK )
    1320                 :            :                                 {
    1321                 :          0 :                                 error = zout.msg;
    1322                 :          0 :                                 return false;
    1323                 :            :                                 }
    1324                 :            :                         }
    1325                 :            : 
    1326                 :          0 :                 *(uint32*) zout.next_out = original_size; // uncompressed_length
    1327                 :            : 
    1328         [ #  # ]:          0 :                 delete [] chunk->data;
    1329                 :          0 :                 chunk->data = compressed;
    1330                 :            :                 chunk->len =
    1331                 :          0 :                         ((char*) zout.next_out - compressed) + sizeof(uint32);
    1332                 :            : 
    1333                 :          0 :                 DBG_LOG(DBG_CHUNKEDIO, "zlib write: size=%d compressed=%d",
    1334                 :            :                         original_size, chunk->len);
    1335                 :            :                 }
    1336                 :            : 
    1337                 :          0 :         return io->Write(chunk);
    1338                 :            :         }
    1339                 :            : 
    1340                 :          0 : void CompressedChunkedIO::Stats(char* buffer, int length)
    1341                 :            :         {
    1342                 :          0 :         const Statistics* stats = io->Stats();
    1343                 :            : 
    1344                 :            :         int i = snprintf(buffer, length, "compression=%.2f/%.2f ",
    1345                 :            :                         uncompressed_bytes_read ? double(stats->bytes_read) / uncompressed_bytes_read : -1,
    1346 [ #  # ][ #  # ]:          0 :                         uncompressed_bytes_written ? double(stats->bytes_written) / uncompressed_bytes_written : -1 );
    1347                 :            : 
    1348                 :          0 :         io->Stats(buffer + i, length - i);
    1349                 :          0 :         buffer[length-1] = '\0';
    1350 [ +  - ][ +  - ]:          6 :         }
    1351                 :          3 : 
    1352                 :            : #endif  /* HAVE_LIBZ */

Generated by: LCOV version 1.8