1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include <assert.h> 23 #include <io.h> 24 #include <stdio.h> 25 #include <stdlib.h> 26 #include <string.h> 27 28 #include "handle-inl.h" 29 #include "internal.h" 30 #include "req-inl.h" 31 #include "stream-inl.h" 32 #include "uv-common.h" 33 #include "uv.h" 34 35 #include <aclapi.h> 36 #include <accctrl.h> 37 38 /* A zero-size buffer for use by uv_pipe_read */ 39 static char uv_zero_[] = ""; 40 41 /* Null uv_buf_t */ 42 static const uv_buf_t uv_null_buf_ = { 0, NULL }; 43 44 /* The timeout that the pipe will wait for the remote end to write data when 45 * the local ends wants to shut it down. */ 46 static const int64_t eof_timeout = 50; /* ms */ 47 48 static const int default_pending_pipe_instances = 4; 49 50 /* Pipe prefix */ 51 static char pipe_prefix[] = "\\\\?\\pipe"; 52 static const size_t pipe_prefix_len = sizeof(pipe_prefix) - 1; 53 54 /* IPC incoming xfer queue item. */ 55 typedef struct { 56 uv__ipc_socket_xfer_type_t xfer_type; 57 uv__ipc_socket_xfer_info_t xfer_info; 58 struct uv__queue member; 59 } uv__ipc_xfer_queue_item_t; 60 61 /* IPC frame header flags. */ 62 /* clang-format off */ 63 enum { 64 UV__IPC_FRAME_HAS_DATA = 0x01, 65 UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02, 66 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04, 67 /* These are combinations of the flags above. */ 68 UV__IPC_FRAME_XFER_FLAGS = 0x06, 69 UV__IPC_FRAME_VALID_FLAGS = 0x07 70 }; 71 /* clang-format on */ 72 73 /* IPC frame header. */ 74 typedef struct { 75 uint32_t flags; 76 uint32_t reserved1; /* Ignored. */ 77 uint32_t data_length; /* Must be zero if there is no data. */ 78 uint32_t reserved2; /* Must be zero. */ 79 } uv__ipc_frame_header_t; 80 81 /* To implement the IPC protocol correctly, these structures must have exactly 82 * the right size. */ 83 STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16); 84 STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632); 85 86 /* Coalesced write request. */ 87 typedef struct { 88 uv_write_t req; /* Internal heap-allocated write request. */ 89 uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */ 90 } uv__coalesced_write_t; 91 92 93 static void eof_timer_init(uv_pipe_t* pipe); 94 static void eof_timer_start(uv_pipe_t* pipe); 95 static void eof_timer_stop(uv_pipe_t* pipe); 96 static void eof_timer_cb(uv_timer_t* timer); 97 static void eof_timer_destroy(uv_pipe_t* pipe); 98 static void eof_timer_close_cb(uv_handle_t* handle); 99 100 101 /* Does the file path contain embedded nul bytes? */ 102 static int includes_nul(const char *s, size_t n) { 103 if (n == 0) 104 return 0; 105 return NULL != memchr(s, '\0', n); 106 } 107 108 109 static void uv__unique_pipe_name(unsigned long long ptr, char* name, size_t size) { 110 snprintf(name, size, "\\\\?\\pipe\\uv\\%llu-%lu", ptr, GetCurrentProcessId()); 111 } 112 113 114 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { 115 uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); 116 117 handle->reqs_pending = 0; 118 handle->handle = INVALID_HANDLE_VALUE; 119 handle->name = NULL; 120 handle->pipe.conn.ipc_remote_pid = 0; 121 handle->pipe.conn.ipc_data_frame.payload_remaining = 0; 122 uv__queue_init(&handle->pipe.conn.ipc_xfer_queue); 123 handle->pipe.conn.ipc_xfer_queue_length = 0; 124 handle->ipc = ipc; 125 handle->pipe.conn.non_overlapped_writes_tail = NULL; 126 127 return 0; 128 } 129 130 131 static void uv__pipe_connection_init(uv_pipe_t* handle) { 132 assert(!(handle->flags & UV_HANDLE_PIPESERVER)); 133 uv__connection_init((uv_stream_t*) handle); 134 handle->read_req.data = handle; 135 handle->pipe.conn.eof_timer = NULL; 136 } 137 138 139 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) { 140 HANDLE pipeHandle; 141 142 /* 143 * Assume that we have a duplex pipe first, so attempt to 144 * connect with GENERIC_READ | GENERIC_WRITE. 145 */ 146 pipeHandle = CreateFileW(name, 147 GENERIC_READ | GENERIC_WRITE, 148 0, 149 NULL, 150 OPEN_EXISTING, 151 FILE_FLAG_OVERLAPPED, 152 NULL); 153 if (pipeHandle != INVALID_HANDLE_VALUE) { 154 *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; 155 return pipeHandle; 156 } 157 158 /* 159 * If the pipe is not duplex CreateFileW fails with 160 * ERROR_ACCESS_DENIED. In that case try to connect 161 * as a read-only or write-only. 162 */ 163 if (GetLastError() == ERROR_ACCESS_DENIED) { 164 pipeHandle = CreateFileW(name, 165 GENERIC_READ | FILE_WRITE_ATTRIBUTES, 166 0, 167 NULL, 168 OPEN_EXISTING, 169 FILE_FLAG_OVERLAPPED, 170 NULL); 171 172 if (pipeHandle != INVALID_HANDLE_VALUE) { 173 *duplex_flags = UV_HANDLE_READABLE; 174 return pipeHandle; 175 } 176 } 177 178 if (GetLastError() == ERROR_ACCESS_DENIED) { 179 pipeHandle = CreateFileW(name, 180 GENERIC_WRITE | FILE_READ_ATTRIBUTES, 181 0, 182 NULL, 183 OPEN_EXISTING, 184 FILE_FLAG_OVERLAPPED, 185 NULL); 186 187 if (pipeHandle != INVALID_HANDLE_VALUE) { 188 *duplex_flags = UV_HANDLE_WRITABLE; 189 return pipeHandle; 190 } 191 } 192 193 return INVALID_HANDLE_VALUE; 194 } 195 196 197 static void close_pipe(uv_pipe_t* pipe) { 198 assert(pipe->u.fd == -1 || pipe->u.fd > 2); 199 if (pipe->u.fd == -1) 200 CloseHandle(pipe->handle); 201 else 202 _close(pipe->u.fd); 203 204 pipe->u.fd = -1; 205 pipe->handle = INVALID_HANDLE_VALUE; 206 } 207 208 209 static int uv__pipe_server( 210 HANDLE* pipeHandle_ptr, DWORD access, 211 char* name, size_t nameSize, unsigned long long random) { 212 HANDLE pipeHandle; 213 int err; 214 215 for (;;) { 216 uv__unique_pipe_name(random, name, nameSize); 217 218 pipeHandle = CreateNamedPipeA(name, 219 access | FILE_FLAG_FIRST_PIPE_INSTANCE, 220 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0, 221 NULL); 222 223 if (pipeHandle != INVALID_HANDLE_VALUE) { 224 /* No name collisions. We're done. */ 225 break; 226 } 227 228 err = GetLastError(); 229 if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) { 230 goto error; 231 } 232 233 /* Pipe name collision. Increment the random number and try again. */ 234 random++; 235 } 236 237 *pipeHandle_ptr = pipeHandle; 238 239 return 0; 240 241 error: 242 if (pipeHandle != INVALID_HANDLE_VALUE) 243 CloseHandle(pipeHandle); 244 245 return err; 246 } 247 248 249 static int uv__create_pipe_pair( 250 HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr, 251 unsigned int server_flags, unsigned int client_flags, 252 int inherit_client, unsigned long long random) { 253 /* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */ 254 char pipe_name[64]; 255 SECURITY_ATTRIBUTES sa; 256 DWORD server_access; 257 DWORD client_access; 258 HANDLE server_pipe; 259 HANDLE client_pipe; 260 int err; 261 262 server_pipe = INVALID_HANDLE_VALUE; 263 client_pipe = INVALID_HANDLE_VALUE; 264 265 server_access = 0; 266 if (server_flags & UV_READABLE_PIPE) 267 server_access |= PIPE_ACCESS_INBOUND; 268 if (server_flags & UV_WRITABLE_PIPE) 269 server_access |= PIPE_ACCESS_OUTBOUND; 270 if (server_flags & UV_NONBLOCK_PIPE) 271 server_access |= FILE_FLAG_OVERLAPPED; 272 server_access |= WRITE_DAC; 273 274 client_access = 0; 275 if (client_flags & UV_READABLE_PIPE) 276 client_access |= GENERIC_READ; 277 else 278 client_access |= FILE_READ_ATTRIBUTES; 279 if (client_flags & UV_WRITABLE_PIPE) 280 client_access |= GENERIC_WRITE; 281 else 282 client_access |= FILE_WRITE_ATTRIBUTES; 283 client_access |= WRITE_DAC; 284 285 /* Create server pipe handle. */ 286 err = uv__pipe_server(&server_pipe, 287 server_access, 288 pipe_name, 289 sizeof(pipe_name), 290 random); 291 if (err) 292 goto error; 293 294 /* Create client pipe handle. */ 295 sa.nLength = sizeof sa; 296 sa.lpSecurityDescriptor = NULL; 297 sa.bInheritHandle = inherit_client; 298 299 client_pipe = CreateFileA(pipe_name, 300 client_access, 301 0, 302 &sa, 303 OPEN_EXISTING, 304 (client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0, 305 NULL); 306 if (client_pipe == INVALID_HANDLE_VALUE) { 307 err = GetLastError(); 308 goto error; 309 } 310 311 #ifndef NDEBUG 312 /* Validate that the pipe was opened in the right mode. */ 313 { 314 DWORD mode; 315 BOOL r; 316 r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0); 317 if (r == TRUE) { 318 assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT)); 319 } else { 320 fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n"); 321 } 322 } 323 #endif 324 325 /* Do a blocking ConnectNamedPipe. This should not block because we have 326 * both ends of the pipe created. */ 327 if (!ConnectNamedPipe(server_pipe, NULL)) { 328 if (GetLastError() != ERROR_PIPE_CONNECTED) { 329 err = GetLastError(); 330 goto error; 331 } 332 } 333 334 *client_pipe_ptr = client_pipe; 335 *server_pipe_ptr = server_pipe; 336 return 0; 337 338 error: 339 if (server_pipe != INVALID_HANDLE_VALUE) 340 CloseHandle(server_pipe); 341 342 if (client_pipe != INVALID_HANDLE_VALUE) 343 CloseHandle(client_pipe); 344 345 return err; 346 } 347 348 349 int uv_pipe(uv_file fds[2], int read_flags, int write_flags) { 350 uv_file temp[2]; 351 int err; 352 HANDLE readh; 353 HANDLE writeh; 354 355 /* Make the server side the inbound (read) end, */ 356 /* so that both ends will have FILE_READ_ATTRIBUTES permission. */ 357 /* TODO: better source of local randomness than &fds? */ 358 read_flags |= UV_READABLE_PIPE; 359 write_flags |= UV_WRITABLE_PIPE; 360 err = uv__create_pipe_pair(&readh, 361 &writeh, 362 read_flags, 363 write_flags, 364 0, 365 (uintptr_t) &fds[0]); 366 if (err != 0) 367 return err; 368 temp[0] = _open_osfhandle((intptr_t) readh, 0); 369 if (temp[0] == -1) { 370 if (errno == UV_EMFILE) 371 err = UV_EMFILE; 372 else 373 err = UV_UNKNOWN; 374 CloseHandle(readh); 375 CloseHandle(writeh); 376 return err; 377 } 378 temp[1] = _open_osfhandle((intptr_t) writeh, 0); 379 if (temp[1] == -1) { 380 if (errno == UV_EMFILE) 381 err = UV_EMFILE; 382 else 383 err = UV_UNKNOWN; 384 _close(temp[0]); 385 CloseHandle(writeh); 386 return err; 387 } 388 fds[0] = temp[0]; 389 fds[1] = temp[1]; 390 return 0; 391 } 392 393 394 int uv__create_stdio_pipe_pair(uv_loop_t* loop, 395 uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) { 396 /* The parent_pipe is always the server_pipe and kept by libuv. 397 * The child_pipe is always the client_pipe and is passed to the child. 398 * The flags are specified with respect to their usage in the child. */ 399 HANDLE server_pipe; 400 HANDLE client_pipe; 401 unsigned int server_flags; 402 unsigned int client_flags; 403 int err; 404 405 uv__pipe_connection_init(parent_pipe); 406 407 server_pipe = INVALID_HANDLE_VALUE; 408 client_pipe = INVALID_HANDLE_VALUE; 409 410 server_flags = 0; 411 client_flags = 0; 412 if (flags & UV_READABLE_PIPE) { 413 /* The server needs inbound (read) access too, otherwise CreateNamedPipe() 414 * won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe 415 * the state of the write buffer when we're trying to shutdown the pipe. */ 416 server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE; 417 client_flags |= UV_READABLE_PIPE; 418 } 419 if (flags & UV_WRITABLE_PIPE) { 420 server_flags |= UV_READABLE_PIPE; 421 client_flags |= UV_WRITABLE_PIPE; 422 } 423 server_flags |= UV_NONBLOCK_PIPE; 424 if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) { 425 client_flags |= UV_NONBLOCK_PIPE; 426 } 427 428 err = uv__create_pipe_pair(&server_pipe, &client_pipe, 429 server_flags, client_flags, 1, (uintptr_t) server_pipe); 430 if (err) 431 goto error; 432 433 if (CreateIoCompletionPort(server_pipe, 434 loop->iocp, 435 (ULONG_PTR) parent_pipe, 436 0) == NULL) { 437 err = GetLastError(); 438 goto error; 439 } 440 441 parent_pipe->handle = server_pipe; 442 *child_pipe_ptr = client_pipe; 443 444 /* The server end is now readable and/or writable. */ 445 if (flags & UV_READABLE_PIPE) 446 parent_pipe->flags |= UV_HANDLE_WRITABLE; 447 if (flags & UV_WRITABLE_PIPE) 448 parent_pipe->flags |= UV_HANDLE_READABLE; 449 450 return 0; 451 452 error: 453 if (server_pipe != INVALID_HANDLE_VALUE) 454 CloseHandle(server_pipe); 455 456 if (client_pipe != INVALID_HANDLE_VALUE) 457 CloseHandle(client_pipe); 458 459 return err; 460 } 461 462 463 static int uv__set_pipe_handle(uv_loop_t* loop, 464 uv_pipe_t* handle, 465 HANDLE pipeHandle, 466 int fd, 467 DWORD duplex_flags) { 468 NTSTATUS nt_status; 469 IO_STATUS_BLOCK io_status; 470 FILE_MODE_INFORMATION mode_info; 471 DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT; 472 DWORD current_mode = 0; 473 DWORD err = 0; 474 475 assert(handle->flags & UV_HANDLE_CONNECTION); 476 assert(!(handle->flags & UV_HANDLE_PIPESERVER)); 477 if (handle->flags & UV_HANDLE_CLOSING) 478 return UV_EINVAL; 479 if (handle->handle != INVALID_HANDLE_VALUE) 480 return UV_EBUSY; 481 482 if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) { 483 err = GetLastError(); 484 if (err == ERROR_ACCESS_DENIED) { 485 /* 486 * SetNamedPipeHandleState can fail if the handle doesn't have either 487 * GENERIC_WRITE or FILE_WRITE_ATTRIBUTES. 488 * But if the handle already has the desired wait and blocking modes 489 * we can continue. 490 */ 491 if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL, 492 NULL, NULL, 0)) { 493 return uv_translate_sys_error(GetLastError()); 494 } else if (current_mode & PIPE_NOWAIT) { 495 return UV_EACCES; 496 } 497 } else { 498 /* If this returns ERROR_INVALID_PARAMETER we probably opened 499 * something that is not a pipe. */ 500 if (err == ERROR_INVALID_PARAMETER) { 501 return UV_ENOTSOCK; 502 } 503 return uv_translate_sys_error(err); 504 } 505 } 506 507 /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */ 508 nt_status = pNtQueryInformationFile(pipeHandle, 509 &io_status, 510 &mode_info, 511 sizeof(mode_info), 512 FileModeInformation); 513 if (nt_status != STATUS_SUCCESS) { 514 return uv_translate_sys_error(err); 515 } 516 517 if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT || 518 mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) { 519 /* Non-overlapped pipe. */ 520 handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE; 521 handle->pipe.conn.readfile_thread_handle = NULL; 522 InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock); 523 } else { 524 /* Overlapped pipe. Try to associate with IOCP. */ 525 if (CreateIoCompletionPort(pipeHandle, 526 loop->iocp, 527 (ULONG_PTR) handle, 528 0) == NULL) { 529 handle->flags |= UV_HANDLE_EMULATE_IOCP; 530 } 531 } 532 533 handle->handle = pipeHandle; 534 handle->u.fd = fd; 535 handle->flags |= duplex_flags; 536 537 return 0; 538 } 539 540 541 static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle, 542 uv_pipe_accept_t* req, BOOL firstInstance) { 543 assert(req->pipeHandle == INVALID_HANDLE_VALUE); 544 545 req->pipeHandle = 546 CreateNamedPipeW(handle->name, 547 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC | 548 (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0), 549 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 550 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL); 551 552 if (req->pipeHandle == INVALID_HANDLE_VALUE) { 553 return 0; 554 } 555 556 /* Associate it with IOCP so we can get events. */ 557 if (CreateIoCompletionPort(req->pipeHandle, 558 loop->iocp, 559 (ULONG_PTR) handle, 560 0) == NULL) { 561 uv_fatal_error(GetLastError(), "CreateIoCompletionPort"); 562 } 563 564 /* Stash a handle in the server object for use from places such as 565 * getsockname and chmod. As we transfer ownership of these to client 566 * objects, we'll allocate new ones here. */ 567 handle->handle = req->pipeHandle; 568 569 return 1; 570 } 571 572 573 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) { 574 uv_loop_t* loop; 575 uv_pipe_t* handle; 576 uv_shutdown_t* req; 577 578 req = (uv_shutdown_t*) parameter; 579 assert(req); 580 handle = (uv_pipe_t*) req->handle; 581 assert(handle); 582 loop = handle->loop; 583 assert(loop); 584 585 FlushFileBuffers(handle->handle); 586 587 /* Post completed */ 588 POST_COMPLETION_FOR_REQ(loop, req); 589 590 return 0; 591 } 592 593 594 void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) { 595 DWORD result; 596 NTSTATUS nt_status; 597 IO_STATUS_BLOCK io_status; 598 FILE_PIPE_LOCAL_INFORMATION pipe_info; 599 600 assert(handle->flags & UV_HANDLE_CONNECTION); 601 assert(req != NULL); 602 assert(handle->stream.conn.write_reqs_pending == 0); 603 SET_REQ_SUCCESS(req); 604 605 if (handle->flags & UV_HANDLE_CLOSING) { 606 uv__insert_pending_req(loop, (uv_req_t*) req); 607 return; 608 } 609 610 /* Try to avoid flushing the pipe buffer in the thread pool. */ 611 nt_status = pNtQueryInformationFile(handle->handle, 612 &io_status, 613 &pipe_info, 614 sizeof pipe_info, 615 FilePipeLocalInformation); 616 617 if (nt_status != STATUS_SUCCESS) { 618 SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status)); 619 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */ 620 uv__insert_pending_req(loop, (uv_req_t*) req); 621 return; 622 } 623 624 if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) { 625 /* Short-circuit, no need to call FlushFileBuffers: 626 * all writes have been read. */ 627 uv__insert_pending_req(loop, (uv_req_t*) req); 628 return; 629 } 630 631 /* Run FlushFileBuffers in the thread pool. */ 632 result = QueueUserWorkItem(pipe_shutdown_thread_proc, 633 req, 634 WT_EXECUTELONGFUNCTION); 635 if (!result) { 636 SET_REQ_ERROR(req, GetLastError()); 637 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */ 638 uv__insert_pending_req(loop, (uv_req_t*) req); 639 return; 640 } 641 } 642 643 644 void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { 645 uv__ipc_xfer_queue_item_t* xfer_queue_item; 646 647 assert(handle->reqs_pending == 0); 648 assert(handle->flags & UV_HANDLE_CLOSING); 649 assert(!(handle->flags & UV_HANDLE_CLOSED)); 650 651 if (handle->flags & UV_HANDLE_CONNECTION) { 652 /* Free pending sockets */ 653 while (!uv__queue_empty(&handle->pipe.conn.ipc_xfer_queue)) { 654 struct uv__queue* q; 655 SOCKET socket; 656 657 q = uv__queue_head(&handle->pipe.conn.ipc_xfer_queue); 658 uv__queue_remove(q); 659 xfer_queue_item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member); 660 661 /* Materialize socket and close it */ 662 socket = WSASocketW(FROM_PROTOCOL_INFO, 663 FROM_PROTOCOL_INFO, 664 FROM_PROTOCOL_INFO, 665 &xfer_queue_item->xfer_info.socket_info, 666 0, 667 WSA_FLAG_OVERLAPPED); 668 uv__free(xfer_queue_item); 669 670 if (socket != INVALID_SOCKET) 671 closesocket(socket); 672 } 673 handle->pipe.conn.ipc_xfer_queue_length = 0; 674 675 assert(handle->read_req.wait_handle == INVALID_HANDLE_VALUE); 676 if (handle->read_req.event_handle != NULL) { 677 CloseHandle(handle->read_req.event_handle); 678 handle->read_req.event_handle = NULL; 679 } 680 681 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) 682 DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock); 683 } 684 685 if (handle->flags & UV_HANDLE_PIPESERVER) { 686 assert(handle->pipe.serv.accept_reqs); 687 uv__free(handle->pipe.serv.accept_reqs); 688 handle->pipe.serv.accept_reqs = NULL; 689 } 690 691 uv__handle_close(handle); 692 } 693 694 695 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) { 696 if (handle->flags & UV_HANDLE_BOUND) 697 return; 698 handle->pipe.serv.pending_instances = count; 699 handle->flags |= UV_HANDLE_PIPESERVER; 700 } 701 702 703 /* Creates a pipe server. */ 704 int uv_pipe_bind(uv_pipe_t* handle, const char* name) { 705 return uv_pipe_bind2(handle, name, strlen(name), 0); 706 } 707 708 709 int uv_pipe_bind2(uv_pipe_t* handle, 710 const char* name, 711 size_t namelen, 712 unsigned int flags) { 713 uv_loop_t* loop = handle->loop; 714 int i, err; 715 uv_pipe_accept_t* req; 716 char* name_copy; 717 718 if (flags & ~UV_PIPE_NO_TRUNCATE) { 719 return UV_EINVAL; 720 } 721 722 if (name == NULL) { 723 return UV_EINVAL; 724 } 725 726 if (namelen == 0) { 727 return UV_EINVAL; 728 } 729 730 if (includes_nul(name, namelen)) { 731 return UV_EINVAL; 732 } 733 734 if (handle->flags & UV_HANDLE_BOUND) { 735 return UV_EINVAL; 736 } 737 738 if (uv__is_closing(handle)) { 739 return UV_EINVAL; 740 } 741 742 name_copy = uv__malloc(namelen + 1); 743 if (name_copy == NULL) { 744 return UV_ENOMEM; 745 } 746 747 memcpy(name_copy, name, namelen); 748 name_copy[namelen] = '\0'; 749 750 if (!(handle->flags & UV_HANDLE_PIPESERVER)) { 751 handle->pipe.serv.pending_instances = default_pending_pipe_instances; 752 } 753 754 err = UV_ENOMEM; 755 handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*) 756 uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances); 757 if (handle->pipe.serv.accept_reqs == NULL) { 758 goto error; 759 } 760 761 for (i = 0; i < handle->pipe.serv.pending_instances; i++) { 762 req = &handle->pipe.serv.accept_reqs[i]; 763 UV_REQ_INIT(req, UV_ACCEPT); 764 req->data = handle; 765 req->pipeHandle = INVALID_HANDLE_VALUE; 766 req->next_pending = NULL; 767 } 768 769 /* TODO(bnoordhuis) Add converters that take a |length| parameter. */ 770 err = uv__convert_utf8_to_utf16(name_copy, &handle->name); 771 uv__free(name_copy); 772 name_copy = NULL; 773 774 if (err) { 775 goto error; 776 } 777 778 /* 779 * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE. 780 * If this fails then there's already a pipe server for the given pipe name. 781 */ 782 if (!pipe_alloc_accept(loop, 783 handle, 784 &handle->pipe.serv.accept_reqs[0], 785 TRUE)) { 786 err = GetLastError(); 787 if (err == ERROR_ACCESS_DENIED) { 788 err = UV_EADDRINUSE; 789 } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) { 790 err = UV_EACCES; 791 } else { 792 err = uv_translate_sys_error(err); 793 } 794 goto error; 795 } 796 797 handle->pipe.serv.pending_accepts = NULL; 798 handle->flags |= UV_HANDLE_PIPESERVER; 799 handle->flags |= UV_HANDLE_BOUND; 800 801 return 0; 802 803 error: 804 uv__free(handle->pipe.serv.accept_reqs); 805 uv__free(handle->name); 806 uv__free(name_copy); 807 handle->pipe.serv.accept_reqs = NULL; 808 handle->name = NULL; 809 810 return err; 811 } 812 813 814 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) { 815 uv_loop_t* loop; 816 uv_pipe_t* handle; 817 uv_connect_t* req; 818 HANDLE pipeHandle = INVALID_HANDLE_VALUE; 819 DWORD duplex_flags; 820 821 req = (uv_connect_t*) parameter; 822 assert(req); 823 handle = (uv_pipe_t*) req->handle; 824 assert(handle); 825 loop = handle->loop; 826 assert(loop); 827 828 /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait 829 * up to 30 seconds for the pipe to become available with WaitNamedPipe. */ 830 while (WaitNamedPipeW(req->u.connect.name, 30000)) { 831 /* The pipe is now available, try to connect. */ 832 pipeHandle = open_named_pipe(req->u.connect.name, &duplex_flags); 833 if (pipeHandle != INVALID_HANDLE_VALUE) 834 break; 835 836 SwitchToThread(); 837 } 838 839 uv__free(req->u.connect.name); 840 req->u.connect.name = NULL; 841 if (pipeHandle != INVALID_HANDLE_VALUE) { 842 SET_REQ_SUCCESS(req); 843 req->u.connect.pipeHandle = pipeHandle; 844 req->u.connect.duplex_flags = duplex_flags; 845 } else { 846 SET_REQ_ERROR(req, GetLastError()); 847 } 848 849 /* Post completed */ 850 POST_COMPLETION_FOR_REQ(loop, req); 851 852 return 0; 853 } 854 855 856 void uv_pipe_connect(uv_connect_t* req, 857 uv_pipe_t* handle, 858 const char* name, 859 uv_connect_cb cb) { 860 uv_loop_t* loop; 861 int err; 862 863 err = uv_pipe_connect2(req, handle, name, strlen(name), 0, cb); 864 865 if (err) { 866 loop = handle->loop; 867 /* Make this req pending reporting an error. */ 868 SET_REQ_ERROR(req, err); 869 uv__insert_pending_req(loop, (uv_req_t*) req); 870 handle->reqs_pending++; 871 REGISTER_HANDLE_REQ(loop, handle); 872 } 873 } 874 875 876 int uv_pipe_connect2(uv_connect_t* req, 877 uv_pipe_t* handle, 878 const char* name, 879 size_t namelen, 880 unsigned int flags, 881 uv_connect_cb cb) { 882 uv_loop_t* loop; 883 int err; 884 size_t nameSize; 885 HANDLE pipeHandle = INVALID_HANDLE_VALUE; 886 DWORD duplex_flags; 887 char* name_copy; 888 889 loop = handle->loop; 890 UV_REQ_INIT(req, UV_CONNECT); 891 req->handle = (uv_stream_t*) handle; 892 req->cb = cb; 893 req->u.connect.pipeHandle = INVALID_HANDLE_VALUE; 894 req->u.connect.duplex_flags = 0; 895 req->u.connect.name = NULL; 896 897 if (flags & ~UV_PIPE_NO_TRUNCATE) { 898 return UV_EINVAL; 899 } 900 901 if (name == NULL) { 902 return UV_EINVAL; 903 } 904 905 if (namelen == 0) { 906 return UV_EINVAL; 907 } 908 909 if (includes_nul(name, namelen)) { 910 return UV_EINVAL; 911 } 912 913 name_copy = uv__malloc(namelen + 1); 914 if (name_copy == NULL) { 915 return UV_ENOMEM; 916 } 917 918 memcpy(name_copy, name, namelen); 919 name_copy[namelen] = '\0'; 920 921 if (handle->flags & UV_HANDLE_PIPESERVER) { 922 err = ERROR_INVALID_PARAMETER; 923 goto error; 924 } 925 if (handle->flags & UV_HANDLE_CONNECTION) { 926 err = ERROR_PIPE_BUSY; 927 goto error; 928 } 929 uv__pipe_connection_init(handle); 930 931 /* TODO(bnoordhuis) Add converters that take a |length| parameter. */ 932 err = uv__convert_utf8_to_utf16(name_copy, &handle->name); 933 uv__free(name_copy); 934 name_copy = NULL; 935 936 if (err) { 937 err = ERROR_NO_UNICODE_TRANSLATION; 938 goto error; 939 } 940 941 pipeHandle = open_named_pipe(handle->name, &duplex_flags); 942 if (pipeHandle == INVALID_HANDLE_VALUE) { 943 if (GetLastError() == ERROR_PIPE_BUSY) { 944 nameSize = (wcslen(handle->name) + 1) * sizeof(WCHAR); 945 req->u.connect.name = uv__malloc(nameSize); 946 if (!req->u.connect.name) { 947 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); 948 } 949 950 memcpy(req->u.connect.name, handle->name, nameSize); 951 952 /* Wait for the server to make a pipe instance available. */ 953 if (!QueueUserWorkItem(&pipe_connect_thread_proc, 954 req, 955 WT_EXECUTELONGFUNCTION)) { 956 uv__free(req->u.connect.name); 957 req->u.connect.name = NULL; 958 err = GetLastError(); 959 goto error; 960 } 961 962 REGISTER_HANDLE_REQ(loop, handle); 963 handle->reqs_pending++; 964 965 return 0; 966 } 967 968 err = GetLastError(); 969 goto error; 970 } 971 972 req->u.connect.pipeHandle = pipeHandle; 973 req->u.connect.duplex_flags = duplex_flags; 974 SET_REQ_SUCCESS(req); 975 uv__insert_pending_req(loop, (uv_req_t*) req); 976 handle->reqs_pending++; 977 REGISTER_HANDLE_REQ(loop, handle); 978 return 0; 979 980 error: 981 uv__free(name_copy); 982 983 if (handle->name) { 984 uv__free(handle->name); 985 handle->name = NULL; 986 } 987 988 if (pipeHandle != INVALID_HANDLE_VALUE) 989 CloseHandle(pipeHandle); 990 991 /* Make this req pending reporting an error. */ 992 SET_REQ_ERROR(req, err); 993 uv__insert_pending_req(loop, (uv_req_t*) req); 994 handle->reqs_pending++; 995 REGISTER_HANDLE_REQ(loop, handle); 996 return 0; 997 } 998 999 1000 void uv__pipe_interrupt_read(uv_pipe_t* handle) { 1001 BOOL r; 1002 1003 if (!(handle->flags & UV_HANDLE_READ_PENDING)) 1004 return; /* No pending reads. */ 1005 if (handle->flags & UV_HANDLE_CANCELLATION_PENDING) 1006 return; /* Already cancelled. */ 1007 if (handle->handle == INVALID_HANDLE_VALUE) 1008 return; /* Pipe handle closed. */ 1009 1010 if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) { 1011 /* Cancel asynchronous read. */ 1012 r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped); 1013 assert(r || GetLastError() == ERROR_NOT_FOUND); 1014 (void) r; 1015 } else { 1016 /* Cancel synchronous read (which is happening in the thread pool). */ 1017 HANDLE thread; 1018 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle; 1019 1020 EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock); 1021 1022 thread = *thread_ptr; 1023 if (thread == NULL) { 1024 /* The thread pool thread has not yet reached the point of blocking, we 1025 * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */ 1026 *thread_ptr = INVALID_HANDLE_VALUE; 1027 1028 } else { 1029 /* Spin until the thread has acknowledged (by setting the thread to 1030 * INVALID_HANDLE_VALUE) that it is past the point of blocking. */ 1031 while (thread != INVALID_HANDLE_VALUE) { 1032 r = CancelSynchronousIo(thread); 1033 assert(r || GetLastError() == ERROR_NOT_FOUND); 1034 SwitchToThread(); /* Yield thread. */ 1035 thread = *thread_ptr; 1036 } 1037 } 1038 1039 LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock); 1040 } 1041 1042 /* Set flag to indicate that read has been cancelled. */ 1043 handle->flags |= UV_HANDLE_CANCELLATION_PENDING; 1044 } 1045 1046 1047 void uv__pipe_read_stop(uv_pipe_t* handle) { 1048 handle->flags &= ~UV_HANDLE_READING; 1049 DECREASE_ACTIVE_COUNT(handle->loop, handle); 1050 uv__pipe_interrupt_read(handle); 1051 } 1052 1053 1054 /* Cleans up uv_pipe_t (server or connection) and all resources associated with 1055 * it. */ 1056 void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) { 1057 int i; 1058 HANDLE pipeHandle; 1059 1060 if (handle->flags & UV_HANDLE_READING) { 1061 handle->flags &= ~UV_HANDLE_READING; 1062 DECREASE_ACTIVE_COUNT(loop, handle); 1063 } 1064 1065 if (handle->flags & UV_HANDLE_LISTENING) { 1066 handle->flags &= ~UV_HANDLE_LISTENING; 1067 DECREASE_ACTIVE_COUNT(loop, handle); 1068 } 1069 1070 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 1071 1072 uv__handle_closing(handle); 1073 1074 uv__pipe_interrupt_read(handle); 1075 1076 if (handle->name) { 1077 uv__free(handle->name); 1078 handle->name = NULL; 1079 } 1080 1081 if (handle->flags & UV_HANDLE_PIPESERVER) { 1082 for (i = 0; i < handle->pipe.serv.pending_instances; i++) { 1083 pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle; 1084 if (pipeHandle != INVALID_HANDLE_VALUE) { 1085 CloseHandle(pipeHandle); 1086 handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE; 1087 } 1088 } 1089 handle->handle = INVALID_HANDLE_VALUE; 1090 } 1091 1092 if (handle->flags & UV_HANDLE_CONNECTION) { 1093 eof_timer_destroy(handle); 1094 } 1095 1096 if ((handle->flags & UV_HANDLE_CONNECTION) 1097 && handle->handle != INVALID_HANDLE_VALUE) { 1098 /* This will eventually destroy the write queue for us too. */ 1099 close_pipe(handle); 1100 } 1101 1102 if (handle->reqs_pending == 0) 1103 uv__want_endgame(loop, (uv_handle_t*) handle); 1104 } 1105 1106 1107 static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, 1108 uv_pipe_accept_t* req, BOOL firstInstance) { 1109 assert(handle->flags & UV_HANDLE_LISTENING); 1110 1111 if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) { 1112 SET_REQ_ERROR(req, GetLastError()); 1113 uv__insert_pending_req(loop, (uv_req_t*) req); 1114 handle->reqs_pending++; 1115 return; 1116 } 1117 1118 assert(req->pipeHandle != INVALID_HANDLE_VALUE); 1119 1120 /* Prepare the overlapped structure. */ 1121 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped)); 1122 1123 if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) && 1124 GetLastError() != ERROR_IO_PENDING) { 1125 if (GetLastError() == ERROR_PIPE_CONNECTED) { 1126 SET_REQ_SUCCESS(req); 1127 } else { 1128 CloseHandle(req->pipeHandle); 1129 req->pipeHandle = INVALID_HANDLE_VALUE; 1130 /* Make this req pending reporting an error. */ 1131 SET_REQ_ERROR(req, GetLastError()); 1132 } 1133 uv__insert_pending_req(loop, (uv_req_t*) req); 1134 handle->reqs_pending++; 1135 return; 1136 } 1137 1138 /* Wait for completion via IOCP */ 1139 handle->reqs_pending++; 1140 } 1141 1142 1143 int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) { 1144 uv_loop_t* loop = server->loop; 1145 uv_pipe_t* pipe_client; 1146 uv_pipe_accept_t* req; 1147 struct uv__queue* q; 1148 uv__ipc_xfer_queue_item_t* item; 1149 int err; 1150 1151 if (server->ipc) { 1152 if (uv__queue_empty(&server->pipe.conn.ipc_xfer_queue)) { 1153 /* No valid pending sockets. */ 1154 return WSAEWOULDBLOCK; 1155 } 1156 1157 q = uv__queue_head(&server->pipe.conn.ipc_xfer_queue); 1158 uv__queue_remove(q); 1159 server->pipe.conn.ipc_xfer_queue_length--; 1160 item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member); 1161 1162 err = uv__tcp_xfer_import( 1163 (uv_tcp_t*) client, item->xfer_type, &item->xfer_info); 1164 1165 uv__free(item); 1166 1167 if (err != 0) 1168 return err; 1169 1170 } else { 1171 pipe_client = (uv_pipe_t*) client; 1172 uv__pipe_connection_init(pipe_client); 1173 1174 /* Find a connection instance that has been connected, but not yet 1175 * accepted. */ 1176 req = server->pipe.serv.pending_accepts; 1177 1178 if (!req) { 1179 /* No valid connections found, so we error out. */ 1180 return WSAEWOULDBLOCK; 1181 } 1182 1183 /* Initialize the client handle and copy the pipeHandle to the client */ 1184 pipe_client->handle = req->pipeHandle; 1185 pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; 1186 1187 /* Prepare the req to pick up a new connection */ 1188 server->pipe.serv.pending_accepts = req->next_pending; 1189 req->next_pending = NULL; 1190 req->pipeHandle = INVALID_HANDLE_VALUE; 1191 1192 server->handle = INVALID_HANDLE_VALUE; 1193 if (!(server->flags & UV_HANDLE_CLOSING)) { 1194 uv__pipe_queue_accept(loop, server, req, FALSE); 1195 } 1196 } 1197 1198 return 0; 1199 } 1200 1201 1202 /* Starts listening for connections for the given pipe. */ 1203 int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { 1204 uv_loop_t* loop = handle->loop; 1205 int i; 1206 1207 if (handle->flags & UV_HANDLE_LISTENING) { 1208 handle->stream.serv.connection_cb = cb; 1209 } 1210 1211 if (!(handle->flags & UV_HANDLE_BOUND)) { 1212 return WSAEINVAL; 1213 } 1214 1215 if (handle->flags & UV_HANDLE_READING) { 1216 return WSAEISCONN; 1217 } 1218 1219 if (!(handle->flags & UV_HANDLE_PIPESERVER)) { 1220 return ERROR_NOT_SUPPORTED; 1221 } 1222 1223 if (handle->ipc) { 1224 return WSAEINVAL; 1225 } 1226 1227 handle->flags |= UV_HANDLE_LISTENING; 1228 INCREASE_ACTIVE_COUNT(loop, handle); 1229 handle->stream.serv.connection_cb = cb; 1230 1231 /* First pipe handle should have already been created in uv_pipe_bind */ 1232 assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); 1233 1234 for (i = 0; i < handle->pipe.serv.pending_instances; i++) { 1235 uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0); 1236 } 1237 1238 return 0; 1239 } 1240 1241 1242 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) { 1243 uv_read_t* req = (uv_read_t*) arg; 1244 uv_pipe_t* handle = (uv_pipe_t*) req->data; 1245 uv_loop_t* loop = handle->loop; 1246 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle; 1247 CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock; 1248 HANDLE thread; 1249 DWORD bytes; 1250 DWORD err; 1251 1252 assert(req->type == UV_READ); 1253 assert(handle->type == UV_NAMED_PIPE); 1254 1255 err = 0; 1256 1257 /* Create a handle to the current thread. */ 1258 if (!DuplicateHandle(GetCurrentProcess(), 1259 GetCurrentThread(), 1260 GetCurrentProcess(), 1261 &thread, 1262 0, 1263 FALSE, 1264 DUPLICATE_SAME_ACCESS)) { 1265 err = GetLastError(); 1266 goto out1; 1267 } 1268 1269 /* The lock needs to be held when thread handle is modified. */ 1270 EnterCriticalSection(lock); 1271 if (*thread_ptr == INVALID_HANDLE_VALUE) { 1272 /* uv__pipe_interrupt_read() cancelled reading before we got here. */ 1273 err = ERROR_OPERATION_ABORTED; 1274 } else { 1275 /* Let main thread know which worker thread is doing the blocking read. */ 1276 assert(*thread_ptr == NULL); 1277 *thread_ptr = thread; 1278 } 1279 LeaveCriticalSection(lock); 1280 1281 if (err) 1282 goto out2; 1283 1284 /* Block the thread until data is available on the pipe, or the read is 1285 * cancelled. */ 1286 if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL)) 1287 err = GetLastError(); 1288 1289 /* Let the main thread know the worker is past the point of blocking. */ 1290 assert(thread == *thread_ptr); 1291 *thread_ptr = INVALID_HANDLE_VALUE; 1292 1293 /* Briefly acquire the mutex. Since the main thread holds the lock while it 1294 * is spinning trying to cancel this thread's I/O, we will block here until 1295 * it stops doing that. */ 1296 EnterCriticalSection(lock); 1297 LeaveCriticalSection(lock); 1298 1299 out2: 1300 /* Close the handle to the current thread. */ 1301 CloseHandle(thread); 1302 1303 out1: 1304 /* Set request status and post a completion record to the IOCP. */ 1305 if (err) 1306 SET_REQ_ERROR(req, err); 1307 else 1308 SET_REQ_SUCCESS(req); 1309 POST_COMPLETION_FOR_REQ(loop, req); 1310 1311 return 0; 1312 } 1313 1314 1315 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) { 1316 int result; 1317 DWORD bytes; 1318 uv_write_t* req = (uv_write_t*) parameter; 1319 uv_pipe_t* handle = (uv_pipe_t*) req->handle; 1320 uv_loop_t* loop = handle->loop; 1321 1322 assert(req != NULL); 1323 assert(req->type == UV_WRITE); 1324 assert(handle->type == UV_NAMED_PIPE); 1325 1326 result = WriteFile(handle->handle, 1327 req->write_buffer.base, 1328 req->write_buffer.len, 1329 &bytes, 1330 NULL); 1331 1332 if (!result) { 1333 SET_REQ_ERROR(req, GetLastError()); 1334 } 1335 1336 POST_COMPLETION_FOR_REQ(loop, req); 1337 return 0; 1338 } 1339 1340 1341 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) { 1342 uv_read_t* req; 1343 uv_tcp_t* handle; 1344 1345 req = (uv_read_t*) context; 1346 assert(req != NULL); 1347 handle = (uv_tcp_t*)req->data; 1348 assert(handle != NULL); 1349 assert(!timed_out); 1350 1351 if (!PostQueuedCompletionStatus(handle->loop->iocp, 1352 req->u.io.overlapped.InternalHigh, 1353 0, 1354 &req->u.io.overlapped)) { 1355 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); 1356 } 1357 } 1358 1359 1360 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) { 1361 uv_write_t* req; 1362 uv_tcp_t* handle; 1363 1364 req = (uv_write_t*) context; 1365 assert(req != NULL); 1366 handle = (uv_tcp_t*)req->handle; 1367 assert(handle != NULL); 1368 assert(!timed_out); 1369 1370 if (!PostQueuedCompletionStatus(handle->loop->iocp, 1371 req->u.io.overlapped.InternalHigh, 1372 0, 1373 &req->u.io.overlapped)) { 1374 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); 1375 } 1376 } 1377 1378 1379 static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { 1380 uv_read_t* req; 1381 int result; 1382 1383 assert(handle->flags & UV_HANDLE_READING); 1384 assert(!(handle->flags & UV_HANDLE_READ_PENDING)); 1385 1386 assert(handle->handle != INVALID_HANDLE_VALUE); 1387 1388 req = &handle->read_req; 1389 1390 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { 1391 handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */ 1392 if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc, 1393 req, 1394 WT_EXECUTELONGFUNCTION)) { 1395 /* Make this req pending reporting an error. */ 1396 SET_REQ_ERROR(req, GetLastError()); 1397 goto error; 1398 } 1399 } else { 1400 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); 1401 if (handle->flags & UV_HANDLE_EMULATE_IOCP) { 1402 assert(req->event_handle != NULL); 1403 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); 1404 } 1405 1406 /* Do 0-read */ 1407 result = ReadFile(handle->handle, 1408 &uv_zero_, 1409 0, 1410 NULL, 1411 &req->u.io.overlapped); 1412 1413 if (!result && GetLastError() != ERROR_IO_PENDING) { 1414 /* Make this req pending reporting an error. */ 1415 SET_REQ_ERROR(req, GetLastError()); 1416 goto error; 1417 } 1418 1419 if (handle->flags & UV_HANDLE_EMULATE_IOCP) { 1420 assert(req->wait_handle == INVALID_HANDLE_VALUE); 1421 if (!RegisterWaitForSingleObject(&req->wait_handle, 1422 req->event_handle, post_completion_read_wait, (void*) req, 1423 INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) { 1424 SET_REQ_ERROR(req, GetLastError()); 1425 goto error; 1426 } 1427 } 1428 } 1429 1430 /* Start the eof timer if there is one */ 1431 eof_timer_start(handle); 1432 handle->flags |= UV_HANDLE_READ_PENDING; 1433 handle->reqs_pending++; 1434 return; 1435 1436 error: 1437 uv__insert_pending_req(loop, (uv_req_t*)req); 1438 handle->flags |= UV_HANDLE_READ_PENDING; 1439 handle->reqs_pending++; 1440 } 1441 1442 1443 int uv__pipe_read_start(uv_pipe_t* handle, 1444 uv_alloc_cb alloc_cb, 1445 uv_read_cb read_cb) { 1446 uv_loop_t* loop = handle->loop; 1447 1448 handle->flags |= UV_HANDLE_READING; 1449 INCREASE_ACTIVE_COUNT(loop, handle); 1450 handle->read_cb = read_cb; 1451 handle->alloc_cb = alloc_cb; 1452 1453 if (handle->read_req.event_handle == NULL) { 1454 handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL); 1455 if (handle->read_req.event_handle == NULL) { 1456 uv_fatal_error(GetLastError(), "CreateEvent"); 1457 } 1458 } 1459 1460 /* If reading was stopped and then started again, there could still be a read 1461 * request pending. */ 1462 if (!(handle->flags & UV_HANDLE_READ_PENDING)) { 1463 uv__pipe_queue_read(loop, handle); 1464 } 1465 1466 return 0; 1467 } 1468 1469 1470 static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle, 1471 uv_write_t* req) { 1472 req->next_req = NULL; 1473 if (handle->pipe.conn.non_overlapped_writes_tail) { 1474 req->next_req = 1475 handle->pipe.conn.non_overlapped_writes_tail->next_req; 1476 handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req; 1477 handle->pipe.conn.non_overlapped_writes_tail = req; 1478 } else { 1479 req->next_req = (uv_req_t*)req; 1480 handle->pipe.conn.non_overlapped_writes_tail = req; 1481 } 1482 } 1483 1484 1485 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) { 1486 uv_write_t* req; 1487 1488 if (handle->pipe.conn.non_overlapped_writes_tail) { 1489 req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req; 1490 1491 if (req == handle->pipe.conn.non_overlapped_writes_tail) { 1492 handle->pipe.conn.non_overlapped_writes_tail = NULL; 1493 } else { 1494 handle->pipe.conn.non_overlapped_writes_tail->next_req = 1495 req->next_req; 1496 } 1497 1498 return req; 1499 } else { 1500 /* queue empty */ 1501 return NULL; 1502 } 1503 } 1504 1505 1506 static void uv__queue_non_overlapped_write(uv_pipe_t* handle) { 1507 uv_write_t* req = uv_remove_non_overlapped_write_req(handle); 1508 if (req) { 1509 if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc, 1510 req, 1511 WT_EXECUTELONGFUNCTION)) { 1512 uv_fatal_error(GetLastError(), "QueueUserWorkItem"); 1513 } 1514 } 1515 } 1516 1517 1518 static int uv__build_coalesced_write_req(uv_write_t* user_req, 1519 const uv_buf_t bufs[], 1520 size_t nbufs, 1521 uv_write_t** req_out, 1522 uv_buf_t* write_buf_out) { 1523 /* Pack into a single heap-allocated buffer: 1524 * (a) a uv_write_t structure where libuv stores the actual state. 1525 * (b) a pointer to the original uv_write_t. 1526 * (c) data from all `bufs` entries. 1527 */ 1528 char* heap_buffer; 1529 size_t heap_buffer_length, heap_buffer_offset; 1530 uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */ 1531 char* data_start; /* (c) */ 1532 size_t data_length; 1533 unsigned int i; 1534 1535 /* Compute combined size of all combined buffers from `bufs`. */ 1536 data_length = 0; 1537 for (i = 0; i < nbufs; i++) 1538 data_length += bufs[i].len; 1539 1540 /* The total combined size of data buffers should not exceed UINT32_MAX, 1541 * because WriteFile() won't accept buffers larger than that. */ 1542 if (data_length > UINT32_MAX) 1543 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */ 1544 1545 /* Compute heap buffer size. */ 1546 heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */ 1547 data_length; /* (c) */ 1548 1549 /* Allocate buffer. */ 1550 heap_buffer = uv__malloc(heap_buffer_length); 1551 if (heap_buffer == NULL) 1552 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */ 1553 1554 /* Copy uv_write_t information to the buffer. */ 1555 coalesced_write_req = (uv__coalesced_write_t*) heap_buffer; 1556 coalesced_write_req->req = *user_req; /* copy (a) */ 1557 coalesced_write_req->req.coalesced = 1; 1558 coalesced_write_req->user_req = user_req; /* copy (b) */ 1559 heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */ 1560 1561 /* Copy data buffers to the heap buffer. */ 1562 data_start = &heap_buffer[heap_buffer_offset]; 1563 for (i = 0; i < nbufs; i++) { 1564 memcpy(&heap_buffer[heap_buffer_offset], 1565 bufs[i].base, 1566 bufs[i].len); /* copy (c) */ 1567 heap_buffer_offset += bufs[i].len; /* offset (c) */ 1568 } 1569 assert(heap_buffer_offset == heap_buffer_length); 1570 1571 /* Set out arguments and return. */ 1572 *req_out = &coalesced_write_req->req; 1573 *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length); 1574 return 0; 1575 } 1576 1577 1578 static int uv__pipe_write_data(uv_loop_t* loop, 1579 uv_write_t* req, 1580 uv_pipe_t* handle, 1581 const uv_buf_t bufs[], 1582 size_t nbufs, 1583 uv_write_cb cb, 1584 int copy_always) { 1585 int err; 1586 int result; 1587 uv_buf_t write_buf; 1588 1589 assert(handle->handle != INVALID_HANDLE_VALUE); 1590 1591 UV_REQ_INIT(req, UV_WRITE); 1592 req->handle = (uv_stream_t*) handle; 1593 req->send_handle = NULL; 1594 req->cb = cb; 1595 /* Private fields. */ 1596 req->coalesced = 0; 1597 req->event_handle = NULL; 1598 req->wait_handle = INVALID_HANDLE_VALUE; 1599 1600 /* Prepare the overlapped structure. */ 1601 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); 1602 if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) { 1603 req->event_handle = CreateEvent(NULL, 0, 0, NULL); 1604 if (req->event_handle == NULL) { 1605 uv_fatal_error(GetLastError(), "CreateEvent"); 1606 } 1607 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); 1608 } 1609 req->write_buffer = uv_null_buf_; 1610 1611 if (nbufs == 0) { 1612 /* Write empty buffer. */ 1613 write_buf = uv_null_buf_; 1614 } else if (nbufs == 1 && !copy_always) { 1615 /* Write directly from bufs[0]. */ 1616 write_buf = bufs[0]; 1617 } else { 1618 /* Coalesce all `bufs` into one big buffer. This also creates a new 1619 * write-request structure that replaces the old one. */ 1620 err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf); 1621 if (err != 0) 1622 return err; 1623 } 1624 1625 if ((handle->flags & 1626 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) == 1627 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) { 1628 DWORD bytes; 1629 result = 1630 WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL); 1631 1632 if (!result) { 1633 err = GetLastError(); 1634 return err; 1635 } else { 1636 /* Request completed immediately. */ 1637 req->u.io.queued_bytes = 0; 1638 } 1639 1640 REGISTER_HANDLE_REQ(loop, handle); 1641 handle->reqs_pending++; 1642 handle->stream.conn.write_reqs_pending++; 1643 POST_COMPLETION_FOR_REQ(loop, req); 1644 return 0; 1645 } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { 1646 req->write_buffer = write_buf; 1647 uv__insert_non_overlapped_write_req(handle, req); 1648 if (handle->stream.conn.write_reqs_pending == 0) { 1649 uv__queue_non_overlapped_write(handle); 1650 } 1651 1652 /* Request queued by the kernel. */ 1653 req->u.io.queued_bytes = write_buf.len; 1654 handle->write_queue_size += req->u.io.queued_bytes; 1655 } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) { 1656 /* Using overlapped IO, but wait for completion before returning */ 1657 result = WriteFile(handle->handle, 1658 write_buf.base, 1659 write_buf.len, 1660 NULL, 1661 &req->u.io.overlapped); 1662 1663 if (!result && GetLastError() != ERROR_IO_PENDING) { 1664 err = GetLastError(); 1665 CloseHandle(req->event_handle); 1666 req->event_handle = NULL; 1667 return err; 1668 } 1669 1670 if (result) { 1671 /* Request completed immediately. */ 1672 req->u.io.queued_bytes = 0; 1673 } else { 1674 /* Request queued by the kernel. */ 1675 req->u.io.queued_bytes = write_buf.len; 1676 handle->write_queue_size += req->u.io.queued_bytes; 1677 if (WaitForSingleObject(req->event_handle, INFINITE) != 1678 WAIT_OBJECT_0) { 1679 err = GetLastError(); 1680 CloseHandle(req->event_handle); 1681 req->event_handle = NULL; 1682 return err; 1683 } 1684 } 1685 CloseHandle(req->event_handle); 1686 req->event_handle = NULL; 1687 1688 REGISTER_HANDLE_REQ(loop, handle); 1689 handle->reqs_pending++; 1690 handle->stream.conn.write_reqs_pending++; 1691 return 0; 1692 } else { 1693 result = WriteFile(handle->handle, 1694 write_buf.base, 1695 write_buf.len, 1696 NULL, 1697 &req->u.io.overlapped); 1698 1699 if (!result && GetLastError() != ERROR_IO_PENDING) { 1700 return GetLastError(); 1701 } 1702 1703 if (result) { 1704 /* Request completed immediately. */ 1705 req->u.io.queued_bytes = 0; 1706 } else { 1707 /* Request queued by the kernel. */ 1708 req->u.io.queued_bytes = write_buf.len; 1709 handle->write_queue_size += req->u.io.queued_bytes; 1710 } 1711 1712 if (handle->flags & UV_HANDLE_EMULATE_IOCP) { 1713 if (!RegisterWaitForSingleObject(&req->wait_handle, 1714 req->event_handle, post_completion_write_wait, (void*) req, 1715 INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) { 1716 return GetLastError(); 1717 } 1718 } 1719 } 1720 1721 REGISTER_HANDLE_REQ(loop, handle); 1722 handle->reqs_pending++; 1723 handle->stream.conn.write_reqs_pending++; 1724 1725 return 0; 1726 } 1727 1728 1729 static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) { 1730 DWORD* pid = &handle->pipe.conn.ipc_remote_pid; 1731 1732 /* If the both ends of the IPC pipe are owned by the same process, 1733 * the remote end pid may not yet be set. If so, do it here. 1734 * TODO: this is weird; it'd probably better to use a handshake. */ 1735 if (*pid == 0) { 1736 GetNamedPipeClientProcessId(handle->handle, pid); 1737 if (*pid == GetCurrentProcessId()) { 1738 GetNamedPipeServerProcessId(handle->handle, pid); 1739 } 1740 } 1741 1742 return *pid; 1743 } 1744 1745 1746 int uv__pipe_write_ipc(uv_loop_t* loop, 1747 uv_write_t* req, 1748 uv_pipe_t* handle, 1749 const uv_buf_t data_bufs[], 1750 size_t data_buf_count, 1751 uv_stream_t* send_handle, 1752 uv_write_cb cb) { 1753 uv_buf_t stack_bufs[6]; 1754 uv_buf_t* bufs; 1755 size_t buf_count, buf_index; 1756 uv__ipc_frame_header_t frame_header; 1757 uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE; 1758 uv__ipc_socket_xfer_info_t xfer_info; 1759 uint64_t data_length; 1760 size_t i; 1761 int err; 1762 1763 /* Compute the combined size of data buffers. */ 1764 data_length = 0; 1765 for (i = 0; i < data_buf_count; i++) 1766 data_length += data_bufs[i].len; 1767 if (data_length > UINT32_MAX) 1768 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */ 1769 1770 /* Prepare the frame's socket xfer payload. */ 1771 if (send_handle != NULL) { 1772 uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle; 1773 1774 /* Verify that `send_handle` it is indeed a tcp handle. */ 1775 if (send_tcp_handle->type != UV_TCP) 1776 return ERROR_NOT_SUPPORTED; 1777 1778 /* Export the tcp handle. */ 1779 err = uv__tcp_xfer_export(send_tcp_handle, 1780 uv__pipe_get_ipc_remote_pid(handle), 1781 &xfer_type, 1782 &xfer_info); 1783 if (err != 0) 1784 return err; 1785 } 1786 1787 /* Compute the number of uv_buf_t's required. */ 1788 buf_count = 1 + data_buf_count; /* Frame header and data buffers. */ 1789 if (send_handle != NULL) 1790 buf_count += 1; /* One extra for the socket xfer information. */ 1791 1792 /* Use the on-stack buffer array if it is big enough; otherwise allocate 1793 * space for it on the heap. */ 1794 if (buf_count < ARRAY_SIZE(stack_bufs)) { 1795 /* Use on-stack buffer array. */ 1796 bufs = stack_bufs; 1797 } else { 1798 /* Use heap-allocated buffer array. */ 1799 bufs = uv__calloc(buf_count, sizeof(uv_buf_t)); 1800 if (bufs == NULL) 1801 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */ 1802 } 1803 buf_index = 0; 1804 1805 /* Initialize frame header and add it to the buffers list. */ 1806 memset(&frame_header, 0, sizeof frame_header); 1807 bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header); 1808 1809 if (send_handle != NULL) { 1810 /* Add frame header flags. */ 1811 switch (xfer_type) { 1812 case UV__IPC_SOCKET_XFER_TCP_CONNECTION: 1813 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER | 1814 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION; 1815 break; 1816 case UV__IPC_SOCKET_XFER_TCP_SERVER: 1817 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER; 1818 break; 1819 default: 1820 assert(0); /* Unreachable. */ 1821 } 1822 /* Add xfer info buffer. */ 1823 bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info); 1824 } 1825 1826 if (data_length > 0) { 1827 /* Update frame header. */ 1828 frame_header.flags |= UV__IPC_FRAME_HAS_DATA; 1829 frame_header.data_length = (uint32_t) data_length; 1830 /* Add data buffers to buffers list. */ 1831 for (i = 0; i < data_buf_count; i++) 1832 bufs[buf_index++] = data_bufs[i]; 1833 } 1834 1835 /* Write buffers. We set the `always_copy` flag, so it is not a problem that 1836 * some of the written data lives on the stack. */ 1837 err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1); 1838 1839 /* If we had to heap-allocate the bufs array, free it now. */ 1840 if (bufs != stack_bufs) { 1841 uv__free(bufs); 1842 } 1843 1844 return err; 1845 } 1846 1847 1848 int uv__pipe_write(uv_loop_t* loop, 1849 uv_write_t* req, 1850 uv_pipe_t* handle, 1851 const uv_buf_t bufs[], 1852 size_t nbufs, 1853 uv_stream_t* send_handle, 1854 uv_write_cb cb) { 1855 if (handle->ipc) { 1856 /* IPC pipe write: use framing protocol. */ 1857 return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb); 1858 } else { 1859 /* Non-IPC pipe write: put data on the wire directly. */ 1860 assert(send_handle == NULL); 1861 return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0); 1862 } 1863 } 1864 1865 1866 static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, 1867 uv_buf_t buf) { 1868 /* If there is an eof timer running, we don't need it any more, so discard 1869 * it. */ 1870 eof_timer_destroy(handle); 1871 1872 uv_read_stop((uv_stream_t*) handle); 1873 1874 handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf); 1875 } 1876 1877 1878 static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, 1879 uv_buf_t buf) { 1880 /* If there is an eof timer running, we don't need it any more, so discard 1881 * it. */ 1882 eof_timer_destroy(handle); 1883 1884 uv_read_stop((uv_stream_t*) handle); 1885 1886 handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf); 1887 } 1888 1889 1890 static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, 1891 DWORD error, uv_buf_t buf) { 1892 if (error == ERROR_BROKEN_PIPE) { 1893 uv__pipe_read_eof(loop, handle, buf); 1894 } else { 1895 uv__pipe_read_error(loop, handle, error, buf); 1896 } 1897 } 1898 1899 1900 static void uv__pipe_queue_ipc_xfer_info( 1901 uv_pipe_t* handle, 1902 uv__ipc_socket_xfer_type_t xfer_type, 1903 uv__ipc_socket_xfer_info_t* xfer_info) { 1904 uv__ipc_xfer_queue_item_t* item; 1905 1906 item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item)); 1907 if (item == NULL) 1908 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); 1909 1910 item->xfer_type = xfer_type; 1911 item->xfer_info = *xfer_info; 1912 1913 uv__queue_insert_tail(&handle->pipe.conn.ipc_xfer_queue, &item->member); 1914 handle->pipe.conn.ipc_xfer_queue_length++; 1915 } 1916 1917 1918 /* Read an exact number of bytes from a pipe. If an error or end-of-file is 1919 * encountered before the requested number of bytes are read, an error is 1920 * returned. */ 1921 static DWORD uv__pipe_read_exactly(uv_pipe_t* handle, void* buffer, DWORD count) { 1922 uv_read_t* req; 1923 DWORD bytes_read; 1924 DWORD bytes_read_now; 1925 1926 bytes_read = 0; 1927 while (bytes_read < count) { 1928 req = &handle->read_req; 1929 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); 1930 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); 1931 if (!ReadFile(handle->handle, 1932 (char*) buffer + bytes_read, 1933 count - bytes_read, 1934 &bytes_read_now, 1935 &req->u.io.overlapped)) { 1936 if (GetLastError() != ERROR_IO_PENDING) 1937 return GetLastError(); 1938 if (!GetOverlappedResult(handle->handle, &req->u.io.overlapped, &bytes_read_now, TRUE)) 1939 return GetLastError(); 1940 } 1941 1942 bytes_read += bytes_read_now; 1943 } 1944 1945 assert(bytes_read == count); 1946 return 0; 1947 } 1948 1949 1950 static int uv__pipe_read_data(uv_loop_t* loop, 1951 uv_pipe_t* handle, 1952 DWORD* bytes_read, /* inout argument */ 1953 DWORD max_bytes) { 1954 uv_buf_t buf; 1955 uv_read_t* req; 1956 DWORD r; 1957 DWORD bytes_available; 1958 int more; 1959 1960 /* Ask the user for a buffer to read data into. */ 1961 buf = uv_buf_init(NULL, 0); 1962 handle->alloc_cb((uv_handle_t*) handle, *bytes_read, &buf); 1963 if (buf.base == NULL || buf.len == 0) { 1964 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf); 1965 return 0; /* Break out of read loop. */ 1966 } 1967 1968 /* Ensure we read at most the smaller of: 1969 * (a) the length of the user-allocated buffer. 1970 * (b) the maximum data length as specified by the `max_bytes` argument. 1971 * (c) the amount of data that can be read non-blocking 1972 */ 1973 if (max_bytes > buf.len) 1974 max_bytes = buf.len; 1975 1976 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { 1977 /* The user failed to supply a pipe that can be used non-blocking or with 1978 * threads. Try to estimate the amount of data that is safe to read without 1979 * blocking, in a race-y way however. */ 1980 bytes_available = 0; 1981 if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &bytes_available, NULL)) { 1982 r = GetLastError(); 1983 } else { 1984 if (max_bytes > bytes_available) 1985 max_bytes = bytes_available; 1986 *bytes_read = 0; 1987 if (max_bytes == 0 || ReadFile(handle->handle, buf.base, max_bytes, bytes_read, NULL)) 1988 r = ERROR_SUCCESS; 1989 else 1990 r = GetLastError(); 1991 } 1992 more = max_bytes < bytes_available; 1993 } else { 1994 /* Read into the user buffer. 1995 * Prepare an Event so that we can cancel if it doesn't complete immediately. 1996 */ 1997 req = &handle->read_req; 1998 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); 1999 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); 2000 if (ReadFile(handle->handle, buf.base, max_bytes, bytes_read, &req->u.io.overlapped)) { 2001 r = ERROR_SUCCESS; 2002 } else { 2003 r = GetLastError(); 2004 *bytes_read = 0; 2005 if (r == ERROR_IO_PENDING) { 2006 r = CancelIoEx(handle->handle, &req->u.io.overlapped); 2007 assert(r || GetLastError() == ERROR_NOT_FOUND); 2008 if (GetOverlappedResult(handle->handle, &req->u.io.overlapped, bytes_read, TRUE)) { 2009 r = ERROR_SUCCESS; 2010 } else { 2011 r = GetLastError(); 2012 *bytes_read = 0; 2013 } 2014 } 2015 } 2016 more = *bytes_read == max_bytes; 2017 } 2018 2019 /* Call the read callback. */ 2020 if (r == ERROR_SUCCESS || r == ERROR_OPERATION_ABORTED) 2021 handle->read_cb((uv_stream_t*) handle, *bytes_read, &buf); 2022 else 2023 uv__pipe_read_error_or_eof(loop, handle, r, buf); 2024 2025 return more; 2026 } 2027 2028 2029 static int uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { 2030 uint32_t* data_remaining; 2031 DWORD err; 2032 DWORD more; 2033 DWORD bytes_read; 2034 2035 data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining; 2036 2037 if (*data_remaining > 0) { 2038 /* Read frame data payload. */ 2039 bytes_read = *data_remaining; 2040 more = uv__pipe_read_data(loop, handle, &bytes_read, bytes_read); 2041 *data_remaining -= bytes_read; 2042 2043 } else { 2044 /* Start of a new IPC frame. */ 2045 uv__ipc_frame_header_t frame_header; 2046 uint32_t xfer_flags; 2047 uv__ipc_socket_xfer_type_t xfer_type; 2048 uv__ipc_socket_xfer_info_t xfer_info; 2049 2050 /* Read the IPC frame header. */ 2051 err = uv__pipe_read_exactly( 2052 handle, &frame_header, sizeof frame_header); 2053 if (err) 2054 goto error; 2055 2056 /* Validate that flags are valid. */ 2057 if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0) 2058 goto invalid; 2059 /* Validate that reserved2 is zero. */ 2060 if (frame_header.reserved2 != 0) 2061 goto invalid; 2062 2063 /* Parse xfer flags. */ 2064 xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS; 2065 if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) { 2066 /* Socket coming -- determine the type. */ 2067 xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION 2068 ? UV__IPC_SOCKET_XFER_TCP_CONNECTION 2069 : UV__IPC_SOCKET_XFER_TCP_SERVER; 2070 } else if (xfer_flags == 0) { 2071 /* No socket. */ 2072 xfer_type = UV__IPC_SOCKET_XFER_NONE; 2073 } else { 2074 /* Invalid flags. */ 2075 goto invalid; 2076 } 2077 2078 /* Parse data frame information. */ 2079 if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) { 2080 *data_remaining = frame_header.data_length; 2081 } else if (frame_header.data_length != 0) { 2082 /* Data length greater than zero but data flag not set -- invalid. */ 2083 goto invalid; 2084 } 2085 2086 /* If no socket xfer info follows, return here. Data will be read in a 2087 * subsequent invocation of uv__pipe_read_ipc(). */ 2088 if (xfer_type != UV__IPC_SOCKET_XFER_NONE) { 2089 /* Read transferred socket information. */ 2090 err = uv__pipe_read_exactly(handle, &xfer_info, sizeof xfer_info); 2091 if (err) 2092 goto error; 2093 2094 /* Store the pending socket info. */ 2095 uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info); 2096 } 2097 } 2098 2099 /* Return whether the caller should immediately try another read call to get 2100 * more data. Calling uv__pipe_read_exactly will hang if there isn't data 2101 * available, so we cannot do this unless we are guaranteed not to reach that. 2102 */ 2103 more = *data_remaining > 0; 2104 return more; 2105 2106 invalid: 2107 /* Invalid frame. */ 2108 err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */ 2109 2110 error: 2111 uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); 2112 return 0; /* Break out of read loop. */ 2113 } 2114 2115 2116 void uv__process_pipe_read_req(uv_loop_t* loop, 2117 uv_pipe_t* handle, 2118 uv_req_t* req) { 2119 DWORD err; 2120 DWORD more; 2121 DWORD bytes_requested; 2122 assert(handle->type == UV_NAMED_PIPE); 2123 2124 handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING); 2125 DECREASE_PENDING_REQ_COUNT(handle); 2126 eof_timer_stop(handle); 2127 2128 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { 2129 UnregisterWait(handle->read_req.wait_handle); 2130 handle->read_req.wait_handle = INVALID_HANDLE_VALUE; 2131 } 2132 2133 /* At this point, we're done with bookkeeping. If the user has stopped 2134 * reading the pipe in the meantime, there is nothing left to do, since there 2135 * is no callback that we can call. */ 2136 if (!(handle->flags & UV_HANDLE_READING)) 2137 return; 2138 2139 if (!REQ_SUCCESS(req)) { 2140 /* An error occurred doing the zero-read. */ 2141 err = GET_REQ_ERROR(req); 2142 2143 /* If the read was cancelled by uv__pipe_interrupt_read(), the request may 2144 * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to 2145 * the user; we'll start a new zero-read at the end of this function. */ 2146 if (err != ERROR_OPERATION_ABORTED) 2147 uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); 2148 2149 } else { 2150 /* The zero-read completed without error, indicating there is data 2151 * available in the kernel buffer. */ 2152 while (handle->flags & UV_HANDLE_READING) { 2153 bytes_requested = 65536; 2154 /* Depending on the type of pipe, read either IPC frames or raw data. */ 2155 if (handle->ipc) 2156 more = uv__pipe_read_ipc(loop, handle); 2157 else 2158 more = uv__pipe_read_data(loop, handle, &bytes_requested, INT32_MAX); 2159 2160 /* If no bytes were read, treat this as an indication that an error 2161 * occurred, and break out of the read loop. */ 2162 if (more == 0) 2163 break; 2164 } 2165 } 2166 2167 /* Start another zero-read request if necessary. */ 2168 if ((handle->flags & UV_HANDLE_READING) && 2169 !(handle->flags & UV_HANDLE_READ_PENDING)) { 2170 uv__pipe_queue_read(loop, handle); 2171 } 2172 } 2173 2174 2175 void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, 2176 uv_write_t* req) { 2177 int err; 2178 2179 assert(handle->type == UV_NAMED_PIPE); 2180 2181 assert(handle->write_queue_size >= req->u.io.queued_bytes); 2182 handle->write_queue_size -= req->u.io.queued_bytes; 2183 2184 UNREGISTER_HANDLE_REQ(loop, handle); 2185 2186 if (req->wait_handle != INVALID_HANDLE_VALUE) { 2187 UnregisterWait(req->wait_handle); 2188 req->wait_handle = INVALID_HANDLE_VALUE; 2189 } 2190 if (req->event_handle) { 2191 CloseHandle(req->event_handle); 2192 req->event_handle = NULL; 2193 } 2194 2195 err = GET_REQ_ERROR(req); 2196 2197 /* If this was a coalesced write, extract pointer to the user_provided 2198 * uv_write_t structure so we can pass the expected pointer to the callback, 2199 * then free the heap-allocated write req. */ 2200 if (req->coalesced) { 2201 uv__coalesced_write_t* coalesced_write = 2202 container_of(req, uv__coalesced_write_t, req); 2203 req = coalesced_write->user_req; 2204 uv__free(coalesced_write); 2205 } 2206 if (req->cb) { 2207 req->cb(req, uv_translate_sys_error(err)); 2208 } 2209 2210 handle->stream.conn.write_reqs_pending--; 2211 2212 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE && 2213 handle->pipe.conn.non_overlapped_writes_tail) { 2214 assert(handle->stream.conn.write_reqs_pending > 0); 2215 uv__queue_non_overlapped_write(handle); 2216 } 2217 2218 if (handle->stream.conn.write_reqs_pending == 0 && 2219 uv__is_stream_shutting(handle)) 2220 uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req); 2221 2222 DECREASE_PENDING_REQ_COUNT(handle); 2223 } 2224 2225 2226 void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, 2227 uv_req_t* raw_req) { 2228 uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req; 2229 2230 assert(handle->type == UV_NAMED_PIPE); 2231 2232 if (handle->flags & UV_HANDLE_CLOSING) { 2233 /* The req->pipeHandle should be freed already in uv__pipe_close(). */ 2234 assert(req->pipeHandle == INVALID_HANDLE_VALUE); 2235 DECREASE_PENDING_REQ_COUNT(handle); 2236 return; 2237 } 2238 2239 if (REQ_SUCCESS(req)) { 2240 assert(req->pipeHandle != INVALID_HANDLE_VALUE); 2241 req->next_pending = handle->pipe.serv.pending_accepts; 2242 handle->pipe.serv.pending_accepts = req; 2243 2244 if (handle->stream.serv.connection_cb) { 2245 handle->stream.serv.connection_cb((uv_stream_t*)handle, 0); 2246 } 2247 } else { 2248 if (req->pipeHandle != INVALID_HANDLE_VALUE) { 2249 CloseHandle(req->pipeHandle); 2250 req->pipeHandle = INVALID_HANDLE_VALUE; 2251 } 2252 if (!(handle->flags & UV_HANDLE_CLOSING)) { 2253 uv__pipe_queue_accept(loop, handle, req, FALSE); 2254 } 2255 } 2256 2257 DECREASE_PENDING_REQ_COUNT(handle); 2258 } 2259 2260 2261 void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle, 2262 uv_connect_t* req) { 2263 HANDLE pipeHandle; 2264 DWORD duplex_flags; 2265 int err; 2266 2267 assert(handle->type == UV_NAMED_PIPE); 2268 2269 UNREGISTER_HANDLE_REQ(loop, handle); 2270 2271 err = 0; 2272 if (REQ_SUCCESS(req)) { 2273 pipeHandle = req->u.connect.pipeHandle; 2274 duplex_flags = req->u.connect.duplex_flags; 2275 if (handle->flags & UV_HANDLE_CLOSING) 2276 err = UV_ECANCELED; 2277 else 2278 err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags); 2279 if (err) 2280 CloseHandle(pipeHandle); 2281 } else { 2282 err = uv_translate_sys_error(GET_REQ_ERROR(req)); 2283 } 2284 2285 if (req->cb) 2286 req->cb(req, err); 2287 2288 DECREASE_PENDING_REQ_COUNT(handle); 2289 } 2290 2291 2292 2293 void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle, 2294 uv_shutdown_t* req) { 2295 int err; 2296 2297 assert(handle->type == UV_NAMED_PIPE); 2298 2299 /* Clear the shutdown_req field so we don't go here again. */ 2300 handle->stream.conn.shutdown_req = NULL; 2301 UNREGISTER_HANDLE_REQ(loop, handle); 2302 2303 if (handle->flags & UV_HANDLE_CLOSING) { 2304 /* Already closing. Cancel the shutdown. */ 2305 err = UV_ECANCELED; 2306 } else if (!REQ_SUCCESS(req)) { 2307 /* An error occurred in trying to shutdown gracefully. */ 2308 err = uv_translate_sys_error(GET_REQ_ERROR(req)); 2309 } else { 2310 if (handle->flags & UV_HANDLE_READABLE) { 2311 /* Initialize and optionally start the eof timer. Only do this if the pipe 2312 * is readable and we haven't seen EOF come in ourselves. */ 2313 eof_timer_init(handle); 2314 2315 /* If reading start the timer right now. Otherwise uv__pipe_queue_read will 2316 * start it. */ 2317 if (handle->flags & UV_HANDLE_READ_PENDING) { 2318 eof_timer_start(handle); 2319 } 2320 2321 } else { 2322 /* This pipe is not readable. We can just close it to let the other end 2323 * know that we're done writing. */ 2324 close_pipe(handle); 2325 } 2326 err = 0; 2327 } 2328 2329 if (req->cb) 2330 req->cb(req, err); 2331 2332 DECREASE_PENDING_REQ_COUNT(handle); 2333 } 2334 2335 2336 static void eof_timer_init(uv_pipe_t* pipe) { 2337 int r; 2338 2339 assert(pipe->pipe.conn.eof_timer == NULL); 2340 assert(pipe->flags & UV_HANDLE_CONNECTION); 2341 2342 pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer); 2343 2344 r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer); 2345 assert(r == 0); /* timers can't fail */ 2346 (void) r; 2347 pipe->pipe.conn.eof_timer->data = pipe; 2348 uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer); 2349 } 2350 2351 2352 static void eof_timer_start(uv_pipe_t* pipe) { 2353 assert(pipe->flags & UV_HANDLE_CONNECTION); 2354 2355 if (pipe->pipe.conn.eof_timer != NULL) { 2356 uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0); 2357 } 2358 } 2359 2360 2361 static void eof_timer_stop(uv_pipe_t* pipe) { 2362 assert(pipe->flags & UV_HANDLE_CONNECTION); 2363 2364 if (pipe->pipe.conn.eof_timer != NULL) { 2365 uv_timer_stop(pipe->pipe.conn.eof_timer); 2366 } 2367 } 2368 2369 2370 static void eof_timer_cb(uv_timer_t* timer) { 2371 uv_pipe_t* pipe = (uv_pipe_t*) timer->data; 2372 uv_loop_t* loop = timer->loop; 2373 2374 assert(pipe->type == UV_NAMED_PIPE); 2375 2376 /* This should always be true, since we start the timer only in 2377 * uv__pipe_queue_read after successfully calling ReadFile, or in 2378 * uv__process_pipe_shutdown_req if a read is pending, and we always 2379 * immediately stop the timer in uv__process_pipe_read_req. */ 2380 assert(pipe->flags & UV_HANDLE_READ_PENDING); 2381 2382 /* If there are many packets coming off the iocp then the timer callback may 2383 * be called before the read request is coming off the queue. Therefore we 2384 * check here if the read request has completed but will be processed later. 2385 */ 2386 if ((pipe->flags & UV_HANDLE_READ_PENDING) && 2387 HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) { 2388 return; 2389 } 2390 2391 /* Force both ends off the pipe. */ 2392 close_pipe(pipe); 2393 2394 /* Stop reading, so the pending read that is going to fail will not be 2395 * reported to the user. */ 2396 uv_read_stop((uv_stream_t*) pipe); 2397 2398 /* Report the eof and update flags. This will get reported even if the user 2399 * stopped reading in the meantime. TODO: is that okay? */ 2400 uv__pipe_read_eof(loop, pipe, uv_null_buf_); 2401 } 2402 2403 2404 static void eof_timer_destroy(uv_pipe_t* pipe) { 2405 assert(pipe->flags & UV_HANDLE_CONNECTION); 2406 2407 if (pipe->pipe.conn.eof_timer) { 2408 uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb); 2409 pipe->pipe.conn.eof_timer = NULL; 2410 } 2411 } 2412 2413 2414 static void eof_timer_close_cb(uv_handle_t* handle) { 2415 assert(handle->type == UV_TIMER); 2416 uv__free(handle); 2417 } 2418 2419 2420 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) { 2421 HANDLE os_handle = uv__get_osfhandle(file); 2422 NTSTATUS nt_status; 2423 IO_STATUS_BLOCK io_status; 2424 FILE_ACCESS_INFORMATION access; 2425 DWORD duplex_flags = 0; 2426 int err; 2427 2428 if (os_handle == INVALID_HANDLE_VALUE) 2429 return UV_EBADF; 2430 if (pipe->flags & UV_HANDLE_PIPESERVER) 2431 return UV_EINVAL; 2432 if (pipe->flags & UV_HANDLE_CONNECTION) 2433 return UV_EBUSY; 2434 2435 uv__pipe_connection_init(pipe); 2436 uv__once_init(); 2437 /* In order to avoid closing a stdio file descriptor 0-2, duplicate the 2438 * underlying OS handle and forget about the original fd. 2439 * We could also opt to use the original OS handle and just never close it, 2440 * but then there would be no reliable way to cancel pending read operations 2441 * upon close. 2442 */ 2443 if (file <= 2) { 2444 if (!DuplicateHandle(INVALID_HANDLE_VALUE, 2445 os_handle, 2446 INVALID_HANDLE_VALUE, 2447 &os_handle, 2448 0, 2449 FALSE, 2450 DUPLICATE_SAME_ACCESS)) 2451 return uv_translate_sys_error(GetLastError()); 2452 assert(os_handle != INVALID_HANDLE_VALUE); 2453 file = -1; 2454 } 2455 2456 /* Determine what kind of permissions we have on this handle. 2457 * Cygwin opens the pipe in message mode, but we can support it, 2458 * just query the access flags and set the stream flags accordingly. 2459 */ 2460 nt_status = pNtQueryInformationFile(os_handle, 2461 &io_status, 2462 &access, 2463 sizeof(access), 2464 FileAccessInformation); 2465 if (nt_status != STATUS_SUCCESS) 2466 return UV_EINVAL; 2467 2468 if (pipe->ipc) { 2469 if (!(access.AccessFlags & FILE_WRITE_DATA) || 2470 !(access.AccessFlags & FILE_READ_DATA)) { 2471 return UV_EINVAL; 2472 } 2473 } 2474 2475 if (access.AccessFlags & FILE_WRITE_DATA) 2476 duplex_flags |= UV_HANDLE_WRITABLE; 2477 if (access.AccessFlags & FILE_READ_DATA) 2478 duplex_flags |= UV_HANDLE_READABLE; 2479 2480 err = uv__set_pipe_handle(pipe->loop, 2481 pipe, 2482 os_handle, 2483 file, 2484 duplex_flags); 2485 if (err) { 2486 if (file == -1) 2487 CloseHandle(os_handle); 2488 return err; 2489 } 2490 2491 if (pipe->ipc) { 2492 assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); 2493 GetNamedPipeClientProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid); 2494 if (pipe->pipe.conn.ipc_remote_pid == GetCurrentProcessId()) { 2495 GetNamedPipeServerProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid); 2496 } 2497 assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1); 2498 } 2499 return 0; 2500 } 2501 2502 2503 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) { 2504 NTSTATUS nt_status; 2505 IO_STATUS_BLOCK io_status; 2506 FILE_NAME_INFORMATION tmp_name_info; 2507 FILE_NAME_INFORMATION* name_info; 2508 WCHAR* name_buf; 2509 unsigned int name_size; 2510 unsigned int name_len; 2511 int err; 2512 2513 uv__once_init(); 2514 name_info = NULL; 2515 2516 if (handle->name != NULL) { 2517 /* The user might try to query the name before we are connected, 2518 * and this is just easier to return the cached value if we have it. */ 2519 return uv__copy_utf16_to_utf8(handle->name, -1, buffer, size); 2520 } 2521 2522 if (handle->handle == INVALID_HANDLE_VALUE) { 2523 *size = 0; 2524 return UV_EINVAL; 2525 } 2526 2527 /* NtQueryInformationFile will block if another thread is performing a 2528 * blocking operation on the queried handle. If the pipe handle is 2529 * synchronous, there may be a worker thread currently calling ReadFile() on 2530 * the pipe handle, which could cause a deadlock. To avoid this, interrupt 2531 * the read. */ 2532 if (handle->flags & UV_HANDLE_CONNECTION && 2533 handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { 2534 uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */ 2535 } 2536 2537 nt_status = pNtQueryInformationFile(handle->handle, 2538 &io_status, 2539 &tmp_name_info, 2540 sizeof tmp_name_info, 2541 FileNameInformation); 2542 if (nt_status == STATUS_BUFFER_OVERFLOW) { 2543 name_size = sizeof(*name_info) + tmp_name_info.FileNameLength; 2544 name_info = uv__malloc(name_size); 2545 if (!name_info) { 2546 *size = 0; 2547 return UV_ENOMEM; 2548 } 2549 2550 nt_status = pNtQueryInformationFile(handle->handle, 2551 &io_status, 2552 name_info, 2553 name_size, 2554 FileNameInformation); 2555 } 2556 2557 if (nt_status != STATUS_SUCCESS) { 2558 *size = 0; 2559 err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status)); 2560 goto error; 2561 } 2562 2563 if (!name_info) { 2564 /* the struct on stack was used */ 2565 name_buf = tmp_name_info.FileName; 2566 name_len = tmp_name_info.FileNameLength; 2567 } else { 2568 name_buf = name_info->FileName; 2569 name_len = name_info->FileNameLength; 2570 } 2571 2572 if (name_len == 0) { 2573 *size = 0; 2574 err = 0; 2575 goto error; 2576 } 2577 2578 name_len /= sizeof(WCHAR); 2579 2580 /* "\\\\.\\pipe" + name */ 2581 if (*size < pipe_prefix_len) { 2582 *size = 0; 2583 } 2584 else { 2585 memcpy(buffer, pipe_prefix, pipe_prefix_len); 2586 *size -= pipe_prefix_len; 2587 } 2588 err = uv__copy_utf16_to_utf8(name_buf, name_len, buffer+pipe_prefix_len, size); 2589 *size += pipe_prefix_len; 2590 2591 error: 2592 uv__free(name_info); 2593 return err; 2594 } 2595 2596 2597 int uv_pipe_pending_count(uv_pipe_t* handle) { 2598 if (!handle->ipc) 2599 return 0; 2600 return handle->pipe.conn.ipc_xfer_queue_length; 2601 } 2602 2603 2604 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) { 2605 if (buffer == NULL || size == NULL || *size == 0) 2606 return UV_EINVAL; 2607 2608 if (handle->flags & UV_HANDLE_BOUND) 2609 return uv__pipe_getname(handle, buffer, size); 2610 2611 if (handle->flags & UV_HANDLE_CONNECTION || 2612 handle->handle != INVALID_HANDLE_VALUE) { 2613 *size = 0; 2614 return 0; 2615 } 2616 2617 return UV_EBADF; 2618 } 2619 2620 2621 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) { 2622 if (buffer == NULL || size == NULL || *size == 0) 2623 return UV_EINVAL; 2624 2625 /* emulate unix behaviour */ 2626 if (handle->flags & UV_HANDLE_BOUND) 2627 return UV_ENOTCONN; 2628 2629 if (handle->handle != INVALID_HANDLE_VALUE) 2630 return uv__pipe_getname(handle, buffer, size); 2631 2632 if (handle->flags & UV_HANDLE_CONNECTION) { 2633 if (handle->name != NULL) 2634 return uv__pipe_getname(handle, buffer, size); 2635 } 2636 2637 return UV_EBADF; 2638 } 2639 2640 2641 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) { 2642 if (!handle->ipc) 2643 return UV_UNKNOWN_HANDLE; 2644 if (handle->pipe.conn.ipc_xfer_queue_length == 0) 2645 return UV_UNKNOWN_HANDLE; 2646 else 2647 return UV_TCP; 2648 } 2649 2650 int uv_pipe_chmod(uv_pipe_t* handle, int mode) { 2651 SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY }; 2652 PACL old_dacl, new_dacl; 2653 PSECURITY_DESCRIPTOR sd; 2654 EXPLICIT_ACCESS ea; 2655 PSID everyone; 2656 int error; 2657 2658 if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE) 2659 return UV_EBADF; 2660 2661 if (mode != UV_READABLE && 2662 mode != UV_WRITABLE && 2663 mode != (UV_WRITABLE | UV_READABLE)) 2664 return UV_EINVAL; 2665 2666 if (!AllocateAndInitializeSid(&sid_world, 2667 1, 2668 SECURITY_WORLD_RID, 2669 0, 0, 0, 0, 0, 0, 0, 2670 &everyone)) { 2671 error = GetLastError(); 2672 goto done; 2673 } 2674 2675 if (GetSecurityInfo(handle->handle, 2676 SE_KERNEL_OBJECT, 2677 DACL_SECURITY_INFORMATION, 2678 NULL, 2679 NULL, 2680 &old_dacl, 2681 NULL, 2682 &sd)) { 2683 error = GetLastError(); 2684 goto clean_sid; 2685 } 2686 2687 memset(&ea, 0, sizeof(EXPLICIT_ACCESS)); 2688 if (mode & UV_READABLE) 2689 ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES; 2690 if (mode & UV_WRITABLE) 2691 ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES; 2692 ea.grfAccessPermissions |= SYNCHRONIZE; 2693 ea.grfAccessMode = SET_ACCESS; 2694 ea.grfInheritance = NO_INHERITANCE; 2695 ea.Trustee.TrusteeForm = TRUSTEE_IS_SID; 2696 ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP; 2697 ea.Trustee.ptstrName = (LPTSTR)everyone; 2698 2699 if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) { 2700 error = GetLastError(); 2701 goto clean_sd; 2702 } 2703 2704 if (SetSecurityInfo(handle->handle, 2705 SE_KERNEL_OBJECT, 2706 DACL_SECURITY_INFORMATION, 2707 NULL, 2708 NULL, 2709 new_dacl, 2710 NULL)) { 2711 error = GetLastError(); 2712 goto clean_dacl; 2713 } 2714 2715 error = 0; 2716 2717 clean_dacl: 2718 LocalFree((HLOCAL) new_dacl); 2719 clean_sd: 2720 LocalFree((HLOCAL) sd); 2721 clean_sid: 2722 FreeSid(everyone); 2723 done: 2724 return uv_translate_sys_error(error); 2725 } 2726