Browse Source

tools: Add wrk for benchmarking http servers

v0.9.11-release
isaacs 12 years ago
parent
commit
e850cbab1c
  1. 2
      .gitignore
  2. 4
      Makefile
  3. 2
      tools/wrk/.gitignore
  4. 177
      tools/wrk/LICENSE
  5. 35
      tools/wrk/Makefile
  6. 115
      tools/wrk/NOTICE
  7. 37
      tools/wrk/README
  8. 435
      tools/wrk/src/ae.c
  9. 118
      tools/wrk/src/ae.h
  10. 130
      tools/wrk/src/ae_epoll.c
  11. 315
      tools/wrk/src/ae_evport.c
  12. 132
      tools/wrk/src/ae_kqueue.c
  13. 99
      tools/wrk/src/ae_select.c
  14. 27
      tools/wrk/src/aprintf.c
  15. 6
      tools/wrk/src/aprintf.h
  16. 13
      tools/wrk/src/config.h
  17. 2058
      tools/wrk/src/http_parser.c
  18. 317
      tools/wrk/src/http_parser.h
  19. 73
      tools/wrk/src/stats.c
  20. 21
      tools/wrk/src/stats.h
  21. 129
      tools/wrk/src/tinymt64.c
  22. 210
      tools/wrk/src/tinymt64.h
  23. 96
      tools/wrk/src/units.c
  24. 10
      tools/wrk/src/units.h
  25. 482
      tools/wrk/src/wrk.c
  26. 75
      tools/wrk/src/wrk.h
  27. 287
      tools/wrk/src/zmalloc.c
  28. 83
      tools/wrk/src/zmalloc.h

2
.gitignore

@ -55,3 +55,5 @@ deps/openssl/openssl.xml
# build/release artifacts # build/release artifacts
/*.tar.gz /*.tar.gz
/SHASUMS.txt* /SHASUMS.txt*
/tools/wrk/wrk

4
Makefile

@ -313,6 +313,10 @@ dist-upload: $(TARBALL) $(PKG)
scp $(TARBALL) node@nodejs.org:~/web/nodejs.org/dist/$(VERSION)/$(TARBALL) scp $(TARBALL) node@nodejs.org:~/web/nodejs.org/dist/$(VERSION)/$(TARBALL)
scp $(PKG) node@nodejs.org:~/web/nodejs.org/dist/$(VERSION)/$(TARNAME).pkg scp $(PKG) node@nodejs.org:~/web/nodejs.org/dist/$(VERSION)/$(TARNAME).pkg
wrk: tools/wrk/wrk
tools/wrk/wrk:
$(MAKE) -C tools/wrk/
bench-net: all bench-net: all
@$(NODE) benchmark/common.js net @$(NODE) benchmark/common.js net

2
tools/wrk/.gitignore

@ -0,0 +1,2 @@
obj/*
/wrk

177
tools/wrk/LICENSE

@ -0,0 +1,177 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS

35
tools/wrk/Makefile

@ -0,0 +1,35 @@
CFLAGS := -std=c99 -Wall -O2 -pthread
LDFLAGS := -pthread
LIBS := -lm
ifeq ($(shell uname),SunOS)
LIBS += -lnsl -lsocket -lresolv
endif
SRC := wrk.c aprintf.c stats.c units.c ae.c zmalloc.c http_parser.c tinymt64.c
BIN := wrk
ODIR := obj
OBJ := $(patsubst %.c,$(ODIR)/%.o,$(SRC))
all: $(BIN)
clean:
$(RM) $(BIN) obj/*
$(BIN): $(OBJ)
$(CC) $(LDFLAGS) -o $@ $^ $(LIBS)
$(OBJ): config.h Makefile | $(ODIR)
$(ODIR):
@mkdir $@
$(ODIR)/%.o : %.c
$(CC) $(CFLAGS) -c -o $@ $<
.PHONY: all clean
.SUFFIXES:
.SUFFIXES: .c .o
vpath %.c src
vpath %.h src

115
tools/wrk/NOTICE

@ -0,0 +1,115 @@
=========================================================================
== NOTICE file corresponding to section 4(d) of the Apache License, ==
== Version 2.0, in this case for the wrk distribution. ==
=========================================================================
wrk
Copyright 2012 Will Glozer, http://glozer.net
=========================================================================
== Redis Event Library Notice ==
=========================================================================
This product includes software developed by Salvatore Sanfilippo and
other contributors to the redis project.
Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
Copyright (C) 2009 Harish Mallipeddi - harish.mallipeddi@gmail.com
Copyright (c) 2006-2009, Salvatore Sanfilippo
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the *
distribution.
* Neither the name of Redis nor the names of its contributors may be
used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
=========================================================================
== HTTP Parser Notice ==
=========================================================================
This product includes software developed by Igor Sysoev, Joyent, Inc.,
and other Node contributors.
http_parser.c is based on src/http/ngx_http_parse.c from NGINX copyright
Igor Sysoev.
Additional changes are licensed under the same terms as NGINX and
copyright Joyent, Inc. and other Node contributors. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to permit
persons to whom the Software is furnished to do so, subject to the
following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
USE OR OTHER DEALINGS IN THE SOFTWARE.
=========================================================================
== Tiny Mersenne Twister (TinyMT) Notice ==
=========================================================================
Copyright (c) 2011 Mutsuo Saito, Makoto Matsumoto, Hiroshima University
and The University of Tokyo. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the
distribution.
* Neither the name of the Hiroshima University nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

37
tools/wrk/README

@ -0,0 +1,37 @@
wrk - a HTTP benchmarking tool
wrk is a modern HTTP benchmarking tool capable of generating significant
load when run on a single multi-core CPU. It combines a multithreaded
design with scalable event notification systems such as epoll and kqueue.
Basic Usage
wrk -t8 -c400 -r10m http://localhost:8080/index.html
This runs wrk with 8 threads, keeping 400 connections open, and making a
total of 10 million HTTP GET requests to http://localhost:8080/index.html
Output:
Making 10000000 requests to http://localhost:8080/index.html
8 threads and 400 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 439.75us 350.49us 7.60ms 92.88%
Req/Sec 61.13k 8.26k 72.00k 87.54%
10000088 requests in 19.87s, 3.42GB read
Requests/sec: 503396.23
Transfer/sec: 176.16MB
Benchmarking Tips
The machine running wrk must have a sufficient number of ephemeral ports
available and closed sockets should be recycled quickly. To handle the
initial connection burst the server's listen(2) backlog should be greater
than the number of concurrent connections being tested.
Acknowledgements
wrk contains code from a number of open source projects including the
'ae' event loop from redis, the nginx/joyent/node.js 'http-parser' and
the Tiny Mersenne Twister PRNG. Please consult the NOTICE file for
licensing details.

435
tools/wrk/src/ae.c

@ -0,0 +1,435 @@
/* A simple event-driven programming library. Originally I wrote this code
* for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
* it in form of a library for easy reuse.
*
* Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>
#include <string.h>
#include <time.h>
#include <errno.h>
#include "ae.h"
#include "zmalloc.h"
#include "config.h"
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
void aeStop(aeEventLoop *eventLoop) {
eventLoop->stop = 1;
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
if (fd >= eventLoop->setsize) return;
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;
fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
int j;
for (j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}
aeApiDelEvent(eventLoop, fd, mask);
}
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
if (fd >= eventLoop->setsize) return 0;
aeFileEvent *fe = &eventLoop->events[fd];
return fe->mask;
}
static void aeGetTime(long *seconds, long *milliseconds)
{
struct timeval tv;
gettimeofday(&tv, NULL);
*seconds = tv.tv_sec;
*milliseconds = tv.tv_usec/1000;
}
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
long cur_sec, cur_ms, when_sec, when_ms;
aeGetTime(&cur_sec, &cur_ms);
when_sec = cur_sec + milliseconds/1000;
when_ms = cur_ms + milliseconds%1000;
if (when_ms >= 1000) {
when_sec ++;
when_ms -= 1000;
}
*sec = when_sec;
*ms = when_ms;
}
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
return id;
}
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
aeTimeEvent *te, *prev = NULL;
te = eventLoop->timeEventHead;
while(te) {
if (te->id == id) {
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
return AE_OK;
}
prev = te;
te = te->next;
}
return AE_ERR; /* NO event with the specified ID found */
}
/* Search the first timer to fire.
* This operation is useful to know how many time the select can be
* put in sleep without to delay any event.
* If there are no timers NULL is returned.
*
* Note that's O(N) since time events are unsorted.
* Possible optimizations (not needed by Redis so far, but...):
* 1) Insert the event in order, so that the nearest is just the head.
* Much better but still insertion or deletion of timers is O(N).
* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
*/
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
while(te) {
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* After an event is processed our time event list may
* no longer be the same, so we restart from head.
* Still we make sure to don't process events registered
* by event handlers itself in order to don't loop forever.
* To do so we saved the max ID we want to handle.
*
* FUTURE OPTIMIZATIONS:
* Note that this is NOT great algorithmically. Redis uses
* a single time event so it's not a problem but the right
* way to do this is to add the new elements on head, and
* to flag deleted elements in a special way for later
* deletion (putting references to the nodes to delete into
* another linked list). */
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
aeDeleteTimeEvent(eventLoop, id);
}
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
/* Wait for milliseconds until the given file descriptor becomes
* writable/readable/exception */
int aeWait(int fd, int mask, long long milliseconds) {
struct pollfd pfd;
int retmask = 0, retval;
memset(&pfd, 0, sizeof(pfd));
pfd.fd = fd;
if (mask & AE_READABLE) pfd.events |= POLLIN;
if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
if ((retval = poll(&pfd, 1, milliseconds))== 1) {
if (pfd.revents & POLLIN) retmask |= AE_READABLE;
if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
return retmask;
} else {
return retval;
}
}
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
char *aeGetApiName(void) {
return aeApiName();
}
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}

