From 7c394b9d58acff8c62247a6e2714a4b76c421f2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomas=20Hellstr=C3=B6m?= Date: Wed, 2 Dec 2020 04:02:53 +0100 Subject: [PATCH] x.websocket: docs and cleanup (#7078) --- vlib/x/websocket/events.v | 55 ++++---- vlib/x/websocket/handshake.v | 22 ++-- vlib/x/websocket/io.v | 28 ++-- vlib/x/websocket/message.v | 91 ++++++------- vlib/x/websocket/uri.v | 12 +- vlib/x/websocket/utils.v | 8 +- vlib/x/websocket/websocket_client.v | 170 ++++++++++--------------- vlib/x/websocket/websocket_nix.c.v | 3 +- vlib/x/websocket/websocket_server.v | 59 +++++---- vlib/x/websocket/websocket_windows.c.v | 3 +- 10 files changed, 217 insertions(+), 234 deletions(-) diff --git a/vlib/x/websocket/events.v b/vlib/x/websocket/events.v index 0052d7b485..3994309962 100644 --- a/vlib/x/websocket/events.v +++ b/vlib/x/websocket/events.v @@ -1,27 +1,30 @@ module websocket -// All this plumbing will go awauy when we can do EventHandler properly +// Represents a callback on a new message struct MessageEventHandler { - handler SocketMessageFn - handler2 SocketMessageFn2 - is_ref bool - ref voidptr + handler SocketMessageFn // Callback function + handler2 SocketMessageFn2 // Callback function with reference + is_ref bool // Has a reference object + ref voidptr // The referenced object } +// Represents a callback on error struct ErrorEventHandler { - handler SocketErrorFn - handler2 SocketErrorFn2 - is_ref bool - ref voidptr + handler SocketErrorFn // Callback function + handler2 SocketErrorFn2 // Callback function with reference + is_ref bool // Has a reference object + ref voidptr // The referenced object } +// Represents a callback when connection is opened struct OpenEventHandler { - handler SocketOpenFn - handler2 SocketOpenFn2 - is_ref bool - ref voidptr + handler SocketOpenFn // Callback function + handler2 SocketOpenFn2 // Callback function with reference + is_ref bool // Has a reference object + ref voidptr // The referenced object } +// Represents a callback on a closing event struct CloseEventHandler { handler SocketCloseFn handler2 SocketCloseFn2 @@ -47,6 +50,7 @@ pub type SocketCloseFn = fn (mut c Client, code int, reason string) ? pub type SocketCloseFn2 = fn (mut c Client, code int, reason string, v voidptr) ? +// on_connect register callback when client connects to the server pub fn (mut s Server) on_connect(fun AcceptClientFn) ? { if s.accept_client_callbacks.len > 0 { return error('only one callback can be registered for accept client') @@ -54,16 +58,6 @@ pub fn (mut s Server) on_connect(fun AcceptClientFn) ? { s.accept_client_callbacks << fun } -fn (mut s Server) send_connect_event(mut c ServerClient) ?bool { - if s.accept_client_callbacks.len == 0 { - // If no callback all client will be accepted - return true - } - fun := s.accept_client_callbacks[0] - res := fun(mut c)? - return res -} - // on_message, register a callback on new messages pub fn (mut s Server) on_message(fun SocketMessageFn) { s.message_callbacks << MessageEventHandler{ @@ -160,6 +154,18 @@ pub fn (mut ws Client) on_close_ref(fun SocketCloseFn2, ref voidptr) { } } +// send_connect_event, invokes the on_connect callback +fn (mut s Server) send_connect_event(mut c ServerClient) ?bool { + if s.accept_client_callbacks.len == 0 { + // If no callback all client will be accepted + return true + } + fun := s.accept_client_callbacks[0] + res := fun(mut c) ? + return res +} + +// send_message_event invokes the on_message callback fn (mut ws Client) send_message_event(msg &Message) { ws.debug_log('sending on_message event') for ev_handler in ws.message_callbacks { @@ -171,6 +177,7 @@ fn (mut ws Client) send_message_event(msg &Message) { } } +// send_error_event invokes the on_error callback fn (mut ws Client) send_error_event(err string) { ws.debug_log('sending on_error event') for ev_handler in ws.error_callbacks { @@ -182,6 +189,7 @@ fn (mut ws Client) send_error_event(err string) { } } +// send_close_event invokes the on_close callback fn (mut ws Client) send_close_event(code int, reason string) { ws.debug_log('sending on_close event') for ev_handler in ws.close_callbacks { @@ -193,6 +201,7 @@ fn (mut ws Client) send_close_event(code int, reason string) { } } +// send_open_event invokes the on_open callback fn (mut ws Client) send_open_event() { ws.debug_log('sending on_open event') for ev_handler in ws.open_callbacks { diff --git a/vlib/x/websocket/handshake.v b/vlib/x/websocket/handshake.v index f3a7fe7077..f9ac605850 100644 --- a/vlib/x/websocket/handshake.v +++ b/vlib/x/websocket/handshake.v @@ -3,7 +3,7 @@ module websocket import encoding.base64 import strings -// handshake manage the handshake part of connecting +// handshake manage the websocket handshake process fn (mut ws Client) handshake() ? { nonce := get_nonce(ws.nonce_size) seckey := base64.encode(nonce) @@ -24,7 +24,6 @@ fn (mut ws Client) handshake() ? { sb.write('Sec-WebSocket-Key: ') sb.write(seckey) sb.write('\r\nSec-WebSocket-Version: 13\r\n\r\n') - // handshake := 'GET $ws.uri.resource$ws.uri.querystring HTTP/1.1\r\nHost: $ws.uri.hostname:$ws.uri.port\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: $seckey\r\nSec-WebSocket-Version: 13\r\n\r\n' handshake := sb.str() defer { handshake.free() @@ -36,7 +35,7 @@ fn (mut ws Client) handshake() ? { unsafe {handshake_bytes.free()} } -// handshake manage the handshake part of connecting +// handle_server_handshake manage websocket server handshake fn (mut s Server) handle_server_handshake(mut c Client) ?(string, &ServerClient) { msg := c.read_handshake_str() ? handshake_response, client := s.parse_client_handshake(msg, mut c) ? @@ -44,6 +43,7 @@ fn (mut s Server) handle_server_handshake(mut c Client) ?(string, &ServerClient) return handshake_response, client } +// parse_client_handshake parses handshake result fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client) ?(string, &ServerClient) { s.logger.debug('server-> client handshake:\n$client_handshake') lines := client_handshake.split_into_lines() @@ -52,12 +52,13 @@ fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client) return error_with_code('unexpected get operation, $get_tokens', 1) } if get_tokens[0].trim_space() != 'GET' { - return error_with_code("unexpected request '${get_tokens[0]}', expected 'GET'", 2) + return error_with_code("unexpected request '${get_tokens[0]}', expected 'GET'", + 2) } if get_tokens[2].trim_space() != 'HTTP/1.1' { - return error_with_code("unexpected request $get_tokens, expected 'HTTP/1.1'", 3) + return error_with_code("unexpected request $get_tokens, expected 'HTTP/1.1'", + 3) } - // path := get_tokens[1].trim_space() mut seckey := '' mut flags := []Flag{} mut key := '' @@ -106,6 +107,7 @@ fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client) return server_handshake, server_client } +// / read_handshake_str returns the handshake response fn (mut ws Client) read_handshake_str() ?string { mut total_bytes_read := 0 mut msg := [1024]byte{} @@ -133,12 +135,15 @@ fn (mut ws Client) read_handshake(seckey string) ? { unsafe {msg.free()} } +// check_handshake_response checks the response from handshake and returns +// the response and secure key fn (mut ws Client) check_handshake_response(handshake_response string, seckey string) ? { ws.debug_log('handshake response:\n$handshake_response') lines := handshake_response.split_into_lines() header := lines[0] if !header.starts_with('HTTP/1.1 101') && !header.starts_with('HTTP/1.0 101') { - return error_with_code('handshake_handler: invalid HTTP status response code, $header', 6) + return error_with_code('handshake_handler: invalid HTTP status response code, $header', + 6) } for i in 1 .. lines.len { if lines[i].len <= 0 || lines[i] == '\r\n' { @@ -157,7 +162,8 @@ fn (mut ws Client) check_handshake_response(handshake_response string, seckey st challenge := create_key_challenge_response(seckey) ? ws.debug_log('challenge: $challenge, response: ${keys[1]}') if keys[1].trim_space() != challenge { - return error_with_code('handshake_handler: Sec-WebSocket-Accept header does not match computed sha1/base64 response.', 7) + return error_with_code('handshake_handler: Sec-WebSocket-Accept header does not match computed sha1/base64 response.', + 7) } ws.flags << .has_accept unsafe {challenge.free()} diff --git a/vlib/x/websocket/io.v b/vlib/x/websocket/io.v index 456cf9b721..ca81099762 100644 --- a/vlib/x/websocket/io.v +++ b/vlib/x/websocket/io.v @@ -2,11 +2,7 @@ module websocket import net import time - -interface WebsocketIO { - socket_read(mut buffer []byte) ?int - socket_write(bytes []byte) ? -} +import sync // socket_read reads into the provided buffer with its length fn (mut ws Client) socket_read(mut buffer []byte) ?int { @@ -15,7 +11,7 @@ fn (mut ws Client) socket_read(mut buffer []byte) ?int { return error('socket_read: trying to read a closed socket') } if ws.is_ssl { - r := ws.ssl_conn.read_into(mut buffer)? + r := ws.ssl_conn.read_into(mut buffer) ? return r } else { for { @@ -31,14 +27,14 @@ fn (mut ws Client) socket_read(mut buffer []byte) ?int { } } +// socket_read reads into the provided byte pointer and length fn (mut ws Client) socket_read_ptr(buf_ptr byteptr, len int) ?int { lock { if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { return error('socket_read_ptr: trying to read a closed socket') - } - + } if ws.is_ssl { - r := ws.ssl_conn.socket_read_into_ptr(buf_ptr, len)? + r := ws.ssl_conn.socket_read_into_ptr(buf_ptr, len) ? return r } else { for { @@ -62,7 +58,7 @@ fn (mut ws Client) socket_write(bytes []byte) ? { return error('socket_write: trying to write on a closed socket') } if ws.is_ssl { - ws.ssl_conn.write(bytes)? + ws.ssl_conn.write(bytes) ? } else { for { ws.conn.write(bytes) or { @@ -77,26 +73,26 @@ fn (mut ws Client) socket_write(bytes []byte) ? { } } -// shutdown_socket, proper shutdown make PR in Emeliy repo +// shutdown_socket, shut down socket properly closing the connection fn (mut ws Client) shutdown_socket() ? { ws.debug_log('shutting down socket') if ws.is_ssl { - ws.ssl_conn.shutdown()? + ws.ssl_conn.shutdown() ? } else { - ws.conn.close()? + ws.conn.close() ? } return none } // dial_socket, setup socket communication, options and timeouts fn (mut ws Client) dial_socket() ?net.TcpConn { - mut t := net.dial_tcp('$ws.uri.hostname:$ws.uri.port')? + mut t := net.dial_tcp('$ws.uri.hostname:$ws.uri.port') ? optval := int(1) - t.sock.set_option_int(.keep_alive, optval)? + t.sock.set_option_int(.keep_alive, optval) ? t.set_read_timeout(10 * time.millisecond) t.set_write_timeout(10 * time.millisecond) if ws.is_ssl { - ws.ssl_conn.connect(mut t)? + ws.ssl_conn.connect(mut t) ? } return t } diff --git a/vlib/x/websocket/message.v b/vlib/x/websocket/message.v index e91bc0cc42..b3badffa6d 100644 --- a/vlib/x/websocket/message.v +++ b/vlib/x/websocket/message.v @@ -3,33 +3,37 @@ module websocket import encoding.utf8 const ( - header_len_offset = 2 - buffer_size = 256 - extended_payload16_end_byte = 4 - extended_payload64_end_byte = 10 + header_len_offset = 2 // Offset for lenghtpart of websocket header + buffer_size = 256 // Default buffer size + extended_payload16_end_byte = 4 // Offset for extended lenght 16 bit of websocket header + extended_payload64_end_byte = 10 // Offset for extended lenght 64 bit of websocket header ) +// A websocket data fragment struct Fragment { - data []byte - opcode OPCode + data []byte // The included data payload data in a fragment + opcode OPCode // Defines the interpretation of the payload data } +// Represents a data frame header struct Frame { mut: header_len int = 2 + // Lenght of the websocket header part frame_size int = 2 - fin bool - rsv1 bool - rsv2 bool - rsv3 bool - opcode OPCode - has_mask bool - payload_len int - masking_key [4]byte + // Size of the total frame + fin bool // Indicates if final fragment of message + rsv1 bool // Reserved for future use in websocket RFC + rsv2 bool // Reserved for future use in websocket RFC + rsv3 bool // Reserved for future use in websocket RFC + opcode OPCode // Defines the interpretation of the payload data + has_mask bool // Defines whether the payload data is masked. + payload_len int // Payload lenght + masking_key [4]byte // All frames from client to server is masked with this key } const ( - invalid_close_codes = [999, 1004, 1005, 1006, 1014, 1015, 1016, 1100, 2000, 2999, 5000, 65536] + invalid_close_codes = [999, 1004, 1005, 1006, 1014, 1015, 1016, 1100, 2000, 2999, 5000, 65536] // List of invalid close codes ) // validate_client, validate client frame rules from RFC6455 @@ -40,53 +44,52 @@ pub fn (mut ws Client) validate_frame(frame &Frame) ? { } if (int(frame.opcode) >= 3 && int(frame.opcode) <= 7) || (int(frame.opcode) >= 11 && int(frame.opcode) <= 15) { - ws.close(1002, 'use of reserved opcode')? + ws.close(1002, 'use of reserved opcode') ? return error('use of reserved opcode') } if frame.has_mask && !ws.is_server { // Server should never send masked frames // to client, close connection - ws.close(1002, 'client got masked frame')? + ws.close(1002, 'client got masked frame') ? return error('client sent masked frame') } if is_control_frame(frame.opcode) { if !frame.fin { - ws.close(1002, 'control message must not be fragmented')? + ws.close(1002, 'control message must not be fragmented') ? return error('unexpected control frame with no fin') } if frame.payload_len > 125 { - ws.close(1002, 'control frames must not exceed 125 bytes')? + ws.close(1002, 'control frames must not exceed 125 bytes') ? return error('unexpected control frame payload length') } } if frame.fin == false && ws.fragments.len == 0 && frame.opcode == .continuation { err_msg := 'unexecpected continuation, there are no frames to continue, $frame' - ws.close(1002, err_msg)? + ws.close(1002, err_msg) ? return error(err_msg) } } -[inline] +// is_control_frame returns true if the fram is a control frame fn is_control_frame(opcode OPCode) bool { return opcode !in [.text_frame, .binary_frame, .continuation] } -[inline] +// is_data_frame returns true if the fram is a control frame fn is_data_frame(opcode OPCode) bool { return opcode in [.text_frame, .binary_frame] } -// read_payload, reads the payload from socket +// read_payload, reads the payload from the socket fn (mut ws Client) read_payload(frame &Frame) ?[]byte { if frame.payload_len == 0 { return []byte{} } - // TODO: make a dynamic reusable memory pool here mut buffer := []byte{cap: frame.payload_len} mut read_buf := [1]byte{} mut bytes_read := 0 for bytes_read < frame.payload_len { - len := ws.socket_read_ptr(byteptr(&read_buf), 1)? + len := ws.socket_read_ptr(byteptr(&read_buf), 1) ? if len != 1 { return error('expected read all message, got zero') } @@ -104,8 +107,8 @@ fn (mut ws Client) read_payload(frame &Frame) ?[]byte { return buffer } -// validate_utf_8, validates payload for valid utf encoding -// todo: support fail fast utf errors for strict autobahn conformance +// validate_utf_8, validates payload for valid utf8 encoding +// - Future implementation needs to support fail fast utf errors for strict autobahn conformance fn (mut ws Client) validate_utf_8(opcode OPCode, payload []byte) ? { if opcode in [.text_frame, .close] && !utf8.validate(payload.data, payload.len) { ws.logger.error('malformed utf8 payload, payload len: ($payload.len)') @@ -118,11 +121,11 @@ fn (mut ws Client) validate_utf_8(opcode OPCode, payload []byte) ? { // read_next_message reads 1 to n frames to compose a message pub fn (mut ws Client) read_next_message() ?Message { for { - frame := ws.parse_frame_header()? + frame := ws.parse_frame_header() ? // This debug message leaks so remove if needed // ws.debug_log('read_next_message: frame\n$frame') - ws.validate_frame(&frame)? - frame_payload := ws.read_payload(&frame)? + ws.validate_frame(&frame) ? + frame_payload := ws.read_payload(&frame) ? if is_control_frame(frame.opcode) { // Control frames can interject other frames // and need to be returned immediately @@ -130,9 +133,7 @@ pub fn (mut ws Client) read_next_message() ?Message { opcode: OPCode(frame.opcode) payload: frame_payload.clone() } - unsafe { - frame_payload.free() - } + unsafe {frame_payload.free()} return msg } // If the message is fragmented we just put it on fragments @@ -142,9 +143,7 @@ pub fn (mut ws Client) read_next_message() ?Message { data: frame_payload.clone() opcode: frame.opcode } - unsafe { - frame_payload.free() - } + unsafe {frame_payload.free()} continue } if ws.fragments.len == 0 { @@ -157,21 +156,19 @@ pub fn (mut ws Client) read_next_message() ?Message { opcode: OPCode(frame.opcode) payload: frame_payload.clone() } - unsafe { - frame_payload.free() - } + unsafe {frame_payload.free()} return msg } defer { ws.fragments = [] } if is_data_frame(frame.opcode) { - ws.close(0, '')? + ws.close(0, '') ? return error('Unexpected frame opcode') } - payload := ws.payload_from_fragments(frame_payload)? + payload := ws.payload_from_fragments(frame_payload) ? opcode := ws.opcode_from_fragments() - ws.validate_utf_8(opcode, payload)? + ws.validate_utf_8(opcode, payload) ? msg := Message{ opcode: opcode payload: payload.clone() @@ -206,24 +203,20 @@ fn (ws Client) payload_from_fragments(fin_payload []byte) ?[]byte { return total_buffer } -// opcode_from_fragments, returns the opcode for message from the first fragment sent +// opcode_from_fragments returns the opcode for message from the first fragment sent fn (ws Client) opcode_from_fragments() OPCode { return OPCode(ws.fragments[0].opcode) } // parse_frame_header parses next message by decoding the incoming frames pub fn (mut ws Client) parse_frame_header() ?Frame { - // TODO: make a dynamic reusable memory pool here - // mut buffer := []byte{cap: buffer_size} mut buffer := [256]byte{} mut bytes_read := 0 mut frame := Frame{} mut rbuff := [1]byte{} mut mask_end_byte := 0 for ws.state == .open { - // Todo: different error scenarios to make sure we close correctly on error - // reader.read_into(mut rbuff) ? - read_bytes := ws.socket_read_ptr(byteptr(rbuff), 1)? + read_bytes := ws.socket_read_ptr(byteptr(rbuff), 1) ? if read_bytes == 0 { // This is probably a timeout or close continue @@ -268,7 +261,7 @@ pub fn (mut ws Client) parse_frame_header() ?Frame { } } if frame.payload_len == 127 && bytes_read == u64(extended_payload64_end_byte) { - frame.header_len += 8 // TODO Not sure... + frame.header_len += 8 frame.payload_len = 0 frame.payload_len |= buffer[2] << 56 frame.payload_len |= buffer[3] << 48 diff --git a/vlib/x/websocket/uri.v b/vlib/x/websocket/uri.v index 5a7cc746d0..992009395b 100644 --- a/vlib/x/websocket/uri.v +++ b/vlib/x/websocket/uri.v @@ -1,14 +1,16 @@ module websocket +// Represents an Uri for websocket connections struct Uri { mut: - url string - hostname string - port string - resource string - querystring string + url string // The url to the websocket endpoint + hostname string // The hostname to the websocket endpoint + port string // The port to the websocket endpoint + resource string // The resource used on the websocket endpoint + querystring string // The query string on the websocket endpoint } +// str returns the string representation of the Uri pub fn (u Uri) str() string { return u.url } diff --git a/vlib/x/websocket/utils.v b/vlib/x/websocket/utils.v index e134e2b4d3..439cce03dc 100644 --- a/vlib/x/websocket/utils.v +++ b/vlib/x/websocket/utils.v @@ -4,6 +4,7 @@ import rand import crypto.sha1 import encoding.base64 +// htonl64 converts payload lenght to header bits fn htonl64(payload_len u64) []byte { mut ret := []byte{len: 8} ret[0] = byte(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff) @@ -17,15 +18,15 @@ fn htonl64(payload_len u64) []byte { return ret } +// create_masking_key returs a new masking key byte array fn create_masking_key() []byte { mask_bit := byte(rand.intn(255)) buf := []byte{len: 4, init: `0`} - unsafe { - C.memcpy(buf.data, &mask_bit, 4) - } + unsafe {C.memcpy(buf.data, &mask_bit, 4)} return buf } +// create_key_challenge_response creates a key challange response from security key fn create_key_challenge_response(seckey string) ?string { if seckey.len == 0 { return error('unexpected seckey lengt zero') @@ -42,6 +43,7 @@ fn create_key_challenge_response(seckey string) ?string { return b64 } +// get_nonce, returns a randomized array used in handshake process fn get_nonce(nonce_size int) string { mut nonce := []byte{len: nonce_size, cap: nonce_size} alphanum := '0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz' diff --git a/vlib/x/websocket/websocket_client.v b/vlib/x/websocket/websocket_client.v index f06147f5bc..e6c26363e7 100644 --- a/vlib/x/websocket/websocket_client.v +++ b/vlib/x/websocket/websocket_client.v @@ -23,27 +23,29 @@ const ( pub struct Client { is_server bool mut: - ssl_conn &openssl.SSLConn - flags []Flag - fragments []Fragment - message_callbacks []MessageEventHandler - error_callbacks []ErrorEventHandler - open_callbacks []OpenEventHandler - close_callbacks []CloseEventHandler + ssl_conn &openssl.SSLConn // Secure connection used when wss is used + flags []Flag // Flags + fragments []Fragment // Current fragments + message_callbacks []MessageEventHandler // All callbacks on_message + error_callbacks []ErrorEventHandler // All callbacks on_error + open_callbacks []OpenEventHandler // All callbacks on_open + close_callbacks []CloseEventHandler // All callbacks on_close pub: - is_ssl bool - uri Uri - id string + is_ssl bool // True if secure socket is used + uri Uri // Uri of current connection + id string // Unique id of client pub mut: - conn net.TcpConn - nonce_size int = 16 // you can try 18 too - panic_on_callback bool - state State - logger &log.Log - resource_name string - last_pong_ut u64 + conn net.TcpConn // Underlying TCP connection + nonce_size int = 16 + // you can try 18 too + panic_on_callback bool // Set to true of callbacks can panic + state State // Current state of connection + logger &log.Log // Logger used to log messages + resource_name string // Name of current resource + last_pong_ut u64 // Last time in unix time we got a pong message } +// Flag of websocket handshake enum Flag { has_accept has_connection @@ -78,7 +80,7 @@ pub enum OPCode { // new_client, instance a new websocket client pub fn new_client(address string) ?&Client { - uri := parse_uri(address)? + uri := parse_uri(address) ? return &Client{ is_server: false ssl_conn: openssl.new_ssl_conn() @@ -94,14 +96,14 @@ pub fn new_client(address string) ?&Client { // connect, connects and do handshake procedure with remote server pub fn (mut ws Client) connect() ? { - ws.assert_not_connected()? + ws.assert_not_connected() ? ws.set_state(.connecting) ws.logger.info('connecting to host $ws.uri') - ws.conn = ws.dial_socket()? + ws.conn = ws.dial_socket() ? // Todo: make setting configurable ws.conn.set_read_timeout(time.second * 30) ws.conn.set_write_timeout(time.second * 30) - ws.handshake()? + ws.handshake() ? ws.set_state(.open) ws.logger.info('successfully connected to host $ws.uri') ws.send_open_event() @@ -126,23 +128,19 @@ pub fn (mut ws Client) listen() ? { return error(err) } if ws.state in [.closed, .closing] { - return + return } ws.debug_log('got message: $msg.opcode') // , payload: $msg.payload') leaks match msg.opcode { .text_frame { ws.debug_log('read: text') ws.send_message_event(msg) - unsafe { - msg.free() - } + unsafe {msg.free()} } .binary_frame { ws.debug_log('read: binary') ws.send_message_event(msg) - unsafe { - msg.free() - } + unsafe {msg.free()} } .ping { ws.debug_log('read: ping, sending pong') @@ -155,9 +153,7 @@ pub fn (mut ws Client) listen() ? { continue } if msg.payload.len > 0 { - unsafe { - msg.free() - } + unsafe {msg.free()} } } .pong { @@ -165,9 +161,7 @@ pub fn (mut ws Client) listen() ? { ws.last_pong_ut = time.now().unix ws.send_message_event(msg) if msg.payload.len > 0 { - unsafe { - msg.free() - } + unsafe {msg.free()} } } .close { @@ -177,50 +171,46 @@ pub fn (mut ws Client) listen() ? { } if msg.payload.len > 0 { if msg.payload.len == 1 { - ws.close(1002, 'close payload cannot be 1 byte')? + ws.close(1002, 'close payload cannot be 1 byte') ? return error('close payload cannot be 1 byte') } code := (int(msg.payload[0]) << 8) + int(msg.payload[1]) if code in invalid_close_codes { - ws.close(1002, 'invalid close code: $code')? + ws.close(1002, 'invalid close code: $code') ? return error('invalid close code: $code') } reason := if msg.payload.len > 2 { msg.payload[2..] } else { []byte{} } if reason.len > 0 { - ws.validate_utf_8(.close, reason)? + ws.validate_utf_8(.close, reason) ? } if ws.state !in [.closing, .closed] { // sending close back according to spec ws.debug_log('close with reason, code: $code, reason: $reason') r := reason.bytestr() - ws.close(code, r)? - } - unsafe { - msg.free() + ws.close(code, r) ? } + unsafe {msg.free()} } else { if ws.state !in [.closing, .closed] { ws.debug_log('close with reason, no code') // sending close back according to spec - ws.close(1000, 'normal')? - } - unsafe { - msg.free() + ws.close(1000, 'normal') ? } + unsafe {msg.free()} } return } .continuation { ws.logger.error('unexpected opcode continuation, nothing to continue') ws.send_error_event('unexpected opcode continuation, nothing to continue') - ws.close(1002, 'nothing to continue')? + ws.close(1002, 'nothing to continue') ? return error('unexpected opcode continuation, nothing to continue') } } } } -// this function was needed for defer +// manage_clean_close closes connection in a clean way fn (mut ws Client) manage_clean_close() { ws.send_close_event(1000, 'closed by client') } @@ -228,13 +218,13 @@ fn (mut ws Client) manage_clean_close() { // ping, sends ping message to server, // ping response will be pushed to message callback pub fn (mut ws Client) ping() ? { - ws.send_control_frame(.ping, 'PING', [])? + ws.send_control_frame(.ping, 'PING', []) ? } -// pong, sends pog message to server, +// pong, sends pong message to server, // pongs are normally automatically sent back to server pub fn (mut ws Client) pong() ? { - ws.send_control_frame(.pong, 'PONG', [])? + ws.send_control_frame(.pong, 'PONG', []) ? } // write_ptr, writes len bytes provided a byteptr with a websocket messagetype @@ -265,18 +255,11 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ? } else if payload_len > 125 && payload_len <= 0xffff { len16 := C.htons(payload_len) header[1] = 126 - // 0x80 - // todo: fix v style copy instead - unsafe { - C.memcpy(&header[2], &len16, 2) - } + unsafe {C.memcpy(&header[2], &len16, 2)} } else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff { len_bytes := htonl64(u64(payload_len)) header[1] = 127 // 0x80 - // todo: fix v style copy instead - unsafe { - C.memcpy(&header[2], len_bytes.data, 8) - } + unsafe {C.memcpy(&header[2], len_bytes.data, 8)} } } else { if payload_len <= 125 { @@ -288,10 +271,7 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ? } else if payload_len > 125 && payload_len <= 0xffff { len16 := C.htons(payload_len) header[1] = (126 | 0x80) - // todo: fix v style copy instead - unsafe { - C.memcpy(&header[2], &len16, 2) - } + unsafe {C.memcpy(&header[2], &len16, 2)} header[4] = masking_key[0] header[5] = masking_key[1] header[6] = masking_key[2] @@ -299,17 +279,13 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ? } else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff { // 65535 && 18446744073709551615 len64 := htonl64(u64(payload_len)) header[1] = (127 | 0x80) - // todo: fix v style copy instead - unsafe { - C.memcpy(&header[2], len64.data, 8) - } + unsafe {C.memcpy(&header[2], len64.data, 8)} header[10] = masking_key[0] header[11] = masking_key[1] header[12] = masking_key[2] header[13] = masking_key[3] } else { - // l.c('write: frame too large') - ws.close(1009, 'frame too large')? + ws.close(1009, 'frame too large') ? return error('frame too large') } } @@ -326,7 +302,7 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ? frame_buf[header_len + i] ^= masking_key[i % 4] & 0xff } } - ws.socket_write(frame_buf)? + ws.socket_write(frame_buf) ? // Temporary hack until memory management is done unsafe { frame_buf.free() @@ -337,9 +313,10 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ? // write, writes a byte array with a websocket messagetype pub fn (mut ws Client) write(bytes []byte, code OPCode) ? { - ws.write_ptr(byteptr(bytes.data), bytes.len, code)? + ws.write_ptr(byteptr(bytes.data), bytes.len, code) ? } +// write_str, writes a string with a websocket texttype pub fn (mut ws Client) write_str(str string) ? { ws.write_ptr(str.str, str.len, .text_frame) } @@ -362,20 +339,18 @@ pub fn (mut ws Client) close(code int, message string) ? { if code > 0 { code_ := C.htons(code) message_len := message.len + 2 - mut close_frame := []byte{len: message_len} // [`0`].repeat(message_len) + mut close_frame := []byte{len: message_len} close_frame[0] = byte(code_ & 0xFF) close_frame[1] = byte(code_ >> 8) code32 = (close_frame[0] << 8) + close_frame[1] for i in 0 .. message.len { close_frame[i + 2] = message[i] } - ws.send_control_frame(.close, 'CLOSE', close_frame)? + ws.send_control_frame(.close, 'CLOSE', close_frame) ? ws.send_close_event(code, message) - unsafe { - close_frame.free() - } + unsafe {close_frame.free()} } else { - ws.send_control_frame(.close, 'CLOSE', [])? + ws.send_control_frame(.close, 'CLOSE', []) ? ws.send_close_event(code, '') } ws.fragments = [] @@ -413,20 +388,14 @@ fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []b if payload.len >= 2 { if !ws.is_server { mut parsed_payload := []byte{len: payload.len + 1} - unsafe { - C.memcpy(parsed_payload.data, &payload[0], payload.len) - } + unsafe {C.memcpy(parsed_payload.data, &payload[0], payload.len)} parsed_payload[payload.len] = `\0` for i in 0 .. payload.len { control_frame[6 + i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff } - unsafe { - parsed_payload.free() - } + unsafe {parsed_payload.free()} } else { - unsafe { - C.memcpy(&control_frame[2], &payload[0], payload.len) - } + unsafe {C.memcpy(&control_frame[2], &payload[0], payload.len)} } } } else { @@ -438,9 +407,7 @@ fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []b } } else { if payload.len > 0 { - unsafe { - C.memcpy(&control_frame[2], &payload[0], payload.len) - } + unsafe {C.memcpy(&control_frame[2], &payload[0], payload.len)} } } } @@ -451,17 +418,17 @@ fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []b // parse_uri, parses the url string to it's components fn parse_uri(url string) ?&Uri { - u := urllib.parse(url)? + u := urllib.parse(url) ? v := u.request_uri().split('?') mut port := u.port() if port == '' { port = if u.str().starts_with('ws://') { - '80' - } else if u.str().starts_with('wss://') { - '443' - } else { - u.port() - } + '80' + } else if u.str().starts_with('wss://') { + '443' + } else { + u.port() + } } querystring := if v.len > 1 { '?' + v[1] } else { '' } return &Uri{ @@ -480,7 +447,7 @@ fn (mut ws Client) set_state(state State) { } } -[inline] +// assert_not_connected returns error if the connection is not connected fn (ws Client) assert_not_connected() ? { match ws.state { .connecting { return error('connect: websocket is connecting') } @@ -500,6 +467,7 @@ fn (mut ws Client) reset_state() { } } +// debug_log makes debug logging output different depending if client or server fn (mut ws Client) debug_log(text string) { if ws.is_server { ws.logger.debug('server-> $text') @@ -508,14 +476,12 @@ fn (mut ws Client) debug_log(text string) { } } -[unsafe] +// free, manual free memory of Message struct pub fn (m &Message) free() { - unsafe { - m.payload.free() - } + unsafe {m.payload.free()} } -[unsafe] +// free, manual free memory of Client struct pub fn (c &Client) free() { unsafe { c.flags.free() diff --git a/vlib/x/websocket/websocket_nix.c.v b/vlib/x/websocket/websocket_nix.c.v index e030375e66..f986b98f0b 100644 --- a/vlib/x/websocket/websocket_nix.c.v +++ b/vlib/x/websocket/websocket_nix.c.v @@ -1,9 +1,10 @@ module websocket +// error_code returns the error code fn error_code() int { return C.errno } const ( - error_ewouldblock = C.EWOULDBLOCK + error_ewouldblock = C.EWOULDBLOCK // blocking error code ) diff --git a/vlib/x/websocket/websocket_server.v b/vlib/x/websocket/websocket_server.v index 70f147dc11..5c11cf54e3 100644 --- a/vlib/x/websocket/websocket_server.v +++ b/vlib/x/websocket/websocket_server.v @@ -8,31 +8,35 @@ import sync import time import rand +// Server holds state of websocket server connection pub struct Server { mut: - clients map[string]&ServerClient - logger &log.Log - ls net.TcpListener - accept_client_callbacks []AcceptClientFn - message_callbacks []MessageEventHandler - close_callbacks []CloseEventHandler + clients map[string]&ServerClient // Clients connected to this server + logger &log.Log // Logger used to log + ls net.TcpListener // TCpLister used to get incoming connection to socket + accept_client_callbacks []AcceptClientFn // Accept client callback functions + message_callbacks []MessageEventHandler // New message callback functions + close_callbacks []CloseEventHandler // Close message callback functions pub: - port int - is_ssl bool + port int // Port used as listen to incoming connections + is_ssl bool // True if secure connection (not supported yet on server) pub mut: - ping_interval int = 30 // in seconds - state State + ping_interval int = 30 + // Interval for automatic sending ping to connected clients in seconds + state State // Current state of connection } +// ServerClient has state of connected clients struct ServerClient { pub: - resource_name string - client_key string + resource_name string // The resource that the client access + client_key string // Unique key of client pub mut: - server &Server - client &Client + server &Server // The server instance + client &Client // The client instance } +// new_server instance new websocket server on port and route pub fn new_server(port int, route string) &Server { return &Server{ port: port @@ -43,13 +47,15 @@ pub fn new_server(port int, route string) &Server { } } +// set_ping_interval sets the interval that the server will send ping messages to clients pub fn (mut s Server) set_ping_interval(seconds int) { s.ping_interval = seconds } +// listen, start listen to incoming connections pub fn (mut s Server) listen() ? { s.logger.info('websocket server: start listen on port $s.port') - s.ls = net.listen_tcp(s.port)? + s.ls = net.listen_tcp(s.port) ? s.set_state(.open) go s.handle_ping() for { @@ -61,10 +67,11 @@ pub fn (mut s Server) listen() ? { s.logger.info('websocket server: end listen on port $s.port') } +// Close server (not implemented) fn (mut s Server) close() { } -// Todo: make thread safe +// handle_ping sends ping to all clients every set interval fn (mut s Server) handle_ping() { mut clients_to_remove := []string{} for s.state == .open { @@ -74,8 +81,7 @@ fn (mut s Server) handle_ping() { if c.client.state == .open { c.client.ping() or { s.logger.debug('server-> error sending ping to client') - // todo fix better close message, search the standard - c.client.close(1002, 'Clossing connection: ping send error') or { + c.client.close(1002, 'Closing connection: ping send error') or { // we want to continue even if error continue } @@ -99,20 +105,21 @@ fn (mut s Server) handle_ping() { } } +// serve_client accepts incoming connection and setup the websocket handshake fn (mut s Server) serve_client(mut c Client) ? { c.logger.debug('server-> Start serve client ($c.id)') defer { c.logger.debug('server-> End serve client ($c.id)') } - mut handshake_response, mut server_client := s.handle_server_handshake(mut c)? - accept := s.send_connect_event(mut server_client)? + mut handshake_response, mut server_client := s.handle_server_handshake(mut c) ? + accept := s.send_connect_event(mut server_client) ? if !accept { s.logger.debug('server-> client not accepted') - c.shutdown_socket()? + c.shutdown_socket() ? return } // The client is accepted - c.socket_write(handshake_response.bytes())? + c.socket_write(handshake_response.bytes()) ? lock { s.clients[server_client.client.id] = server_client } @@ -123,6 +130,7 @@ fn (mut s Server) serve_client(mut c Client) ? { } } +// setup_callbacks initialize all callback functions fn (mut s Server) setup_callbacks(mut sc ServerClient) { if s.message_callbacks.len > 0 { for cb in s.message_callbacks { @@ -151,8 +159,9 @@ fn (mut s Server) setup_callbacks(mut sc ServerClient) { }, sc) } +// accept_new_client creates a new client instance for client connects to socket fn (mut s Server) accept_new_client() ?&Client { - mut new_conn := s.ls.accept()? + mut new_conn := s.ls.accept() ? c := &Client{ is_server: true conn: new_conn @@ -166,18 +175,16 @@ fn (mut s Server) accept_new_client() ?&Client { } // set_state sets current state in a thread safe way -[inline] fn (mut s Server) set_state(state State) { lock { s.state = state } } +// free, manual free memory of Server instance pub fn (mut s Server) free() { unsafe { s.clients.free() - // s.logger.free() - // s.ls.free() s.accept_client_callbacks.free() s.message_callbacks.free() s.close_callbacks.free() diff --git a/vlib/x/websocket/websocket_windows.c.v b/vlib/x/websocket/websocket_windows.c.v index e630ac25c7..e9f4fc3d73 100644 --- a/vlib/x/websocket/websocket_windows.c.v +++ b/vlib/x/websocket/websocket_windows.c.v @@ -2,10 +2,11 @@ module websocket import net +// error_code returns the error code fn error_code() int { return C.WSAGetLastError() } const ( - error_ewouldblock = net.WsaError.wsaewouldblock + error_ewouldblock = net.WsaError.wsaewouldblock // blocking error code )