Support Upgrade in HTTP messages

This allows for web servers to be "hijacked" and used as Web Socket servers
(or other). You simply listen for requests as normal, but check if

  req.upgrade === true

If so, this will be the last request of the connection. It's your job now to
hijack req.connection and start reading from it. req.upgradeHead is a buffer
containing the first part of the new protocol communication (in the case it
arrived on the same packet).

This needs tests and documentation. API subject to change.
This commit is contained in:
Ryan Dahl 2010-04-14 03:52:15 -07:00
parent af49187e57
commit 760bba5518
2 changed files with 38 additions and 8 deletions

View file

@ -76,7 +76,13 @@ var parsers = new FreeList('parsers', 1000, function () {
parser.incoming.statusCode = info.statusCode; parser.incoming.statusCode = info.statusCode;
} }
parser.onIncoming(parser.incoming, info.shouldKeepAlive); parser.incoming.upgrade = info.upgrade;
if (!info.upgrade) {
// For upgraded connections, we'll emit this after parser.execute
// so that we can capture the first part of the new protocol
parser.onIncoming(parser.incoming, info.shouldKeepAlive);
}
}; };
parser.onBody = function (b, start, len) { parser.onBody = function (b, start, len) {
@ -91,7 +97,10 @@ var parsers = new FreeList('parsers', 1000, function () {
}; };
parser.onMessageComplete = function () { parser.onMessageComplete = function () {
parser.incoming.emit("end"); if (!parser.incoming.upgrade) {
// For upgraded connections, also emit this after parser.execute
parser.incoming.emit("end");
}
}; };
return parser; return parser;
@ -512,7 +521,16 @@ function connectionListener (socket) {
parser.socket = socket; parser.socket = socket;
socket.ondata = function (d, start, end) { socket.ondata = function (d, start, end) {
parser.execute(d, start, end - start); var bytesParsed = parser.execute(d, start, end - start);
if (parser.incoming && parser.incoming.upgrade) {
var upgradeHead = d.slice(start + bytesParsed, end - start);
parser.incoming.upgradeHead = upgradeHead;
socket.ondata = null;
socket.onend = null;
self.emit('request', parser.incoming, null);
parser.incoming.emit('end');
}
}; };
socket.onend = function () { socket.onend = function () {
@ -579,8 +597,16 @@ function Client ( ) {
requests.push(req); requests.push(req);
}; };
this.ondata = function (d, start, end) { self.ondata = function (d, start, end) {
parser.execute(d, start, end - start); var bytesParsed = parser.execute(d, start, end - start);
if (parser.incoming && parser.incoming.upgrade) {
var upgradeHead = d.slice(start + bytesParsed, end - start);
parser.incoming.upgradeHead = upgradeHead;
currentRequest.emit("response", parser.incoming);
parser.incoming.emit('end');
self.ondata = null;
self.onend = null
}
}; };
self.addListener("connect", function () { self.addListener("connect", function () {
@ -590,12 +616,12 @@ function Client ( ) {
currentRequest.flush(); currentRequest.flush();
}); });
self.addListener("end", function () { self.onend = function () {
parser.finish(); parser.finish();
debug("self got end closing. readyState = " + self.readyState); debug("self got end closing. readyState = " + self.readyState);
self.end(); self.end();
}); };
self.addListener("close", function (e) { self.addListener("close", function (e) {
if (e) { if (e) {

View file

@ -61,6 +61,7 @@ static Persistent<String> http_version_sym;
static Persistent<String> version_major_sym; static Persistent<String> version_major_sym;
static Persistent<String> version_minor_sym; static Persistent<String> version_minor_sym;
static Persistent<String> should_keep_alive_sym; static Persistent<String> should_keep_alive_sym;
static Persistent<String> upgrade_sym;
static struct http_parser_settings settings; static struct http_parser_settings settings;
@ -165,6 +166,8 @@ class Parser : public ObjectWrap {
message_info->Set(should_keep_alive_sym, message_info->Set(should_keep_alive_sym,
http_should_keep_alive(p) ? True() : False()); http_should_keep_alive(p) ? True() : False());
message_info->Set(upgrade_sym, p->upgrade ? True() : False());
Local<Value> argv[1] = { message_info }; Local<Value> argv[1] = { message_info };
Local<Value> ret = cb->Call(parser->handle_, 1, argv); Local<Value> ret = cb->Call(parser->handle_, 1, argv);
@ -243,7 +246,7 @@ class Parser : public ObjectWrap {
Local<Integer> nparsed_obj = Integer::New(nparsed); Local<Integer> nparsed_obj = Integer::New(nparsed);
// If there was a parse error in one of the callbacks // If there was a parse error in one of the callbacks
// TODO What if there is an error on EOF? // TODO What if there is an error on EOF?
if (nparsed != len) { if (!parser->parser_.upgrade && nparsed != len) {
Local<Value> e = Exception::Error(String::New("Parse Error")); Local<Value> e = Exception::Error(String::New("Parse Error"));
Local<Object> obj = e->ToObject(); Local<Object> obj = e->ToObject();
obj->Set(String::NewSymbol("bytesParsed"), nparsed_obj); obj->Set(String::NewSymbol("bytesParsed"), nparsed_obj);
@ -345,6 +348,7 @@ void InitHttpParser(Handle<Object> target) {
version_major_sym = NODE_PSYMBOL("versionMajor"); version_major_sym = NODE_PSYMBOL("versionMajor");
version_minor_sym = NODE_PSYMBOL("versionMinor"); version_minor_sym = NODE_PSYMBOL("versionMinor");
should_keep_alive_sym = NODE_PSYMBOL("shouldKeepAlive"); should_keep_alive_sym = NODE_PSYMBOL("shouldKeepAlive");
upgrade_sym = NODE_PSYMBOL("upgrade");
settings.on_message_begin = Parser::on_message_begin; settings.on_message_begin = Parser::on_message_begin;
settings.on_path = Parser::on_path; settings.on_path = Parser::on_path;