Home | History | Annotate | Line # | Download | only in win
      1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2  *
      3  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4  * of this software and associated documentation files (the "Software"), to
      5  * deal in the Software without restriction, including without limitation the
      6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7  * sell copies of the Software, and to permit persons to whom the Software is
      8  * furnished to do so, subject to the following conditions:
      9  *
     10  * The above copyright notice and this permission notice shall be included in
     11  * all copies or substantial portions of the Software.
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19  * IN THE SOFTWARE.
     20  */
     21 
     22 #include <assert.h>
     23 #include <io.h>
     24 #include <stdio.h>
     25 #include <stdlib.h>
     26 #include <string.h>
     27 
     28 #include "handle-inl.h"
     29 #include "internal.h"
     30 #include "req-inl.h"
     31 #include "stream-inl.h"
     32 #include "uv-common.h"
     33 #include "uv.h"
     34 
     35 #include <aclapi.h>
     36 #include <accctrl.h>
     37 
     38 /* A zero-size buffer for use by uv_pipe_read */
     39 static char uv_zero_[] = "";
     40 
     41 /* Null uv_buf_t */
     42 static const uv_buf_t uv_null_buf_ = { 0, NULL };
     43 
     44 /* The timeout that the pipe will wait for the remote end to write data when
     45  * the local ends wants to shut it down. */
     46 static const int64_t eof_timeout = 50; /* ms */
     47 
     48 static const int default_pending_pipe_instances = 4;
     49 
     50 /* Pipe prefix */
     51 static char pipe_prefix[] = "\\\\?\\pipe";
     52 static const size_t pipe_prefix_len = sizeof(pipe_prefix) - 1;
     53 
     54 /* IPC incoming xfer queue item. */
     55 typedef struct {
     56   uv__ipc_socket_xfer_type_t xfer_type;
     57   uv__ipc_socket_xfer_info_t xfer_info;
     58   struct uv__queue member;
     59 } uv__ipc_xfer_queue_item_t;
     60 
     61 /* IPC frame header flags. */
     62 /* clang-format off */
     63 enum {
     64   UV__IPC_FRAME_HAS_DATA                = 0x01,
     65   UV__IPC_FRAME_HAS_SOCKET_XFER         = 0x02,
     66   UV__IPC_FRAME_XFER_IS_TCP_CONNECTION  = 0x04,
     67   /* These are combinations of the flags above. */
     68   UV__IPC_FRAME_XFER_FLAGS              = 0x06,
     69   UV__IPC_FRAME_VALID_FLAGS             = 0x07
     70 };
     71 /* clang-format on */
     72 
     73 /* IPC frame header. */
     74 typedef struct {
     75   uint32_t flags;
     76   uint32_t reserved1;   /* Ignored. */
     77   uint32_t data_length; /* Must be zero if there is no data. */
     78   uint32_t reserved2;   /* Must be zero. */
     79 } uv__ipc_frame_header_t;
     80 
     81 /* To implement the IPC protocol correctly, these structures must have exactly
     82  * the right size. */
     83 STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
     84 STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
     85 
     86 /* Coalesced write request. */
     87 typedef struct {
     88   uv_write_t req;       /* Internal heap-allocated write request. */
     89   uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
     90 } uv__coalesced_write_t;
     91 
     92 
     93 static void eof_timer_init(uv_pipe_t* pipe);
     94 static void eof_timer_start(uv_pipe_t* pipe);
     95 static void eof_timer_stop(uv_pipe_t* pipe);
     96 static void eof_timer_cb(uv_timer_t* timer);
     97 static void eof_timer_destroy(uv_pipe_t* pipe);
     98 static void eof_timer_close_cb(uv_handle_t* handle);
     99 
    100 
    101 /* Does the file path contain embedded nul bytes? */
    102 static int includes_nul(const char *s, size_t n) {
    103   if (n == 0)
    104     return 0;
    105   return NULL != memchr(s, '\0', n);
    106 }
    107 
    108 
    109 static void uv__unique_pipe_name(unsigned long long ptr, char* name, size_t size) {
    110   snprintf(name, size, "\\\\?\\pipe\\uv\\%llu-%lu", ptr, GetCurrentProcessId());
    111 }
    112 
    113 
    114 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
    115   uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
    116 
    117   handle->reqs_pending = 0;
    118   handle->handle = INVALID_HANDLE_VALUE;
    119   handle->name = NULL;
    120   handle->pipe.conn.ipc_remote_pid = 0;
    121   handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
    122   uv__queue_init(&handle->pipe.conn.ipc_xfer_queue);
    123   handle->pipe.conn.ipc_xfer_queue_length = 0;
    124   handle->ipc = ipc;
    125   handle->pipe.conn.non_overlapped_writes_tail = NULL;
    126 
    127   return 0;
    128 }
    129 
    130 
    131 static void uv__pipe_connection_init(uv_pipe_t* handle) {
    132   assert(!(handle->flags & UV_HANDLE_PIPESERVER));
    133   uv__connection_init((uv_stream_t*) handle);
    134   handle->read_req.data = handle;
    135   handle->pipe.conn.eof_timer = NULL;
    136 }
    137 
    138 
    139 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
    140   HANDLE pipeHandle;
    141 
    142   /*
    143    * Assume that we have a duplex pipe first, so attempt to
    144    * connect with GENERIC_READ | GENERIC_WRITE.
    145    */
    146   pipeHandle = CreateFileW(name,
    147                            GENERIC_READ | GENERIC_WRITE,
    148                            0,
    149                            NULL,
    150                            OPEN_EXISTING,
    151                            FILE_FLAG_OVERLAPPED,
    152                            NULL);
    153   if (pipeHandle != INVALID_HANDLE_VALUE) {
    154     *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
    155     return pipeHandle;
    156   }
    157 
    158   /*
    159    * If the pipe is not duplex CreateFileW fails with
    160    * ERROR_ACCESS_DENIED.  In that case try to connect
    161    * as a read-only or write-only.
    162    */
    163   if (GetLastError() == ERROR_ACCESS_DENIED) {
    164     pipeHandle = CreateFileW(name,
    165                              GENERIC_READ | FILE_WRITE_ATTRIBUTES,
    166                              0,
    167                              NULL,
    168                              OPEN_EXISTING,
    169                              FILE_FLAG_OVERLAPPED,
    170                              NULL);
    171 
    172     if (pipeHandle != INVALID_HANDLE_VALUE) {
    173       *duplex_flags = UV_HANDLE_READABLE;
    174       return pipeHandle;
    175     }
    176   }
    177 
    178   if (GetLastError() == ERROR_ACCESS_DENIED) {
    179     pipeHandle = CreateFileW(name,
    180                              GENERIC_WRITE | FILE_READ_ATTRIBUTES,
    181                              0,
    182                              NULL,
    183                              OPEN_EXISTING,
    184                              FILE_FLAG_OVERLAPPED,
    185                              NULL);
    186 
    187     if (pipeHandle != INVALID_HANDLE_VALUE) {
    188       *duplex_flags = UV_HANDLE_WRITABLE;
    189       return pipeHandle;
    190     }
    191   }
    192 
    193   return INVALID_HANDLE_VALUE;
    194 }
    195 
    196 
    197 static void close_pipe(uv_pipe_t* pipe) {
    198   assert(pipe->u.fd == -1 || pipe->u.fd > 2);
    199   if (pipe->u.fd == -1)
    200     CloseHandle(pipe->handle);
    201   else
    202     _close(pipe->u.fd);
    203 
    204   pipe->u.fd = -1;
    205   pipe->handle = INVALID_HANDLE_VALUE;
    206 }
    207 
    208 
    209 static int uv__pipe_server(
    210     HANDLE* pipeHandle_ptr, DWORD access,
    211     char* name, size_t nameSize, unsigned long long random) {
    212   HANDLE pipeHandle;
    213   int err;
    214 
    215   for (;;) {
    216     uv__unique_pipe_name(random, name, nameSize);
    217 
    218     pipeHandle = CreateNamedPipeA(name,
    219       access | FILE_FLAG_FIRST_PIPE_INSTANCE,
    220       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
    221       NULL);
    222 
    223     if (pipeHandle != INVALID_HANDLE_VALUE) {
    224       /* No name collisions.  We're done. */
    225       break;
    226     }
    227 
    228     err = GetLastError();
    229     if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
    230       goto error;
    231     }
    232 
    233     /* Pipe name collision.  Increment the random number and try again. */
    234     random++;
    235   }
    236 
    237   *pipeHandle_ptr = pipeHandle;
    238 
    239   return 0;
    240 
    241  error:
    242   if (pipeHandle != INVALID_HANDLE_VALUE)
    243     CloseHandle(pipeHandle);
    244 
    245   return err;
    246 }
    247 
    248 
    249 static int uv__create_pipe_pair(
    250     HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
    251     unsigned int server_flags, unsigned int client_flags,
    252     int inherit_client, unsigned long long random) {
    253   /* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
    254   char pipe_name[64];
    255   SECURITY_ATTRIBUTES sa;
    256   DWORD server_access;
    257   DWORD client_access;
    258   HANDLE server_pipe;
    259   HANDLE client_pipe;
    260   int err;
    261 
    262   server_pipe = INVALID_HANDLE_VALUE;
    263   client_pipe = INVALID_HANDLE_VALUE;
    264 
    265   server_access = 0;
    266   if (server_flags & UV_READABLE_PIPE)
    267     server_access |= PIPE_ACCESS_INBOUND;
    268   if (server_flags & UV_WRITABLE_PIPE)
    269     server_access |= PIPE_ACCESS_OUTBOUND;
    270   if (server_flags & UV_NONBLOCK_PIPE)
    271     server_access |= FILE_FLAG_OVERLAPPED;
    272   server_access |= WRITE_DAC;
    273 
    274   client_access = 0;
    275   if (client_flags & UV_READABLE_PIPE)
    276     client_access |= GENERIC_READ;
    277   else
    278     client_access |= FILE_READ_ATTRIBUTES;
    279   if (client_flags & UV_WRITABLE_PIPE)
    280     client_access |= GENERIC_WRITE;
    281   else
    282     client_access |= FILE_WRITE_ATTRIBUTES;
    283   client_access |= WRITE_DAC;
    284 
    285   /* Create server pipe handle. */
    286   err = uv__pipe_server(&server_pipe,
    287                         server_access,
    288                         pipe_name,
    289                         sizeof(pipe_name),
    290                         random);
    291   if (err)
    292     goto error;
    293 
    294   /* Create client pipe handle. */
    295   sa.nLength = sizeof sa;
    296   sa.lpSecurityDescriptor = NULL;
    297   sa.bInheritHandle = inherit_client;
    298 
    299   client_pipe = CreateFileA(pipe_name,
    300                             client_access,
    301                             0,
    302                             &sa,
    303                             OPEN_EXISTING,
    304                             (client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
    305                             NULL);
    306   if (client_pipe == INVALID_HANDLE_VALUE) {
    307     err = GetLastError();
    308     goto error;
    309   }
    310 
    311 #ifndef NDEBUG
    312   /* Validate that the pipe was opened in the right mode. */
    313   {
    314     DWORD mode;
    315     BOOL r;
    316     r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
    317     if (r == TRUE) {
    318       assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
    319     } else {
    320       fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
    321     }
    322   }
    323 #endif
    324 
    325   /* Do a blocking ConnectNamedPipe.  This should not block because we have
    326    * both ends of the pipe created. */
    327   if (!ConnectNamedPipe(server_pipe, NULL)) {
    328     if (GetLastError() != ERROR_PIPE_CONNECTED) {
    329       err = GetLastError();
    330       goto error;
    331     }
    332   }
    333 
    334   *client_pipe_ptr = client_pipe;
    335   *server_pipe_ptr = server_pipe;
    336   return 0;
    337 
    338  error:
    339   if (server_pipe != INVALID_HANDLE_VALUE)
    340     CloseHandle(server_pipe);
    341 
    342   if (client_pipe != INVALID_HANDLE_VALUE)
    343     CloseHandle(client_pipe);
    344 
    345   return err;
    346 }
    347 
    348 
    349 int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
    350   uv_file temp[2];
    351   int err;
    352   HANDLE readh;
    353   HANDLE writeh;
    354 
    355   /* Make the server side the inbound (read) end, */
    356   /* so that both ends will have FILE_READ_ATTRIBUTES permission. */
    357   /* TODO: better source of local randomness than &fds? */
    358   read_flags |= UV_READABLE_PIPE;
    359   write_flags |= UV_WRITABLE_PIPE;
    360   err = uv__create_pipe_pair(&readh,
    361                              &writeh,
    362                              read_flags,
    363                              write_flags,
    364                              0,
    365                              (uintptr_t) &fds[0]);
    366   if (err != 0)
    367     return err;
    368   temp[0] = _open_osfhandle((intptr_t) readh, 0);
    369   if (temp[0] == -1) {
    370     if (errno == UV_EMFILE)
    371       err = UV_EMFILE;
    372     else
    373       err = UV_UNKNOWN;
    374     CloseHandle(readh);
    375     CloseHandle(writeh);
    376     return err;
    377   }
    378   temp[1] = _open_osfhandle((intptr_t) writeh, 0);
    379   if (temp[1] == -1) {
    380     if (errno == UV_EMFILE)
    381       err = UV_EMFILE;
    382     else
    383       err = UV_UNKNOWN;
    384     _close(temp[0]);
    385     CloseHandle(writeh);
    386     return err;
    387   }
    388   fds[0] = temp[0];
    389   fds[1] = temp[1];
    390   return 0;
    391 }
    392 
    393 
    394 int uv__create_stdio_pipe_pair(uv_loop_t* loop,
    395     uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
    396   /* The parent_pipe is always the server_pipe and kept by libuv.
    397    * The child_pipe is always the client_pipe and is passed to the child.
    398    * The flags are specified with respect to their usage in the child. */
    399   HANDLE server_pipe;
    400   HANDLE client_pipe;
    401   unsigned int server_flags;
    402   unsigned int client_flags;
    403   int err;
    404 
    405   uv__pipe_connection_init(parent_pipe);
    406 
    407   server_pipe = INVALID_HANDLE_VALUE;
    408   client_pipe = INVALID_HANDLE_VALUE;
    409 
    410   server_flags = 0;
    411   client_flags = 0;
    412   if (flags & UV_READABLE_PIPE) {
    413     /* The server needs inbound (read) access too, otherwise CreateNamedPipe()
    414      * won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
    415      * the state of the write buffer when we're trying to shutdown the pipe. */
    416     server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
    417     client_flags |= UV_READABLE_PIPE;
    418   }
    419   if (flags & UV_WRITABLE_PIPE) {
    420     server_flags |= UV_READABLE_PIPE;
    421     client_flags |= UV_WRITABLE_PIPE;
    422   }
    423   server_flags |= UV_NONBLOCK_PIPE;
    424   if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
    425     client_flags |= UV_NONBLOCK_PIPE;
    426   }
    427 
    428   err = uv__create_pipe_pair(&server_pipe, &client_pipe,
    429           server_flags, client_flags, 1, (uintptr_t) server_pipe);
    430   if (err)
    431     goto error;
    432 
    433   if (CreateIoCompletionPort(server_pipe,
    434                              loop->iocp,
    435                              (ULONG_PTR) parent_pipe,
    436                              0) == NULL) {
    437     err = GetLastError();
    438     goto error;
    439   }
    440 
    441   parent_pipe->handle = server_pipe;
    442   *child_pipe_ptr = client_pipe;
    443 
    444   /* The server end is now readable and/or writable. */
    445   if (flags & UV_READABLE_PIPE)
    446     parent_pipe->flags |= UV_HANDLE_WRITABLE;
    447   if (flags & UV_WRITABLE_PIPE)
    448     parent_pipe->flags |= UV_HANDLE_READABLE;
    449 
    450   return 0;
    451 
    452  error:
    453   if (server_pipe != INVALID_HANDLE_VALUE)
    454     CloseHandle(server_pipe);
    455 
    456   if (client_pipe != INVALID_HANDLE_VALUE)
    457     CloseHandle(client_pipe);
    458 
    459   return err;
    460 }
    461 
    462 
    463 static int uv__set_pipe_handle(uv_loop_t* loop,
    464                                uv_pipe_t* handle,
    465                                HANDLE pipeHandle,
    466                                int fd,
    467                                DWORD duplex_flags) {
    468   NTSTATUS nt_status;
    469   IO_STATUS_BLOCK io_status;
    470   FILE_MODE_INFORMATION mode_info;
    471   DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
    472   DWORD current_mode = 0;
    473   DWORD err = 0;
    474 
    475   assert(handle->flags & UV_HANDLE_CONNECTION);
    476   assert(!(handle->flags & UV_HANDLE_PIPESERVER));
    477   if (handle->flags & UV_HANDLE_CLOSING)
    478     return UV_EINVAL;
    479   if (handle->handle != INVALID_HANDLE_VALUE)
    480     return UV_EBUSY;
    481 
    482   if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
    483     err = GetLastError();
    484     if (err == ERROR_ACCESS_DENIED) {
    485       /*
    486        * SetNamedPipeHandleState can fail if the handle doesn't have either
    487        * GENERIC_WRITE  or FILE_WRITE_ATTRIBUTES.
    488        * But if the handle already has the desired wait and blocking modes
    489        * we can continue.
    490        */
    491       if (!GetNamedPipeHandleState(pipeHandle, &current_mode, NULL, NULL,
    492                                    NULL, NULL, 0)) {
    493         return uv_translate_sys_error(GetLastError());
    494       } else if (current_mode & PIPE_NOWAIT) {
    495         return UV_EACCES;
    496       }
    497     } else {
    498       /* If this returns ERROR_INVALID_PARAMETER we probably opened
    499        * something that is not a pipe. */
    500       if (err == ERROR_INVALID_PARAMETER) {
    501         return UV_ENOTSOCK;
    502       }
    503       return uv_translate_sys_error(err);
    504     }
    505   }
    506 
    507   /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
    508   nt_status = pNtQueryInformationFile(pipeHandle,
    509                                       &io_status,
    510                                       &mode_info,
    511                                       sizeof(mode_info),
    512                                       FileModeInformation);
    513   if (nt_status != STATUS_SUCCESS) {
    514     return uv_translate_sys_error(err);
    515   }
    516 
    517   if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
    518       mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
    519     /* Non-overlapped pipe. */
    520     handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
    521     handle->pipe.conn.readfile_thread_handle = NULL;
    522     InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
    523   } else {
    524     /* Overlapped pipe.  Try to associate with IOCP. */
    525     if (CreateIoCompletionPort(pipeHandle,
    526                                loop->iocp,
    527                                (ULONG_PTR) handle,
    528                                0) == NULL) {
    529       handle->flags |= UV_HANDLE_EMULATE_IOCP;
    530     }
    531   }
    532 
    533   handle->handle = pipeHandle;
    534   handle->u.fd = fd;
    535   handle->flags |= duplex_flags;
    536 
    537   return 0;
    538 }
    539 
    540 
    541 static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
    542                              uv_pipe_accept_t* req, BOOL firstInstance) {
    543   assert(req->pipeHandle == INVALID_HANDLE_VALUE);
    544 
    545   req->pipeHandle =
    546       CreateNamedPipeW(handle->name,
    547                        PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
    548                          (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
    549                        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
    550                        PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
    551 
    552   if (req->pipeHandle == INVALID_HANDLE_VALUE) {
    553     return 0;
    554   }
    555 
    556   /* Associate it with IOCP so we can get events. */
    557   if (CreateIoCompletionPort(req->pipeHandle,
    558                              loop->iocp,
    559                              (ULONG_PTR) handle,
    560                              0) == NULL) {
    561     uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
    562   }
    563 
    564   /* Stash a handle in the server object for use from places such as
    565    * getsockname and chmod. As we transfer ownership of these to client
    566    * objects, we'll allocate new ones here. */
    567   handle->handle = req->pipeHandle;
    568 
    569   return 1;
    570 }
    571 
    572 
    573 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
    574   uv_loop_t* loop;
    575   uv_pipe_t* handle;
    576   uv_shutdown_t* req;
    577 
    578   req = (uv_shutdown_t*) parameter;
    579   assert(req);
    580   handle = (uv_pipe_t*) req->handle;
    581   assert(handle);
    582   loop = handle->loop;
    583   assert(loop);
    584 
    585   FlushFileBuffers(handle->handle);
    586 
    587   /* Post completed */
    588   POST_COMPLETION_FOR_REQ(loop, req);
    589 
    590   return 0;
    591 }
    592 
    593 
    594 void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
    595   DWORD result;
    596   NTSTATUS nt_status;
    597   IO_STATUS_BLOCK io_status;
    598   FILE_PIPE_LOCAL_INFORMATION pipe_info;
    599 
    600   assert(handle->flags & UV_HANDLE_CONNECTION);
    601   assert(req != NULL);
    602   assert(handle->stream.conn.write_reqs_pending == 0);
    603   SET_REQ_SUCCESS(req);
    604 
    605   if (handle->flags & UV_HANDLE_CLOSING) {
    606     uv__insert_pending_req(loop, (uv_req_t*) req);
    607     return;
    608   }
    609 
    610   /* Try to avoid flushing the pipe buffer in the thread pool. */
    611   nt_status = pNtQueryInformationFile(handle->handle,
    612                                       &io_status,
    613                                       &pipe_info,
    614                                       sizeof pipe_info,
    615                                       FilePipeLocalInformation);
    616 
    617   if (nt_status != STATUS_SUCCESS) {
    618     SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
    619     handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
    620     uv__insert_pending_req(loop, (uv_req_t*) req);
    621     return;
    622   }
    623 
    624   if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
    625     /* Short-circuit, no need to call FlushFileBuffers:
    626      * all writes have been read. */
    627     uv__insert_pending_req(loop, (uv_req_t*) req);
    628     return;
    629   }
    630 
    631   /* Run FlushFileBuffers in the thread pool. */
    632   result = QueueUserWorkItem(pipe_shutdown_thread_proc,
    633                              req,
    634                              WT_EXECUTELONGFUNCTION);
    635   if (!result) {
    636     SET_REQ_ERROR(req, GetLastError());
    637     handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
    638     uv__insert_pending_req(loop, (uv_req_t*) req);
    639     return;
    640   }
    641 }
    642 
    643 
    644 void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
    645   uv__ipc_xfer_queue_item_t* xfer_queue_item;
    646 
    647   assert(handle->reqs_pending == 0);
    648   assert(handle->flags & UV_HANDLE_CLOSING);
    649   assert(!(handle->flags & UV_HANDLE_CLOSED));
    650 
    651   if (handle->flags & UV_HANDLE_CONNECTION) {
    652     /* Free pending sockets */
    653     while (!uv__queue_empty(&handle->pipe.conn.ipc_xfer_queue)) {
    654       struct uv__queue* q;
    655       SOCKET socket;
    656 
    657       q = uv__queue_head(&handle->pipe.conn.ipc_xfer_queue);
    658       uv__queue_remove(q);
    659       xfer_queue_item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
    660 
    661       /* Materialize socket and close it */
    662       socket = WSASocketW(FROM_PROTOCOL_INFO,
    663                           FROM_PROTOCOL_INFO,
    664                           FROM_PROTOCOL_INFO,
    665                           &xfer_queue_item->xfer_info.socket_info,
    666                           0,
    667                           WSA_FLAG_OVERLAPPED);
    668       uv__free(xfer_queue_item);
    669 
    670       if (socket != INVALID_SOCKET)
    671         closesocket(socket);
    672     }
    673     handle->pipe.conn.ipc_xfer_queue_length = 0;
    674 
    675     assert(handle->read_req.wait_handle == INVALID_HANDLE_VALUE);
    676     if (handle->read_req.event_handle != NULL) {
    677       CloseHandle(handle->read_req.event_handle);
    678       handle->read_req.event_handle = NULL;
    679     }
    680 
    681     if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
    682       DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
    683   }
    684 
    685   if (handle->flags & UV_HANDLE_PIPESERVER) {
    686     assert(handle->pipe.serv.accept_reqs);
    687     uv__free(handle->pipe.serv.accept_reqs);
    688     handle->pipe.serv.accept_reqs = NULL;
    689   }
    690 
    691   uv__handle_close(handle);
    692 }
    693 
    694 
    695 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
    696   if (handle->flags & UV_HANDLE_BOUND)
    697     return;
    698   handle->pipe.serv.pending_instances = count;
    699   handle->flags |= UV_HANDLE_PIPESERVER;
    700 }
    701 
    702 
    703 /* Creates a pipe server. */
    704 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
    705   return uv_pipe_bind2(handle, name, strlen(name), 0);
    706 }
    707 
    708 
    709 int uv_pipe_bind2(uv_pipe_t* handle,
    710                   const char* name,
    711                   size_t namelen,
    712                   unsigned int flags) {
    713   uv_loop_t* loop = handle->loop;
    714   int i, err;
    715   uv_pipe_accept_t* req;
    716   char* name_copy;
    717 
    718   if (flags & ~UV_PIPE_NO_TRUNCATE) {
    719     return UV_EINVAL;
    720   }
    721 
    722   if (name == NULL) {
    723     return UV_EINVAL;
    724   }
    725 
    726   if (namelen == 0) {
    727     return UV_EINVAL;
    728   }
    729 
    730   if (includes_nul(name, namelen)) {
    731     return UV_EINVAL;
    732   }
    733 
    734   if (handle->flags & UV_HANDLE_BOUND) {
    735     return UV_EINVAL;
    736   }
    737 
    738   if (uv__is_closing(handle)) {
    739     return UV_EINVAL;
    740   }
    741 
    742   name_copy = uv__malloc(namelen + 1);
    743   if (name_copy == NULL) {
    744     return UV_ENOMEM;
    745   }
    746 
    747   memcpy(name_copy, name, namelen);
    748   name_copy[namelen] = '\0';
    749 
    750   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
    751     handle->pipe.serv.pending_instances = default_pending_pipe_instances;
    752   }
    753 
    754   err = UV_ENOMEM;
    755   handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
    756     uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
    757   if (handle->pipe.serv.accept_reqs == NULL) {
    758     goto error;
    759   }
    760 
    761   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
    762     req = &handle->pipe.serv.accept_reqs[i];
    763     UV_REQ_INIT(req, UV_ACCEPT);
    764     req->data = handle;
    765     req->pipeHandle = INVALID_HANDLE_VALUE;
    766     req->next_pending = NULL;
    767   }
    768 
    769   /* TODO(bnoordhuis) Add converters that take a |length| parameter. */
    770   err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
    771   uv__free(name_copy);
    772   name_copy = NULL;
    773 
    774   if (err) {
    775     goto error;
    776   }
    777 
    778   /*
    779    * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
    780    * If this fails then there's already a pipe server for the given pipe name.
    781    */
    782   if (!pipe_alloc_accept(loop,
    783                          handle,
    784                          &handle->pipe.serv.accept_reqs[0],
    785                          TRUE)) {
    786     err = GetLastError();
    787     if (err == ERROR_ACCESS_DENIED) {
    788       err = UV_EADDRINUSE;
    789     } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
    790       err = UV_EACCES;
    791     } else {
    792       err = uv_translate_sys_error(err);
    793     }
    794     goto error;
    795   }
    796 
    797   handle->pipe.serv.pending_accepts = NULL;
    798   handle->flags |= UV_HANDLE_PIPESERVER;
    799   handle->flags |= UV_HANDLE_BOUND;
    800 
    801   return 0;
    802 
    803 error:
    804   uv__free(handle->pipe.serv.accept_reqs);
    805   uv__free(handle->name);
    806   uv__free(name_copy);
    807   handle->pipe.serv.accept_reqs = NULL;
    808   handle->name = NULL;
    809 
    810   return err;
    811 }
    812 
    813 
    814 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
    815   uv_loop_t* loop;
    816   uv_pipe_t* handle;
    817   uv_connect_t* req;
    818   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
    819   DWORD duplex_flags;
    820 
    821   req = (uv_connect_t*) parameter;
    822   assert(req);
    823   handle = (uv_pipe_t*) req->handle;
    824   assert(handle);
    825   loop = handle->loop;
    826   assert(loop);
    827 
    828   /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
    829    * up to 30 seconds for the pipe to become available with WaitNamedPipe. */
    830   while (WaitNamedPipeW(req->u.connect.name, 30000)) {
    831     /* The pipe is now available, try to connect. */
    832     pipeHandle = open_named_pipe(req->u.connect.name, &duplex_flags);
    833     if (pipeHandle != INVALID_HANDLE_VALUE)
    834       break;
    835 
    836     SwitchToThread();
    837   }
    838 
    839   uv__free(req->u.connect.name);
    840   req->u.connect.name = NULL;
    841   if (pipeHandle != INVALID_HANDLE_VALUE) {
    842     SET_REQ_SUCCESS(req);
    843     req->u.connect.pipeHandle = pipeHandle;
    844     req->u.connect.duplex_flags = duplex_flags;
    845   } else {
    846     SET_REQ_ERROR(req, GetLastError());
    847   }
    848 
    849   /* Post completed */
    850   POST_COMPLETION_FOR_REQ(loop, req);
    851 
    852   return 0;
    853 }
    854 
    855 
    856 void uv_pipe_connect(uv_connect_t* req,
    857                     uv_pipe_t* handle,
    858                     const char* name,
    859                     uv_connect_cb cb) {
    860   uv_loop_t* loop;
    861   int err;
    862 
    863   err = uv_pipe_connect2(req, handle, name, strlen(name), 0, cb);
    864 
    865   if (err) {
    866     loop = handle->loop;
    867     /* Make this req pending reporting an error. */
    868     SET_REQ_ERROR(req, err);
    869     uv__insert_pending_req(loop, (uv_req_t*) req);
    870     handle->reqs_pending++;
    871     REGISTER_HANDLE_REQ(loop, handle);
    872   }
    873 }
    874 
    875 
    876 int uv_pipe_connect2(uv_connect_t* req,
    877                      uv_pipe_t* handle,
    878                      const char* name,
    879                      size_t namelen,
    880                      unsigned int flags,
    881                      uv_connect_cb cb) {
    882   uv_loop_t* loop;
    883   int err;
    884   size_t nameSize;
    885   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
    886   DWORD duplex_flags;
    887   char* name_copy;
    888 
    889   loop = handle->loop;
    890   UV_REQ_INIT(req, UV_CONNECT);
    891   req->handle = (uv_stream_t*) handle;
    892   req->cb = cb;
    893   req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
    894   req->u.connect.duplex_flags = 0;
    895   req->u.connect.name = NULL;
    896 
    897   if (flags & ~UV_PIPE_NO_TRUNCATE) {
    898     return UV_EINVAL;
    899   }
    900 
    901   if (name == NULL) {
    902     return UV_EINVAL;
    903   }
    904 
    905   if (namelen == 0) {
    906     return UV_EINVAL;
    907   }
    908 
    909   if (includes_nul(name, namelen)) {
    910     return UV_EINVAL;
    911   }
    912 
    913   name_copy = uv__malloc(namelen + 1);
    914   if (name_copy == NULL) {
    915     return UV_ENOMEM;
    916   }
    917 
    918   memcpy(name_copy, name, namelen);
    919   name_copy[namelen] = '\0';
    920 
    921   if (handle->flags & UV_HANDLE_PIPESERVER) {
    922     err = ERROR_INVALID_PARAMETER;
    923     goto error;
    924   }
    925   if (handle->flags & UV_HANDLE_CONNECTION) {
    926     err = ERROR_PIPE_BUSY;
    927     goto error;
    928   }
    929   uv__pipe_connection_init(handle);
    930 
    931   /* TODO(bnoordhuis) Add converters that take a |length| parameter. */
    932   err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
    933   uv__free(name_copy);
    934   name_copy = NULL;
    935 
    936   if (err) {
    937     err = ERROR_NO_UNICODE_TRANSLATION;
    938     goto error;
    939   }
    940 
    941   pipeHandle = open_named_pipe(handle->name, &duplex_flags);
    942   if (pipeHandle == INVALID_HANDLE_VALUE) {
    943     if (GetLastError() == ERROR_PIPE_BUSY) {
    944       nameSize = (wcslen(handle->name) + 1) * sizeof(WCHAR);
    945       req->u.connect.name = uv__malloc(nameSize);
    946       if (!req->u.connect.name) {
    947         uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
    948       }
    949 
    950       memcpy(req->u.connect.name, handle->name, nameSize);
    951 
    952       /* Wait for the server to make a pipe instance available. */
    953       if (!QueueUserWorkItem(&pipe_connect_thread_proc,
    954                              req,
    955                              WT_EXECUTELONGFUNCTION)) {
    956         uv__free(req->u.connect.name);
    957         req->u.connect.name = NULL;
    958         err = GetLastError();
    959         goto error;
    960       }
    961 
    962       REGISTER_HANDLE_REQ(loop, handle);
    963       handle->reqs_pending++;
    964 
    965       return 0;
    966     }
    967 
    968     err = GetLastError();
    969     goto error;
    970   }
    971 
    972   req->u.connect.pipeHandle = pipeHandle;
    973   req->u.connect.duplex_flags = duplex_flags;
    974   SET_REQ_SUCCESS(req);
    975   uv__insert_pending_req(loop, (uv_req_t*) req);
    976   handle->reqs_pending++;
    977   REGISTER_HANDLE_REQ(loop, handle);
    978   return 0;
    979 
    980 error:
    981   uv__free(name_copy);
    982 
    983   if (handle->name) {
    984     uv__free(handle->name);
    985     handle->name = NULL;
    986   }
    987 
    988   if (pipeHandle != INVALID_HANDLE_VALUE)
    989     CloseHandle(pipeHandle);
    990 
    991   /* Make this req pending reporting an error. */
    992   SET_REQ_ERROR(req, err);
    993   uv__insert_pending_req(loop, (uv_req_t*) req);
    994   handle->reqs_pending++;
    995   REGISTER_HANDLE_REQ(loop, handle);
    996   return 0;
    997 }
    998 
    999 
   1000 void uv__pipe_interrupt_read(uv_pipe_t* handle) {
   1001   BOOL r;
   1002 
   1003   if (!(handle->flags & UV_HANDLE_READ_PENDING))
   1004     return; /* No pending reads. */
   1005   if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
   1006     return; /* Already cancelled. */
   1007   if (handle->handle == INVALID_HANDLE_VALUE)
   1008     return; /* Pipe handle closed. */
   1009 
   1010   if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
   1011     /* Cancel asynchronous read. */
   1012     r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
   1013     assert(r || GetLastError() == ERROR_NOT_FOUND);
   1014     (void) r;
   1015   } else {
   1016     /* Cancel synchronous read (which is happening in the thread pool). */
   1017     HANDLE thread;
   1018     volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
   1019 
   1020     EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
   1021 
   1022     thread = *thread_ptr;
   1023     if (thread == NULL) {
   1024       /* The thread pool thread has not yet reached the point of blocking, we
   1025        * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
   1026       *thread_ptr = INVALID_HANDLE_VALUE;
   1027 
   1028     } else {
   1029       /* Spin until the thread has acknowledged (by setting the thread to
   1030        * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
   1031       while (thread != INVALID_HANDLE_VALUE) {
   1032         r = CancelSynchronousIo(thread);
   1033         assert(r || GetLastError() == ERROR_NOT_FOUND);
   1034         SwitchToThread(); /* Yield thread. */
   1035         thread = *thread_ptr;
   1036       }
   1037     }
   1038 
   1039     LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
   1040   }
   1041 
   1042   /* Set flag to indicate that read has been cancelled. */
   1043   handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
   1044 }
   1045 
   1046 
   1047 void uv__pipe_read_stop(uv_pipe_t* handle) {
   1048   handle->flags &= ~UV_HANDLE_READING;
   1049   DECREASE_ACTIVE_COUNT(handle->loop, handle);
   1050   uv__pipe_interrupt_read(handle);
   1051 }
   1052 
   1053 
   1054 /* Cleans up uv_pipe_t (server or connection) and all resources associated with
   1055  * it. */
   1056 void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
   1057   int i;
   1058   HANDLE pipeHandle;
   1059 
   1060   if (handle->flags & UV_HANDLE_READING) {
   1061     handle->flags &= ~UV_HANDLE_READING;
   1062     DECREASE_ACTIVE_COUNT(loop, handle);
   1063   }
   1064 
   1065   if (handle->flags & UV_HANDLE_LISTENING) {
   1066     handle->flags &= ~UV_HANDLE_LISTENING;
   1067     DECREASE_ACTIVE_COUNT(loop, handle);
   1068   }
   1069 
   1070   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
   1071 
   1072   uv__handle_closing(handle);
   1073 
   1074   uv__pipe_interrupt_read(handle);
   1075 
   1076   if (handle->name) {
   1077     uv__free(handle->name);
   1078     handle->name = NULL;
   1079   }
   1080 
   1081   if (handle->flags & UV_HANDLE_PIPESERVER) {
   1082     for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
   1083       pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
   1084       if (pipeHandle != INVALID_HANDLE_VALUE) {
   1085         CloseHandle(pipeHandle);
   1086         handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
   1087       }
   1088     }
   1089     handle->handle = INVALID_HANDLE_VALUE;
   1090   }
   1091 
   1092   if (handle->flags & UV_HANDLE_CONNECTION) {
   1093     eof_timer_destroy(handle);
   1094   }
   1095 
   1096   if ((handle->flags & UV_HANDLE_CONNECTION)
   1097       && handle->handle != INVALID_HANDLE_VALUE) {
   1098     /* This will eventually destroy the write queue for us too. */
   1099     close_pipe(handle);
   1100   }
   1101 
   1102   if (handle->reqs_pending == 0)
   1103     uv__want_endgame(loop, (uv_handle_t*) handle);
   1104 }
   1105 
   1106 
   1107 static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
   1108     uv_pipe_accept_t* req, BOOL firstInstance) {
   1109   assert(handle->flags & UV_HANDLE_LISTENING);
   1110 
   1111   if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
   1112     SET_REQ_ERROR(req, GetLastError());
   1113     uv__insert_pending_req(loop, (uv_req_t*) req);
   1114     handle->reqs_pending++;
   1115     return;
   1116   }
   1117 
   1118   assert(req->pipeHandle != INVALID_HANDLE_VALUE);
   1119 
   1120   /* Prepare the overlapped structure. */
   1121   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
   1122 
   1123   if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
   1124       GetLastError() != ERROR_IO_PENDING) {
   1125     if (GetLastError() == ERROR_PIPE_CONNECTED) {
   1126       SET_REQ_SUCCESS(req);
   1127     } else {
   1128       CloseHandle(req->pipeHandle);
   1129       req->pipeHandle = INVALID_HANDLE_VALUE;
   1130       /* Make this req pending reporting an error. */
   1131       SET_REQ_ERROR(req, GetLastError());
   1132     }
   1133     uv__insert_pending_req(loop, (uv_req_t*) req);
   1134     handle->reqs_pending++;
   1135     return;
   1136   }
   1137 
   1138   /* Wait for completion via IOCP */
   1139   handle->reqs_pending++;
   1140 }
   1141 
   1142 
   1143 int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
   1144   uv_loop_t* loop = server->loop;
   1145   uv_pipe_t* pipe_client;
   1146   uv_pipe_accept_t* req;
   1147   struct uv__queue* q;
   1148   uv__ipc_xfer_queue_item_t* item;
   1149   int err;
   1150 
   1151   if (server->ipc) {
   1152     if (uv__queue_empty(&server->pipe.conn.ipc_xfer_queue)) {
   1153       /* No valid pending sockets. */
   1154       return WSAEWOULDBLOCK;
   1155     }
   1156 
   1157     q = uv__queue_head(&server->pipe.conn.ipc_xfer_queue);
   1158     uv__queue_remove(q);
   1159     server->pipe.conn.ipc_xfer_queue_length--;
   1160     item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
   1161 
   1162     err = uv__tcp_xfer_import(
   1163         (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
   1164 
   1165     uv__free(item);
   1166 
   1167     if (err != 0)
   1168       return err;
   1169 
   1170   } else {
   1171     pipe_client = (uv_pipe_t*) client;
   1172     uv__pipe_connection_init(pipe_client);
   1173 
   1174     /* Find a connection instance that has been connected, but not yet
   1175      * accepted. */
   1176     req = server->pipe.serv.pending_accepts;
   1177 
   1178     if (!req) {
   1179       /* No valid connections found, so we error out. */
   1180       return WSAEWOULDBLOCK;
   1181     }
   1182 
   1183     /* Initialize the client handle and copy the pipeHandle to the client */
   1184     pipe_client->handle = req->pipeHandle;
   1185     pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
   1186 
   1187     /* Prepare the req to pick up a new connection */
   1188     server->pipe.serv.pending_accepts = req->next_pending;
   1189     req->next_pending = NULL;
   1190     req->pipeHandle = INVALID_HANDLE_VALUE;
   1191 
   1192     server->handle = INVALID_HANDLE_VALUE;
   1193     if (!(server->flags & UV_HANDLE_CLOSING)) {
   1194       uv__pipe_queue_accept(loop, server, req, FALSE);
   1195     }
   1196   }
   1197 
   1198   return 0;
   1199 }
   1200 
   1201 
   1202 /* Starts listening for connections for the given pipe. */
   1203 int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
   1204   uv_loop_t* loop = handle->loop;
   1205   int i;
   1206 
   1207   if (handle->flags & UV_HANDLE_LISTENING) {
   1208     handle->stream.serv.connection_cb = cb;
   1209   }
   1210 
   1211   if (!(handle->flags & UV_HANDLE_BOUND)) {
   1212     return WSAEINVAL;
   1213   }
   1214 
   1215   if (handle->flags & UV_HANDLE_READING) {
   1216     return WSAEISCONN;
   1217   }
   1218 
   1219   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
   1220     return ERROR_NOT_SUPPORTED;
   1221   }
   1222 
   1223   if (handle->ipc) {
   1224     return WSAEINVAL;
   1225   }
   1226 
   1227   handle->flags |= UV_HANDLE_LISTENING;
   1228   INCREASE_ACTIVE_COUNT(loop, handle);
   1229   handle->stream.serv.connection_cb = cb;
   1230 
   1231   /* First pipe handle should have already been created in uv_pipe_bind */
   1232   assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
   1233 
   1234   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
   1235     uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
   1236   }
   1237 
   1238   return 0;
   1239 }
   1240 
   1241 
   1242 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
   1243   uv_read_t* req = (uv_read_t*) arg;
   1244   uv_pipe_t* handle = (uv_pipe_t*) req->data;
   1245   uv_loop_t* loop = handle->loop;
   1246   volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
   1247   CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
   1248   HANDLE thread;
   1249   DWORD bytes;
   1250   DWORD err;
   1251 
   1252   assert(req->type == UV_READ);
   1253   assert(handle->type == UV_NAMED_PIPE);
   1254 
   1255   err = 0;
   1256 
   1257   /* Create a handle to the current thread. */
   1258   if (!DuplicateHandle(GetCurrentProcess(),
   1259                        GetCurrentThread(),
   1260                        GetCurrentProcess(),
   1261                        &thread,
   1262                        0,
   1263                        FALSE,
   1264                        DUPLICATE_SAME_ACCESS)) {
   1265     err = GetLastError();
   1266     goto out1;
   1267   }
   1268 
   1269   /* The lock needs to be held when thread handle is modified. */
   1270   EnterCriticalSection(lock);
   1271   if (*thread_ptr == INVALID_HANDLE_VALUE) {
   1272     /* uv__pipe_interrupt_read() cancelled reading before we got here. */
   1273     err = ERROR_OPERATION_ABORTED;
   1274   } else {
   1275     /* Let main thread know which worker thread is doing the blocking read. */
   1276     assert(*thread_ptr == NULL);
   1277     *thread_ptr = thread;
   1278   }
   1279   LeaveCriticalSection(lock);
   1280 
   1281   if (err)
   1282     goto out2;
   1283 
   1284   /* Block the thread until data is available on the pipe, or the read is
   1285    * cancelled. */
   1286   if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
   1287     err = GetLastError();
   1288 
   1289   /* Let the main thread know the worker is past the point of blocking. */
   1290   assert(thread == *thread_ptr);
   1291   *thread_ptr = INVALID_HANDLE_VALUE;
   1292 
   1293   /* Briefly acquire the mutex. Since the main thread holds the lock while it
   1294    * is spinning trying to cancel this thread's I/O, we will block here until
   1295    * it stops doing that. */
   1296   EnterCriticalSection(lock);
   1297   LeaveCriticalSection(lock);
   1298 
   1299 out2:
   1300   /* Close the handle to the current thread. */
   1301   CloseHandle(thread);
   1302 
   1303 out1:
   1304   /* Set request status and post a completion record to the IOCP. */
   1305   if (err)
   1306     SET_REQ_ERROR(req, err);
   1307   else
   1308     SET_REQ_SUCCESS(req);
   1309   POST_COMPLETION_FOR_REQ(loop, req);
   1310 
   1311   return 0;
   1312 }
   1313 
   1314 
   1315 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
   1316   int result;
   1317   DWORD bytes;
   1318   uv_write_t* req = (uv_write_t*) parameter;
   1319   uv_pipe_t* handle = (uv_pipe_t*) req->handle;
   1320   uv_loop_t* loop = handle->loop;
   1321 
   1322   assert(req != NULL);
   1323   assert(req->type == UV_WRITE);
   1324   assert(handle->type == UV_NAMED_PIPE);
   1325 
   1326   result = WriteFile(handle->handle,
   1327                      req->write_buffer.base,
   1328                      req->write_buffer.len,
   1329                      &bytes,
   1330                      NULL);
   1331 
   1332   if (!result) {
   1333     SET_REQ_ERROR(req, GetLastError());
   1334   }
   1335 
   1336   POST_COMPLETION_FOR_REQ(loop, req);
   1337   return 0;
   1338 }
   1339 
   1340 
   1341 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
   1342   uv_read_t* req;
   1343   uv_tcp_t* handle;
   1344 
   1345   req = (uv_read_t*) context;
   1346   assert(req != NULL);
   1347   handle = (uv_tcp_t*)req->data;
   1348   assert(handle != NULL);
   1349   assert(!timed_out);
   1350 
   1351   if (!PostQueuedCompletionStatus(handle->loop->iocp,
   1352                                   req->u.io.overlapped.InternalHigh,
   1353                                   0,
   1354                                   &req->u.io.overlapped)) {
   1355     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
   1356   }
   1357 }
   1358 
   1359 
   1360 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
   1361   uv_write_t* req;
   1362   uv_tcp_t* handle;
   1363 
   1364   req = (uv_write_t*) context;
   1365   assert(req != NULL);
   1366   handle = (uv_tcp_t*)req->handle;
   1367   assert(handle != NULL);
   1368   assert(!timed_out);
   1369 
   1370   if (!PostQueuedCompletionStatus(handle->loop->iocp,
   1371                                   req->u.io.overlapped.InternalHigh,
   1372                                   0,
   1373                                   &req->u.io.overlapped)) {
   1374     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
   1375   }
   1376 }
   1377 
   1378 
   1379 static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
   1380   uv_read_t* req;
   1381   int result;
   1382 
   1383   assert(handle->flags & UV_HANDLE_READING);
   1384   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
   1385 
   1386   assert(handle->handle != INVALID_HANDLE_VALUE);
   1387 
   1388   req = &handle->read_req;
   1389 
   1390   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
   1391     handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
   1392     if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
   1393                            req,
   1394                            WT_EXECUTELONGFUNCTION)) {
   1395       /* Make this req pending reporting an error. */
   1396       SET_REQ_ERROR(req, GetLastError());
   1397       goto error;
   1398     }
   1399   } else {
   1400     memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
   1401     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
   1402       assert(req->event_handle != NULL);
   1403       req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
   1404     }
   1405 
   1406     /* Do 0-read */
   1407     result = ReadFile(handle->handle,
   1408                       &uv_zero_,
   1409                       0,
   1410                       NULL,
   1411                       &req->u.io.overlapped);
   1412 
   1413     if (!result && GetLastError() != ERROR_IO_PENDING) {
   1414       /* Make this req pending reporting an error. */
   1415       SET_REQ_ERROR(req, GetLastError());
   1416       goto error;
   1417     }
   1418 
   1419     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
   1420       assert(req->wait_handle == INVALID_HANDLE_VALUE);
   1421       if (!RegisterWaitForSingleObject(&req->wait_handle,
   1422           req->event_handle, post_completion_read_wait, (void*) req,
   1423           INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
   1424         SET_REQ_ERROR(req, GetLastError());
   1425         goto error;
   1426       }
   1427     }
   1428   }
   1429 
   1430   /* Start the eof timer if there is one */
   1431   eof_timer_start(handle);
   1432   handle->flags |= UV_HANDLE_READ_PENDING;
   1433   handle->reqs_pending++;
   1434   return;
   1435 
   1436 error:
   1437   uv__insert_pending_req(loop, (uv_req_t*)req);
   1438   handle->flags |= UV_HANDLE_READ_PENDING;
   1439   handle->reqs_pending++;
   1440 }
   1441 
   1442 
   1443 int uv__pipe_read_start(uv_pipe_t* handle,
   1444                         uv_alloc_cb alloc_cb,
   1445                         uv_read_cb read_cb) {
   1446   uv_loop_t* loop = handle->loop;
   1447 
   1448   handle->flags |= UV_HANDLE_READING;
   1449   INCREASE_ACTIVE_COUNT(loop, handle);
   1450   handle->read_cb = read_cb;
   1451   handle->alloc_cb = alloc_cb;
   1452 
   1453   if (handle->read_req.event_handle == NULL) {
   1454     handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
   1455     if (handle->read_req.event_handle == NULL) {
   1456       uv_fatal_error(GetLastError(), "CreateEvent");
   1457     }
   1458   }
   1459 
   1460   /* If reading was stopped and then started again, there could still be a read
   1461    * request pending. */
   1462   if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
   1463     uv__pipe_queue_read(loop, handle);
   1464   }
   1465 
   1466   return 0;
   1467 }
   1468 
   1469 
   1470 static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
   1471     uv_write_t* req) {
   1472   req->next_req = NULL;
   1473   if (handle->pipe.conn.non_overlapped_writes_tail) {
   1474     req->next_req =
   1475       handle->pipe.conn.non_overlapped_writes_tail->next_req;
   1476     handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
   1477     handle->pipe.conn.non_overlapped_writes_tail = req;
   1478   } else {
   1479     req->next_req = (uv_req_t*)req;
   1480     handle->pipe.conn.non_overlapped_writes_tail = req;
   1481   }
   1482 }
   1483 
   1484 
   1485 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
   1486   uv_write_t* req;
   1487 
   1488   if (handle->pipe.conn.non_overlapped_writes_tail) {
   1489     req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
   1490 
   1491     if (req == handle->pipe.conn.non_overlapped_writes_tail) {
   1492       handle->pipe.conn.non_overlapped_writes_tail = NULL;
   1493     } else {
   1494       handle->pipe.conn.non_overlapped_writes_tail->next_req =
   1495         req->next_req;
   1496     }
   1497 
   1498     return req;
   1499   } else {
   1500     /* queue empty */
   1501     return NULL;
   1502   }
   1503 }
   1504 
   1505 
   1506 static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
   1507   uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
   1508   if (req) {
   1509     if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
   1510                            req,
   1511                            WT_EXECUTELONGFUNCTION)) {
   1512       uv_fatal_error(GetLastError(), "QueueUserWorkItem");
   1513     }
   1514   }
   1515 }
   1516 
   1517 
   1518 static int uv__build_coalesced_write_req(uv_write_t* user_req,
   1519                                          const uv_buf_t bufs[],
   1520                                          size_t nbufs,
   1521                                          uv_write_t** req_out,
   1522                                          uv_buf_t* write_buf_out) {
   1523   /* Pack into a single heap-allocated buffer:
   1524    *   (a) a uv_write_t structure where libuv stores the actual state.
   1525    *   (b) a pointer to the original uv_write_t.
   1526    *   (c) data from all `bufs` entries.
   1527    */
   1528   char* heap_buffer;
   1529   size_t heap_buffer_length, heap_buffer_offset;
   1530   uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
   1531   char* data_start;                           /* (c) */
   1532   size_t data_length;
   1533   unsigned int i;
   1534 
   1535   /* Compute combined size of all combined buffers from `bufs`. */
   1536   data_length = 0;
   1537   for (i = 0; i < nbufs; i++)
   1538     data_length += bufs[i].len;
   1539 
   1540   /* The total combined size of data buffers should not exceed UINT32_MAX,
   1541    * because WriteFile() won't accept buffers larger than that. */
   1542   if (data_length > UINT32_MAX)
   1543     return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
   1544 
   1545   /* Compute heap buffer size. */
   1546   heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
   1547                        data_length;                  /* (c) */
   1548 
   1549   /* Allocate buffer. */
   1550   heap_buffer = uv__malloc(heap_buffer_length);
   1551   if (heap_buffer == NULL)
   1552     return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
   1553 
   1554   /* Copy uv_write_t information to the buffer. */
   1555   coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
   1556   coalesced_write_req->req = *user_req; /* copy (a) */
   1557   coalesced_write_req->req.coalesced = 1;
   1558   coalesced_write_req->user_req = user_req;         /* copy (b) */
   1559   heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
   1560 
   1561   /* Copy data buffers to the heap buffer. */
   1562   data_start = &heap_buffer[heap_buffer_offset];
   1563   for (i = 0; i < nbufs; i++) {
   1564     memcpy(&heap_buffer[heap_buffer_offset],
   1565            bufs[i].base,
   1566            bufs[i].len);               /* copy (c) */
   1567     heap_buffer_offset += bufs[i].len; /* offset (c) */
   1568   }
   1569   assert(heap_buffer_offset == heap_buffer_length);
   1570 
   1571   /* Set out arguments and return. */
   1572   *req_out = &coalesced_write_req->req;
   1573   *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
   1574   return 0;
   1575 }
   1576 
   1577 
   1578 static int uv__pipe_write_data(uv_loop_t* loop,
   1579                                uv_write_t* req,
   1580                                uv_pipe_t* handle,
   1581                                const uv_buf_t bufs[],
   1582                                size_t nbufs,
   1583                                uv_write_cb cb,
   1584                                int copy_always) {
   1585   int err;
   1586   int result;
   1587   uv_buf_t write_buf;
   1588 
   1589   assert(handle->handle != INVALID_HANDLE_VALUE);
   1590 
   1591   UV_REQ_INIT(req, UV_WRITE);
   1592   req->handle = (uv_stream_t*) handle;
   1593   req->send_handle = NULL;
   1594   req->cb = cb;
   1595   /* Private fields. */
   1596   req->coalesced = 0;
   1597   req->event_handle = NULL;
   1598   req->wait_handle = INVALID_HANDLE_VALUE;
   1599 
   1600   /* Prepare the overlapped structure. */
   1601   memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
   1602   if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
   1603     req->event_handle = CreateEvent(NULL, 0, 0, NULL);
   1604     if (req->event_handle == NULL) {
   1605       uv_fatal_error(GetLastError(), "CreateEvent");
   1606     }
   1607     req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
   1608   }
   1609   req->write_buffer = uv_null_buf_;
   1610 
   1611   if (nbufs == 0) {
   1612     /* Write empty buffer. */
   1613     write_buf = uv_null_buf_;
   1614   } else if (nbufs == 1 && !copy_always) {
   1615     /* Write directly from bufs[0]. */
   1616     write_buf = bufs[0];
   1617   } else {
   1618     /* Coalesce all `bufs` into one big buffer. This also creates a new
   1619      * write-request structure that replaces the old one. */
   1620     err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
   1621     if (err != 0)
   1622       return err;
   1623   }
   1624 
   1625   if ((handle->flags &
   1626       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
   1627       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
   1628     DWORD bytes;
   1629     result =
   1630         WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
   1631 
   1632     if (!result) {
   1633       err = GetLastError();
   1634       return err;
   1635     } else {
   1636       /* Request completed immediately. */
   1637       req->u.io.queued_bytes = 0;
   1638     }
   1639 
   1640     REGISTER_HANDLE_REQ(loop, handle);
   1641     handle->reqs_pending++;
   1642     handle->stream.conn.write_reqs_pending++;
   1643     POST_COMPLETION_FOR_REQ(loop, req);
   1644     return 0;
   1645   } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
   1646     req->write_buffer = write_buf;
   1647     uv__insert_non_overlapped_write_req(handle, req);
   1648     if (handle->stream.conn.write_reqs_pending == 0) {
   1649       uv__queue_non_overlapped_write(handle);
   1650     }
   1651 
   1652     /* Request queued by the kernel. */
   1653     req->u.io.queued_bytes = write_buf.len;
   1654     handle->write_queue_size += req->u.io.queued_bytes;
   1655   } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
   1656     /* Using overlapped IO, but wait for completion before returning */
   1657     result = WriteFile(handle->handle,
   1658                        write_buf.base,
   1659                        write_buf.len,
   1660                        NULL,
   1661                        &req->u.io.overlapped);
   1662 
   1663     if (!result && GetLastError() != ERROR_IO_PENDING) {
   1664       err = GetLastError();
   1665       CloseHandle(req->event_handle);
   1666       req->event_handle = NULL;
   1667       return err;
   1668     }
   1669 
   1670     if (result) {
   1671       /* Request completed immediately. */
   1672       req->u.io.queued_bytes = 0;
   1673     } else {
   1674       /* Request queued by the kernel. */
   1675       req->u.io.queued_bytes = write_buf.len;
   1676       handle->write_queue_size += req->u.io.queued_bytes;
   1677       if (WaitForSingleObject(req->event_handle, INFINITE) !=
   1678           WAIT_OBJECT_0) {
   1679         err = GetLastError();
   1680         CloseHandle(req->event_handle);
   1681         req->event_handle = NULL;
   1682         return err;
   1683       }
   1684     }
   1685     CloseHandle(req->event_handle);
   1686     req->event_handle = NULL;
   1687 
   1688     REGISTER_HANDLE_REQ(loop, handle);
   1689     handle->reqs_pending++;
   1690     handle->stream.conn.write_reqs_pending++;
   1691     return 0;
   1692   } else {
   1693     result = WriteFile(handle->handle,
   1694                        write_buf.base,
   1695                        write_buf.len,
   1696                        NULL,
   1697                        &req->u.io.overlapped);
   1698 
   1699     if (!result && GetLastError() != ERROR_IO_PENDING) {
   1700       return GetLastError();
   1701     }
   1702 
   1703     if (result) {
   1704       /* Request completed immediately. */
   1705       req->u.io.queued_bytes = 0;
   1706     } else {
   1707       /* Request queued by the kernel. */
   1708       req->u.io.queued_bytes = write_buf.len;
   1709       handle->write_queue_size += req->u.io.queued_bytes;
   1710     }
   1711 
   1712     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
   1713       if (!RegisterWaitForSingleObject(&req->wait_handle,
   1714           req->event_handle, post_completion_write_wait, (void*) req,
   1715           INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
   1716         return GetLastError();
   1717       }
   1718     }
   1719   }
   1720 
   1721   REGISTER_HANDLE_REQ(loop, handle);
   1722   handle->reqs_pending++;
   1723   handle->stream.conn.write_reqs_pending++;
   1724 
   1725   return 0;
   1726 }
   1727 
   1728 
   1729 static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
   1730   DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
   1731 
   1732   /* If the both ends of the IPC pipe are owned by the same process,
   1733    * the remote end pid may not yet be set. If so, do it here.
   1734    * TODO: this is weird; it'd probably better to use a handshake. */
   1735   if (*pid == 0) {
   1736     GetNamedPipeClientProcessId(handle->handle, pid);
   1737     if (*pid == GetCurrentProcessId()) {
   1738       GetNamedPipeServerProcessId(handle->handle, pid);
   1739     }
   1740   }
   1741 
   1742   return *pid;
   1743 }
   1744 
   1745 
   1746 int uv__pipe_write_ipc(uv_loop_t* loop,
   1747                        uv_write_t* req,
   1748                        uv_pipe_t* handle,
   1749                        const uv_buf_t data_bufs[],
   1750                        size_t data_buf_count,
   1751                        uv_stream_t* send_handle,
   1752                        uv_write_cb cb) {
   1753   uv_buf_t stack_bufs[6];
   1754   uv_buf_t* bufs;
   1755   size_t buf_count, buf_index;
   1756   uv__ipc_frame_header_t frame_header;
   1757   uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
   1758   uv__ipc_socket_xfer_info_t xfer_info;
   1759   uint64_t data_length;
   1760   size_t i;
   1761   int err;
   1762 
   1763   /* Compute the combined size of data buffers. */
   1764   data_length = 0;
   1765   for (i = 0; i < data_buf_count; i++)
   1766     data_length += data_bufs[i].len;
   1767   if (data_length > UINT32_MAX)
   1768     return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
   1769 
   1770   /* Prepare the frame's socket xfer payload. */
   1771   if (send_handle != NULL) {
   1772     uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
   1773 
   1774     /* Verify that `send_handle` it is indeed a tcp handle. */
   1775     if (send_tcp_handle->type != UV_TCP)
   1776       return ERROR_NOT_SUPPORTED;
   1777 
   1778     /* Export the tcp handle. */
   1779     err = uv__tcp_xfer_export(send_tcp_handle,
   1780                               uv__pipe_get_ipc_remote_pid(handle),
   1781                               &xfer_type,
   1782                               &xfer_info);
   1783     if (err != 0)
   1784       return err;
   1785   }
   1786 
   1787   /* Compute the number of uv_buf_t's required. */
   1788   buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
   1789   if (send_handle != NULL)
   1790     buf_count += 1; /* One extra for the socket xfer information. */
   1791 
   1792   /* Use the on-stack buffer array if it is big enough; otherwise allocate
   1793    * space for it on the heap. */
   1794   if (buf_count < ARRAY_SIZE(stack_bufs)) {
   1795     /* Use on-stack buffer array. */
   1796     bufs = stack_bufs;
   1797   } else {
   1798     /* Use heap-allocated buffer array. */
   1799     bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
   1800     if (bufs == NULL)
   1801       return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
   1802   }
   1803   buf_index = 0;
   1804 
   1805   /* Initialize frame header and add it to the buffers list. */
   1806   memset(&frame_header, 0, sizeof frame_header);
   1807   bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
   1808 
   1809   if (send_handle != NULL) {
   1810     /* Add frame header flags. */
   1811     switch (xfer_type) {
   1812       case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
   1813         frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
   1814                               UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
   1815         break;
   1816       case UV__IPC_SOCKET_XFER_TCP_SERVER:
   1817         frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
   1818         break;
   1819       default:
   1820         assert(0);  /* Unreachable. */
   1821     }
   1822     /* Add xfer info buffer. */
   1823     bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
   1824   }
   1825 
   1826   if (data_length > 0) {
   1827     /* Update frame header. */
   1828     frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
   1829     frame_header.data_length = (uint32_t) data_length;
   1830     /* Add data buffers to buffers list. */
   1831     for (i = 0; i < data_buf_count; i++)
   1832       bufs[buf_index++] = data_bufs[i];
   1833   }
   1834 
   1835   /* Write buffers. We set the `always_copy` flag, so it is not a problem that
   1836    * some of the written data lives on the stack. */
   1837   err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
   1838 
   1839   /* If we had to heap-allocate the bufs array, free it now. */
   1840   if (bufs != stack_bufs) {
   1841     uv__free(bufs);
   1842   }
   1843 
   1844   return err;
   1845 }
   1846 
   1847 
   1848 int uv__pipe_write(uv_loop_t* loop,
   1849                    uv_write_t* req,
   1850                    uv_pipe_t* handle,
   1851                    const uv_buf_t bufs[],
   1852                    size_t nbufs,
   1853                    uv_stream_t* send_handle,
   1854                    uv_write_cb cb) {
   1855   if (handle->ipc) {
   1856     /* IPC pipe write: use framing protocol. */
   1857     return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
   1858   } else {
   1859     /* Non-IPC pipe write: put data on the wire directly. */
   1860     assert(send_handle == NULL);
   1861     return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
   1862   }
   1863 }
   1864 
   1865 
   1866 static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
   1867     uv_buf_t buf) {
   1868   /* If there is an eof timer running, we don't need it any more, so discard
   1869    * it. */
   1870   eof_timer_destroy(handle);
   1871 
   1872   uv_read_stop((uv_stream_t*) handle);
   1873 
   1874   handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
   1875 }
   1876 
   1877 
   1878 static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
   1879     uv_buf_t buf) {
   1880   /* If there is an eof timer running, we don't need it any more, so discard
   1881    * it. */
   1882   eof_timer_destroy(handle);
   1883 
   1884   uv_read_stop((uv_stream_t*) handle);
   1885 
   1886   handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
   1887 }
   1888 
   1889 
   1890 static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
   1891     DWORD error, uv_buf_t buf) {
   1892   if (error == ERROR_BROKEN_PIPE) {
   1893     uv__pipe_read_eof(loop, handle, buf);
   1894   } else {
   1895     uv__pipe_read_error(loop, handle, error, buf);
   1896   }
   1897 }
   1898 
   1899 
   1900 static void uv__pipe_queue_ipc_xfer_info(
   1901     uv_pipe_t* handle,
   1902     uv__ipc_socket_xfer_type_t xfer_type,
   1903     uv__ipc_socket_xfer_info_t* xfer_info) {
   1904   uv__ipc_xfer_queue_item_t* item;
   1905 
   1906   item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
   1907   if (item == NULL)
   1908     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
   1909 
   1910   item->xfer_type = xfer_type;
   1911   item->xfer_info = *xfer_info;
   1912 
   1913   uv__queue_insert_tail(&handle->pipe.conn.ipc_xfer_queue, &item->member);
   1914   handle->pipe.conn.ipc_xfer_queue_length++;
   1915 }
   1916 
   1917 
   1918 /* Read an exact number of bytes from a pipe. If an error or end-of-file is
   1919  * encountered before the requested number of bytes are read, an error is
   1920  * returned. */
   1921 static DWORD uv__pipe_read_exactly(uv_pipe_t* handle, void* buffer, DWORD count) {
   1922   uv_read_t* req;
   1923   DWORD bytes_read;
   1924   DWORD bytes_read_now;
   1925 
   1926   bytes_read = 0;
   1927   while (bytes_read < count) {
   1928     req = &handle->read_req;
   1929     memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
   1930     req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
   1931     if (!ReadFile(handle->handle,
   1932                   (char*) buffer + bytes_read,
   1933                   count - bytes_read,
   1934                   &bytes_read_now,
   1935                   &req->u.io.overlapped)) {
   1936       if (GetLastError() != ERROR_IO_PENDING)
   1937         return GetLastError();
   1938       if (!GetOverlappedResult(handle->handle, &req->u.io.overlapped, &bytes_read_now, TRUE))
   1939         return GetLastError();
   1940     }
   1941 
   1942     bytes_read += bytes_read_now;
   1943   }
   1944 
   1945   assert(bytes_read == count);
   1946   return 0;
   1947 }
   1948 
   1949 
   1950 static int uv__pipe_read_data(uv_loop_t* loop,
   1951                               uv_pipe_t* handle,
   1952                               DWORD* bytes_read, /* inout argument */
   1953                               DWORD max_bytes) {
   1954   uv_buf_t buf;
   1955   uv_read_t* req;
   1956   DWORD r;
   1957   DWORD bytes_available;
   1958   int more;
   1959 
   1960   /* Ask the user for a buffer to read data into. */
   1961   buf = uv_buf_init(NULL, 0);
   1962   handle->alloc_cb((uv_handle_t*) handle, *bytes_read, &buf);
   1963   if (buf.base == NULL || buf.len == 0) {
   1964     handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
   1965     return 0; /* Break out of read loop. */
   1966   }
   1967 
   1968   /* Ensure we read at most the smaller of:
   1969    *   (a) the length of the user-allocated buffer.
   1970    *   (b) the maximum data length as specified by the `max_bytes` argument.
   1971    *   (c) the amount of data that can be read non-blocking
   1972    */
   1973   if (max_bytes > buf.len)
   1974     max_bytes = buf.len;
   1975 
   1976   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
   1977     /* The user failed to supply a pipe that can be used non-blocking or with
   1978      * threads. Try to estimate the amount of data that is safe to read without
   1979      * blocking, in a race-y way however. */
   1980     bytes_available = 0;
   1981     if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &bytes_available, NULL)) {
   1982       r = GetLastError();
   1983     } else {
   1984       if (max_bytes > bytes_available)
   1985         max_bytes = bytes_available;
   1986       *bytes_read = 0;
   1987       if (max_bytes == 0 || ReadFile(handle->handle, buf.base, max_bytes, bytes_read, NULL))
   1988         r = ERROR_SUCCESS;
   1989       else
   1990         r = GetLastError();
   1991     }
   1992     more = max_bytes < bytes_available;
   1993   } else {
   1994     /* Read into the user buffer.
   1995      * Prepare an Event so that we can cancel if it doesn't complete immediately.
   1996      */
   1997     req = &handle->read_req;
   1998     memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
   1999     req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
   2000     if (ReadFile(handle->handle, buf.base, max_bytes, bytes_read, &req->u.io.overlapped)) {
   2001       r = ERROR_SUCCESS;
   2002     } else {
   2003       r = GetLastError();
   2004       *bytes_read = 0;
   2005       if (r == ERROR_IO_PENDING) {
   2006         r = CancelIoEx(handle->handle, &req->u.io.overlapped);
   2007         assert(r || GetLastError() == ERROR_NOT_FOUND);
   2008         if (GetOverlappedResult(handle->handle, &req->u.io.overlapped, bytes_read, TRUE)) {
   2009           r = ERROR_SUCCESS;
   2010         } else {
   2011           r = GetLastError();
   2012           *bytes_read = 0;
   2013         }
   2014       }
   2015     }
   2016     more = *bytes_read == max_bytes;
   2017   }
   2018 
   2019   /* Call the read callback. */
   2020   if (r == ERROR_SUCCESS || r == ERROR_OPERATION_ABORTED)
   2021     handle->read_cb((uv_stream_t*) handle, *bytes_read, &buf);
   2022   else
   2023     uv__pipe_read_error_or_eof(loop, handle, r, buf);
   2024 
   2025   return more;
   2026 }
   2027 
   2028 
   2029 static int uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
   2030   uint32_t* data_remaining;
   2031   DWORD err;
   2032   DWORD more;
   2033   DWORD bytes_read;
   2034 
   2035   data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
   2036 
   2037   if (*data_remaining > 0) {
   2038     /* Read frame data payload. */
   2039     bytes_read = *data_remaining;
   2040     more = uv__pipe_read_data(loop, handle, &bytes_read, bytes_read);
   2041     *data_remaining -= bytes_read;
   2042 
   2043   } else {
   2044     /* Start of a new IPC frame. */
   2045     uv__ipc_frame_header_t frame_header;
   2046     uint32_t xfer_flags;
   2047     uv__ipc_socket_xfer_type_t xfer_type;
   2048     uv__ipc_socket_xfer_info_t xfer_info;
   2049 
   2050     /* Read the IPC frame header. */
   2051     err = uv__pipe_read_exactly(
   2052         handle, &frame_header, sizeof frame_header);
   2053     if (err)
   2054       goto error;
   2055 
   2056     /* Validate that flags are valid. */
   2057     if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
   2058       goto invalid;
   2059     /* Validate that reserved2 is zero. */
   2060     if (frame_header.reserved2 != 0)
   2061       goto invalid;
   2062 
   2063     /* Parse xfer flags. */
   2064     xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
   2065     if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
   2066       /* Socket coming -- determine the type. */
   2067       xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
   2068                       ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
   2069                       : UV__IPC_SOCKET_XFER_TCP_SERVER;
   2070     } else if (xfer_flags == 0) {
   2071       /* No socket. */
   2072       xfer_type = UV__IPC_SOCKET_XFER_NONE;
   2073     } else {
   2074       /* Invalid flags. */
   2075       goto invalid;
   2076     }
   2077 
   2078     /* Parse data frame information. */
   2079     if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
   2080       *data_remaining = frame_header.data_length;
   2081     } else if (frame_header.data_length != 0) {
   2082       /* Data length greater than zero but data flag not set -- invalid. */
   2083       goto invalid;
   2084     }
   2085 
   2086     /* If no socket xfer info follows, return here. Data will be read in a
   2087      * subsequent invocation of uv__pipe_read_ipc(). */
   2088     if (xfer_type != UV__IPC_SOCKET_XFER_NONE) {
   2089       /* Read transferred socket information. */
   2090       err = uv__pipe_read_exactly(handle, &xfer_info, sizeof xfer_info);
   2091       if (err)
   2092         goto error;
   2093 
   2094       /* Store the pending socket info. */
   2095       uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
   2096     }
   2097   }
   2098 
   2099   /* Return whether the caller should immediately try another read call to get
   2100    * more data. Calling uv__pipe_read_exactly will hang if there isn't data
   2101    * available, so we cannot do this unless we are guaranteed not to reach that.
   2102    */
   2103   more = *data_remaining > 0;
   2104   return more;
   2105 
   2106 invalid:
   2107   /* Invalid frame. */
   2108   err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
   2109 
   2110 error:
   2111   uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
   2112   return 0; /* Break out of read loop. */
   2113 }
   2114 
   2115 
   2116 void uv__process_pipe_read_req(uv_loop_t* loop,
   2117                                uv_pipe_t* handle,
   2118                                uv_req_t* req) {
   2119   DWORD err;
   2120   DWORD more;
   2121   DWORD bytes_requested;
   2122   assert(handle->type == UV_NAMED_PIPE);
   2123 
   2124   handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
   2125   DECREASE_PENDING_REQ_COUNT(handle);
   2126   eof_timer_stop(handle);
   2127 
   2128   if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
   2129     UnregisterWait(handle->read_req.wait_handle);
   2130     handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
   2131   }
   2132 
   2133   /* At this point, we're done with bookkeeping. If the user has stopped
   2134    * reading the pipe in the meantime, there is nothing left to do, since there
   2135    * is no callback that we can call. */
   2136   if (!(handle->flags & UV_HANDLE_READING))
   2137     return;
   2138 
   2139   if (!REQ_SUCCESS(req)) {
   2140     /* An error occurred doing the zero-read. */
   2141     err = GET_REQ_ERROR(req);
   2142 
   2143     /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
   2144      * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
   2145      * the user; we'll start a new zero-read at the end of this function. */
   2146     if (err != ERROR_OPERATION_ABORTED)
   2147       uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
   2148 
   2149   } else {
   2150     /* The zero-read completed without error, indicating there is data
   2151      * available in the kernel buffer. */
   2152     while (handle->flags & UV_HANDLE_READING) {
   2153       bytes_requested = 65536;
   2154       /* Depending on the type of pipe, read either IPC frames or raw data. */
   2155       if (handle->ipc)
   2156           more = uv__pipe_read_ipc(loop, handle);
   2157       else
   2158           more = uv__pipe_read_data(loop, handle, &bytes_requested, INT32_MAX);
   2159 
   2160       /* If no bytes were read, treat this as an indication that an error
   2161        * occurred, and break out of the read loop. */
   2162       if (more == 0)
   2163         break;
   2164     }
   2165   }
   2166 
   2167   /* Start another zero-read request if necessary. */
   2168   if ((handle->flags & UV_HANDLE_READING) &&
   2169       !(handle->flags & UV_HANDLE_READ_PENDING)) {
   2170     uv__pipe_queue_read(loop, handle);
   2171   }
   2172 }
   2173 
   2174 
   2175 void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
   2176     uv_write_t* req) {
   2177   int err;
   2178 
   2179   assert(handle->type == UV_NAMED_PIPE);
   2180 
   2181   assert(handle->write_queue_size >= req->u.io.queued_bytes);
   2182   handle->write_queue_size -= req->u.io.queued_bytes;
   2183 
   2184   UNREGISTER_HANDLE_REQ(loop, handle);
   2185 
   2186   if (req->wait_handle != INVALID_HANDLE_VALUE) {
   2187     UnregisterWait(req->wait_handle);
   2188     req->wait_handle = INVALID_HANDLE_VALUE;
   2189   }
   2190   if (req->event_handle) {
   2191     CloseHandle(req->event_handle);
   2192     req->event_handle = NULL;
   2193   }
   2194 
   2195   err = GET_REQ_ERROR(req);
   2196 
   2197   /* If this was a coalesced write, extract pointer to the user_provided
   2198    * uv_write_t structure so we can pass the expected pointer to the callback,
   2199    * then free the heap-allocated write req. */
   2200   if (req->coalesced) {
   2201     uv__coalesced_write_t* coalesced_write =
   2202         container_of(req, uv__coalesced_write_t, req);
   2203     req = coalesced_write->user_req;
   2204     uv__free(coalesced_write);
   2205   }
   2206   if (req->cb) {
   2207     req->cb(req, uv_translate_sys_error(err));
   2208   }
   2209 
   2210   handle->stream.conn.write_reqs_pending--;
   2211 
   2212   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
   2213       handle->pipe.conn.non_overlapped_writes_tail) {
   2214     assert(handle->stream.conn.write_reqs_pending > 0);
   2215     uv__queue_non_overlapped_write(handle);
   2216   }
   2217 
   2218   if (handle->stream.conn.write_reqs_pending == 0 &&
   2219       uv__is_stream_shutting(handle))
   2220     uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
   2221 
   2222   DECREASE_PENDING_REQ_COUNT(handle);
   2223 }
   2224 
   2225 
   2226 void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
   2227     uv_req_t* raw_req) {
   2228   uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
   2229 
   2230   assert(handle->type == UV_NAMED_PIPE);
   2231 
   2232   if (handle->flags & UV_HANDLE_CLOSING) {
   2233     /* The req->pipeHandle should be freed already in uv__pipe_close(). */
   2234     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
   2235     DECREASE_PENDING_REQ_COUNT(handle);
   2236     return;
   2237   }
   2238 
   2239   if (REQ_SUCCESS(req)) {
   2240     assert(req->pipeHandle != INVALID_HANDLE_VALUE);
   2241     req->next_pending = handle->pipe.serv.pending_accepts;
   2242     handle->pipe.serv.pending_accepts = req;
   2243 
   2244     if (handle->stream.serv.connection_cb) {
   2245       handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
   2246     }
   2247   } else {
   2248     if (req->pipeHandle != INVALID_HANDLE_VALUE) {
   2249       CloseHandle(req->pipeHandle);
   2250       req->pipeHandle = INVALID_HANDLE_VALUE;
   2251     }
   2252     if (!(handle->flags & UV_HANDLE_CLOSING)) {
   2253       uv__pipe_queue_accept(loop, handle, req, FALSE);
   2254     }
   2255   }
   2256 
   2257   DECREASE_PENDING_REQ_COUNT(handle);
   2258 }
   2259 
   2260 
   2261 void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
   2262     uv_connect_t* req) {
   2263   HANDLE pipeHandle;
   2264   DWORD duplex_flags;
   2265   int err;
   2266 
   2267   assert(handle->type == UV_NAMED_PIPE);
   2268 
   2269   UNREGISTER_HANDLE_REQ(loop, handle);
   2270 
   2271   err = 0;
   2272   if (REQ_SUCCESS(req)) {
   2273     pipeHandle = req->u.connect.pipeHandle;
   2274     duplex_flags = req->u.connect.duplex_flags;
   2275     if (handle->flags & UV_HANDLE_CLOSING)
   2276       err = UV_ECANCELED;
   2277     else
   2278       err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
   2279     if (err)
   2280       CloseHandle(pipeHandle);
   2281   } else {
   2282     err = uv_translate_sys_error(GET_REQ_ERROR(req));
   2283   }
   2284 
   2285   if (req->cb)
   2286     req->cb(req, err);
   2287 
   2288   DECREASE_PENDING_REQ_COUNT(handle);
   2289 }
   2290 
   2291 
   2292 
   2293 void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
   2294     uv_shutdown_t* req) {
   2295   int err;
   2296 
   2297   assert(handle->type == UV_NAMED_PIPE);
   2298 
   2299   /* Clear the shutdown_req field so we don't go here again. */
   2300   handle->stream.conn.shutdown_req = NULL;
   2301   UNREGISTER_HANDLE_REQ(loop, handle);
   2302 
   2303   if (handle->flags & UV_HANDLE_CLOSING) {
   2304     /* Already closing. Cancel the shutdown. */
   2305     err = UV_ECANCELED;
   2306   } else if (!REQ_SUCCESS(req)) {
   2307     /* An error occurred in trying to shutdown gracefully. */
   2308     err = uv_translate_sys_error(GET_REQ_ERROR(req));
   2309   } else {
   2310     if (handle->flags & UV_HANDLE_READABLE) {
   2311       /* Initialize and optionally start the eof timer. Only do this if the pipe
   2312        * is readable and we haven't seen EOF come in ourselves. */
   2313       eof_timer_init(handle);
   2314 
   2315       /* If reading start the timer right now. Otherwise uv__pipe_queue_read will
   2316        * start it. */
   2317       if (handle->flags & UV_HANDLE_READ_PENDING) {
   2318         eof_timer_start(handle);
   2319       }
   2320 
   2321     } else {
   2322       /* This pipe is not readable. We can just close it to let the other end
   2323        * know that we're done writing. */
   2324       close_pipe(handle);
   2325     }
   2326     err = 0;
   2327   }
   2328 
   2329   if (req->cb)
   2330     req->cb(req, err);
   2331 
   2332   DECREASE_PENDING_REQ_COUNT(handle);
   2333 }
   2334 
   2335 
   2336 static void eof_timer_init(uv_pipe_t* pipe) {
   2337   int r;
   2338 
   2339   assert(pipe->pipe.conn.eof_timer == NULL);
   2340   assert(pipe->flags & UV_HANDLE_CONNECTION);
   2341 
   2342   pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
   2343 
   2344   r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
   2345   assert(r == 0);  /* timers can't fail */
   2346   (void) r;
   2347   pipe->pipe.conn.eof_timer->data = pipe;
   2348   uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
   2349 }
   2350 
   2351 
   2352 static void eof_timer_start(uv_pipe_t* pipe) {
   2353   assert(pipe->flags & UV_HANDLE_CONNECTION);
   2354 
   2355   if (pipe->pipe.conn.eof_timer != NULL) {
   2356     uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
   2357   }
   2358 }
   2359 
   2360 
   2361 static void eof_timer_stop(uv_pipe_t* pipe) {
   2362   assert(pipe->flags & UV_HANDLE_CONNECTION);
   2363 
   2364   if (pipe->pipe.conn.eof_timer != NULL) {
   2365     uv_timer_stop(pipe->pipe.conn.eof_timer);
   2366   }
   2367 }
   2368 
   2369 
   2370 static void eof_timer_cb(uv_timer_t* timer) {
   2371   uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
   2372   uv_loop_t* loop = timer->loop;
   2373 
   2374   assert(pipe->type == UV_NAMED_PIPE);
   2375 
   2376   /* This should always be true, since we start the timer only in
   2377    * uv__pipe_queue_read after successfully calling ReadFile, or in
   2378    * uv__process_pipe_shutdown_req if a read is pending, and we always
   2379    * immediately stop the timer in uv__process_pipe_read_req. */
   2380   assert(pipe->flags & UV_HANDLE_READ_PENDING);
   2381 
   2382   /* If there are many packets coming off the iocp then the timer callback may
   2383    * be called before the read request is coming off the queue. Therefore we
   2384    * check here if the read request has completed but will be processed later.
   2385    */
   2386   if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
   2387       HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
   2388     return;
   2389   }
   2390 
   2391   /* Force both ends off the pipe. */
   2392   close_pipe(pipe);
   2393 
   2394   /* Stop reading, so the pending read that is going to fail will not be
   2395    * reported to the user. */
   2396   uv_read_stop((uv_stream_t*) pipe);
   2397 
   2398   /* Report the eof and update flags. This will get reported even if the user
   2399    * stopped reading in the meantime. TODO: is that okay? */
   2400   uv__pipe_read_eof(loop, pipe, uv_null_buf_);
   2401 }
   2402 
   2403 
   2404 static void eof_timer_destroy(uv_pipe_t* pipe) {
   2405   assert(pipe->flags & UV_HANDLE_CONNECTION);
   2406 
   2407   if (pipe->pipe.conn.eof_timer) {
   2408     uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
   2409     pipe->pipe.conn.eof_timer = NULL;
   2410   }
   2411 }
   2412 
   2413 
   2414 static void eof_timer_close_cb(uv_handle_t* handle) {
   2415   assert(handle->type == UV_TIMER);
   2416   uv__free(handle);
   2417 }
   2418 
   2419 
   2420 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
   2421   HANDLE os_handle = uv__get_osfhandle(file);
   2422   NTSTATUS nt_status;
   2423   IO_STATUS_BLOCK io_status;
   2424   FILE_ACCESS_INFORMATION access;
   2425   DWORD duplex_flags = 0;
   2426   int err;
   2427 
   2428   if (os_handle == INVALID_HANDLE_VALUE)
   2429     return UV_EBADF;
   2430   if (pipe->flags & UV_HANDLE_PIPESERVER)
   2431     return UV_EINVAL;
   2432   if (pipe->flags & UV_HANDLE_CONNECTION)
   2433     return UV_EBUSY;
   2434 
   2435   uv__pipe_connection_init(pipe);
   2436   uv__once_init();
   2437   /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
   2438    * underlying OS handle and forget about the original fd.
   2439    * We could also opt to use the original OS handle and just never close it,
   2440    * but then there would be no reliable way to cancel pending read operations
   2441    * upon close.
   2442    */
   2443   if (file <= 2) {
   2444     if (!DuplicateHandle(INVALID_HANDLE_VALUE,
   2445                          os_handle,
   2446                          INVALID_HANDLE_VALUE,
   2447                          &os_handle,
   2448                          0,
   2449                          FALSE,
   2450                          DUPLICATE_SAME_ACCESS))
   2451       return uv_translate_sys_error(GetLastError());
   2452     assert(os_handle != INVALID_HANDLE_VALUE);
   2453     file = -1;
   2454   }
   2455 
   2456   /* Determine what kind of permissions we have on this handle.
   2457    * Cygwin opens the pipe in message mode, but we can support it,
   2458    * just query the access flags and set the stream flags accordingly.
   2459    */
   2460   nt_status = pNtQueryInformationFile(os_handle,
   2461                                       &io_status,
   2462                                       &access,
   2463                                       sizeof(access),
   2464                                       FileAccessInformation);
   2465   if (nt_status != STATUS_SUCCESS)
   2466     return UV_EINVAL;
   2467 
   2468   if (pipe->ipc) {
   2469     if (!(access.AccessFlags & FILE_WRITE_DATA) ||
   2470         !(access.AccessFlags & FILE_READ_DATA)) {
   2471       return UV_EINVAL;
   2472     }
   2473   }
   2474 
   2475   if (access.AccessFlags & FILE_WRITE_DATA)
   2476     duplex_flags |= UV_HANDLE_WRITABLE;
   2477   if (access.AccessFlags & FILE_READ_DATA)
   2478     duplex_flags |= UV_HANDLE_READABLE;
   2479 
   2480   err = uv__set_pipe_handle(pipe->loop,
   2481                             pipe,
   2482                             os_handle,
   2483                             file,
   2484                             duplex_flags);
   2485   if (err) {
   2486     if (file == -1)
   2487       CloseHandle(os_handle);
   2488     return err;
   2489   }
   2490 
   2491   if (pipe->ipc) {
   2492     assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
   2493     GetNamedPipeClientProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
   2494     if (pipe->pipe.conn.ipc_remote_pid == GetCurrentProcessId()) {
   2495       GetNamedPipeServerProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
   2496     }
   2497     assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
   2498   }
   2499   return 0;
   2500 }
   2501 
   2502 
   2503 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
   2504   NTSTATUS nt_status;
   2505   IO_STATUS_BLOCK io_status;
   2506   FILE_NAME_INFORMATION tmp_name_info;
   2507   FILE_NAME_INFORMATION* name_info;
   2508   WCHAR* name_buf;
   2509   unsigned int name_size;
   2510   unsigned int name_len;
   2511   int err;
   2512 
   2513   uv__once_init();
   2514   name_info = NULL;
   2515 
   2516   if (handle->name != NULL) {
   2517     /* The user might try to query the name before we are connected,
   2518      * and this is just easier to return the cached value if we have it. */
   2519     return uv__copy_utf16_to_utf8(handle->name, -1, buffer, size);
   2520   }
   2521 
   2522   if (handle->handle == INVALID_HANDLE_VALUE) {
   2523     *size = 0;
   2524     return UV_EINVAL;
   2525   }
   2526 
   2527   /* NtQueryInformationFile will block if another thread is performing a
   2528    * blocking operation on the queried handle. If the pipe handle is
   2529    * synchronous, there may be a worker thread currently calling ReadFile() on
   2530    * the pipe handle, which could cause a deadlock. To avoid this, interrupt
   2531    * the read. */
   2532   if (handle->flags & UV_HANDLE_CONNECTION &&
   2533       handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
   2534     uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
   2535   }
   2536 
   2537   nt_status = pNtQueryInformationFile(handle->handle,
   2538                                       &io_status,
   2539                                       &tmp_name_info,
   2540                                       sizeof tmp_name_info,
   2541                                       FileNameInformation);
   2542   if (nt_status == STATUS_BUFFER_OVERFLOW) {
   2543     name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
   2544     name_info = uv__malloc(name_size);
   2545     if (!name_info) {
   2546       *size = 0;
   2547       return UV_ENOMEM;
   2548     }
   2549 
   2550     nt_status = pNtQueryInformationFile(handle->handle,
   2551                                         &io_status,
   2552                                         name_info,
   2553                                         name_size,
   2554                                         FileNameInformation);
   2555   }
   2556 
   2557   if (nt_status != STATUS_SUCCESS) {
   2558     *size = 0;
   2559     err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
   2560     goto error;
   2561   }
   2562 
   2563   if (!name_info) {
   2564     /* the struct on stack was used */
   2565     name_buf = tmp_name_info.FileName;
   2566     name_len = tmp_name_info.FileNameLength;
   2567   } else {
   2568     name_buf = name_info->FileName;
   2569     name_len = name_info->FileNameLength;
   2570   }
   2571 
   2572   if (name_len == 0) {
   2573     *size = 0;
   2574     err = 0;
   2575     goto error;
   2576   }
   2577 
   2578   name_len /= sizeof(WCHAR);
   2579 
   2580   /* "\\\\.\\pipe" + name */
   2581   if (*size < pipe_prefix_len) {
   2582     *size = 0;
   2583   }
   2584   else {
   2585     memcpy(buffer, pipe_prefix, pipe_prefix_len);
   2586     *size -= pipe_prefix_len;
   2587   }
   2588   err = uv__copy_utf16_to_utf8(name_buf, name_len, buffer+pipe_prefix_len, size);
   2589   *size += pipe_prefix_len;
   2590 
   2591 error:
   2592   uv__free(name_info);
   2593   return err;
   2594 }
   2595 
   2596 
   2597 int uv_pipe_pending_count(uv_pipe_t* handle) {
   2598   if (!handle->ipc)
   2599     return 0;
   2600   return handle->pipe.conn.ipc_xfer_queue_length;
   2601 }
   2602 
   2603 
   2604 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
   2605   if (buffer == NULL || size == NULL || *size == 0)
   2606     return UV_EINVAL;
   2607 
   2608   if (handle->flags & UV_HANDLE_BOUND)
   2609     return uv__pipe_getname(handle, buffer, size);
   2610 
   2611   if (handle->flags & UV_HANDLE_CONNECTION ||
   2612       handle->handle != INVALID_HANDLE_VALUE) {
   2613     *size = 0;
   2614     return 0;
   2615   }
   2616 
   2617   return UV_EBADF;
   2618 }
   2619 
   2620 
   2621 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
   2622   if (buffer == NULL || size == NULL || *size == 0)
   2623     return UV_EINVAL;
   2624 
   2625   /* emulate unix behaviour */
   2626   if (handle->flags & UV_HANDLE_BOUND)
   2627     return UV_ENOTCONN;
   2628 
   2629   if (handle->handle != INVALID_HANDLE_VALUE)
   2630     return uv__pipe_getname(handle, buffer, size);
   2631 
   2632   if (handle->flags & UV_HANDLE_CONNECTION) {
   2633     if (handle->name != NULL)
   2634       return uv__pipe_getname(handle, buffer, size);
   2635   }
   2636 
   2637   return UV_EBADF;
   2638 }
   2639 
   2640 
   2641 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
   2642   if (!handle->ipc)
   2643     return UV_UNKNOWN_HANDLE;
   2644   if (handle->pipe.conn.ipc_xfer_queue_length == 0)
   2645     return UV_UNKNOWN_HANDLE;
   2646   else
   2647     return UV_TCP;
   2648 }
   2649 
   2650 int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
   2651   SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
   2652   PACL old_dacl, new_dacl;
   2653   PSECURITY_DESCRIPTOR sd;
   2654   EXPLICIT_ACCESS ea;
   2655   PSID everyone;
   2656   int error;
   2657 
   2658   if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
   2659     return UV_EBADF;
   2660 
   2661   if (mode != UV_READABLE &&
   2662       mode != UV_WRITABLE &&
   2663       mode != (UV_WRITABLE | UV_READABLE))
   2664     return UV_EINVAL;
   2665 
   2666   if (!AllocateAndInitializeSid(&sid_world,
   2667                                 1,
   2668                                 SECURITY_WORLD_RID,
   2669                                 0, 0, 0, 0, 0, 0, 0,
   2670                                 &everyone)) {
   2671     error = GetLastError();
   2672     goto done;
   2673   }
   2674 
   2675   if (GetSecurityInfo(handle->handle,
   2676                       SE_KERNEL_OBJECT,
   2677                       DACL_SECURITY_INFORMATION,
   2678                       NULL,
   2679                       NULL,
   2680                       &old_dacl,
   2681                       NULL,
   2682                       &sd)) {
   2683     error = GetLastError();
   2684     goto clean_sid;
   2685   }
   2686 
   2687   memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
   2688   if (mode & UV_READABLE)
   2689     ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
   2690   if (mode & UV_WRITABLE)
   2691     ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
   2692   ea.grfAccessPermissions |= SYNCHRONIZE;
   2693   ea.grfAccessMode = SET_ACCESS;
   2694   ea.grfInheritance = NO_INHERITANCE;
   2695   ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
   2696   ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
   2697   ea.Trustee.ptstrName = (LPTSTR)everyone;
   2698 
   2699   if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
   2700     error = GetLastError();
   2701     goto clean_sd;
   2702   }
   2703 
   2704   if (SetSecurityInfo(handle->handle,
   2705                       SE_KERNEL_OBJECT,
   2706                       DACL_SECURITY_INFORMATION,
   2707                       NULL,
   2708                       NULL,
   2709                       new_dacl,
   2710                       NULL)) {
   2711     error = GetLastError();
   2712     goto clean_dacl;
   2713   }
   2714 
   2715   error = 0;
   2716 
   2717 clean_dacl:
   2718   LocalFree((HLOCAL) new_dacl);
   2719 clean_sd:
   2720   LocalFree((HLOCAL) sd);
   2721 clean_sid:
   2722   FreeSid(everyone);
   2723 done:
   2724   return uv_translate_sys_error(error);
   2725 }
   2726