118
tools/wrk/src/ae.h

@ -0,0 +1,118 @@
/* A simple event-driven programming library. Originally I wrote this code
* for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
* it in form of a library for easy reuse.
*
* Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __AE_H__
#define __AE_H__
#define AE_OK 0
#define AE_ERR -1
#define AE_NONE 0
#define AE_READABLE 1
#define AE_WRITABLE 2
#define AE_FILE_EVENTS 1
#define AE_TIME_EVENTS 2
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT 4
#define AE_NOMORE -1
/* Macros */
#define AE_NOTUSED(V) ((void) V)
struct aeEventLoop;
/* Types and data structures */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
/* Prototypes */
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
#endif

130
tools/wrk/src/ae_epoll.c

@ -0,0 +1,130 @@
/* Linux epoll(2) based ae.c module
*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/epoll.h>
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;
close(state->epfd);
zfree(state->events);
zfree(state);
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
static char *aeApiName(void) {
return "epoll";
}

315
tools/wrk/src/ae_evport.c

@ -0,0 +1,315 @@
/* ae.c module for illumos event ports.
*
* Copyright (c) 2012, Joyent, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <assert.h>
#include <errno.h>
#include <port.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/time.h>
#include <stdio.h>
static int evport_debug = 0;
/*
* This file implements the ae API using event ports, present on Solaris-based
* systems since Solaris 10. Using the event port interface, we associate file
* descriptors with the port. Each association also includes the set of poll(2)
* events that the consumer is interested in (e.g., POLLIN and POLLOUT).
*
* There's one tricky piece to this implementation: when we return events via
* aeApiPoll, the corresponding file descriptors become dissociated from the
* port. This is necessary because poll events are level-triggered, so if the
* fd didn't become dissociated, it would immediately fire another event since
* the underlying state hasn't changed yet. We must re-associate the file
* descriptor, but only after we know that our caller has actually read from it.
* The ae API does not tell us exactly when that happens, but we do know that
* it must happen by the time aeApiPoll is called again. Our solution is to
* keep track of the last fds returned by aeApiPoll and re-associate them next
* time aeApiPoll is invoked.
*
* To summarize, in this module, each fd association is EITHER (a) represented
* only via the in-kernel association OR (b) represented by pending_fds and
* pending_masks. (b) is only true for the last fds we returned from aeApiPoll,
* and only until we enter aeApiPoll again (at which point we restore the
* in-kernel association).
*/
#define MAX_EVENT_BATCHSZ 512
typedef struct aeApiState {
int portfd; /* event port */
int npending; /* # of pending fds */
int pending_fds[MAX_EVENT_BATCHSZ]; /* pending fds */
int pending_masks[MAX_EVENT_BATCHSZ]; /* pending fds' masks */
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
int i;
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->portfd = port_create();
if (state->portfd == -1) {
zfree(state);
return -1;
}
state->npending = 0;
for (i = 0; i < MAX_EVENT_BATCHSZ; i++) {
state->pending_fds[i] = -1;
state->pending_masks[i] = AE_NONE;
}
eventLoop->apidata = state;
return 0;
}
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;
close(state->portfd);
zfree(state);
}
static int aeApiLookupPending(aeApiState *state, int fd) {
int i;
for (i = 0; i < state->npending; i++) {
if (state->pending_fds[i] == fd)
return (i);
}
return (-1);
}
/*
* Helper function to invoke port_associate for the given fd and mask.
*/
static int aeApiAssociate(const char *where, int portfd, int fd, int mask) {
int events = 0;
int rv, err;
if (mask & AE_READABLE)
events |= POLLIN;
if (mask & AE_WRITABLE)
events |= POLLOUT;
if (evport_debug)
fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events);
rv = port_associate(portfd, PORT_SOURCE_FD, fd, events,
(void *)(uintptr_t)mask);
err = errno;
if (evport_debug)
fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err));
if (rv == -1) {
fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err));
if (err == EAGAIN)
fprintf(stderr, "aeApiAssociate: event port limit exceeded.");
}
return rv;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
int fullmask, pfd;
if (evport_debug)
fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask);
/*
* Since port_associate's "events" argument replaces any existing events, we
* must be sure to include whatever events are already associated when
* we call port_associate() again.
*/
fullmask = mask | eventLoop->events[fd].mask;
pfd = aeApiLookupPending(state, fd);
if (pfd != -1) {
/*
* This fd was recently returned from aeApiPoll. It should be safe to
* assume that the consumer has processed that poll event, but we play
* it safer by simply updating pending_mask. The fd will be
* re-associated as usual when aeApiPoll is called again.
*/
if (evport_debug)
fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd);
state->pending_masks[pfd] |= fullmask;
return 0;
}
return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask));
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
int fullmask, pfd;
if (evport_debug)
fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask);
pfd = aeApiLookupPending(state, fd);
if (pfd != -1) {
if (evport_debug)
fprintf(stderr, "deleting event from pending fd %d\n", fd);
/*
* This fd was just returned from aeApiPoll, so it's not currently
* associated with the port. All we need to do is update
* pending_mask appropriately.
*/
state->pending_masks[pfd] &= ~mask;
if (state->pending_masks[pfd] == AE_NONE)
state->pending_fds[pfd] = -1;
return;
}
/*
* The fd is currently associated with the port. Like with the add case
* above, we must look at the full mask for the file descriptor before
* updating that association. We don't have a good way of knowing what the
* events are without looking into the eventLoop state directly. We rely on
* the fact that our caller has already updated the mask in the eventLoop.
*/
fullmask = eventLoop->events[fd].mask;
if (fullmask == AE_NONE) {
/*
* We're removing *all* events, so use port_dissociate to remove the
* association completely. Failure here indicates a bug.
*/
if (evport_debug)
fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd);
if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) {
perror("aeApiDelEvent: port_dissociate");
abort(); /* will not return */
}
} else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd,
fullmask) != 0) {
/*
* ENOMEM is a potentially transient condition, but the kernel won't
* generally return it unless things are really bad. EAGAIN indicates
* we've reached an resource limit, for which it doesn't make sense to
* retry (counter-intuitively). All other errors indicate a bug. In any
* of these cases, the best we can do is to abort.
*/
abort(); /* will not return */
}
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
struct timespec timeout, *tsp;
int mask, i;
uint_t nevents;
port_event_t event[MAX_EVENT_BATCHSZ];
/*
* If we've returned fd events before, we must re-associate them with the
* port now, before calling port_get(). See the block comment at the top of
* this file for an explanation of why.
*/
for (i = 0; i < state->npending; i++) {
if (state->pending_fds[i] == -1)
/* This fd has since been deleted. */
continue;
if (aeApiAssociate("aeApiPoll", state->portfd,
state->pending_fds[i], state->pending_masks[i]) != 0) {
/* See aeApiDelEvent for why this case is fatal. */
abort();
}
state->pending_masks[i] = AE_NONE;
state->pending_fds[i] = -1;
}
state->npending = 0;
if (tvp != NULL) {
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
tsp = &timeout;
} else {
tsp = NULL;
}
/*
* port_getn can return with errno == ETIME having returned some events (!).
* So if we get ETIME, we check nevents, too.
*/
nevents = 1;
if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
tsp) == -1 && (errno != ETIME || nevents == 0)) {
if (errno == ETIME || errno == EINTR)
return 0;
/* Any other error indicates a bug. */
perror("aeApiPoll: port_get");
abort();
}
state->npending = nevents;
for (i = 0; i < nevents; i++) {
mask = 0;
if (event[i].portev_events & POLLIN)
mask |= AE_READABLE;
if (event[i].portev_events & POLLOUT)
mask |= AE_WRITABLE;
eventLoop->fired[i].fd = event[i].portev_object;
eventLoop->fired[i].mask = mask;
if (evport_debug)
fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
(int)event[i].portev_object, mask);
state->pending_fds[i] = event[i].portev_object;
state->pending_masks[i] = (uintptr_t)event[i].portev_user;
}
return nevents;
}
static char *aeApiName(void) {
return "evport";
}

