performance - How to increase Socket's transfer speed in c# -
i developing program using c# thrift, , need server iis can faster send data between client , server. done, found problem: when server send/receive data client. slower iis.
my test case both program , iis return request client blank page(content length zero), testing , compare concurrency level between them.
what found that: program's transfer speed @ can 600kb-1.1mb/s, however, iis can 1.1mb-1.8mb/s
my question is: socket in c# intend slower native socket, or can speed socket's transfer speed up?
main code listed below:
/// <summary> /// http service context /// </summary> public class httpservercontext : servercontext { private const int buffer_size = 16384; private static readonly byte[] header_cache_control = frameworkconfig.encoding.getbytes("cache-control:"); private static readonly byte[] header_connection_close = frameworkconfig.encoding.getbytes("connection:close\r\n"); private static readonly byte[] header_connection_keep_alive = frameworkconfig.encoding.getbytes("connection:keep-alive\r\n"); private static readonly byte[] header_content_encoding = frameworkconfig.encoding.getbytes("content-encoding:"); private static readonly byte[] header_content_encoding_deflate = frameworkconfig.encoding.getbytes("content-encoding:deflate\r\n"); private static readonly byte[] header_content_type_html = frameworkconfig.encoding.getbytes("content-type:text/html\r\n"); private static readonly byte[] header_content_encoding_length = frameworkconfig.encoding.getbytes("content-length:"); private static readonly byte[] header_content_encoding_length_0 = frameworkconfig.encoding.getbytes("content-length:0\r\n"); private static readonly byte[] header_last_modified = frameworkconfig.encoding.getbytes("last-modified:"); private static readonly byte[] header_expires = frameworkconfig.encoding.getbytes("expires:"); private static readonly byte[] header_etag = frameworkconfig.encoding.getbytes("etag:"); private static readonly byte[] status_ok = frameworkconfig.encoding.getbytes("http/1.1 200 ok\r\n"); private static readonly byte[] status_error = frameworkconfig.encoding.getbytes("http/1.1 500 internal service error\r\n"); private static readonly byte[] status_not_modified = frameworkconfig.encoding.getbytes("http/1.1 304 not modified\r\n"); private static readonly byte[] header_server = frameworkconfig.encoding.getbytes("server:aria\r\n"); private static readonly byte[] header_end = frameworkconfig.encoding.getbytes("\r\n"); private readonly socketasynceventargs m_args; private readonly httpserver m_server; private readonly byte[] m_buffer; private string m_ifmodifiedsince; private int m_length; private int m_offset; private int m_timeout; private readonly linkedlist<byte[]> m_output = new linkedlist<byte[]>(); private string m_querystring; private string m_requestetag; private bool m_shutdown; private socket m_socket; private string m_ticket; private string m_url; private static string m_sample; internal httpservercontext(httpserver server) : base(server) { this.m_server = server; this.m_args = new socketasynceventargs(); m_buffer = new byte[buffer_size]; this.m_args.setbuffer(m_buffer, 0, buffer_size); this.m_args.completed += args_completed; } private void args_completed(object sender, socketasynceventargs e) { if (e.socketerror != socketerror.success) { m_server.reporterror(e.socketerror); this.close(); return; } if (m_socket == null) { return; } switch (e.lastoperation) { case socketasyncoperation.receive: this.processreceive(); break; case socketasyncoperation.send: processsend(); break; } } private void processsend() { int rest; loopstart: int offset = 0; while (m_output.count > 0) { linkedlistnode<byte[]> node = m_output.first; byte[] buffer = node.value; chunk_start: rest = buffer.length - m_offset; if (rest > buffer_size) { buffer.blockcopy(buffer, m_offset, m_buffer, 0, buffer_size); m_offset += buffer_size; log.debug.write("send chunk:" + m_socket.remoteendpoint + "\r\n" + frameworkconfig.encoding.getstring(m_buffer, 0, buffer_size)); if (!m_socket.sendasync(this.m_args)) { goto chunk_start; } } else if (rest == buffer_size) { buffer.blockcopy(buffer, m_offset, m_buffer, 0, buffer_size); m_offset = 0; m_output.remove(node); log.debug.write("send final chunk of block:" + m_socket.remoteendpoint + "\r\n" + frameworkconfig.encoding.getstring(m_buffer, 0, buffer_size)); if (!m_socket.sendasync(this.m_args)) { goto loopstart; } return; } else { buffer.blockcopy(buffer, m_offset, m_buffer, offset, rest); offset += rest; m_offset = 0; m_output.remove(node); } } if (offset > 0) { log.debug.write("send final data of request:" + m_socket.remoteendpoint + "\r\n" + frameworkconfig.encoding.getstring(m_buffer, 0, offset)); m_args.setbuffer(0, offset); if (m_socket.sendasync(this.m_args)) { return; } log.debug.write("done sync:" + m_socket.remoteendpoint + "," + m_args.bytestransferred); } m_args.setbuffer(0, buffer_size); if (m_socket == null) { m_server.reporterror("socket closed"); this.close(); } else if (m_shutdown) { m_server.reporterror("shutdown"); this.close(); } else { log.debug.write("next request/response workflow:" + m_socket.remoteendpoint); this.nextworkflow(); } } private void processreceive() { int bytes = m_args.bytestransferred; if (bytes == 0) { m_timeout++; if (m_timeout < 100) { if (!m_socket.receiveasync(this.m_args)) { args_completed(null, m_args); } return; } m_server.reporterror("zero data"); this.close(); return; } m_timeout = 0; if (bytes == buffer_size) { m_server.reporterror("request large"); this.close(); return; } if (m_sample == null) { m_sample = frameworkconfig.encoding.getstring(m_buffer, 0, bytes); console.writeline(m_sample); } int offset = 0; int cursor = 0; int count; string header = null; while (cursor < bytes) { byte c = m_buffer[cursor]; switch (c) { case 0x0d: //\r count = cursor - offset; if (count == 0) { goto end; } if (!processrequest(offset, count, ref header)) { m_server.reporterror("parse request error"); this.close(); return; } cursor++; moveforwardif(ref cursor, '\n', bytes); offset = cursor; continue; case 0x0a: //\n count = cursor - offset; if (count == 0) { goto end; } if (!processrequest(offset, count, ref header)) { m_server.reporterror("parse request error"); this.close(); return; } offset = cursor + 1; break; case 0x3a: if (header == null) { header = frameworkconfig.encoding.getstring(m_buffer, offset, cursor - offset); cursor++; moveforwardif(ref cursor, ' ', bytes); offset = cursor; continue; } break; } cursor++; } end: if (m_url != null) { if (m_socket != null) { this.processexecute(); } } else { m_server.reporterror("parse url error"); this.close(); } } private void moveforwardif(ref int cursor, char ch, int len) { while (cursor < len) { if (m_buffer[cursor] == ch) { cursor++; } else { break; } } } private bool processrequest(int offset, int count, ref string header) { try { if (m_url == null) { #region 分析querystring 和 url if (count < 6) { return false; } int index1 = offset + 4; if (m_buffer[index1] != '/') { return false; //post not supported yet } int index2 = -1; int index3 = -1; (int = offset + count - 1; != offset; i--) { byte c = m_buffer[i]; if (c == ' ' && index3 != -1) { index3 = i; } if (c == '?') { index2 = i; break; } } if (index2 == -1) { m_querystring = string.empty; if (index3 == -1) { m_url = string.empty; } else { m_url = frameworkconfig.encoding.getstring(m_buffer, index1 + 1, index3 - index1 - 1); } } else { m_url = frameworkconfig.encoding.getstring(m_buffer, index1 + 1, index2 - index1 - 1); m_querystring = frameworkconfig.encoding.getstring(m_buffer, index2 + 1, index3 - index2 - 1); } #endregion return true; } else if (header == null) { return false; } switch (header.tolower()) { case "if-match": case "if-none-match": m_requestetag = frameworkconfig.encoding.getstring(m_buffer, offset, count); break; case "if-unmodified-since": case "if-modified-since": m_ifmodifiedsince = frameworkconfig.encoding.getstring(m_buffer, offset, count); break; case "connection": string keepalive = frameworkconfig.encoding.getstring(m_buffer, offset, count); m_server.reporterror("keepalive:" + keepalive); break; case "ticket": m_ticket = frameworkconfig.encoding.getstring(m_buffer, offset, count); break; } header = null; return true; } catch (exception e) { log.critical.writeerror(e); return false; } } private void processexecute() { try { m_output.addlast(status_ok); m_output.addlast(header_content_type_html); m_output.addlast(header_connection_keep_alive); m_output.addlast(header_content_encoding_deflate); m_output.addlast(header_content_encoding_length_0); m_output.addlast(header_server); m_output.addlast(header_end); //todo execute real service.... } catch (exception e) { log.critical.writeerror(e); m_server.reporterror("execute error"); m_output.clear(); m_length = 0; m_output.addlast(status_error); m_output.addlast(header_content_type_html); m_output.addlast(header_connection_close); m_output.addlast(header_content_encoding_deflate); m_output.addlast(header_content_encoding_length_0); m_output.addlast(header_server); m_output.addlast(header_end); m_shutdown = true; } this.processsend(); } internal void start(socket socket)//here main workflow portal, server accept socket , send method process request { this.m_socket = socket; this.m_args.acceptsocket = socket; m_shutdown = false; this.nextworkflow(); } private void nextworkflow() { m_url = null; m_ticket = null; m_ifmodifiedsince = null; m_querystring = null; m_requestetag = null; m_output.clear(); log.debug.write("receive next request:" + m_socket.remoteendpoint); this.m_output.clear(); if (!m_socket.receiveasync(this.m_args)) { args_completed(null, m_args); } } }
Comments
Post a Comment