diff --git a/.github/workflows/websockets.yml b/.github/workflows/websockets.yml index e807039550..410c1327b3 100644 --- a/.github/workflows/websockets.yml +++ b/.github/workflows/websockets.yml @@ -27,25 +27,25 @@ jobs: - name: v doctor run: ./v doctor - name: Run websockets tests - run: ./v -g test vlib/x/websocket/ + run: ./v -g test vlib/net/websocket/ ## Autobahn integrations tests - name: Run autobahn services - run: docker-compose -f ${{github.workspace}}/vlib/x/websocket/tests/autobahn/docker-compose.yml up -d + run: docker-compose -f ${{github.workspace}}/vlib/net/websocket/tests/autobahn/docker-compose.yml up -d - name: Wait for the service to start run: sleep 10s - name: Build client test - run: docker exec autobahn_client "/src/v" "/src/vlib/x/websocket/tests/autobahn/autobahn_client.v" + run: docker exec autobahn_client "/src/v" "/src/vlib/net/websocket/tests/autobahn/autobahn_client.v" - name: Run client test - run: docker exec autobahn_client "/src/vlib/x/websocket/tests/autobahn/autobahn_client" + run: docker exec autobahn_client "/src/vlib/net/websocket/tests/autobahn/autobahn_client" - name: Build client wss test - run: docker exec autobahn_client "/src/v" "/src/vlib/x/websocket/tests/autobahn/autobahn_client_wss.v" + run: docker exec autobahn_client "/src/v" "/src/vlib/net/websocket/tests/autobahn/autobahn_client_wss.v" - name: Run client wss test - run: docker exec autobahn_client "/src/vlib/x/websocket/tests/autobahn/autobahn_client_wss" + run: docker exec autobahn_client "/src/vlib/net/websocket/tests/autobahn/autobahn_client_wss" - name: Run server test run: docker exec autobahn_server "wstest" "-m" "fuzzingclient" "-s" "/config/fuzzingclient.json" diff --git a/cmd/tools/vtest-self.v b/cmd/tools/vtest-self.v index 112d0ae9c3..3e2fb8c585 100644 --- a/cmd/tools/vtest-self.v +++ b/cmd/tools/vtest-self.v @@ -44,11 +44,11 @@ const ( 'vlib/vweb/request_test.v', 'vlib/net/http/request_test.v', 'vlib/vweb/route_test.v', - 'vlib/x/websocket/websocket_test.v', + 'vlib/net/websocket/websocket_test.v', 'vlib/crypto/rand/crypto_rand_read_test.v', ] skip_with_fsanitize_address = [ - 'vlib/x/websocket/websocket_test.v', + 'vlib/net/websocket/websocket_test.v', ] skip_with_fsanitize_undefined = [ 'do_not_remove', @@ -83,7 +83,7 @@ const ( 'vlib/vweb/request_test.v', 'vlib/net/http/request_test.v', 'vlib/vweb/route_test.v', - 'vlib/x/websocket/websocket_test.v', + 'vlib/net/websocket/websocket_test.v', 'vlib/net/http/http_httpbin_test.v', 'vlib/net/http/header_test.v', ] @@ -98,7 +98,7 @@ const ( 'vlib/v/tests/orm_sub_struct_test.v', 'vlib/net/websocket/ws_test.v', 'vlib/net/unix/unix_test.v', - 'vlib/x/websocket/websocket_test.v', + 'vlib/net/websocket/websocket_test.v', 'vlib/vweb/tests/vweb_test.v', 'vlib/vweb/request_test.v', 'vlib/net/http/request_test.v', diff --git a/examples/websocket/client-server/client.v b/examples/websocket/client-server/client.v index c6d475906c..e72e53a16f 100644 --- a/examples/websocket/client-server/client.v +++ b/examples/websocket/client-server/client.v @@ -1,7 +1,7 @@ module main import os -import x.websocket +import net.websocket import term // This client should be compiled an run in different konsoles diff --git a/examples/websocket/client-server/server.v b/examples/websocket/client-server/server.v index 200dc662e3..db419131db 100644 --- a/examples/websocket/client-server/server.v +++ b/examples/websocket/client-server/server.v @@ -1,6 +1,6 @@ module main -import x.websocket +import net.websocket import term // this server accepts client connections and broadcast all messages to other connected clients diff --git a/examples/websocket/ping.v b/examples/websocket/ping.v index 0dcdf01bef..3599ec37c3 100644 --- a/examples/websocket/ping.v +++ b/examples/websocket/ping.v @@ -2,7 +2,7 @@ module main import time import os -import x.websocket +import net.websocket fn main() { println('press enter to quit...\n') @@ -12,6 +12,8 @@ fn main() { os.get_line() } +// start_server starts the websocket server, it receives messages +// and send it back to the client that sent it fn start_server() ? { mut s := websocket.new_server(.ip6, 30000, '') // Make that in execution test time give time to execute at least one time @@ -36,6 +38,8 @@ fn start_server() ? { } } +// start_client starts the websocket client, it writes a message to +// the server and prints all the messages received fn start_client() ? { mut ws := websocket.new_client('ws://localhost:30000') ? // mut ws := websocket.new_client('wss://echo.websocket.org:443')? diff --git a/vlib/x/openssl/openssl.v b/vlib/net/openssl/ssl_connection.v similarity index 91% rename from vlib/x/openssl/openssl.v rename to vlib/net/openssl/ssl_connection.v index 2bab696143..58f47f6187 100644 --- a/vlib/x/openssl/openssl.v +++ b/vlib/net/openssl/ssl_connection.v @@ -1,9 +1,33 @@ module openssl import net -import net.openssl as nssl import time +// SSLConn is the current connection +pub struct SSLConn { +mut: + sslctx &C.SSL_CTX + ssl &C.SSL + handle int + duration time.Duration +} + +// new_ssl_conn instance an new SSLCon struct +pub fn new_ssl_conn() &SSLConn { + return &SSLConn{ + sslctx: 0 + ssl: 0 + handle: 0 + } +} + +// Select operation +enum Select { + read + write + except +} + // shutdown closes the ssl connection and do clean up pub fn (mut s SSLConn) shutdown() ? { if s.ssl != 0 { @@ -11,7 +35,7 @@ pub fn (mut s SSLConn) shutdown() ? { for { res = C.SSL_shutdown(voidptr(s.ssl)) if res < 0 { - err_res := nssl.ssl_error(res, s.ssl) or { + err_res := ssl_error(res, s.ssl) or { break // We break to free rest of resources } if err_res == .ssl_error_want_read { @@ -99,7 +123,7 @@ pub fn (mut s SSLConn) connect(mut tcp_conn net.TcpConn, hostname string) ? { for { res = C.SSL_connect(voidptr(s.ssl)) if res != 1 { - err_res := nssl.ssl_error(res, s.ssl) ? + err_res := ssl_error(res, s.ssl) ? if err_res == .ssl_error_want_read { for { ready := @select(s.handle, .read, s.duration) ? @@ -128,7 +152,7 @@ pub fn (mut s SSLConn) socket_read_into_ptr(buf_ptr &byte, len int) ?int { for { res = C.SSL_read(voidptr(s.ssl), buf_ptr, len) if res < 0 { - err_res := nssl.ssl_error(res, s.ssl) ? + err_res := ssl_error(res, s.ssl) ? if err_res == .ssl_error_want_read { for { ready := @select(s.handle, .read, s.duration) ? @@ -170,7 +194,7 @@ pub fn (mut s SSLConn) write(bytes []byte) ?int { remaining := bytes.len - total_sent mut sent := C.SSL_write(voidptr(s.ssl), ptr, remaining) if sent <= 0 { - err_res := nssl.ssl_error(sent, s.ssl) ? + err_res := ssl_error(sent, s.ssl) ? if err_res == .ssl_error_want_read { for { ready := @select(s.handle, .read, s.duration) ? @@ -202,9 +226,9 @@ This is basically a copy of Emily socket implementation of select. This have to be consolidated into common net lib features when merging this to V */ -[typedef] -pub struct C.fd_set { -} +// [typedef] +// pub struct C.fd_set { +// } // Select waits for an io operation (specified by parameter `test`) to be available fn @select(handle int, test Select, timeout time.Duration) ?bool { diff --git a/vlib/net/websocket/events.v b/vlib/net/websocket/events.v new file mode 100644 index 0000000000..a442dafc7b --- /dev/null +++ b/vlib/net/websocket/events.v @@ -0,0 +1,227 @@ +module websocket + +// MessageEventHandler represents a callback on a new message +struct MessageEventHandler { + handler SocketMessageFn // callback function + handler2 SocketMessageFn2 // callback function with reference + is_ref bool // true if has a reference object + ref voidptr // referenced object +} + +// ErrorEventHandler represents a callback on error +struct ErrorEventHandler { + handler SocketErrorFn // callback function + handler2 SocketErrorFn2 // callback function with reference + is_ref bool // true if has a reference object + ref voidptr // referenced object +} + +// OpenEventHandler represents a callback when connection is opened +struct OpenEventHandler { + handler SocketOpenFn // callback function + handler2 SocketOpenFn2 // callback function with reference + is_ref bool // true if has a reference object + ref voidptr // referenced object +} + +// CloseEventHandler represents a callback on a closing event +struct CloseEventHandler { + handler SocketCloseFn // callback function + handler2 SocketCloseFn2 // callback function with reference + is_ref bool // true if has a reference object + ref voidptr // referenced object +} + +pub type AcceptClientFn = fn (mut c ServerClient) ?bool + +pub type SocketMessageFn = fn (mut c Client, msg &Message) ? + +pub type SocketMessageFn2 = fn (mut c Client, msg &Message, v voidptr) ? + +pub type SocketErrorFn = fn (mut c Client, err string) ? + +pub type SocketErrorFn2 = fn (mut c Client, err string, v voidptr) ? + +pub type SocketOpenFn = fn (mut c Client) ? + +pub type SocketOpenFn2 = fn (mut c Client, v voidptr) ? + +pub type SocketCloseFn = fn (mut c Client, code int, reason string) ? + +pub type SocketCloseFn2 = fn (mut c Client, code int, reason string, v voidptr) ? + +// on_connect registers a callback when client connects to the server +pub fn (mut s Server) on_connect(fun AcceptClientFn) ? { + if s.accept_client_callbacks.len > 0 { + return error('only one callback can be registered for accept client') + } + s.accept_client_callbacks << fun +} + +// on_message registers a callback on new messages +pub fn (mut s Server) on_message(fun SocketMessageFn) { + s.message_callbacks << MessageEventHandler{ + handler: fun + } +} + +// on_message_ref registers a callback on new messages and provides a reference object +pub fn (mut s Server) on_message_ref(fun SocketMessageFn2, ref voidptr) { + s.message_callbacks << MessageEventHandler{ + handler2: fun + ref: ref + is_ref: true + } +} + +// on_close registers a callback on closed socket +pub fn (mut s Server) on_close(fun SocketCloseFn) { + s.close_callbacks << CloseEventHandler{ + handler: fun + } +} + +// on_close_ref registers a callback on closed socket and provides a reference object +pub fn (mut s Server) on_close_ref(fun SocketCloseFn2, ref voidptr) { + s.close_callbacks << CloseEventHandler{ + handler2: fun + ref: ref + is_ref: true + } +} + +// on_message registers a callback on new messages +pub fn (mut ws Client) on_message(fun SocketMessageFn) { + ws.message_callbacks << MessageEventHandler{ + handler: fun + } +} + +// on_message_ref registers a callback on new messages and provides a reference object +pub fn (mut ws Client) on_message_ref(fun SocketMessageFn2, ref voidptr) { + ws.message_callbacks << MessageEventHandler{ + handler2: fun + ref: ref + is_ref: true + } +} + +// on_error registers a callback on errors +pub fn (mut ws Client) on_error(fun SocketErrorFn) { + ws.error_callbacks << ErrorEventHandler{ + handler: fun + } +} + +// on_error_ref registers a callback on errors and provides a reference object +pub fn (mut ws Client) on_error_ref(fun SocketErrorFn2, ref voidptr) { + ws.error_callbacks << ErrorEventHandler{ + handler2: fun + ref: ref + is_ref: true + } +} + +// on_open registers a callback on successful opening the websocket +pub fn (mut ws Client) on_open(fun SocketOpenFn) { + ws.open_callbacks << OpenEventHandler{ + handler: fun + } +} + +// on_open_ref registers a callback on successful opening the websocket +// and provides a reference object +pub fn (mut ws Client) on_open_ref(fun SocketOpenFn2, ref voidptr) { + ws.open_callbacks << OpenEventHandler{ + handler2: fun + ref: ref + is_ref: true + } +} + +// on_close registers a callback on closed socket +pub fn (mut ws Client) on_close(fun SocketCloseFn) { + ws.close_callbacks << CloseEventHandler{ + handler: fun + } +} + +// on_close_ref registers a callback on closed socket and provides a reference object +pub fn (mut ws Client) on_close_ref(fun SocketCloseFn2, ref voidptr) { + ws.close_callbacks << CloseEventHandler{ + handler2: fun + ref: ref + is_ref: true + } +} + +// send_connect_event invokes the on_connect callback +fn (mut s Server) send_connect_event(mut c ServerClient) ?bool { + if s.accept_client_callbacks.len == 0 { + // If no callback all client will be accepted + return true + } + fun := s.accept_client_callbacks[0] + res := fun(mut c) ? + return res +} + +// send_message_event invokes the on_message callback +fn (mut ws Client) send_message_event(msg &Message) { + ws.debug_log('sending on_message event') + for ev_handler in ws.message_callbacks { + if !ev_handler.is_ref { + ev_handler.handler(ws, msg) or { ws.logger.error('send_message_event error: $err') } + } else { + ev_handler.handler2(ws, msg, ev_handler.ref) or { + ws.logger.error('send_message_event error: $err') + } + } + } +} + +// send_error_event invokes the on_error callback +fn (mut ws Client) send_error_event(error string) { + ws.debug_log('sending on_error event') + for ev_handler in ws.error_callbacks { + if !ev_handler.is_ref { + ev_handler.handler(mut ws, error) or { + ws.logger.error('send_error_event error: $error, err: $err') + } + } else { + ev_handler.handler2(mut ws, error, ev_handler.ref) or { + ws.logger.error('send_error_event error: $error, err: $err') + } + } + } +} + +// send_close_event invokes the on_close callback +fn (mut ws Client) send_close_event(code int, reason string) { + ws.debug_log('sending on_close event') + for ev_handler in ws.close_callbacks { + if !ev_handler.is_ref { + ev_handler.handler(mut ws, code, reason) or { + ws.logger.error('send_close_event error: $err') + } + } else { + ev_handler.handler2(mut ws, code, reason, ev_handler.ref) or { + ws.logger.error('send_close_event error: $err') + } + } + } +} + +// send_open_event invokes the on_open callback +fn (mut ws Client) send_open_event() { + ws.debug_log('sending on_open event') + for ev_handler in ws.open_callbacks { + if !ev_handler.is_ref { + ev_handler.handler(mut ws) or { ws.logger.error('send_open_event error: $err') } + } else { + ev_handler.handler2(mut ws, ev_handler.ref) or { + ws.logger.error('send_open_event error: $err') + } + } + } +} diff --git a/vlib/net/websocket/handshake.v b/vlib/net/websocket/handshake.v new file mode 100644 index 0000000000..9f3ab000e0 --- /dev/null +++ b/vlib/net/websocket/handshake.v @@ -0,0 +1,185 @@ +[manualfree] +module websocket + +import encoding.base64 +import strings + +// handshake manages the websocket handshake process +fn (mut ws Client) handshake() ? { + nonce := get_nonce(ws.nonce_size) + seckey := base64.encode_str(nonce) + mut sb := strings.new_builder(1024) + defer { + unsafe { sb.free() } + } + sb.write_string('GET ') + sb.write_string(ws.uri.resource) + sb.write_string(ws.uri.querystring) + sb.write_string(' HTTP/1.1\r\nHost: ') + sb.write_string(ws.uri.hostname) + sb.write_string(':') + sb.write_string(ws.uri.port) + sb.write_string('\r\nUpgrade: websocket\r\nConnection: Upgrade\r\n') + sb.write_string('Sec-WebSocket-Key: ') + sb.write_string(seckey) + sb.write_string('\r\nSec-WebSocket-Version: 13') + for key in ws.header.keys() { + val := ws.header.custom_values(key).join(',') + sb.write_string('\r\n$key:$val') + } + sb.write_string('\r\n\r\n') + handshake := sb.str() + defer { + unsafe { handshake.free() } + } + handshake_bytes := handshake.bytes() + ws.debug_log('sending handshake: $handshake') + ws.socket_write(handshake_bytes) ? + ws.read_handshake(seckey) ? + unsafe { handshake_bytes.free() } +} + +// handle_server_handshake manages websocket server handshake process +fn (mut s Server) handle_server_handshake(mut c Client) ?(string, &ServerClient) { + msg := c.read_handshake_str() ? + handshake_response, client := s.parse_client_handshake(msg, mut c) ? + unsafe { msg.free() } + return handshake_response, client +} + +// parse_client_handshake parses result from handshake process +fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client) ?(string, &ServerClient) { + s.logger.debug('server-> client handshake:\n$client_handshake') + lines := client_handshake.split_into_lines() + get_tokens := lines[0].split(' ') + if get_tokens.len < 3 { + return error_with_code('unexpected get operation, $get_tokens', 1) + } + if get_tokens[0].trim_space() != 'GET' { + return error_with_code("unexpected request '${get_tokens[0]}', expected 'GET'", + 2) + } + if get_tokens[2].trim_space() != 'HTTP/1.1' { + return error_with_code("unexpected request $get_tokens, expected 'HTTP/1.1'", + 3) + } + mut seckey := '' + mut flags := []Flag{} + mut key := '' + for i in 1 .. lines.len { + if lines[i].len <= 0 || lines[i] == '\r\n' { + continue + } + keys := lines[i].split(':') + match keys[0] { + 'Upgrade', 'upgrade' { + flags << .has_upgrade + } + 'Connection', 'connection' { + flags << .has_connection + } + 'Sec-WebSocket-Key', 'sec-websocket-key' { + key = keys[1].trim_space() + s.logger.debug('server-> got key: $key') + seckey = create_key_challenge_response(key) ? + s.logger.debug('server-> challenge: $seckey, response: ${keys[1]}') + flags << .has_accept + } + else { + // we ignore other headers like protocol for now + } + } + unsafe { keys.free() } + } + if flags.len < 3 { + return error_with_code('invalid client handshake, $client_handshake', 4) + } + server_handshake := 'HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: $seckey\r\n\r\n' + server_client := &ServerClient{ + resource_name: get_tokens[1] + client_key: key + client: unsafe { c } + server: unsafe { s } + } + unsafe { + lines.free() + flags.free() + get_tokens.free() + seckey.free() + key.free() + } + return server_handshake, server_client +} + +// read_handshake_str returns the handshake response +fn (mut ws Client) read_handshake_str() ?string { + mut total_bytes_read := 0 + mut msg := [1024]byte{} + mut buffer := [1]byte{} + for total_bytes_read < 1024 { + bytes_read := ws.socket_read_ptr(&buffer[0], 1) ? + if bytes_read == 0 { + return error_with_code('unexpected no response from handshake', 5) + } + msg[total_bytes_read] = buffer[0] + total_bytes_read++ + if total_bytes_read > 5 && msg[total_bytes_read - 1] == `\n` + && msg[total_bytes_read - 2] == `\r` && msg[total_bytes_read - 3] == `\n` + && msg[total_bytes_read - 4] == `\r` { + break + } + } + res := msg[..total_bytes_read].bytestr() + return res +} + +// read_handshake reads the handshake result and check if valid +fn (mut ws Client) read_handshake(seckey string) ? { + mut msg := ws.read_handshake_str() ? + ws.check_handshake_response(msg, seckey) ? + unsafe { msg.free() } +} + +// check_handshake_response checks the response from handshake and returns +// the response and secure key provided by the websocket client +fn (mut ws Client) check_handshake_response(handshake_response string, seckey string) ? { + ws.debug_log('handshake response:\n$handshake_response') + lines := handshake_response.split_into_lines() + header := lines[0] + if !header.starts_with('HTTP/1.1 101') && !header.starts_with('HTTP/1.0 101') { + return error_with_code('handshake_handler: invalid HTTP status response code, $header', + 6) + } + for i in 1 .. lines.len { + if lines[i].len <= 0 || lines[i] == '\r\n' { + continue + } + keys := lines[i].split(':') + match keys[0] { + 'Upgrade', 'upgrade' { + ws.flags << .has_upgrade + } + 'Connection', 'connection' { + ws.flags << .has_connection + } + 'Sec-WebSocket-Accept', 'sec-websocket-accept' { + ws.debug_log('seckey: $seckey') + challenge := create_key_challenge_response(seckey) ? + ws.debug_log('challenge: $challenge, response: ${keys[1]}') + if keys[1].trim_space() != challenge { + return error_with_code('handshake_handler: Sec-WebSocket-Accept header does not match computed sha1/base64 response.', + 7) + } + ws.flags << .has_accept + unsafe { challenge.free() } + } + else {} + } + unsafe { keys.free() } + } + unsafe { lines.free() } + if ws.flags.len < 3 { + ws.close(1002, 'invalid websocket HTTP headers') ? + return error_with_code('invalid websocket HTTP headers', 8) + } +} diff --git a/vlib/net/websocket/io.v b/vlib/net/websocket/io.v new file mode 100644 index 0000000000..5408a4ed55 --- /dev/null +++ b/vlib/net/websocket/io.v @@ -0,0 +1,100 @@ +module websocket + +import net +import time + +// socket_read reads from socket into the provided buffer +fn (mut ws Client) socket_read(mut buffer []byte) ?int { + lock { + if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { + return error('socket_read: trying to read a closed socket') + } + if ws.is_ssl { + r := ws.ssl_conn.read_into(mut buffer) ? + return r + } else { + for { + r := ws.conn.read(mut buffer) or { + if err.code == net.err_timed_out_code { + continue + } + return err + } + return r + } + } + } + return none +} + +// socket_read reads from socket into the provided byte pointer and length +fn (mut ws Client) socket_read_ptr(buf_ptr &byte, len int) ?int { + lock { + if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { + return error('socket_read_ptr: trying to read a closed socket') + } + if ws.is_ssl { + r := ws.ssl_conn.socket_read_into_ptr(buf_ptr, len) ? + return r + } else { + for { + r := ws.conn.read_ptr(buf_ptr, len) or { + if err.code == net.err_timed_out_code { + continue + } + return err + } + return r + } + } + } + return none +} + +// socket_write writes the provided byte array to the socket +fn (mut ws Client) socket_write(bytes []byte) ?int { + lock { + if ws.state == .closed || ws.conn.sock.handle <= 1 { + ws.debug_log('socket_write: Socket allready closed') + return error('socket_write: trying to write on a closed socket') + } + if ws.is_ssl { + return ws.ssl_conn.write(bytes) + } else { + for { + n := ws.conn.write(bytes) or { + if err.code == net.err_timed_out_code { + continue + } + return err + } + return n + } + panic('reached unreachable code') + } + } +} + +// shutdown_socket shuts down the socket properly when connection is closed +fn (mut ws Client) shutdown_socket() ? { + ws.debug_log('shutting down socket') + if ws.is_ssl { + ws.ssl_conn.shutdown() ? + } else { + ws.conn.close() ? + } +} + +// dial_socket connects tcp socket and initializes default configurations +fn (mut ws Client) dial_socket() ?&net.TcpConn { + tcp_address := '$ws.uri.hostname:$ws.uri.port' + mut t := net.dial_tcp(tcp_address) ? + optval := int(1) + t.sock.set_option_int(.keep_alive, optval) ? + t.set_read_timeout(30 * time.second) + t.set_write_timeout(30 * time.second) + if ws.is_ssl { + ws.ssl_conn.connect(mut t, ws.uri.hostname) ? + } + return t +} diff --git a/vlib/net/websocket/message.v b/vlib/net/websocket/message.v new file mode 100644 index 0000000000..4c57232e92 --- /dev/null +++ b/vlib/net/websocket/message.v @@ -0,0 +1,295 @@ +module websocket + +import encoding.utf8 + +const ( + header_len_offset = 2 // offset for lengthpart of websocket header + buffer_size = 256 // default buffer size + extended_payload16_end_byte = 4 // header length with 16-bit extended payload + extended_payload64_end_byte = 10 // header length with 64-bit extended payload +) + +// Fragment represents a websocket data fragment +struct Fragment { + data []byte // included data payload data in a fragment + opcode OPCode // interpretation of the payload data +} + +// Frame represents a data frame header +struct Frame { +mut: + // length of the websocket header part + header_len int = 2 + // size of total frame + frame_size int = 2 + fin bool // true if final fragment of message + rsv1 bool // reserved for future use in websocket RFC + rsv2 bool // reserved for future use in websocket RFC + rsv3 bool // reserved for future use in websocket RFC + opcode OPCode // interpretation of the payload data + has_mask bool // true if the payload data is masked + payload_len int // payload length + masking_key [4]byte // all frames from client to server is masked with this key +} + +const ( + invalid_close_codes = [999, 1004, 1005, 1006, 1014, 1015, 1016, 1100, 2000, 2999, 5000, 65536] +) + +// validate_client validates client frame rules from RFC6455 +pub fn (mut ws Client) validate_frame(frame &Frame) ? { + if frame.rsv1 || frame.rsv2 || frame.rsv3 { + ws.close(1002, 'rsv cannot be other than 0, not negotiated') ? + return error('rsv cannot be other than 0, not negotiated') + } + if (int(frame.opcode) >= 3 && int(frame.opcode) <= 7) + || (int(frame.opcode) >= 11 && int(frame.opcode) <= 15) { + ws.close(1002, 'use of reserved opcode') ? + return error('use of reserved opcode') + } + if frame.has_mask && !ws.is_server { + // server should never send masked frames + // to client, close connection + ws.close(1002, 'client got masked frame') ? + return error('client sent masked frame') + } + if is_control_frame(frame.opcode) { + if !frame.fin { + ws.close(1002, 'control message must not be fragmented') ? + return error('unexpected control frame with no fin') + } + if frame.payload_len > 125 { + ws.close(1002, 'control frames must not exceed 125 bytes') ? + return error('unexpected control frame payload length') + } + } + if frame.fin == false && ws.fragments.len == 0 && frame.opcode == .continuation { + err_msg := 'unexecpected continuation, there are no frames to continue, $frame' + ws.close(1002, err_msg) ? + return error(err_msg) + } +} + +// is_control_frame returns true if the frame is a control frame +fn is_control_frame(opcode OPCode) bool { + return opcode !in [.text_frame, .binary_frame, .continuation] +} + +// is_data_frame returns true if the frame is a control frame +fn is_data_frame(opcode OPCode) bool { + return opcode in [.text_frame, .binary_frame] +} + +// read_payload reads the message payload from the socket +fn (mut ws Client) read_payload(frame &Frame) ?[]byte { + if frame.payload_len == 0 { + return []byte{} + } + mut buffer := []byte{cap: frame.payload_len} + mut read_buf := [1]byte{} + mut bytes_read := 0 + for bytes_read < frame.payload_len { + len := ws.socket_read_ptr(&read_buf[0], 1) ? + if len != 1 { + return error('expected read all message, got zero') + } + bytes_read += len + buffer << read_buf[0] + } + if bytes_read != frame.payload_len { + return error('failed to read payload') + } + if frame.has_mask { + for i in 0 .. frame.payload_len { + buffer[i] ^= frame.masking_key[i % 4] & 0xff + } + } + return buffer +} + +// validate_utf_8 validates payload for valid utf8 encoding +// - Future implementation needs to support fail fast utf errors for strict autobahn conformance +fn (mut ws Client) validate_utf_8(opcode OPCode, payload []byte) ? { + if opcode in [.text_frame, .close] && !utf8.validate(payload.data, payload.len) { + ws.logger.error('malformed utf8 payload, payload len: ($payload.len)') + ws.send_error_event('Recieved malformed utf8.') + ws.close(1007, 'malformed utf8 payload') ? + return error('malformed utf8 payload') + } +} + +// read_next_message reads 1 to n frames to compose a message +pub fn (mut ws Client) read_next_message() ?Message { + for { + frame := ws.parse_frame_header() ? + ws.validate_frame(&frame) ? + frame_payload := ws.read_payload(&frame) ? + if is_control_frame(frame.opcode) { + // Control frames can interject other frames + // and need to be returned immediately + msg := Message{ + opcode: OPCode(frame.opcode) + payload: frame_payload.clone() + } + unsafe { frame_payload.free() } + return msg + } + // if the message is fragmented we just put it on fragments + // a fragment is allowed to have zero size payload + if !frame.fin { + ws.fragments << &Fragment{ + data: frame_payload.clone() + opcode: frame.opcode + } + unsafe { frame_payload.free() } + continue + } + if ws.fragments.len == 0 { + ws.validate_utf_8(frame.opcode, frame_payload) or { + ws.logger.error('UTF8 validation error: $err, len of payload($frame_payload.len)') + ws.send_error_event('UTF8 validation error: $err, len of payload($frame_payload.len)') + return err + } + msg := Message{ + opcode: OPCode(frame.opcode) + payload: frame_payload.clone() + } + unsafe { frame_payload.free() } + return msg + } + defer { + ws.fragments = [] + } + if is_data_frame(frame.opcode) { + ws.close(0, '') ? + return error('Unexpected frame opcode') + } + payload := ws.payload_from_fragments(frame_payload) ? + opcode := ws.opcode_from_fragments() + ws.validate_utf_8(opcode, payload) ? + msg := Message{ + opcode: opcode + payload: payload.clone() + } + unsafe { + frame_payload.free() + payload.free() + } + return msg + } + return none +} + +// payload_from_fragments returs the whole paylaod from fragmented message +fn (ws Client) payload_from_fragments(fin_payload []byte) ?[]byte { + mut total_size := 0 + for f in ws.fragments { + if f.data.len > 0 { + total_size += f.data.len + } + } + total_size += fin_payload.len + if total_size == 0 { + return []byte{} + } + mut total_buffer := []byte{cap: total_size} + for f in ws.fragments { + if f.data.len > 0 { + total_buffer << f.data + } + } + total_buffer << fin_payload + return total_buffer +} + +// opcode_from_fragments returns the opcode for message from the first fragment sent +fn (ws Client) opcode_from_fragments() OPCode { + return OPCode(ws.fragments[0].opcode) +} + +// parse_frame_header parses next message by decoding the incoming frames +pub fn (mut ws Client) parse_frame_header() ?Frame { + mut buffer := [256]byte{} + mut bytes_read := 0 + mut frame := Frame{} + mut rbuff := [1]byte{} + mut mask_end_byte := 0 + for ws.state == .open { + read_bytes := ws.socket_read_ptr(&rbuff[0], 1) ? + if read_bytes == 0 { + // this is probably a timeout or close + continue + } + buffer[bytes_read] = rbuff[0] + bytes_read++ + // parses the first two header bytes to get basic frame information + if bytes_read == u64(websocket.header_len_offset) { + frame.fin = (buffer[0] & 0x80) == 0x80 + frame.rsv1 = (buffer[0] & 0x40) == 0x40 + frame.rsv2 = (buffer[0] & 0x20) == 0x20 + frame.rsv3 = (buffer[0] & 0x10) == 0x10 + frame.opcode = OPCode(int(buffer[0] & 0x7F)) + frame.has_mask = (buffer[1] & 0x80) == 0x80 + frame.payload_len = buffer[1] & 0x7F + // if has mask set the byte postition where mask ends + if frame.has_mask { + mask_end_byte = if frame.payload_len < 126 { + websocket.header_len_offset + 4 + } else if frame.payload_len == 126 { + websocket.header_len_offset + 6 + } else if frame.payload_len == 127 { + websocket.header_len_offset + 12 + } else { + 0 + } // impossible + } + frame.payload_len = frame.payload_len + frame.frame_size = frame.header_len + frame.payload_len + if !frame.has_mask && frame.payload_len < 126 { + break + } + } + if frame.payload_len == 126 && bytes_read == u64(websocket.extended_payload16_end_byte) { + frame.header_len += 2 + frame.payload_len = 0 + frame.payload_len |= buffer[2] << 8 + frame.payload_len |= buffer[3] + frame.frame_size = frame.header_len + frame.payload_len + if !frame.has_mask { + break + } + } + if frame.payload_len == 127 && bytes_read == u64(websocket.extended_payload64_end_byte) { + frame.header_len += 8 + // these shift operators needs 64 bit on clang with -prod flag + mut payload_len := u64(0) + payload_len |= u64(buffer[2]) << 56 + payload_len |= u64(buffer[3]) << 48 + payload_len |= u64(buffer[4]) << 40 + payload_len |= u64(buffer[5]) << 32 + payload_len |= u64(buffer[6]) << 24 + payload_len |= u64(buffer[7]) << 16 + payload_len |= u64(buffer[8]) << 8 + payload_len |= u64(buffer[9]) + frame.payload_len = int(payload_len) + if !frame.has_mask { + break + } + } + if frame.has_mask && bytes_read == mask_end_byte { + frame.masking_key[0] = buffer[mask_end_byte - 4] + frame.masking_key[1] = buffer[mask_end_byte - 3] + frame.masking_key[2] = buffer[mask_end_byte - 2] + frame.masking_key[3] = buffer[mask_end_byte - 1] + break + } + } + return frame +} + +// unmask_sequence unmask any given sequence +fn (f Frame) unmask_sequence(mut buffer []byte) { + for i in 0 .. buffer.len { + buffer[i] ^= f.masking_key[i % 4] & 0xff + } +} diff --git a/vlib/x/websocket/tests/autobahn/README.md b/vlib/net/websocket/tests/autobahn/README.md similarity index 100% rename from vlib/x/websocket/tests/autobahn/README.md rename to vlib/net/websocket/tests/autobahn/README.md diff --git a/vlib/x/websocket/tests/autobahn/autobahn_client.v b/vlib/net/websocket/tests/autobahn/autobahn_client.v similarity index 97% rename from vlib/x/websocket/tests/autobahn/autobahn_client.v rename to vlib/net/websocket/tests/autobahn/autobahn_client.v index 6b94e7fd67..c65fdab7be 100644 --- a/vlib/x/websocket/tests/autobahn/autobahn_client.v +++ b/vlib/net/websocket/tests/autobahn/autobahn_client.v @@ -1,7 +1,7 @@ // use this test to test the websocket client in the autobahn test module main -import x.websocket +import net.websocket fn main() { for i in 1 .. 304 { diff --git a/vlib/x/websocket/tests/autobahn/local_run/autobahn_client_wss.v b/vlib/net/websocket/tests/autobahn/autobahn_client_wss.v similarity index 97% rename from vlib/x/websocket/tests/autobahn/local_run/autobahn_client_wss.v rename to vlib/net/websocket/tests/autobahn/autobahn_client_wss.v index 7d015ea9e8..c7a3c25aee 100644 --- a/vlib/x/websocket/tests/autobahn/local_run/autobahn_client_wss.v +++ b/vlib/net/websocket/tests/autobahn/autobahn_client_wss.v @@ -1,7 +1,7 @@ // use this test to test the websocket client in the autobahn test module main -import x.websocket +import net.websocket fn main() { for i in 1 .. 304 { diff --git a/vlib/x/websocket/tests/autobahn/autobahn_server.v b/vlib/net/websocket/tests/autobahn/autobahn_server.v similarity index 96% rename from vlib/x/websocket/tests/autobahn/autobahn_server.v rename to vlib/net/websocket/tests/autobahn/autobahn_server.v index 3847889b9c..0493ca99e6 100644 --- a/vlib/x/websocket/tests/autobahn/autobahn_server.v +++ b/vlib/net/websocket/tests/autobahn/autobahn_server.v @@ -1,7 +1,7 @@ // use this to test websocket server to the autobahn test module main -import x.websocket +import net.websocket fn main() { mut s := websocket.new_server(.ip6, 9002, '/') diff --git a/vlib/x/websocket/tests/autobahn/docker-compose.yml b/vlib/net/websocket/tests/autobahn/docker-compose.yml similarity index 83% rename from vlib/x/websocket/tests/autobahn/docker-compose.yml rename to vlib/net/websocket/tests/autobahn/docker-compose.yml index 52885d05ef..30b58ec6ec 100644 --- a/vlib/x/websocket/tests/autobahn/docker-compose.yml +++ b/vlib/net/websocket/tests/autobahn/docker-compose.yml @@ -17,5 +17,5 @@ services: client: container_name: autobahn_client build: - dockerfile: vlib/x/websocket/tests/autobahn/ws_test/Dockerfile + dockerfile: vlib/net/websocket/tests/autobahn/ws_test/Dockerfile context: ../../../../../ diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server/Dockerfile b/vlib/net/websocket/tests/autobahn/fuzzing_server/Dockerfile similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server/Dockerfile rename to vlib/net/websocket/tests/autobahn/fuzzing_server/Dockerfile diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server/check_results.py b/vlib/net/websocket/tests/autobahn/fuzzing_server/check_results.py similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server/check_results.py rename to vlib/net/websocket/tests/autobahn/fuzzing_server/check_results.py diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server/config/fuzzingclient.json b/vlib/net/websocket/tests/autobahn/fuzzing_server/config/fuzzingclient.json similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server/config/fuzzingclient.json rename to vlib/net/websocket/tests/autobahn/fuzzing_server/config/fuzzingclient.json diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server/config/fuzzingserver.json b/vlib/net/websocket/tests/autobahn/fuzzing_server/config/fuzzingserver.json similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server/config/fuzzingserver.json rename to vlib/net/websocket/tests/autobahn/fuzzing_server/config/fuzzingserver.json diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server_wss/Dockerfile b/vlib/net/websocket/tests/autobahn/fuzzing_server_wss/Dockerfile similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server_wss/Dockerfile rename to vlib/net/websocket/tests/autobahn/fuzzing_server_wss/Dockerfile diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server_wss/check_results.py b/vlib/net/websocket/tests/autobahn/fuzzing_server_wss/check_results.py similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server_wss/check_results.py rename to vlib/net/websocket/tests/autobahn/fuzzing_server_wss/check_results.py diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/fuzzingserver.json b/vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/fuzzingserver.json similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/fuzzingserver.json rename to vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/fuzzingserver.json diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/server.crt b/vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.crt similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/server.crt rename to vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.crt diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/server.csr b/vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.csr similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/server.csr rename to vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.csr diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/server.key b/vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.key similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/server.key rename to vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.key diff --git a/vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/server.pem b/vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.pem similarity index 100% rename from vlib/x/websocket/tests/autobahn/fuzzing_server_wss/config/server.pem rename to vlib/net/websocket/tests/autobahn/fuzzing_server_wss/config/server.pem diff --git a/vlib/x/websocket/tests/autobahn/local_run/Dockerfile b/vlib/net/websocket/tests/autobahn/local_run/Dockerfile similarity index 100% rename from vlib/x/websocket/tests/autobahn/local_run/Dockerfile rename to vlib/net/websocket/tests/autobahn/local_run/Dockerfile diff --git a/vlib/x/websocket/tests/autobahn/local_run/autobahn_client.v b/vlib/net/websocket/tests/autobahn/local_run/autobahn_client.v similarity index 97% rename from vlib/x/websocket/tests/autobahn/local_run/autobahn_client.v rename to vlib/net/websocket/tests/autobahn/local_run/autobahn_client.v index 1d29e27b07..ef5b281ab2 100644 --- a/vlib/x/websocket/tests/autobahn/local_run/autobahn_client.v +++ b/vlib/net/websocket/tests/autobahn/local_run/autobahn_client.v @@ -1,7 +1,7 @@ // use this test to test the websocket client in the autobahn test module main -import x.websocket +import net.websocket fn main() { for i in 1 .. 304 { diff --git a/vlib/x/websocket/tests/autobahn/autobahn_client_wss.v b/vlib/net/websocket/tests/autobahn/local_run/autobahn_client_wss.v similarity index 97% rename from vlib/x/websocket/tests/autobahn/autobahn_client_wss.v rename to vlib/net/websocket/tests/autobahn/local_run/autobahn_client_wss.v index 7d015ea9e8..c7a3c25aee 100644 --- a/vlib/x/websocket/tests/autobahn/autobahn_client_wss.v +++ b/vlib/net/websocket/tests/autobahn/local_run/autobahn_client_wss.v @@ -1,7 +1,7 @@ // use this test to test the websocket client in the autobahn test module main -import x.websocket +import net.websocket fn main() { for i in 1 .. 304 { diff --git a/vlib/net/websocket/tests/autobahn/ws_test/Dockerfile b/vlib/net/websocket/tests/autobahn/ws_test/Dockerfile new file mode 100644 index 0000000000..b57cffd850 --- /dev/null +++ b/vlib/net/websocket/tests/autobahn/ws_test/Dockerfile @@ -0,0 +1,12 @@ +FROM thevlang/vlang:buster-build + + +COPY ./ /src/ + +WORKDIR /src + +RUN make CC=clang + +RUN /src/v /src/vlib/net/websocket/tests/autobahn/autobahn_server.v +RUN chmod +x /src/vlib/net/websocket/tests/autobahn/autobahn_server +ENTRYPOINT [ "/src/vlib/net/websocket/tests/autobahn/autobahn_server" ] diff --git a/vlib/net/websocket/uri.v b/vlib/net/websocket/uri.v new file mode 100644 index 0000000000..7d388e17d5 --- /dev/null +++ b/vlib/net/websocket/uri.v @@ -0,0 +1,16 @@ +module websocket + +// Uri represents an Uri for websocket connections +struct Uri { +mut: + url string // url to the websocket endpoint + hostname string // hostname of the websocket endpoint + port string // port of the websocket endpoint + resource string // resource of the websocket endpoint + querystring string // query string of the websocket endpoint +} + +// str returns the string representation of the Uri +pub fn (u Uri) str() string { + return u.url +} diff --git a/vlib/net/websocket/utils.v b/vlib/net/websocket/utils.v new file mode 100644 index 0000000000..4e48359b06 --- /dev/null +++ b/vlib/net/websocket/utils.v @@ -0,0 +1,54 @@ +module websocket + +import rand +import crypto.sha1 +import encoding.base64 + +// htonl64 converts payload length to header bits +fn htonl64(payload_len u64) []byte { + mut ret := []byte{len: 8} + ret[0] = byte(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff) + ret[1] = byte(((payload_len & (u64(0xff) << 48)) >> 48) & 0xff) + ret[2] = byte(((payload_len & (u64(0xff) << 40)) >> 40) & 0xff) + ret[3] = byte(((payload_len & (u64(0xff) << 32)) >> 32) & 0xff) + ret[4] = byte(((payload_len & (u64(0xff) << 24)) >> 24) & 0xff) + ret[5] = byte(((payload_len & (u64(0xff) << 16)) >> 16) & 0xff) + ret[6] = byte(((payload_len & (u64(0xff) << 8)) >> 8) & 0xff) + ret[7] = byte(((payload_len & (u64(0xff) << 0)) >> 0) & 0xff) + return ret +} + +// create_masking_key returs a new masking key to use when masking websocket messages +fn create_masking_key() []byte { + mask_bit := byte(rand.intn(255)) + buf := []byte{len: 4, init: `0`} + unsafe { C.memcpy(buf.data, &mask_bit, 4) } + return buf +} + +// create_key_challenge_response creates a key challange response from security key +fn create_key_challenge_response(seckey string) ?string { + if seckey.len == 0 { + return error('unexpected seckey lengt zero') + } + guid := '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' + sha1buf := seckey + guid + shabytes := sha1buf.bytes() + hash := sha1.sum(shabytes) + b64 := base64.encode(hash) + unsafe { + hash.free() + shabytes.free() + } + return b64 +} + +// get_nonce creates a randomized array used in handshake process +fn get_nonce(nonce_size int) string { + mut nonce := []byte{len: nonce_size, cap: nonce_size} + alphanum := '0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz' + for i in 0 .. nonce_size { + nonce[i] = alphanum[rand.intn(alphanum.len)] + } + return unsafe { tos(nonce.data, nonce.len) }.clone() +} diff --git a/vlib/net/websocket/websocket_client.v b/vlib/net/websocket/websocket_client.v new file mode 100644 index 0000000000..96699477f9 --- /dev/null +++ b/vlib/net/websocket/websocket_client.v @@ -0,0 +1,494 @@ +// websocket module implements websocket client and a websocket server +// attribution: @thecoderr the author of original websocket client +[manualfree] +module websocket + +import net +import net.http +import net.openssl +import net.urllib +import time +import log +import rand + +const ( + empty_bytearr = []byte{} // used as empty response to avoid allocation +) + +// Client represents websocket client +pub struct Client { + is_server bool +mut: + ssl_conn &openssl.SSLConn // secure connection used when wss is used + flags []Flag // flags used in handshake + fragments []Fragment // current fragments + message_callbacks []MessageEventHandler // all callbacks on_message + error_callbacks []ErrorEventHandler // all callbacks on_error + open_callbacks []OpenEventHandler // all callbacks on_open + close_callbacks []CloseEventHandler // all callbacks on_close +pub: + is_ssl bool // true if secure socket is used + uri Uri // uri of current connection + id string // unique id of client +pub mut: + header http.Header // headers that will be passed when connecting + conn &net.TcpConn // underlying TCP socket connection + nonce_size int = 16 // size of nounce used for masking + panic_on_callback bool // set to true of callbacks can panic + state State // current state of connection + logger &log.Log // logger used to log messages + resource_name string // name of current resource + last_pong_ut u64 // last time in unix time we got a pong message +} + +// Flag represents different types of headers in websocket handshake +enum Flag { + has_accept // Webs + has_connection + has_upgrade +} + +// State represents the state of the websocket connection. +pub enum State { + connecting = 0 + open + closing + closed +} + +// Message represents a whole message combined from 1 to n frames +pub struct Message { +pub: + opcode OPCode // websocket frame type of this message + payload []byte // payload of the message +} + +// OPCode represents the supported websocket frame types +pub enum OPCode { + continuation = 0x00 + text_frame = 0x01 + binary_frame = 0x02 + close = 0x08 + ping = 0x09 + pong = 0x0A +} + +// new_client instance a new websocket client +pub fn new_client(address string) ?&Client { + uri := parse_uri(address) ? + return &Client{ + conn: 0 + is_server: false + ssl_conn: openssl.new_ssl_conn() + is_ssl: address.starts_with('wss') + logger: &log.Log{ + level: .info + } + uri: uri + state: .closed + id: rand.uuid_v4() + header: http.new_header() + } +} + +// connect connects to remote websocket server +pub fn (mut ws Client) connect() ? { + ws.assert_not_connected() ? + ws.set_state(.connecting) + ws.logger.info('connecting to host $ws.uri') + ws.conn = ws.dial_socket() ? + // Todo: make setting configurable + ws.conn.set_read_timeout(time.second * 30) + ws.conn.set_write_timeout(time.second * 30) + ws.handshake() ? + ws.set_state(.open) + ws.logger.info('successfully connected to host $ws.uri') + ws.send_open_event() +} + +// listen listens and processes incoming messages +pub fn (mut ws Client) listen() ? { + mut log := 'Starting client listener, server($ws.is_server)...' + ws.logger.info(log) + unsafe { log.free() } + defer { + ws.logger.info('Quit client listener, server($ws.is_server)...') + if ws.state == .open { + ws.close(1000, 'closed by client') or {} + } + } + for ws.state == .open { + msg := ws.read_next_message() or { + if ws.state in [.closed, .closing] { + return + } + ws.debug_log('failed to read next message: $err') + ws.send_error_event('failed to read next message: $err') + return err + } + if ws.state in [.closed, .closing] { + return + } + ws.debug_log('got message: $msg.opcode') + match msg.opcode { + .text_frame { + log = 'read: text' + ws.debug_log(log) + unsafe { log.free() } + ws.send_message_event(msg) + unsafe { msg.free() } + } + .binary_frame { + ws.debug_log('read: binary') + ws.send_message_event(msg) + unsafe { msg.free() } + } + .ping { + ws.debug_log('read: ping, sending pong') + ws.send_control_frame(.pong, 'PONG', msg.payload) or { + ws.logger.error('error in message callback sending PONG: $err') + ws.send_error_event('error in message callback sending PONG: $err') + if ws.panic_on_callback { + panic(err) + } + continue + } + if msg.payload.len > 0 { + unsafe { msg.free() } + } + } + .pong { + ws.debug_log('read: pong') + ws.last_pong_ut = time.now().unix + ws.send_message_event(msg) + if msg.payload.len > 0 { + unsafe { msg.free() } + } + } + .close { + log = 'read: close' + ws.debug_log(log) + unsafe { log.free() } + defer { + ws.manage_clean_close() + } + if msg.payload.len > 0 { + if msg.payload.len == 1 { + ws.close(1002, 'close payload cannot be 1 byte') ? + return error('close payload cannot be 1 byte') + } + code := (int(msg.payload[0]) << 8) + int(msg.payload[1]) + if code in invalid_close_codes { + ws.close(1002, 'invalid close code: $code') ? + return error('invalid close code: $code') + } + reason := if msg.payload.len > 2 { msg.payload[2..] } else { []byte{} } + if reason.len > 0 { + ws.validate_utf_8(.close, reason) ? + } + if ws.state !in [.closing, .closed] { + // sending close back according to spec + ws.debug_log('close with reason, code: $code, reason: $reason') + r := reason.bytestr() + ws.close(code, r) ? + } + unsafe { msg.free() } + } else { + if ws.state !in [.closing, .closed] { + ws.debug_log('close with reason, no code') + // sending close back according to spec + ws.close(1000, 'normal') ? + } + unsafe { msg.free() } + } + return + } + .continuation { + ws.logger.error('unexpected opcode continuation, nothing to continue') + ws.send_error_event('unexpected opcode continuation, nothing to continue') + ws.close(1002, 'nothing to continue') ? + return error('unexpected opcode continuation, nothing to continue') + } + } + } +} + +// manage_clean_close closes connection in a clean websocket way +fn (mut ws Client) manage_clean_close() { + ws.send_close_event(1000, 'closed by client') +} + +// ping sends ping message to server +pub fn (mut ws Client) ping() ? { + ws.send_control_frame(.ping, 'PING', []) ? +} + +// pong sends pong message to server, +pub fn (mut ws Client) pong() ? { + ws.send_control_frame(.pong, 'PONG', []) ? +} + +// write_ptr writes len bytes provided a byteptr with a websocket messagetype +pub fn (mut ws Client) write_ptr(bytes &byte, payload_len int, code OPCode) ?int { + // ws.debug_log('write_ptr code: $code') + if ws.state != .open || ws.conn.sock.handle < 1 { + // todo: send error here later + return error('trying to write on a closed socket!') + } + mut header_len := 2 + if payload_len > 125 { 2 } else { 0 } + + if payload_len > 0xffff { 6 } else { 0 } + if !ws.is_server { + header_len += 4 + } + mut header := []byte{len: header_len, init: `0`} // [`0`].repeat(header_len) + header[0] = byte(int(code)) | 0x80 + masking_key := create_masking_key() + if ws.is_server { + if payload_len <= 125 { + header[1] = byte(payload_len) + } else if payload_len > 125 && payload_len <= 0xffff { + len16 := C.htons(payload_len) + header[1] = 126 + unsafe { C.memcpy(&header[2], &len16, 2) } + } else if payload_len > 0xffff && payload_len <= 0x7fffffff { + len_bytes := htonl64(u64(payload_len)) + header[1] = 127 + unsafe { C.memcpy(&header[2], len_bytes.data, 8) } + } + } else { + if payload_len <= 125 { + header[1] = byte(payload_len | 0x80) + header[2] = masking_key[0] + header[3] = masking_key[1] + header[4] = masking_key[2] + header[5] = masking_key[3] + } else if payload_len > 125 && payload_len <= 0xffff { + len16 := C.htons(payload_len) + header[1] = (126 | 0x80) + unsafe { C.memcpy(&header[2], &len16, 2) } + header[4] = masking_key[0] + header[5] = masking_key[1] + header[6] = masking_key[2] + header[7] = masking_key[3] + } else if payload_len > 0xffff && payload_len <= 0x7fffffff { + len64 := htonl64(u64(payload_len)) + header[1] = (127 | 0x80) + unsafe { C.memcpy(&header[2], len64.data, 8) } + header[10] = masking_key[0] + header[11] = masking_key[1] + header[12] = masking_key[2] + header[13] = masking_key[3] + } else { + ws.close(1009, 'frame too large') ? + return error('frame too large') + } + } + len := header.len + payload_len + mut frame_buf := []byte{len: len} + unsafe { + C.memcpy(&frame_buf[0], &byte(header.data), header.len) + if payload_len > 0 { + C.memcpy(&frame_buf[header.len], bytes, payload_len) + } + } + if !ws.is_server { + for i in 0 .. payload_len { + frame_buf[header_len + i] ^= masking_key[i % 4] & 0xff + } + } + written_len := ws.socket_write(frame_buf) ? + unsafe { + frame_buf.free() + masking_key.free() + header.free() + } + return written_len +} + +// write writes a byte array with a websocket messagetype to socket +pub fn (mut ws Client) write(bytes []byte, code OPCode) ?int { + return ws.write_ptr(&byte(bytes.data), bytes.len, code) +} + +// write_string, writes a string with a websocket texttype to socket +[deprecated: 'use Client.write_string() instead'] +pub fn (mut ws Client) write_str(str string) ?int { + return ws.write_string(str) +} + +// write_str, writes a string with a websocket texttype to socket +pub fn (mut ws Client) write_string(str string) ?int { + return ws.write_ptr(str.str, str.len, .text_frame) +} + +// close closes the websocket connection +pub fn (mut ws Client) close(code int, message string) ? { + ws.debug_log('sending close, $code, $message') + if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 { + ws.debug_log('close: Websocket allready closed ($ws.state), $message, $code handle($ws.conn.sock.handle)') + err_msg := 'Socket allready closed: $code' + return error(err_msg) + } + defer { + ws.shutdown_socket() or {} + ws.reset_state() + } + ws.set_state(.closing) + // mut code32 := 0 + if code > 0 { + code_ := C.htons(code) + message_len := message.len + 2 + mut close_frame := []byte{len: message_len} + close_frame[0] = byte(code_ & 0xFF) + close_frame[1] = byte(code_ >> 8) + // code32 = (close_frame[0] << 8) + close_frame[1] + for i in 0 .. message.len { + close_frame[i + 2] = message[i] + } + ws.send_control_frame(.close, 'CLOSE', close_frame) ? + unsafe { close_frame.free() } + } else { + ws.send_control_frame(.close, 'CLOSE', []) ? + } + ws.fragments = [] +} + +// send_control_frame sends a control frame to the server +fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []byte) ? { + ws.debug_log('send control frame $code, frame_type: $frame_typ') + if ws.state !in [.open, .closing] && ws.conn.sock.handle > 1 { + return error('socket is not connected') + } + header_len := if ws.is_server { 2 } else { 6 } + frame_len := header_len + payload.len + mut control_frame := []byte{len: frame_len} + mut masking_key := if !ws.is_server { create_masking_key() } else { websocket.empty_bytearr } + defer { + unsafe { + control_frame.free() + if masking_key.len > 0 { + masking_key.free() + } + } + } + control_frame[0] = byte(int(code) | 0x80) + if !ws.is_server { + control_frame[1] = byte(payload.len | 0x80) + control_frame[2] = masking_key[0] + control_frame[3] = masking_key[1] + control_frame[4] = masking_key[2] + control_frame[5] = masking_key[3] + } else { + control_frame[1] = byte(payload.len) + } + if code == .close { + if payload.len >= 2 { + if !ws.is_server { + mut parsed_payload := []byte{len: payload.len + 1} + unsafe { C.memcpy(parsed_payload.data, &payload[0], payload.len) } + parsed_payload[payload.len] = `\0` + for i in 0 .. payload.len { + control_frame[6 + i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff + } + unsafe { parsed_payload.free() } + } else { + unsafe { C.memcpy(&control_frame[2], &payload[0], payload.len) } + } + } + } else { + if !ws.is_server { + if payload.len > 0 { + for i in 0 .. payload.len { + control_frame[header_len + i] = (payload[i] ^ masking_key[i % 4]) & 0xff + } + } + } else { + if payload.len > 0 { + unsafe { C.memcpy(&control_frame[2], &payload[0], payload.len) } + } + } + } + ws.socket_write(control_frame) or { + return error('send_control_frame: error sending $frame_typ control frame.') + } +} + +// parse_uri parses the url to a Uri +fn parse_uri(url string) ?&Uri { + u := urllib.parse(url) ? + request_uri := u.request_uri() + v := request_uri.split('?') + mut port := u.port() + uri := u.str() + if port == '' { + port = if uri.starts_with('ws://') { + '80' + } else if uri.starts_with('wss://') { + '443' + } else { + u.port() + } + } + querystring := if v.len > 1 { '?' + v[1] } else { '' } + return &Uri{ + url: url + hostname: u.hostname() + port: port + resource: v[0] + querystring: querystring + } +} + +// set_state sets current state of the websocket connection +fn (mut ws Client) set_state(state State) { + lock { + ws.state = state + } +} + +// assert_not_connected returns error if the connection is not connected +fn (ws Client) assert_not_connected() ? { + match ws.state { + .connecting { return error('connect: websocket is connecting') } + .open { return error('connect: websocket already open') } + .closing { return error('connect: reconnect on closing websocket not supported, please use new client') } + else {} + } +} + +// reset_state resets the websocket and initialize default settings +fn (mut ws Client) reset_state() { + lock { + ws.state = .closed + ws.ssl_conn = openssl.new_ssl_conn() + ws.flags = [] + ws.fragments = [] + } +} + +// debug_log handles debug logging output for client and server +fn (mut ws Client) debug_log(text string) { + if ws.is_server { + ws.logger.debug('server-> $text') + } else { + ws.logger.debug('client-> $text') + } +} + +// free handles manual free memory of Message struct +pub fn (m &Message) free() { + unsafe { m.payload.free() } +} + +// free handles manual free memory of Client struct +pub fn (c &Client) free() { + unsafe { + c.flags.free() + c.fragments.free() + c.message_callbacks.free() + c.error_callbacks.free() + c.open_callbacks.free() + c.close_callbacks.free() + c.header.free() + } +} diff --git a/vlib/net/websocket/websocket_nix.c.v b/vlib/net/websocket/websocket_nix.c.v new file mode 100644 index 0000000000..f986b98f0b --- /dev/null +++ b/vlib/net/websocket/websocket_nix.c.v @@ -0,0 +1,10 @@ +module websocket + +// error_code returns the error code +fn error_code() int { + return C.errno +} + +const ( + error_ewouldblock = C.EWOULDBLOCK // blocking error code +) diff --git a/vlib/net/websocket/websocket_server.v b/vlib/net/websocket/websocket_server.v new file mode 100644 index 0000000000..99af3e0ebf --- /dev/null +++ b/vlib/net/websocket/websocket_server.v @@ -0,0 +1,189 @@ +module websocket + +import net +import net.openssl +import log +import time +import rand + +// Server represents a websocket server connection +pub struct Server { +mut: + logger &log.Log // logger used to log + ls &net.TcpListener // listener used to get incoming connection to socket + accept_client_callbacks []AcceptClientFn // accept client callback functions + message_callbacks []MessageEventHandler // new message callback functions + close_callbacks []CloseEventHandler // close message callback functions +pub: + family net.AddrFamily = .ip + port int // port used as listen to incoming connections + is_ssl bool // true if secure connection (not supported yet on server) +pub mut: + clients map[string]&ServerClient // clients connected to this server + ping_interval int = 30 // interval for sending ping to clients (seconds) + state State // current state of connection +} + +// ServerClient represents a connected client +struct ServerClient { +pub: + resource_name string // resource that the client access + client_key string // unique key of client +pub mut: + server &Server + client &Client +} + +// new_server instance a new websocket server on provided port and route +pub fn new_server(family net.AddrFamily, port int, route string) &Server { + return &Server{ + ls: 0 + family: family + port: port + logger: &log.Log{ + level: .info + } + state: .closed + } +} + +// set_ping_interval sets the interval that the server will send ping messages to clients +pub fn (mut s Server) set_ping_interval(seconds int) { + s.ping_interval = seconds +} + +// listen start listen and process to incoming connections from websocket clients +pub fn (mut s Server) listen() ? { + s.logger.info('websocket server: start listen on port $s.port') + s.ls = net.listen_tcp(s.family, ':$s.port') ? + s.set_state(.open) + go s.handle_ping() + for { + mut c := s.accept_new_client() or { continue } + go s.serve_client(mut c) + } + s.logger.info('websocket server: end listen on port $s.port') +} + +// Close closes server (not implemented yet) +fn (mut s Server) close() { + // TODO: implement close when moving to net from x.net +} + +// handle_ping sends ping to all clients every set interval +fn (mut s Server) handle_ping() { + mut clients_to_remove := []string{} + for s.state == .open { + time.sleep(s.ping_interval * time.second) + for i, _ in s.clients { + mut c := s.clients[i] + if c.client.state == .open { + c.client.ping() or { + s.logger.debug('server-> error sending ping to client') + c.client.close(1002, 'Closing connection: ping send error') or { + // we want to continue even if error + continue + } + clients_to_remove << c.client.id + } + if (time.now().unix - c.client.last_pong_ut) > s.ping_interval * 2 { + clients_to_remove << c.client.id + c.client.close(1000, 'no pong received') or { continue } + } + } + } + // TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges + for client in clients_to_remove { + lock { + s.clients.delete(client) + } + } + clients_to_remove.clear() + } +} + +// serve_client accepts incoming connection and sets up the callbacks +fn (mut s Server) serve_client(mut c Client) ? { + c.logger.debug('server-> Start serve client ($c.id)') + defer { + c.logger.debug('server-> End serve client ($c.id)') + } + mut handshake_response, mut server_client := s.handle_server_handshake(mut c) ? + accept := s.send_connect_event(mut server_client) ? + if !accept { + s.logger.debug('server-> client not accepted') + c.shutdown_socket() ? + return + } + // the client is accepted + c.socket_write(handshake_response.bytes()) ? + lock { + s.clients[server_client.client.id] = server_client + } + s.setup_callbacks(mut server_client) + c.listen() or { + s.logger.error(err.msg) + return err + } +} + +// setup_callbacks initialize all callback functions +fn (mut s Server) setup_callbacks(mut sc ServerClient) { + if s.message_callbacks.len > 0 { + for cb in s.message_callbacks { + if cb.is_ref { + sc.client.on_message_ref(cb.handler2, cb.ref) + } else { + sc.client.on_message(cb.handler) + } + } + } + if s.close_callbacks.len > 0 { + for cb in s.close_callbacks { + if cb.is_ref { + sc.client.on_close_ref(cb.handler2, cb.ref) + } else { + sc.client.on_close(cb.handler) + } + } + } + // set standard close so we can remove client if closed + sc.client.on_close_ref(fn (mut c Client, code int, reason string, mut sc ServerClient) ? { + c.logger.debug('server-> Delete client') + lock { + sc.server.clients.delete(sc.client.id) + } + }, sc) +} + +// accept_new_client creates a new client instance for client that connects to the socket +fn (mut s Server) accept_new_client() ?&Client { + mut new_conn := s.ls.accept() ? + c := &Client{ + is_server: true + conn: new_conn + ssl_conn: openssl.new_ssl_conn() + logger: s.logger + state: .open + last_pong_ut: time.now().unix + id: rand.uuid_v4() + } + return c +} + +// set_state sets current state in a thread safe way +fn (mut s Server) set_state(state State) { + lock { + s.state = state + } +} + +// free manages manual free of memory for Server instance +pub fn (mut s Server) free() { + unsafe { + s.clients.free() + s.accept_client_callbacks.free() + s.message_callbacks.free() + s.close_callbacks.free() + } +} diff --git a/vlib/x/websocket/websocket_test.v b/vlib/net/websocket/websocket_test.v similarity index 99% rename from vlib/x/websocket/websocket_test.v rename to vlib/net/websocket/websocket_test.v index 999252998a..35e15d3ffc 100644 --- a/vlib/x/websocket/websocket_test.v +++ b/vlib/net/websocket/websocket_test.v @@ -1,6 +1,6 @@ import os import net -import x.websocket +import net.websocket import time import rand diff --git a/vlib/net/websocket/websocket_windows.c.v b/vlib/net/websocket/websocket_windows.c.v new file mode 100644 index 0000000000..e9f4fc3d73 --- /dev/null +++ b/vlib/net/websocket/websocket_windows.c.v @@ -0,0 +1,12 @@ +module websocket + +import net + +// error_code returns the error code +fn error_code() int { + return C.WSAGetLastError() +} + +const ( + error_ewouldblock = net.WsaError.wsaewouldblock // blocking error code +) diff --git a/vlib/x/openssl/declrarations.v b/vlib/x/openssl/declrarations.v deleted file mode 100644 index b2d7d4f2c6..0000000000 --- a/vlib/x/openssl/declrarations.v +++ /dev/null @@ -1,25 +0,0 @@ -module openssl - -import time - -enum Select { - read - write - except -} - -pub struct SSLConn { -mut: - sslctx &C.SSL_CTX - ssl &C.SSL - handle int - duration time.Duration -} - -pub fn new_ssl_conn() &SSLConn { - return &SSLConn{ - sslctx: 0 - ssl: 0 - handle: 0 - } -} diff --git a/vlib/x/websocket/tests/autobahn/ws_test/Dockerfile b/vlib/x/websocket/tests/autobahn/ws_test/Dockerfile deleted file mode 100644 index a53f112c75..0000000000 --- a/vlib/x/websocket/tests/autobahn/ws_test/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -FROM thevlang/vlang:buster-build - - -COPY ./ /src/ - -WORKDIR /src - -RUN make CC=clang - -RUN /src/v /src/vlib/x/websocket/tests/autobahn/autobahn_server.v -RUN chmod +x /src/vlib/x/websocket/tests/autobahn/autobahn_server -ENTRYPOINT [ "/src/vlib/x/websocket/tests/autobahn/autobahn_server" ] diff --git a/vlib/x/websocket/websocket_client.v b/vlib/x/websocket/websocket_client.v index 518946250f..cd0a7c1167 100644 --- a/vlib/x/websocket/websocket_client.v +++ b/vlib/x/websocket/websocket_client.v @@ -1,11 +1,13 @@ // websocket module implements websocket client and a websocket server // attribution: @thecoderr the author of original websocket client +// advice that the implementation is deprecated and moved to the net.websocket module! +// it will be removed in later versions [manualfree] module websocket import net import net.http -import x.openssl +import net.openssl import net.urllib import time import log @@ -74,6 +76,7 @@ pub enum OPCode { } // new_client instance a new websocket client +[deprecated: 'use net.websocket module instead'] pub fn new_client(address string) ?&Client { uri := parse_uri(address) ? return &Client{ diff --git a/vlib/x/websocket/websocket_server.v b/vlib/x/websocket/websocket_server.v index 7fba8868fb..61903d1f63 100644 --- a/vlib/x/websocket/websocket_server.v +++ b/vlib/x/websocket/websocket_server.v @@ -1,7 +1,7 @@ module websocket import net -import x.openssl +import net.openssl import log import time import rand @@ -35,6 +35,7 @@ pub mut: } // new_server instance a new websocket server on provided port and route +[deprecated: 'use net.websocket module instead'] pub fn new_server(family net.AddrFamily, port int, route string) &Server { return &Server{ ls: 0