132
tools/wrk/src/ae_kqueue.c

@ -0,0 +1,132 @@
/* Kqueue(2)-based ae.c module
*
* Copyright (C) 2009 Harish Mallipeddi - harish.mallipeddi@gmail.com
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
typedef struct aeApiState {
int kqfd;
struct kevent *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->kqfd = kqueue();
if (state->kqfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;
close(state->kqfd);
zfree(state->events);
zfree(state);
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct kevent ke;
if (mask & AE_READABLE) {
EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
if (mask & AE_WRITABLE) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
return 0;
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct kevent ke;
if (mask & AE_READABLE) {
EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
}
if (mask & AE_WRITABLE) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
}
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
if (tvp != NULL) {
struct timespec timeout;
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
&timeout);
} else {
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
NULL);
}
if (retval > 0) {
int j;
numevents = retval;
for(j = 0; j < numevents; j++) {
int mask = 0;
struct kevent *e = state->events+j;
if (e->filter == EVFILT_READ) mask |= AE_READABLE;
if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->ident;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
static char *aeApiName(void) {
return "kqueue";
}

99
tools/wrk/src/ae_select.c

@ -0,0 +1,99 @@
/* Select()-based ae.c module.
*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <string.h>
typedef struct aeApiState {
fd_set rfds, wfds;
/* We need to have a copy of the fd sets as it's not safe to reuse
* FD sets after select(). */
fd_set _rfds, _wfds;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
FD_ZERO(&state->rfds);
FD_ZERO(&state->wfds);
eventLoop->apidata = state;
return 0;
}
static void aeApiFree(aeEventLoop *eventLoop) {
zfree(eventLoop->apidata);
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
return 0;
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
if (mask & AE_READABLE) FD_CLR(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds);
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
static char *aeApiName(void) {
return "select";
}

27
tools/wrk/src/aprintf.c

@ -0,0 +1,27 @@
// Copyright (C) 2012 - Will Glozer. All rights reserved.
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
char *aprintf(char **s, const char *fmt, ...) {
char *c = NULL;
int n, len;
va_list ap;
va_start(ap, fmt);
n = vsnprintf(NULL, 0, fmt, ap) + 1;
va_end(ap);
len = *s ? strlen(*s) : 0;
if ((*s = realloc(*s, (len + n) * sizeof(char)))) {
c = *s + len;
va_start(ap, fmt);
vsnprintf(c, n, fmt, ap);
va_end(ap);
}
return c;
}

6
tools/wrk/src/aprintf.h

@ -0,0 +1,6 @@
#ifndef APRINTF_H
#define APRINTF_H
char *aprintf(char **, const char *, ...);
#endif /* APRINTF_H */

13
tools/wrk/src/config.h

@ -0,0 +1,13 @@
#ifndef CONFIG_H
#define CONFIG_H
#if defined(__FreeBSD__) || defined(__APPLE__)
#define HAVE_KQUEUE
#elif defined(__sun)
#define HAVE_EVPORT
#elif defined(__linux__)
#define HAVE_EPOLL
#define _POSIX_C_SOURCE 200809L
#endif
#endif /* CONFIG_H */

2058
tools/wrk/src/http_parser.c

File diff suppressed because it is too large

317
tools/wrk/src/http_parser.h

@ -0,0 +1,317 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#ifndef http_parser_h
#define http_parser_h
#ifdef __cplusplus
extern "C" {
#endif
#define HTTP_PARSER_VERSION_MAJOR 1
#define HTTP_PARSER_VERSION_MINOR 0
#include <sys/types.h>
#if defined(_WIN32) && !defined(__MINGW32__) && (!defined(_MSC_VER) || _MSC_VER<1600)
typedef __int8 int8_t;
typedef unsigned __int8 uint8_t;
typedef __int16 int16_t;
typedef unsigned __int16 uint16_t;
typedef __int32 int32_t;
typedef unsigned __int32 uint32_t;
typedef __int64 int64_t;
typedef unsigned __int64 uint64_t;
typedef unsigned int size_t;
typedef int ssize_t;
#else
#include <stdint.h>
#endif
/* Compile with -DHTTP_PARSER_STRICT=0 to make less checks, but run
* faster
*/
#ifndef HTTP_PARSER_STRICT
# define HTTP_PARSER_STRICT 1
#endif
/* Compile with -DHTTP_PARSER_DEBUG=1 to add extra debugging information to
* the error reporting facility.
*/
#ifndef HTTP_PARSER_DEBUG
# define HTTP_PARSER_DEBUG 0
#endif
/* Maximium header size allowed */
#define HTTP_MAX_HEADER_SIZE (80*1024)
typedef struct http_parser http_parser;
typedef struct http_parser_settings http_parser_settings;
/* Callbacks should return non-zero to indicate an error. The parser will
* then halt execution.
*
* The one exception is on_headers_complete. In a HTTP_RESPONSE parser
* returning '1' from on_headers_complete will tell the parser that it
* should not expect a body. This is used when receiving a response to a
* HEAD request which may contain 'Content-Length' or 'Transfer-Encoding:
* chunked' headers that indicate the presence of a body.
*
* http_data_cb does not return data chunks. It will be call arbitrarally
* many times for each string. E.G. you might get 10 callbacks for "on_path"
* each providing just a few characters more data.
*/
typedef int (*http_data_cb) (http_parser*, const char *at, size_t length);
typedef int (*http_cb) (http_parser*);
/* Request Methods */
#define HTTP_METHOD_MAP(XX) \
XX(0, DELETE) \
XX(1, GET) \
XX(2, HEAD) \
XX(3, POST) \
XX(4, PUT) \
/* pathological */ \
XX(5, CONNECT) \
XX(6, OPTIONS) \
XX(7, TRACE) \
/* webdav */ \
XX(8, COPY) \
XX(9, LOCK) \
XX(10, MKCOL) \
XX(11, MOVE) \
XX(12, PROPFIND) \
XX(13, PROPPATCH) \
XX(14, UNLOCK) \
/* subversion */ \
XX(15, REPORT) \
XX(16, MKACTIVITY) \
XX(17, CHECKOUT) \
XX(18, MERGE) \
/* upnp */ \
XX(19, MSEARCH) \
XX(20, NOTIFY) \
XX(21, SUBSCRIBE) \
XX(22, UNSUBSCRIBE) \
/* RFC-5789 */ \
XX(23, PATCH) \
XX(24, PURGE) \
enum http_method
{
#define XX(num, name) HTTP_##name = num,
HTTP_METHOD_MAP(XX)
#undef X
};
enum http_parser_type { HTTP_REQUEST, HTTP_RESPONSE, HTTP_BOTH };
/* Flag values for http_parser.flags field */
enum flags
{ F_CHUNKED = 1 << 0
, F_CONNECTION_KEEP_ALIVE = 1 << 1
, F_CONNECTION_CLOSE = 1 << 2
, F_TRAILING = 1 << 3
, F_UPGRADE = 1 << 4
, F_SKIPBODY = 1 << 5
};
/* Map for errno-related constants
*
* The provided argument should be a macro that takes 2 arguments.
*/
#define HTTP_ERRNO_MAP(XX) \
/* No error */ \
XX(OK, "success") \
\
/* Callback-related errors */ \
XX(CB_message_begin, "the on_message_begin callback failed") \
XX(CB_url, "the on_url callback failed") \
XX(CB_header_field, "the on_header_field callback failed") \
XX(CB_header_value, "the on_header_value callback failed") \
XX(CB_headers_complete, "the on_headers_complete callback failed") \
XX(CB_body, "the on_body callback failed") \
XX(CB_message_complete, "the on_message_complete callback failed") \
\
/* Parsing-related errors */ \
XX(INVALID_EOF_STATE, "stream ended at an unexpected time") \
XX(HEADER_OVERFLOW, \
"too many header bytes seen; overflow detected") \
XX(CLOSED_CONNECTION, \
"data received after completed connection: close message") \
XX(INVALID_VERSION, "invalid HTTP version") \
XX(INVALID_STATUS, "invalid HTTP status code") \
XX(INVALID_METHOD, "invalid HTTP method") \
XX(INVALID_URL, "invalid URL") \
XX(INVALID_HOST, "invalid host") \
XX(INVALID_PORT, "invalid port") \
XX(INVALID_PATH, "invalid path") \
XX(INVALID_QUERY_STRING, "invalid query string") \
XX(INVALID_FRAGMENT, "invalid fragment") \
XX(LF_EXPECTED, "LF character expected") \
XX(INVALID_HEADER_TOKEN, "invalid character in header") \
XX(INVALID_CONTENT_LENGTH, \
"invalid character in content-length header") \
XX(INVALID_CHUNK_SIZE, \
"invalid character in chunk size header") \
XX(INVALID_CONSTANT, "invalid constant string") \
XX(INVALID_INTERNAL_STATE, "encountered unexpected internal state")\
XX(STRICT, "strict mode assertion failed") \
XX(PAUSED, "parser is paused") \
XX(UNKNOWN, "an unknown error occurred")
/* Define HPE_* values for each errno value above */
#define HTTP_ERRNO_GEN(n, s) HPE_##n,
enum http_errno {
HTTP_ERRNO_MAP(HTTP_ERRNO_GEN)
};
#undef HTTP_ERRNO_GEN
/* Get an http_errno value from an http_parser */
#define HTTP_PARSER_ERRNO(p) ((enum http_errno) (p)->http_errno)
/* Get the line number that generated the current error */
#if HTTP_PARSER_DEBUG
#define HTTP_PARSER_ERRNO_LINE(p) ((p)->error_lineno)
#else
#define HTTP_PARSER_ERRNO_LINE(p) 0
#endif
struct http_parser {
/** PRIVATE **/
unsigned char type : 2; /* enum http_parser_type */
unsigned char flags : 6; /* F_* values from 'flags' enum; semi-public */
unsigned char state; /* enum state from http_parser.c */
unsigned char header_state; /* enum header_state from http_parser.c */
unsigned char index; /* index into current matcher */
uint32_t nread; /* # bytes read in various scenarios */
uint64_t content_length; /* # bytes in body (0 if no Content-Length header) */
/** READ-ONLY **/
unsigned short http_major;
unsigned short http_minor;
unsigned short status_code; /* responses only */
unsigned char method; /* requests only */
unsigned char http_errno : 7;
/* 1 = Upgrade header was present and the parser has exited because of that.
* 0 = No upgrade header present.
* Should be checked when http_parser_execute() returns in addition to
* error checking.
*/
unsigned char upgrade : 1;
#if HTTP_PARSER_DEBUG
uint32_t error_lineno;
#endif
/** PUBLIC **/
void *data; /* A pointer to get hook to the "connection" or "socket" object */
};
struct http_parser_settings {
http_cb on_message_begin;
http_data_cb on_url;
http_data_cb on_header_field;
http_data_cb on_header_value;
http_cb on_headers_complete;
http_data_cb on_body;
http_cb on_message_complete;
};
enum http_parser_url_fields
{ UF_SCHEMA = 0
, UF_HOST = 1
, UF_PORT = 2
, UF_PATH = 3
, UF_QUERY = 4
, UF_FRAGMENT = 5
, UF_MAX = 6
};
/* Result structure for http_parser_parse_url().
*
* Callers should index into field_data[] with UF_* values iff field_set
* has the relevant (1 << UF_*) bit set. As a courtesy to clients (and
* because we probably have padding left over), we convert any port to
* a uint16_t.
*/
struct http_parser_url {
uint16_t field_set; /* Bitmask of (1 << UF_*) values */
uint16_t port; /* Converted UF_PORT string */
struct {
uint16_t off; /* Offset into buffer in which field starts */
uint16_t len; /* Length of run in buffer */
} field_data[UF_MAX];
};
void http_parser_init(http_parser *parser, enum http_parser_type type);
size_t http_parser_execute(http_parser *parser,
const http_parser_settings *settings,
const char *data,
size_t len);
/* If http_should_keep_alive() in the on_headers_complete or
* on_message_complete callback returns true, then this will be should be
* the last message on the connection.
* If you are the server, respond with the "Connection: close" header.
* If you are the client, close the connection.
*/
int http_should_keep_alive(http_parser *parser);
/* Returns a string version of the HTTP method. */
const char *http_method_str(enum http_method m);
/* Return a string name of the given error */
const char *http_errno_name(enum http_errno err);
/* Return a string description of the given error */
const char *http_errno_description(enum http_errno err);
/* Parse a URL; return nonzero on failure */
int http_parser_parse_url(const char *buf, size_t buflen,
int is_connect,
struct http_parser_url *u);
/* Pause or un-pause the parser; a nonzero value pauses */
void http_parser_pause(http_parser *parser, int paused);
#ifdef __cplusplus
}
#endif
#endif

73
tools/wrk/src/stats.c

@ -0,0 +1,73 @@
// Copyright (C) 2012 - Will Glozer. All rights reserved.
#include <inttypes.h>
#include <stdlib.h>
#include <math.h>
#include "stats.h"
#include "zmalloc.h"
stats *stats_alloc(uint64_t samples) {
stats *stats = zcalloc(sizeof(stats) + sizeof(uint64_t) * samples);
stats->samples = samples;
return stats;
}
void stats_free(stats *stats) {
zfree(stats);
}
void stats_record(stats *stats, uint64_t x) {
stats->data[stats->index++] = x;
if (stats->limit < stats->samples) stats->limit++;
if (stats->index == stats->samples) stats->index = 0;
}
uint64_t stats_min(stats *stats) {
uint64_t min = 0;
for (uint64_t i = 0; i < stats->limit; i++) {
uint64_t x = stats->data[i];
if (x < min || min == 0) min = x;
}
return min;
}
uint64_t stats_max(stats *stats) {
uint64_t max = 0;
for (uint64_t i = 0; i < stats->limit; i++) {
uint64_t x = stats->data[i];
if (x > max || max == 0) max = x;
}
return max;
}
long double stats_mean(stats *stats) {
uint64_t sum = 0;
if (stats->limit == 0) return 0.0;
for (uint64_t i = 0; i < stats->limit; i++) {
sum += stats->data[i];
}
return sum / (long double) stats->limit;
}
long double stats_stdev(stats *stats, long double mean) {
long double sum = 0.0;
if (stats->limit < 2) return 0.0;
for (uint64_t i = 0; i < stats->limit; i++) {
sum += powl(stats->data[i] - mean, 2);
}
return sqrtl(sum / (stats->limit - 1));
}
long double stats_within_stdev(stats *stats, long double mean, long double stdev, uint64_t n) {
long double upper = mean + (stdev * n);
long double lower = mean - (stdev * n);
uint64_t sum = 0;
for (uint64_t i = 0; i < stats->limit; i++) {
uint64_t x = stats->data[i];
if (x >= lower && x <= upper) sum++;
}
return (sum / (long double) stats->limit) * 100;
}

21
tools/wrk/src/stats.h

@ -0,0 +1,21 @@
#ifndef STATS_H
#define STATS_H
typedef struct {
uint64_t samples;
uint64_t index;
uint64_t limit;
uint64_t data[];
} stats;
stats *stats_alloc(uint64_t);
void stats_free(stats *);
void stats_record(stats *, uint64_t);
uint64_t stats_min(stats *);
uint64_t stats_max(stats *);
long double stats_mean(stats *);
long double stats_stdev(stats *stats, long double);
long double stats_within_stdev(stats *, long double, long double, uint64_t);
#endif /* STATS_H */

129
tools/wrk/src/tinymt64.c

@ -0,0 +1,129 @@
/**
* @file tinymt64.c
*
* @brief 64-bit Tiny Mersenne Twister only 127 bit internal state
*
* @author Mutsuo Saito (Hiroshima University)
* @author Makoto Matsumoto (The University of Tokyo)
*
* Copyright (C) 2011 Mutsuo Saito, Makoto Matsumoto,
* Hiroshima University and The University of Tokyo.
* All rights reserved.
*
* The 3-clause BSD License is applied to this software, see
* LICENSE.txt
*/
#include "tinymt64.h"
#define MIN_LOOP 8
/**
* This function represents a function used in the initialization
* by init_by_array
* @param[in] x 64-bit integer
* @return 64-bit integer
*/
static uint64_t ini_func1(uint64_t x) {
return (x ^ (x >> 59)) * UINT64_C(2173292883993);
}
/**
* This function represents a function used in the initialization
* by init_by_array
* @param[in] x 64-bit integer
* @return 64-bit integer
*/
static uint64_t ini_func2(uint64_t x) {
return (x ^ (x >> 59)) * UINT64_C(58885565329898161);
}
/**
* This function certificate the period of 2^127-1.
* @param random tinymt state vector.
*/
static void period_certification(tinymt64_t * random) {
if ((random->status[0] & TINYMT64_MASK) == 0 &&
random->status[1] == 0) {
random->status[0] = 'T';
random->status[1] = 'M';
}
}
/**
* This function initializes the internal state array with a 64-bit
* unsigned integer seed.
* @param random tinymt state vector.
* @param seed a 64-bit unsigned integer used as a seed.
*/
void tinymt64_init(tinymt64_t * random, uint64_t seed) {
random->status[0] = seed ^ ((uint64_t)random->mat1 << 32);
random->status[1] = random->mat2 ^ random->tmat;
for (int i = 1; i < MIN_LOOP; i++) {
random->status[i & 1] ^= i + UINT64_C(6364136223846793005)
* (random->status[(i - 1) & 1]
^ (random->status[(i - 1) & 1] >> 62));
}
period_certification(random);
}
/**
* This function initializes the internal state array,
* with an array of 64-bit unsigned integers used as seeds
* @param random tinymt state vector.
* @param init_key the array of 64-bit integers, used as a seed.
* @param key_length the length of init_key.
*/
void tinymt64_init_by_array(tinymt64_t * random, const uint64_t init_key[],
int key_length) {
const int lag = 1;
const int mid = 1;
const int size = 4;
int i, j;
int count;
uint64_t r;
uint64_t st[4];
st[0] = 0;
st[1] = random->mat1;
st[2] = random->mat2;
st[3] = random->tmat;
if (key_length + 1 > MIN_LOOP) {
count = key_length + 1;
} else {
count = MIN_LOOP;
}
r = ini_func1(st[0] ^ st[mid % size]
^ st[(size - 1) % size]);
st[mid % size] += r;
r += key_length;
st[(mid + lag) % size] += r;
st[0] = r;
count--;
for (i = 1, j = 0; (j < count) && (j < key_length); j++) {
r = ini_func1(st[i] ^ st[(i + mid) % size] ^ st[(i + size - 1) % size]);
st[(i + mid) % size] += r;
r += init_key[j] + i;
st[(i + mid + lag) % size] += r;
st[i] = r;
i = (i + 1) % size;
}
for (; j < count; j++) {
r = ini_func1(st[i] ^ st[(i + mid) % size] ^ st[(i + size - 1) % size]);
st[(i + mid) % size] += r;
r += i;
st[(i + mid + lag) % size] += r;
st[i] = r;
i = (i + 1) % size;
}
for (j = 0; j < size; j++) {
r = ini_func2(st[i] + st[(i + mid) % size] + st[(i + size - 1) % size]);
st[(i + mid) % size] ^= r;
r -= i;
st[(i + mid + lag) % size] ^= r;
st[i] = r;
i = (i + 1) % size;
}
random->status[0] = st[0] ^ st[1];
random->status[1] = st[2] ^ st[3];
period_certification(random);
}

210
tools/wrk/src/tinymt64.h

@ -0,0 +1,210 @@
#ifndef TINYMT64_H
#define TINYMT64_H
/**
* @file tinymt64.h
*
* @brief Tiny Mersenne Twister only 127 bit internal state
*
* @author Mutsuo Saito (Hiroshima University)
* @author Makoto Matsumoto (The University of Tokyo)
*
* Copyright (C) 2011 Mutsuo Saito, Makoto Matsumoto,
* Hiroshima University and The University of Tokyo.
* All rights reserved.
*
* The 3-clause BSD License is applied to this software, see
* LICENSE.txt
*/
#include <stdint.h>
#include <inttypes.h>
#define TINYMT64_MEXP 127
#define TINYMT64_SH0 12
#define TINYMT64_SH1 11
#define TINYMT64_SH8 8
#define TINYMT64_MASK UINT64_C(0x7fffffffffffffff)
#define TINYMT64_MUL (1.0 / 18446744073709551616.0)
/*
* tinymt64 internal state vector and parameters
*/
struct TINYMT64_T {
uint64_t status[2];
uint32_t mat1;
uint32_t mat2;
uint64_t tmat;
};
typedef struct TINYMT64_T tinymt64_t;
void tinymt64_init(tinymt64_t * random, uint64_t seed);
void tinymt64_init_by_array(tinymt64_t * random, const uint64_t init_key[],
int key_length);
#if defined(__GNUC__)
/**
* This function always returns 127
* @param random not used
* @return always 127
*/
inline static int tinymt64_get_mexp(
tinymt64_t * random __attribute__((unused))) {
return TINYMT64_MEXP;
}
#else
inline static int tinymt64_get_mexp(tinymt64_t * random) {
return TINYMT64_MEXP;
}
#endif
/**
* This function changes internal state of tinymt64.
* Users should not call this function directly.
* @param random tinymt internal status
*/
inline static void tinymt64_next_state(tinymt64_t * random) {
uint64_t x;
random->status[0] &= TINYMT64_MASK;
x = random->status[0] ^ random->status[1];
x ^= x << TINYMT64_SH0;
x ^= x >> 32;
x ^= x << 32;
x ^= x << TINYMT64_SH1;
random->status[0] = random->status[1];
random->status[1] = x;
random->status[0] ^= -((int64_t)(x & 1)) & random->mat1;
random->status[1] ^= -((int64_t)(x & 1)) & (((uint64_t)random->mat2) << 32);
}
/**
* This function outputs 64-bit unsigned integer from internal state.
* Users should not call this function directly.
* @param random tinymt internal status
* @return 64-bit unsigned pseudorandom number
*/
inline static uint64_t tinymt64_temper(tinymt64_t * random) {
uint64_t x;
#if defined(LINEARITY_CHECK)
x = random->status[0] ^ random->status[1];
#else
x = random->status[0] + random->status[1];
#endif
x ^= random->status[0] >> TINYMT64_SH8;
x ^= -((int64_t)(x & 1)) & random->tmat;
return x;
}
/**
* This function outputs floating point number from internal state.
* Users should not call this function directly.
* @param random tinymt internal status
* @return floating point number r (1.0 <= r < 2.0)
*/
inline static double tinymt64_temper_conv(tinymt64_t * random) {
uint64_t x;
union {
uint64_t u;
double d;
} conv;
#if defined(LINEARITY_CHECK)
x = random->status[0] ^ random->status[1];
#else
x = random->status[0] + random->status[1];
#endif
x ^= random->status[0] >> TINYMT64_SH8;
conv.u = ((x ^ (-((int64_t)(x & 1)) & random->tmat)) >> 12)
| UINT64_C(0x3ff0000000000000);
return conv.d;
}
/**
* This function outputs floating point number from internal state.
* Users should not call this function directly.
* @param random tinymt internal status
* @return floating point number r (1.0 < r < 2.0)
*/
inline static double tinymt64_temper_conv_open(tinymt64_t * random) {
uint64_t x;
union {
uint64_t u;
double d;
} conv;
#if defined(LINEARITY_CHECK)
x = random->status[0] ^ random->status[1];
#else
x = random->status[0] + random->status[1];
#endif
x ^= random->status[0] >> TINYMT64_SH8;
conv.u = ((x ^ (-((int64_t)(x & 1)) & random->tmat)) >> 12)
| UINT64_C(0x3ff0000000000001);
return conv.d;
}
/**
* This function outputs 64-bit unsigned integer from internal state.
* @param random tinymt internal status
* @return 64-bit unsigned integer r (0 <= r < 2^64)
*/
inline static uint64_t tinymt64_generate_uint64(tinymt64_t * random) {
tinymt64_next_state(random);
return tinymt64_temper(random);
}
/**
* This function outputs floating point number from internal state.
* This function is implemented using multiplying by 1 / 2^64.
* @param random tinymt internal status
* @return floating point number r (0.0 <= r < 1.0)
*/
inline static double tinymt64_generate_double(tinymt64_t * random) {
tinymt64_next_state(random);
return tinymt64_temper(random) * TINYMT64_MUL;
}
/**
* This function outputs floating point number from internal state.
* This function is implemented using union trick.
* @param random tinymt internal status
* @return floating point number r (0.0 <= r < 1.0)
*/
inline static double tinymt64_generate_double01(tinymt64_t * random) {
tinymt64_next_state(random);
return tinymt64_temper_conv(random) - 1.0;
}
/**
* This function outputs floating point number from internal state.
* This function is implemented using union trick.
* @param random tinymt internal status
* @return floating point number r (1.0 <= r < 2.0)
*/
inline static double tinymt64_generate_double12(tinymt64_t * random) {
tinymt64_next_state(random);
return tinymt64_temper_conv(random);
}
/**
* This function outputs floating point number from internal state.
* This function is implemented using union trick.
* @param random tinymt internal status
* @return floating point number r (0.0 < r <= 1.0)
*/
inline static double tinymt64_generate_doubleOC(tinymt64_t * random) {
tinymt64_next_state(random);
return 2.0 - tinymt64_temper_conv(random);
}
/**
* This function outputs floating point number from internal state.
* This function is implemented using union trick.
* @param random tinymt internal status
* @return floating point number r (0.0 < r < 1.0)
*/
inline static double tinymt64_generate_doubleOO(tinymt64_t * random) {
tinymt64_next_state(random);
return tinymt64_temper_conv_open(random) - 1.0;
}
#endif

96
tools/wrk/src/units.c

@ -0,0 +1,96 @@
// Copyright (C) 2012 - Will Glozer. All rights reserved.
#include <stdlib.h>
#include <stdio.h>
#include <strings.h>
#include <inttypes.h>
#include "units.h"
#include "aprintf.h"
typedef struct {
int scale;
char *base;
char *units[];
} units;
units time_units_us = {
.scale = 1000,
.base = "us",
.units = { "ms", "s", NULL }
};
units time_units_s = {
.scale = 60,
.base = "s",
.units = { "m", "h", NULL }
};
units binary_units = {
.scale = 1024,
.base = "",
.units = { "K", "M", "G", "T", "P", NULL }
};
units metric_units = {
.scale = 1000,
.base = "",
.units = { "k", "M", "G", "T", "P", NULL }
};
static char *format_units(long double n, units *m, int p) {
long double amt = n, scale;
char *unit = m->base;
char *msg = NULL;
scale = m->scale * 0.85;
for (int i = 0; m->units[i+1] && amt >= scale; i++) {
amt /= m->scale;
unit = m->units[i];
}
aprintf(&msg, "%.*Lf%s", p, amt, unit);
return msg;
}
static int scan_units(char *s, uint64_t *n, units *m) {
uint64_t base, scale = 1;
char unit[3] = { 0, 0, 0 };
int i, c;
if ((c = sscanf(s, "%"SCNu64"%2s", &base, unit)) < 1) return -1;
if (c == 2) {
for (i = 0; m->units[i] != NULL; i++) {
scale *= m->scale;
if (!strncasecmp(unit, m->units[i], sizeof(unit))) break;
}
if (m->units[i] == NULL) return -1;
}
*n = base * scale;
return 0;
}
char *format_binary(long double n) {
return format_units(n, &binary_units, 2);
}
char *format_metric(long double n) {
return format_units(n, &metric_units, 2);
}
char *format_time_us(long double n) {
units *units = &time_units_us;
if (n >= 1000000.0) {
n /= 1000000.0;
units = &time_units_s;
}
return format_units(n, units, 2);
}
int scan_metric(char *s, uint64_t *n) {
return scan_units(s, n, &metric_units);
}

10
tools/wrk/src/units.h

@ -0,0 +1,10 @@
#ifndef UNITS_H
#define UNITS_H
char *format_binary(long double);
char *format_metric(long double);
char *format_time_us(long double);
int scan_metric(char *, uint64_t *);
#endif /* UNITS_H */

482
tools/wrk/src/wrk.c

@ -0,0 +1,482 @@
// Copyright (C) 2012 - Will Glozer. All rights reserved.
#include "wrk.h"
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <getopt.h>
#include <math.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/uio.h>
#include "aprintf.h"
#include "stats.h"
#include "units.h"
#include "zmalloc.h"
#include "tinymt64.h"
extern char *optarg;
extern int optind, opterr;
static struct config {
struct addrinfo addr;
uint64_t threads;
uint64_t connections;
uint64_t requests;
uint64_t timeout;
} cfg;
static struct {
size_t size;
char *buf;
} request;
static struct {
stats *latency;
stats *requests;
pthread_mutex_t mutex;
} statistics;
static const struct http_parser_settings parser_settings = {
.on_message_complete = request_complete
};
static void usage() {
printf("Usage: wrk <options> <url> \n"
" Options: \n"
" -c, --connections <n> Connections to keep open \n"
" -r, --requests <n> Total requests to make \n"
" -t, --threads <n> Number of threads to use \n"
" \n"
" -H, --header <h> Add header to request \n"
" -v, --version Print version details \n"
" \n"
" Numeric arguments may include a SI unit (2k, 2M, 2G)\n");
}
int main(int argc, char **argv) {
struct addrinfo *addrs, *addr;
struct http_parser_url parser_url;
char *url, **headers;
int rc;
headers = zmalloc((argc / 2) * sizeof(char *));
if (parse_args(&cfg, &url, headers, argc, argv)) {
usage();
exit(1);
}
if (http_parser_parse_url(url, strlen(url), 0, &parser_url)) {
fprintf(stderr, "invalid URL: %s\n", url);
exit(1);
}
char *host = extract_url_part(url, &parser_url, UF_HOST);
char *port = extract_url_part(url, &parser_url, UF_PORT);
char *service = port ? port : extract_url_part(url, &parser_url, UF_SCHEMA);
char *path = &url[parser_url.field_data[UF_PATH].off];
struct addrinfo hints = {
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM
};
if ((rc = getaddrinfo(host, service, &hints, &addrs)) != 0) {
const char *msg = gai_strerror(rc);
fprintf(stderr, "unable to resolve %s:%s %s\n", host, service, msg);
exit(1);
}
for (addr = addrs; addr != NULL; addr = addr->ai_next) {
int fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (fd == -1) continue;
if (connect(fd, addr->ai_addr, addr->ai_addrlen) == -1) {
if (errno == EHOSTUNREACH || errno == ECONNREFUSED) {
close(fd);
continue;
}
}
close(fd);
break;
}
if (addr == NULL) {
char *msg = strerror(errno);
fprintf(stderr, "unable to connect to %s:%s %s\n", host, service, msg);
exit(1);
}
cfg.addr = *addr;
request.buf = format_request(host, port, path, headers);
request.size = strlen(request.buf);
pthread_mutex_init(&statistics.mutex, NULL);
statistics.latency = stats_alloc(SAMPLES);
statistics.requests = stats_alloc(SAMPLES);
thread *threads = zcalloc(cfg.threads * sizeof(thread));
uint64_t connections = cfg.connections / cfg.threads;
uint64_t requests = cfg.requests / cfg.threads;
for (uint64_t i = 0; i < cfg.threads; i++) {
thread *t = &threads[i];
t->connections = connections;
t->requests = requests;
if (pthread_create(&t->thread, NULL, &thread_main, t)) {
char *msg = strerror(errno);
fprintf(stderr, "unable to create thread %zu %s\n", i, msg);
exit(2);
}
}
printf("Making %"PRIu64" requests to %s\n", cfg.requests, url);
printf(" %"PRIu64" threads and %"PRIu64" connections\n", cfg.threads, cfg.connections);
uint64_t start = time_us();
uint64_t complete = 0;
uint64_t bytes = 0;
errors errors = { 0 };
for (uint64_t i = 0; i < cfg.threads; i++) {
thread *t = &threads[i];
pthread_join(t->thread, NULL);
complete += t->complete;
bytes += t->bytes;
errors.connect += t->errors.connect;
errors.read += t->errors.read;
errors.write += t->errors.write;
errors.timeout += t->errors.timeout;
errors.status += t->errors.status;
}
uint64_t runtime_us = time_us() - start;
long double runtime_s = runtime_us / 1000000.0;
long double req_per_s = complete / runtime_s;
long double bytes_per_s = bytes / runtime_s;
print_stats_header();
print_stats("Latency", statistics.latency, format_time_us);
print_stats("Req/Sec", statistics.requests, format_metric);
char *runtime_msg = format_time_us(runtime_us);
printf(" %"PRIu64" requests in %s, %sB read\n", complete, runtime_msg, format_binary(bytes));
if (errors.connect || errors.read || errors.write || errors.timeout) {
printf(" Socket errors: connect %d, read %d, write %d, timeout %d\n",
errors.connect, errors.read, errors.write, errors.timeout);
}
if (errors.status) {
printf(" Non-2xx or 3xx responses: %d\n", errors.status);
}
printf("Requests/sec: %9.2Lf\n", req_per_s);
printf("Transfer/sec: %10sB\n", format_binary(bytes_per_s));
return 0;
}
void *thread_main(void *arg) {
thread *thread = arg;
aeEventLoop *loop = aeCreateEventLoop(10 + cfg.connections * 3);
thread->cs = zmalloc(thread->connections * sizeof(connection));
thread->loop = loop;
tinymt64_init(&thread->rand, time_us());
connection *c = thread->cs;
for (uint64_t i = 0; i < thread->connections; i++, c++) {
c->thread = thread;
c->latency = 0;
connect_socket(thread, c);
}
aeCreateTimeEvent(loop, SAMPLE_INTERVAL_MS, sample_rate, thread, NULL);
aeCreateTimeEvent(loop, TIMEOUT_INTERVAL_MS, check_timeouts, thread, NULL);
thread->start = time_us();
aeMain(loop);
aeDeleteEventLoop(loop);
zfree(thread->cs);
return NULL;
}
static int connect_socket(thread *thread, connection *c) {
struct addrinfo addr = cfg.addr;
struct aeEventLoop *loop = thread->loop;
int fd, flags;
fd = socket(addr.ai_family, addr.ai_socktype, addr.ai_protocol);
flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (connect(fd, addr.ai_addr, addr.ai_addrlen) == -1) {
if (errno != EINPROGRESS) {
thread->errors.connect++;
goto error;
}
}
flags = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
if (aeCreateFileEvent(loop, fd, AE_WRITABLE, socket_writeable, c) != AE_OK) {
goto error;
}
http_parser_init(&c->parser, HTTP_RESPONSE);
c->parser.data = c;
c->fd = fd;
return fd;
error:
close(fd);
return -1;
}
static int reconnect_socket(thread *thread, connection *c) {
aeDeleteFileEvent(thread->loop, c->fd, AE_WRITABLE | AE_READABLE);
close(c->fd);
return connect_socket(thread, c);
}
static int sample_rate(aeEventLoop *loop, long long id, void *data) {
thread *thread = data;
uint64_t n = rand64(&thread->rand, thread->connections);
uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
connection *c = thread->cs + n;
uint64_t requests = (thread->complete / elapsed_ms) * 1000;
pthread_mutex_lock(&statistics.mutex);
stats_record(statistics.latency, c->latency);
stats_record(statistics.requests, requests);
pthread_mutex_unlock(&statistics.mutex);
return SAMPLE_INTERVAL_MS + rand64(&thread->rand, SAMPLE_INTERVAL_MS);
}
static int request_complete(http_parser *parser) {
connection *c = parser->data;
thread *thread = c->thread;
if (parser->status_code > 399) {
thread->errors.status++;
}
if (++thread->complete >= thread->requests) {
aeStop(thread->loop);
goto done;
}
c->latency = time_us() - c->start;
if (!http_should_keep_alive(parser)) goto reconnect;
http_parser_init(parser, HTTP_RESPONSE);
aeDeleteFileEvent(thread->loop, c->fd, AE_READABLE);
aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
goto done;
reconnect:
reconnect_socket(thread, c);
done:
return 0;
}
static int check_timeouts(aeEventLoop *loop, long long id, void *data) {
thread *thread = data;
connection *c = thread->cs;
uint64_t maxAge = time_us() - (cfg.timeout * 1000);
for (uint64_t i = 0; i < thread->connections; i++, c++) {
if (maxAge > c->start) {
thread->errors.timeout++;
}
}
return TIMEOUT_INTERVAL_MS;
}
static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
connection *c = data;
if (write(fd, request.buf, request.size) < request.size) goto error;
c->start = time_us();
aeDeleteFileEvent(loop, fd, AE_WRITABLE);
aeCreateFileEvent(loop, fd, AE_READABLE, socket_readable, c);
return;
error:
c->thread->errors.write++;
reconnect_socket(c->thread, c);
}
static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
connection *c = data;
ssize_t n;
if ((n = read(fd, c->buf, sizeof(c->buf))) == -1) goto error;
if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
c->thread->bytes += n;
return;
error:
c->thread->errors.read++;
reconnect_socket(c->thread, c);
}
static uint64_t time_us() {
struct timeval t;
gettimeofday(&t, NULL);
return (t.tv_sec * 1000000) + t.tv_usec;
}
static uint64_t rand64(tinymt64_t *state, uint64_t n) {
uint64_t x, max = ~UINT64_C(0);
max -= max % n;
do {
x = tinymt64_generate_uint64(state);
} while (x >= max);
return x % n;
}
static char *extract_url_part(char *url, struct http_parser_url *parser_url, enum http_parser_url_fields field) {
char *part = NULL;
if (parser_url->field_set & (1 << field)) {
uint16_t off = parser_url->field_data[field].off;
uint16_t len = parser_url->field_data[field].len;
part = zcalloc(len + 1 * sizeof(char));
memcpy(part, &url[off], len);
}
return part;
}
static char *format_request(char *host, char *port, char *path, char **headers) {
char *req = NULL;
aprintf(&req, "GET %s HTTP/1.1\r\n", path);
aprintf(&req, "Host: %s", host);
if (port) aprintf(&req, ":%s", port);
aprintf(&req, "\r\n");
for (char **h = headers; *h != NULL; h++) {
aprintf(&req, "%s\r\n", *h);
}
aprintf(&req, "\r\n");
return req;
}
static struct option longopts[] = {
{ "connections", required_argument, NULL, 'c' },
{ "requests", required_argument, NULL, 'r' },
{ "threads", required_argument, NULL, 't' },
{ "header", required_argument, NULL, 'H' },
{ "help", no_argument, NULL, 'h' },
{ "version", no_argument, NULL, 'v' },
{ NULL, 0, NULL, 0 }
};
static int parse_args(struct config *cfg, char **url, char **headers, int argc, char **argv) {
char c, **header = headers;
memset(cfg, 0, sizeof(struct config));
cfg->threads = 2;
cfg->connections = 10;
cfg->requests = 100;
cfg->timeout = SOCKET_TIMEOUT_MS;
while ((c = getopt_long(argc, argv, "t:c:r:H:v?", longopts, NULL)) != -1) {
switch (c) {
case 't':
if (scan_metric(optarg, &cfg->threads)) return -1;
break;
case 'c':
if (scan_metric(optarg, &cfg->connections)) return -1;
break;
case 'r':
if (scan_metric(optarg, &cfg->requests)) return -1;
break;
case 'H':
*header++ = optarg;
break;
case 'v':
printf("wrk %s [%s] ", VERSION, aeGetApiName());
printf("Copyright (C) 2012 Will Glozer\n");
break;
case 'h':
case '?':
case ':':
default:
return -1;
}
}
if (optind == argc || !cfg->threads || !cfg->requests) return -1;
if (!cfg->connections || cfg->connections < cfg->threads) {
fprintf(stderr, "number of connections must be >= threads\n");
return -1;
}
*url = argv[optind];
*header = NULL;
return 0;
}
static void print_stats_header() {
printf(" Thread Stats%6s%11s%8s%12s\n", "Avg", "Stdev", "Max", "+/- Stdev");
}
static void print_units(long double n, char *(*fmt)(long double), int width) {
char *msg = fmt(n);
int len = strlen(msg), pad = 2;
if (isalpha(msg[len-1])) pad--;
if (isalpha(msg[len-2])) pad--;
width -= pad;
printf("%*.*s%.*s", width, width, msg, pad, " ");
free(msg);
}
static void print_stats(char *name, stats *stats, char *(*fmt)(long double)) {
long double mean = stats_mean(stats);
long double max = stats_max(stats);
long double stdev = stats_stdev(stats, mean);
printf(" %-10s", name);
print_units(mean, fmt, 8);
print_units(stdev, fmt, 10);
print_units(max, fmt, 9);
printf("%8.2Lf%%\n", stats_within_stdev(stats, mean, stdev, 1));
}

