xcb_in.c revision 21298544
1/* Copyright (C) 2001-2004 Bart Massey and Jamey Sharp. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a 4 * copy of this software and associated documentation files (the "Software"), 5 * to deal in the Software without restriction, including without limitation 6 * the rights to use, copy, modify, merge, publish, distribute, sublicense, 7 * and/or sell copies of the Software, and to permit persons to whom the 8 * Software is furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 17 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 18 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 19 * 20 * Except as contained in this notice, the names of the authors or their 21 * institutions shall not be used in advertising or otherwise to promote the 22 * sale, use or other dealings in this Software without prior written 23 * authorization from the authors. 24 */ 25 26/* Stuff that reads stuff from the server. */ 27 28#ifdef HAVE_CONFIG_H 29#include "config.h" 30#endif 31 32#include <assert.h> 33#include <string.h> 34#include <stdlib.h> 35#include <unistd.h> 36#include <stdio.h> 37#include <errno.h> 38 39#include "xcb.h" 40#include "xcbext.h" 41#include "xcbint.h" 42#if USE_POLL 43#include <poll.h> 44#endif 45#ifndef _WIN32 46#include <sys/select.h> 47#include <sys/socket.h> 48#endif 49 50#ifdef _WIN32 51#include "xcb_windefs.h" 52#endif /* _WIN32 */ 53 54#define XCB_ERROR 0 55#define XCB_REPLY 1 56#define XCB_XGE_EVENT 35 57 58struct event_list { 59 xcb_generic_event_t *event; 60 struct event_list *next; 61}; 62 63struct reply_list { 64 void *reply; 65 struct reply_list *next; 66}; 67 68typedef struct pending_reply { 69 uint64_t first_request; 70 uint64_t last_request; 71 enum workarounds workaround; 72 int flags; 73 struct pending_reply *next; 74} pending_reply; 75 76typedef struct reader_list { 77 uint64_t request; 78 pthread_cond_t *data; 79 struct reader_list *next; 80} reader_list; 81 82static void remove_finished_readers(reader_list **prev_reader, uint64_t completed) 83{ 84 while(*prev_reader && XCB_SEQUENCE_COMPARE((*prev_reader)->request, <=, completed)) 85 { 86 /* If you don't have what you're looking for now, you never 87 * will. Wake up and leave me alone. */ 88 pthread_cond_signal((*prev_reader)->data); 89 *prev_reader = (*prev_reader)->next; 90 } 91} 92 93static int read_packet(xcb_connection_t *c) 94{ 95 xcb_generic_reply_t genrep; 96 int length = 32; 97 int eventlength = 0; /* length after first 32 bytes for GenericEvents */ 98 void *buf; 99 pending_reply *pend = 0; 100 struct event_list *event; 101 102 /* Wait for there to be enough data for us to read a whole packet */ 103 if(c->in.queue_len < length) 104 return 0; 105 106 /* Get the response type, length, and sequence number. */ 107 memcpy(&genrep, c->in.queue, sizeof(genrep)); 108 109 /* Compute 32-bit sequence number of this packet. */ 110 if((genrep.response_type & 0x7f) != XCB_KEYMAP_NOTIFY) 111 { 112 uint64_t lastread = c->in.request_read; 113 c->in.request_read = (lastread & UINT64_C(0xffffffffffff0000)) | genrep.sequence; 114 if(XCB_SEQUENCE_COMPARE(c->in.request_read, <, lastread)) 115 c->in.request_read += 0x10000; 116 if(XCB_SEQUENCE_COMPARE(c->in.request_read, >, c->in.request_expected)) 117 c->in.request_expected = c->in.request_read; 118 119 if(c->in.request_read != lastread) 120 { 121 if(c->in.current_reply) 122 { 123 _xcb_map_put(c->in.replies, lastread, c->in.current_reply); 124 c->in.current_reply = 0; 125 c->in.current_reply_tail = &c->in.current_reply; 126 } 127 c->in.request_completed = c->in.request_read - 1; 128 } 129 130 while(c->in.pending_replies && 131 c->in.pending_replies->workaround != WORKAROUND_EXTERNAL_SOCKET_OWNER && 132 XCB_SEQUENCE_COMPARE (c->in.pending_replies->last_request, <=, c->in.request_completed)) 133 { 134 pending_reply *oldpend = c->in.pending_replies; 135 c->in.pending_replies = oldpend->next; 136 if(!oldpend->next) 137 c->in.pending_replies_tail = &c->in.pending_replies; 138 free(oldpend); 139 } 140 141 if(genrep.response_type == XCB_ERROR) 142 c->in.request_completed = c->in.request_read; 143 144 remove_finished_readers(&c->in.readers, c->in.request_completed); 145 } 146 147 if(genrep.response_type == XCB_ERROR || genrep.response_type == XCB_REPLY) 148 { 149 pend = c->in.pending_replies; 150 if(pend && 151 !(XCB_SEQUENCE_COMPARE(pend->first_request, <=, c->in.request_read) && 152 (pend->workaround == WORKAROUND_EXTERNAL_SOCKET_OWNER || 153 XCB_SEQUENCE_COMPARE(c->in.request_read, <=, pend->last_request)))) 154 pend = 0; 155 } 156 157 /* For reply packets, check that the entire packet is available. */ 158 if(genrep.response_type == XCB_REPLY) 159 { 160 if(pend && pend->workaround == WORKAROUND_GLX_GET_FB_CONFIGS_BUG) 161 { 162 uint32_t *p = (uint32_t *) c->in.queue; 163 genrep.length = p[2] * p[3] * 2; 164 } 165 length += genrep.length * 4; 166 } 167 168 /* XGE events may have sizes > 32 */ 169 if ((genrep.response_type & 0x7f) == XCB_XGE_EVENT) 170 eventlength = genrep.length * 4; 171 172 buf = malloc(length + eventlength + 173 (genrep.response_type == XCB_REPLY ? 0 : sizeof(uint32_t))); 174 if(!buf) 175 { 176 _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT); 177 return 0; 178 } 179 180 if(_xcb_in_read_block(c, buf, length) <= 0) 181 { 182 free(buf); 183 return 0; 184 } 185 186 /* pull in XGE event data if available, append after event struct */ 187 if (eventlength) 188 { 189 if(_xcb_in_read_block(c, &((xcb_generic_event_t*)buf)[1], eventlength) <= 0) 190 { 191 free(buf); 192 return 0; 193 } 194 } 195 196 if(pend && (pend->flags & XCB_REQUEST_DISCARD_REPLY)) 197 { 198 free(buf); 199 return 1; 200 } 201 202 if(genrep.response_type != XCB_REPLY) 203 ((xcb_generic_event_t *) buf)->full_sequence = c->in.request_read; 204 205 /* reply, or checked error */ 206 if( genrep.response_type == XCB_REPLY || 207 (genrep.response_type == XCB_ERROR && pend && (pend->flags & XCB_REQUEST_CHECKED))) 208 { 209 struct reply_list *cur = malloc(sizeof(struct reply_list)); 210 if(!cur) 211 { 212 _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT); 213 free(buf); 214 return 0; 215 } 216 cur->reply = buf; 217 cur->next = 0; 218 *c->in.current_reply_tail = cur; 219 c->in.current_reply_tail = &cur->next; 220 if(c->in.readers && c->in.readers->request == c->in.request_read) 221 pthread_cond_signal(c->in.readers->data); 222 return 1; 223 } 224 225 /* event, or unchecked error */ 226 event = malloc(sizeof(struct event_list)); 227 if(!event) 228 { 229 _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT); 230 free(buf); 231 return 0; 232 } 233 event->event = buf; 234 event->next = 0; 235 *c->in.events_tail = event; 236 c->in.events_tail = &event->next; 237 pthread_cond_signal(&c->in.event_cond); 238 return 1; /* I have something for you... */ 239} 240 241static xcb_generic_event_t *get_event(xcb_connection_t *c) 242{ 243 struct event_list *cur = c->in.events; 244 xcb_generic_event_t *ret; 245 if(!c->in.events) 246 return 0; 247 ret = cur->event; 248 c->in.events = cur->next; 249 if(!cur->next) 250 c->in.events_tail = &c->in.events; 251 free(cur); 252 return ret; 253} 254 255static void free_reply_list(struct reply_list *head) 256{ 257 while(head) 258 { 259 struct reply_list *cur = head; 260 head = cur->next; 261 free(cur->reply); 262 free(cur); 263 } 264} 265 266static int read_block(const int fd, void *buf, const ssize_t len) 267{ 268 int done = 0; 269 while(done < len) 270 { 271 int ret = recv(fd, ((char *) buf) + done, len - done, 0); 272 if(ret > 0) 273 done += ret; 274#ifndef _WIN32 275 if(ret < 0 && errno == EAGAIN) 276#else 277 if(ret == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK) 278#endif /* !_Win32 */ 279 { 280#if USE_POLL 281 struct pollfd pfd; 282 pfd.fd = fd; 283 pfd.events = POLLIN; 284 pfd.revents = 0; 285 do { 286 ret = poll(&pfd, 1, -1); 287 } while (ret == -1 && errno == EINTR); 288#else 289 fd_set fds; 290 FD_ZERO(&fds); 291 FD_SET(fd, &fds); 292 293 /* Initializing errno here makes sure that for Win32 this loop will execute only once */ 294 errno = 0; 295 do { 296 ret = select(fd + 1, &fds, 0, 0, 0); 297 } while (ret == -1 && errno == EINTR); 298#endif /* USE_POLL */ 299 } 300 if(ret <= 0) 301 return ret; 302 } 303 return len; 304} 305 306static int poll_for_reply(xcb_connection_t *c, uint64_t request, void **reply, xcb_generic_error_t **error) 307{ 308 struct reply_list *head; 309 310 /* If an error occurred when issuing the request, fail immediately. */ 311 if(!request) 312 head = 0; 313 /* We've read requests past the one we want, so if it has replies we have 314 * them all and they're in the replies map. */ 315 else if(XCB_SEQUENCE_COMPARE(request, <, c->in.request_read)) 316 { 317 head = _xcb_map_remove(c->in.replies, request); 318 if(head && head->next) 319 _xcb_map_put(c->in.replies, request, head->next); 320 } 321 /* We're currently processing the responses to the request we want, and we 322 * have a reply ready to return. So just return it without blocking. */ 323 else if(request == c->in.request_read && c->in.current_reply) 324 { 325 head = c->in.current_reply; 326 c->in.current_reply = head->next; 327 if(!head->next) 328 c->in.current_reply_tail = &c->in.current_reply; 329 } 330 /* We know this request can't have any more replies, and we've already 331 * established it doesn't have a reply now. Don't bother blocking. */ 332 else if(request == c->in.request_completed) 333 head = 0; 334 /* We may have more replies on the way for this request: block until we're 335 * sure. */ 336 else 337 return 0; 338 339 if(error) 340 *error = 0; 341 *reply = 0; 342 343 if(head) 344 { 345 if(((xcb_generic_reply_t *) head->reply)->response_type == XCB_ERROR) 346 { 347 if(error) 348 *error = head->reply; 349 else 350 free(head->reply); 351 } 352 else 353 *reply = head->reply; 354 355 free(head); 356 } 357 358 return 1; 359} 360 361static void insert_reader(reader_list **prev_reader, reader_list *reader, uint64_t request, pthread_cond_t *cond) 362{ 363 while(*prev_reader && XCB_SEQUENCE_COMPARE((*prev_reader)->request, <=, request)) 364 prev_reader = &(*prev_reader)->next; 365 reader->request = request; 366 reader->data = cond; 367 reader->next = *prev_reader; 368 *prev_reader = reader; 369} 370 371static void remove_reader(reader_list **prev_reader, reader_list *reader) 372{ 373 while(*prev_reader && XCB_SEQUENCE_COMPARE((*prev_reader)->request, <=, reader->request)) 374 if(*prev_reader == reader) 375 { 376 *prev_reader = (*prev_reader)->next; 377 break; 378 } 379} 380 381static void *wait_for_reply(xcb_connection_t *c, uint64_t request, xcb_generic_error_t **e) 382{ 383 void *ret = 0; 384 385 /* If this request has not been written yet, write it. */ 386 if(c->out.return_socket || _xcb_out_flush_to(c, request)) 387 { 388 pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 389 reader_list reader; 390 391 insert_reader(&c->in.readers, &reader, request, &cond); 392 393 while(!poll_for_reply(c, request, &ret, e)) 394 if(!_xcb_conn_wait(c, &cond, 0, 0)) 395 break; 396 397 remove_reader(&c->in.readers, &reader); 398 pthread_cond_destroy(&cond); 399 } 400 401 _xcb_in_wake_up_next_reader(c); 402 return ret; 403} 404 405static uint64_t widen(xcb_connection_t *c, unsigned int request) 406{ 407 uint64_t widened_request = (c->out.request & UINT64_C(0xffffffff00000000)) | request; 408 if(widened_request > c->out.request) 409 widened_request -= UINT64_C(1) << 32; 410 return widened_request; 411} 412 413/* Public interface */ 414 415void *xcb_wait_for_reply(xcb_connection_t *c, unsigned int request, xcb_generic_error_t **e) 416{ 417 void *ret; 418 if(e) 419 *e = 0; 420 if(c->has_error) 421 return 0; 422 423 pthread_mutex_lock(&c->iolock); 424 ret = wait_for_reply(c, widen(c, request), e); 425 pthread_mutex_unlock(&c->iolock); 426 return ret; 427} 428 429static void insert_pending_discard(xcb_connection_t *c, pending_reply **prev_next, uint64_t seq) 430{ 431 pending_reply *pend; 432 pend = malloc(sizeof(*pend)); 433 if(!pend) 434 { 435 _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT); 436 return; 437 } 438 439 pend->first_request = seq; 440 pend->last_request = seq; 441 pend->workaround = 0; 442 pend->flags = XCB_REQUEST_DISCARD_REPLY; 443 pend->next = *prev_next; 444 *prev_next = pend; 445 446 if(!pend->next) 447 c->in.pending_replies_tail = &pend->next; 448} 449 450static void discard_reply(xcb_connection_t *c, uint64_t request) 451{ 452 void *reply; 453 pending_reply **prev_pend; 454 455 /* Free any replies or errors that we've already read. Stop if 456 * xcb_wait_for_reply would block or we've run out of replies. */ 457 while(poll_for_reply(c, request, &reply, 0) && reply) 458 free(reply); 459 460 /* If we've proven there are no more responses coming, we're done. */ 461 if(XCB_SEQUENCE_COMPARE(request, <=, c->in.request_completed)) 462 return; 463 464 /* Walk the list of pending requests. Mark the first match for deletion. */ 465 for(prev_pend = &c->in.pending_replies; *prev_pend; prev_pend = &(*prev_pend)->next) 466 { 467 if(XCB_SEQUENCE_COMPARE((*prev_pend)->first_request, >, request)) 468 break; 469 470 if((*prev_pend)->first_request == request) 471 { 472 /* Pending reply found. Mark for discard: */ 473 (*prev_pend)->flags |= XCB_REQUEST_DISCARD_REPLY; 474 return; 475 } 476 } 477 478 /* Pending reply not found (likely due to _unchecked request). Create one: */ 479 insert_pending_discard(c, prev_pend, request); 480} 481 482void xcb_discard_reply(xcb_connection_t *c, unsigned int sequence) 483{ 484 if(c->has_error) 485 return; 486 487 /* If an error occurred when issuing the request, fail immediately. */ 488 if(!sequence) 489 return; 490 491 pthread_mutex_lock(&c->iolock); 492 discard_reply(c, widen(c, sequence)); 493 pthread_mutex_unlock(&c->iolock); 494} 495 496int xcb_poll_for_reply(xcb_connection_t *c, unsigned int request, void **reply, xcb_generic_error_t **error) 497{ 498 int ret; 499 if(c->has_error) 500 { 501 *reply = 0; 502 if(error) 503 *error = 0; 504 return 1; /* would not block */ 505 } 506 assert(reply != 0); 507 pthread_mutex_lock(&c->iolock); 508 ret = poll_for_reply(c, widen(c, request), reply, error); 509 pthread_mutex_unlock(&c->iolock); 510 return ret; 511} 512 513xcb_generic_event_t *xcb_wait_for_event(xcb_connection_t *c) 514{ 515 xcb_generic_event_t *ret; 516 if(c->has_error) 517 return 0; 518 pthread_mutex_lock(&c->iolock); 519 /* get_event returns 0 on empty list. */ 520 while(!(ret = get_event(c))) 521 if(!_xcb_conn_wait(c, &c->in.event_cond, 0, 0)) 522 break; 523 524 _xcb_in_wake_up_next_reader(c); 525 pthread_mutex_unlock(&c->iolock); 526 return ret; 527} 528 529static xcb_generic_event_t *poll_for_next_event(xcb_connection_t *c, int queued) 530{ 531 xcb_generic_event_t *ret = 0; 532 if(!c->has_error) 533 { 534 pthread_mutex_lock(&c->iolock); 535 /* FIXME: follow X meets Z architecture changes. */ 536 ret = get_event(c); 537 if(!ret && !queued && c->in.reading == 0 && _xcb_in_read(c)) /* _xcb_in_read shuts down the connection on error */ 538 ret = get_event(c); 539 pthread_mutex_unlock(&c->iolock); 540 } 541 return ret; 542} 543 544xcb_generic_event_t *xcb_poll_for_event(xcb_connection_t *c) 545{ 546 return poll_for_next_event(c, 0); 547} 548 549xcb_generic_event_t *xcb_poll_for_queued_event(xcb_connection_t *c) 550{ 551 return poll_for_next_event(c, 1); 552} 553 554xcb_generic_error_t *xcb_request_check(xcb_connection_t *c, xcb_void_cookie_t cookie) 555{ 556 uint64_t request; 557 xcb_generic_error_t *ret = 0; 558 void *reply; 559 if(c->has_error) 560 return 0; 561 pthread_mutex_lock(&c->iolock); 562 request = widen(c, cookie.sequence); 563 if(XCB_SEQUENCE_COMPARE(request, >=, c->in.request_expected) 564 && XCB_SEQUENCE_COMPARE(request, >, c->in.request_completed)) 565 { 566 _xcb_out_send_sync(c); 567 _xcb_out_flush_to(c, c->out.request); 568 } 569 reply = wait_for_reply(c, request, &ret); 570 assert(!reply); 571 pthread_mutex_unlock(&c->iolock); 572 return ret; 573} 574 575/* Private interface */ 576 577int _xcb_in_init(_xcb_in *in) 578{ 579 if(pthread_cond_init(&in->event_cond, 0)) 580 return 0; 581 in->reading = 0; 582 583 in->queue_len = 0; 584 585 in->request_read = 0; 586 in->request_completed = 0; 587 588 in->replies = _xcb_map_new(); 589 if(!in->replies) 590 return 0; 591 592 in->current_reply_tail = &in->current_reply; 593 in->events_tail = &in->events; 594 in->pending_replies_tail = &in->pending_replies; 595 596 return 1; 597} 598 599void _xcb_in_destroy(_xcb_in *in) 600{ 601 pthread_cond_destroy(&in->event_cond); 602 free_reply_list(in->current_reply); 603 _xcb_map_delete(in->replies, (void (*)(void *)) free_reply_list); 604 while(in->events) 605 { 606 struct event_list *e = in->events; 607 in->events = e->next; 608 free(e->event); 609 free(e); 610 } 611 while(in->pending_replies) 612 { 613 pending_reply *pend = in->pending_replies; 614 in->pending_replies = pend->next; 615 free(pend); 616 } 617} 618 619void _xcb_in_wake_up_next_reader(xcb_connection_t *c) 620{ 621 int pthreadret; 622 if(c->in.readers) 623 pthreadret = pthread_cond_signal(c->in.readers->data); 624 else 625 pthreadret = pthread_cond_signal(&c->in.event_cond); 626 assert(pthreadret == 0); 627} 628 629int _xcb_in_expect_reply(xcb_connection_t *c, uint64_t request, enum workarounds workaround, int flags) 630{ 631 pending_reply *pend = malloc(sizeof(pending_reply)); 632 assert(workaround != WORKAROUND_NONE || flags != 0); 633 if(!pend) 634 { 635 _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT); 636 return 0; 637 } 638 pend->first_request = pend->last_request = request; 639 pend->workaround = workaround; 640 pend->flags = flags; 641 pend->next = 0; 642 *c->in.pending_replies_tail = pend; 643 c->in.pending_replies_tail = &pend->next; 644 return 1; 645} 646 647void _xcb_in_replies_done(xcb_connection_t *c) 648{ 649 struct pending_reply *pend; 650 if (c->in.pending_replies_tail != &c->in.pending_replies) 651 { 652 pend = container_of(c->in.pending_replies_tail, struct pending_reply, next); 653 if(pend->workaround == WORKAROUND_EXTERNAL_SOCKET_OWNER) 654 { 655 pend->last_request = c->out.request; 656 pend->workaround = WORKAROUND_NONE; 657 } 658 } 659} 660 661int _xcb_in_read(xcb_connection_t *c) 662{ 663 int n = recv(c->fd, c->in.queue + c->in.queue_len, sizeof(c->in.queue) - c->in.queue_len, 0); 664 if(n > 0) 665 c->in.queue_len += n; 666 while(read_packet(c)) 667 /* empty */; 668#ifndef _WIN32 669 if((n > 0) || (n < 0 && errno == EAGAIN)) 670#else 671 if((n > 0) || (n < 0 && WSAGetLastError() == WSAEWOULDBLOCK)) 672#endif /* !_WIN32 */ 673 return 1; 674 _xcb_conn_shutdown(c, XCB_CONN_ERROR); 675 return 0; 676} 677 678int _xcb_in_read_block(xcb_connection_t *c, void *buf, int len) 679{ 680 int done = c->in.queue_len; 681 if(len < done) 682 done = len; 683 684 memcpy(buf, c->in.queue, done); 685 c->in.queue_len -= done; 686 memmove(c->in.queue, c->in.queue + done, c->in.queue_len); 687 688 if(len > done) 689 { 690 int ret = read_block(c->fd, (char *) buf + done, len - done); 691 if(ret <= 0) 692 { 693 _xcb_conn_shutdown(c, XCB_CONN_ERROR); 694 return ret; 695 } 696 } 697 698 return len; 699} 700