75
tools/wrk/src/wrk.h

@ -0,0 +1,75 @@
#ifndef WRK_H
#define WRK_H
#include "config.h"
#include <pthread.h>
#include <inttypes.h>
#include <sys/types.h>
#include "stats.h"
#include "ae.h"
#include "http_parser.h"
#include "tinymt64.h"
#define VERSION "1.0.0"
#define RECVBUF 8192
#define SAMPLES 100000
#define SOCKET_TIMEOUT_MS 2000
#define SAMPLE_INTERVAL_MS 100
#define TIMEOUT_INTERVAL_MS 2000
typedef struct {
uint32_t connect;
uint32_t read;
uint32_t write;
uint32_t status;
uint32_t timeout;
} errors;
typedef struct {
pthread_t thread;
aeEventLoop *loop;
uint64_t connections;
uint64_t requests;
uint64_t complete;
uint64_t bytes;
uint64_t start;
tinymt64_t rand;
errors errors;
struct connection *cs;
} thread;
typedef struct connection {
thread *thread;
http_parser parser;
int fd;
uint64_t start;
uint64_t latency;
char buf[RECVBUF];
} connection;
struct config;
static void *thread_main(void *);
static int connect_socket(thread *, connection *);
static int reconnect_socket(thread *, connection *);
static int sample_rate(aeEventLoop *, long long, void *);
static int check_timeouts(aeEventLoop *, long long, void *);
static void socket_writeable(aeEventLoop *, int, void *, int);
static void socket_readable(aeEventLoop *, int, void *, int);
static int request_complete(http_parser *);
static uint64_t time_us();
static uint64_t rand64(tinymt64_t *, uint64_t);
static char *extract_url_part(char *, struct http_parser_url *, enum http_parser_url_fields);
static char *format_request(char *, char *, char *, char **);
static int parse_args(struct config *, char **, char **, int, char **);
static void print_stats_header();
static void print_stats(char *, stats *, char *(*)(long double));
#endif /* WRK_H */

287
tools/wrk/src/zmalloc.c

@ -0,0 +1,287 @@
/* zmalloc - total amount of allocated memory aware version of malloc()
*
* Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "config.h"
#include "zmalloc.h"
#ifdef HAVE_MALLOC_SIZE
#define PREFIX_SIZE (0)
#else
#if defined(__sun) || defined(__sparc) || defined(__sparc__)
#define PREFIX_SIZE (sizeof(long long))
#else
#define PREFIX_SIZE (sizeof(size_t))
#endif
#endif
/* Explicitly override malloc/free etc when using tcmalloc. */
#if defined(USE_TCMALLOC)
#define malloc(size) tc_malloc(size)
#define calloc(count,size) tc_calloc(count,size)
#define realloc(ptr,size) tc_realloc(ptr,size)
#define free(ptr) tc_free(ptr)
#elif defined(USE_JEMALLOC)
#define malloc(size) je_malloc(size)
#define calloc(count,size) je_calloc(count,size)
#define realloc(ptr,size) je_realloc(ptr,size)
#define free(ptr) je_free(ptr)
#endif
#define update_zmalloc_stat_alloc(__n,__size) do { \
size_t _n = (__n); \
if (_n&(sizeof(long)-1)) _n += sizeof(long)-(_n&(sizeof(long)-1)); \
if (zmalloc_thread_safe) { \
pthread_mutex_lock(&used_memory_mutex); \
used_memory += _n; \
pthread_mutex_unlock(&used_memory_mutex); \
} else { \
used_memory += _n; \
} \
} while(0)
#define update_zmalloc_stat_free(__n) do { \
size_t _n = (__n); \
if (_n&(sizeof(long)-1)) _n += sizeof(long)-(_n&(sizeof(long)-1)); \
if (zmalloc_thread_safe) { \
pthread_mutex_lock(&used_memory_mutex); \
used_memory -= _n; \
pthread_mutex_unlock(&used_memory_mutex); \
} else { \
used_memory -= _n; \
} \
} while(0)
static size_t used_memory = 0;
static int zmalloc_thread_safe = 0;
pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER;
static void zmalloc_oom(size_t size) {
fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",
size);
fflush(stderr);
abort();
}
void *zmalloc(size_t size) {
void *ptr = malloc(size+PREFIX_SIZE);
if (!ptr) zmalloc_oom(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr),size);
return ptr;
#else
*((size_t*)ptr) = size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE,size);
return (char*)ptr+PREFIX_SIZE;
#endif
}
void *zcalloc(size_t size) {
void *ptr = calloc(1, size+PREFIX_SIZE);
if (!ptr) zmalloc_oom(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr),size);
return ptr;
#else
*((size_t*)ptr) = size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE,size);
return (char*)ptr+PREFIX_SIZE;
#endif
}
void *zrealloc(void *ptr, size_t size) {
#ifndef HAVE_MALLOC_SIZE
void *realptr;
#endif
size_t oldsize;
void *newptr;
if (ptr == NULL) return zmalloc(size);
#ifdef HAVE_MALLOC_SIZE
oldsize = zmalloc_size(ptr);
newptr = realloc(ptr,size);
if (!newptr) zmalloc_oom(size);
update_zmalloc_stat_free(oldsize);
update_zmalloc_stat_alloc(zmalloc_size(newptr),size);
return newptr;
#else
realptr = (char*)ptr-PREFIX_SIZE;
oldsize = *((size_t*)realptr);
newptr = realloc(realptr,size+PREFIX_SIZE);
if (!newptr) zmalloc_oom(size);
*((size_t*)newptr) = size;
update_zmalloc_stat_free(oldsize);
update_zmalloc_stat_alloc(size,size);
return (char*)newptr+PREFIX_SIZE;
#endif
}
/* Provide zmalloc_size() for systems where this function is not provided by
* malloc itself, given that in that case we store an header with this
* information as the first bytes of every allocation. */
#ifndef HAVE_MALLOC_SIZE
size_t zmalloc_size(void *ptr) {
void *realptr = (char*)ptr-PREFIX_SIZE;
size_t size = *((size_t*)realptr);
/* Assume at least that all the allocations are padded at sizeof(long) by
* the underlying allocator. */
if (size&(sizeof(long)-1)) size += sizeof(long)-(size&(sizeof(long)-1));
return size+PREFIX_SIZE;
}
#endif
void zfree(void *ptr) {
#ifndef HAVE_MALLOC_SIZE
void *realptr;
size_t oldsize;
#endif
if (ptr == NULL) return;
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_free(zmalloc_size(ptr));
free(ptr);
#else
realptr = (char*)ptr-PREFIX_SIZE;
oldsize = *((size_t*)realptr);
update_zmalloc_stat_free(oldsize+PREFIX_SIZE);
free(realptr);
#endif
}
char *zstrdup(const char *s) {
size_t l = strlen(s)+1;
char *p = zmalloc(l);
memcpy(p,s,l);
return p;
}
size_t zmalloc_used_memory(void) {
size_t um;
if (zmalloc_thread_safe) pthread_mutex_lock(&used_memory_mutex);
um = used_memory;
if (zmalloc_thread_safe) pthread_mutex_unlock(&used_memory_mutex);
return um;
}
void zmalloc_enable_thread_safeness(void) {
zmalloc_thread_safe = 1;
}
/* Get the RSS information in an OS-specific way.
*
* WARNING: the function zmalloc_get_rss() is not designed to be fast
* and may not be called in the busy loops where Redis tries to release
* memory expiring or swapping out objects.
*
* For this kind of "fast RSS reporting" usages use instead the
* function RedisEstimateRSS() that is a much faster (and less precise)
* version of the funciton. */
#if defined(HAVE_PROCFS)
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
size_t zmalloc_get_rss(void) {
int page = sysconf(_SC_PAGESIZE);
size_t rss;
char buf[4096];
char filename[256];
int fd, count;
char *p, *x;
snprintf(filename,256,"/proc/%d/stat",getpid());
if ((fd = open(filename,O_RDONLY)) == -1) return 0;
if (read(fd,buf,4096) <= 0) {
close(fd);
return 0;
}
close(fd);
p = buf;
count = 23; /* RSS is the 24th field in /proc/<pid>/stat */
while(p && count--) {
p = strchr(p,' ');
if (p) p++;
}
if (!p) return 0;
x = strchr(p,' ');
if (!x) return 0;
*x = '\0';
rss = strtoll(p,NULL,10);
rss *= page;
return rss;
}
#elif defined(HAVE_TASKINFO)
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/sysctl.h>
#include <mach/task.h>
#include <mach/mach_init.h>
size_t zmalloc_get_rss(void) {
task_t task = MACH_PORT_NULL;
struct task_basic_info t_info;
mach_msg_type_number_t t_info_count = TASK_BASIC_INFO_COUNT;
if (task_for_pid(current_task(), getpid(), &task) != KERN_SUCCESS)
return 0;
task_info(task, TASK_BASIC_INFO, (task_info_t)&t_info, &t_info_count);
return t_info.resident_size;
}
#else
size_t zmalloc_get_rss(void) {
/* If we can't get the RSS in an OS-specific way for this system just
* return the memory usage we estimated in zmalloc()..
*
* Fragmentation will appear to be always 1 (no fragmentation)
* of course... */
return zmalloc_used_memory();
}
#endif
/* Fragmentation = RSS / allocated-bytes */
float zmalloc_get_fragmentation_ratio(void) {
return (float)zmalloc_get_rss()/zmalloc_used_memory();
}

83
tools/wrk/src/zmalloc.h

@ -0,0 +1,83 @@
/* zmalloc - total amount of allocated memory aware version of malloc()
*
* Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __ZMALLOC_H
#define __ZMALLOC_H
/* Double expansion needed for stringification of macro values. */
#define __xstr(s) __str(s)
#define __str(s) #s
#if defined(USE_TCMALLOC)
#define ZMALLOC_LIB ("tcmalloc-" __xstr(TC_VERSION_MAJOR) "." __xstr(TC_VERSION_MINOR))
#include <google/tcmalloc.h>
#if TC_VERSION_MAJOR >= 1 && TC_VERSION_MINOR >= 6
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) tc_malloc_size(p)
#else
#error "Newer version of tcmalloc required"
#endif
#elif defined(USE_JEMALLOC)
#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX))
#define JEMALLOC_MANGLE
#include <jemalloc/jemalloc.h>
#if JEMALLOC_VERSION_MAJOR >= 2 && JEMALLOC_VERSION_MINOR >= 1
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) JEMALLOC_P(malloc_usable_size)(p)
#else
#error "Newer version of jemalloc required"
#endif
#elif defined(__APPLE__)
#include <malloc/malloc.h>
#define HAVE_MALLOC_SIZE 1
#define zmalloc_size(p) malloc_size(p)
#endif
#ifndef ZMALLOC_LIB
#define ZMALLOC_LIB "libc"
#endif
void *zmalloc(size_t size);
void *zcalloc(size_t size);
void *zrealloc(void *ptr, size_t size);
void zfree(void *ptr);
char *zstrdup(const char *s);
size_t zmalloc_used_memory(void);
void zmalloc_enable_thread_safeness(void);
float zmalloc_get_fragmentation_ratio(void);
size_t zmalloc_get_rss(void);
#ifndef HAVE_MALLOC_SIZE
size_t zmalloc_size(void *ptr);
#endif
#endif /* __ZMALLOC_H */
Loading…
Cancel
Save