x.websocket: move to net.websocket module (#10648)

pull/10650/head
Tomas Hellström 2021-07-03 01:56:00 +02:00 committed by GitHub
parent c44a47acb1
commit ec973f5c6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 1656 additions and 67 deletions

View File

@ -27,25 +27,25 @@ jobs:
- name: v doctor - name: v doctor
run: ./v doctor run: ./v doctor
- name: Run websockets tests - name: Run websockets tests
run: ./v -g test vlib/x/websocket/ run: ./v -g test vlib/net/websocket/
## Autobahn integrations tests ## Autobahn integrations tests
- name: Run autobahn services - name: Run autobahn services
run: docker-compose -f ${{github.workspace}}/vlib/x/websocket/tests/autobahn/docker-compose.yml up -d run: docker-compose -f ${{github.workspace}}/vlib/net/websocket/tests/autobahn/docker-compose.yml up -d
- name: Wait for the service to start - name: Wait for the service to start
run: sleep 10s run: sleep 10s
- name: Build client test - name: Build client test
run: docker exec autobahn_client "/src/v" "/src/vlib/x/websocket/tests/autobahn/autobahn_client.v" run: docker exec autobahn_client "/src/v" "/src/vlib/net/websocket/tests/autobahn/autobahn_client.v"
- name: Run client test - name: Run client test
run: docker exec autobahn_client "/src/vlib/x/websocket/tests/autobahn/autobahn_client" run: docker exec autobahn_client "/src/vlib/net/websocket/tests/autobahn/autobahn_client"
- name: Build client wss test - name: Build client wss test
run: docker exec autobahn_client "/src/v" "/src/vlib/x/websocket/tests/autobahn/autobahn_client_wss.v" run: docker exec autobahn_client "/src/v" "/src/vlib/net/websocket/tests/autobahn/autobahn_client_wss.v"
- name: Run client wss test - name: Run client wss test
run: docker exec autobahn_client "/src/vlib/x/websocket/tests/autobahn/autobahn_client_wss" run: docker exec autobahn_client "/src/vlib/net/websocket/tests/autobahn/autobahn_client_wss"
- name: Run server test - name: Run server test
run: docker exec autobahn_server "wstest" "-m" "fuzzingclient" "-s" "/config/fuzzingclient.json" run: docker exec autobahn_server "wstest" "-m" "fuzzingclient" "-s" "/config/fuzzingclient.json"

View File

@ -44,11 +44,11 @@ const (
'vlib/vweb/request_test.v', 'vlib/vweb/request_test.v',
'vlib/net/http/request_test.v', 'vlib/net/http/request_test.v',
'vlib/vweb/route_test.v', 'vlib/vweb/route_test.v',
'vlib/x/websocket/websocket_test.v', 'vlib/net/websocket/websocket_test.v',
'vlib/crypto/rand/crypto_rand_read_test.v', 'vlib/crypto/rand/crypto_rand_read_test.v',
] ]
skip_with_fsanitize_address = [ skip_with_fsanitize_address = [
'vlib/x/websocket/websocket_test.v', 'vlib/net/websocket/websocket_test.v',
] ]
skip_with_fsanitize_undefined = [ skip_with_fsanitize_undefined = [
'do_not_remove', 'do_not_remove',
@ -83,7 +83,7 @@ const (
'vlib/vweb/request_test.v', 'vlib/vweb/request_test.v',
'vlib/net/http/request_test.v', 'vlib/net/http/request_test.v',
'vlib/vweb/route_test.v', 'vlib/vweb/route_test.v',
'vlib/x/websocket/websocket_test.v', 'vlib/net/websocket/websocket_test.v',
'vlib/net/http/http_httpbin_test.v', 'vlib/net/http/http_httpbin_test.v',
'vlib/net/http/header_test.v', 'vlib/net/http/header_test.v',
] ]
@ -98,7 +98,7 @@ const (
'vlib/v/tests/orm_sub_struct_test.v', 'vlib/v/tests/orm_sub_struct_test.v',
'vlib/net/websocket/ws_test.v', 'vlib/net/websocket/ws_test.v',
'vlib/net/unix/unix_test.v', 'vlib/net/unix/unix_test.v',
'vlib/x/websocket/websocket_test.v', 'vlib/net/websocket/websocket_test.v',
'vlib/vweb/tests/vweb_test.v', 'vlib/vweb/tests/vweb_test.v',
'vlib/vweb/request_test.v', 'vlib/vweb/request_test.v',
'vlib/net/http/request_test.v', 'vlib/net/http/request_test.v',

View File

@ -1,7 +1,7 @@
module main module main
import os import os
import x.websocket import net.websocket
import term import term
// This client should be compiled an run in different konsoles // This client should be compiled an run in different konsoles

View File

@ -1,6 +1,6 @@
module main module main
import x.websocket import net.websocket
import term import term
// this server accepts client connections and broadcast all messages to other connected clients // this server accepts client connections and broadcast all messages to other connected clients

View File

@ -2,7 +2,7 @@ module main
import time import time
import os import os
import x.websocket import net.websocket
fn main() { fn main() {
println('press enter to quit...\n') println('press enter to quit...\n')
@ -12,6 +12,8 @@ fn main() {
os.get_line() os.get_line()
} }
// start_server starts the websocket server, it receives messages
// and send it back to the client that sent it
fn start_server() ? { fn start_server() ? {
mut s := websocket.new_server(.ip6, 30000, '') mut s := websocket.new_server(.ip6, 30000, '')
// Make that in execution test time give time to execute at least one time // Make that in execution test time give time to execute at least one time
@ -36,6 +38,8 @@ fn start_server() ? {
} }
} }
// start_client starts the websocket client, it writes a message to
// the server and prints all the messages received
fn start_client() ? { fn start_client() ? {
mut ws := websocket.new_client('ws://localhost:30000') ? mut ws := websocket.new_client('ws://localhost:30000') ?
// mut ws := websocket.new_client('wss://echo.websocket.org:443')? // mut ws := websocket.new_client('wss://echo.websocket.org:443')?

View File

@ -1,9 +1,33 @@
module openssl module openssl
import net import net
import net.openssl as nssl
import time import time
// SSLConn is the current connection
pub struct SSLConn {
mut:
sslctx &C.SSL_CTX
ssl &C.SSL
handle int
duration time.Duration
}
// new_ssl_conn instance an new SSLCon struct
pub fn new_ssl_conn() &SSLConn {
return &SSLConn{
sslctx: 0
ssl: 0
handle: 0
}
}
// Select operation
enum Select {
read
write
except
}
// shutdown closes the ssl connection and do clean up // shutdown closes the ssl connection and do clean up
pub fn (mut s SSLConn) shutdown() ? { pub fn (mut s SSLConn) shutdown() ? {
if s.ssl != 0 { if s.ssl != 0 {
@ -11,7 +35,7 @@ pub fn (mut s SSLConn) shutdown() ? {
for { for {
res = C.SSL_shutdown(voidptr(s.ssl)) res = C.SSL_shutdown(voidptr(s.ssl))
if res < 0 { if res < 0 {
err_res := nssl.ssl_error(res, s.ssl) or { err_res := ssl_error(res, s.ssl) or {
break // We break to free rest of resources break // We break to free rest of resources
} }
if err_res == .ssl_error_want_read { if err_res == .ssl_error_want_read {
@ -99,7 +123,7 @@ pub fn (mut s SSLConn) connect(mut tcp_conn net.TcpConn, hostname string) ? {
for { for {
res = C.SSL_connect(voidptr(s.ssl)) res = C.SSL_connect(voidptr(s.ssl))
if res != 1 { if res != 1 {
err_res := nssl.ssl_error(res, s.ssl) ? err_res := ssl_error(res, s.ssl) ?
if err_res == .ssl_error_want_read { if err_res == .ssl_error_want_read {
for { for {
ready := @select(s.handle, .read, s.duration) ? ready := @select(s.handle, .read, s.duration) ?
@ -128,7 +152,7 @@ pub fn (mut s SSLConn) socket_read_into_ptr(buf_ptr &byte, len int) ?int {
for { for {
res = C.SSL_read(voidptr(s.ssl), buf_ptr, len) res = C.SSL_read(voidptr(s.ssl), buf_ptr, len)
if res < 0 { if res < 0 {
err_res := nssl.ssl_error(res, s.ssl) ? err_res := ssl_error(res, s.ssl) ?
if err_res == .ssl_error_want_read { if err_res == .ssl_error_want_read {
for { for {
ready := @select(s.handle, .read, s.duration) ? ready := @select(s.handle, .read, s.duration) ?
@ -170,7 +194,7 @@ pub fn (mut s SSLConn) write(bytes []byte) ?int {
remaining := bytes.len - total_sent remaining := bytes.len - total_sent
mut sent := C.SSL_write(voidptr(s.ssl), ptr, remaining) mut sent := C.SSL_write(voidptr(s.ssl), ptr, remaining)
if sent <= 0 { if sent <= 0 {
err_res := nssl.ssl_error(sent, s.ssl) ? err_res := ssl_error(sent, s.ssl) ?
if err_res == .ssl_error_want_read { if err_res == .ssl_error_want_read {
for { for {
ready := @select(s.handle, .read, s.duration) ? ready := @select(s.handle, .read, s.duration) ?
@ -202,9 +226,9 @@ This is basically a copy of Emily socket implementation of select.
This have to be consolidated into common net lib features This have to be consolidated into common net lib features
when merging this to V when merging this to V
*/ */
[typedef] // [typedef]
pub struct C.fd_set { // pub struct C.fd_set {
} // }
// Select waits for an io operation (specified by parameter `test`) to be available // Select waits for an io operation (specified by parameter `test`) to be available
fn @select(handle int, test Select, timeout time.Duration) ?bool { fn @select(handle int, test Select, timeout time.Duration) ?bool {

View File

@ -0,0 +1,227 @@
module websocket
// MessageEventHandler represents a callback on a new message
struct MessageEventHandler {
handler SocketMessageFn // callback function
handler2 SocketMessageFn2 // callback function with reference
is_ref bool // true if has a reference object
ref voidptr // referenced object
}
// ErrorEventHandler represents a callback on error
struct ErrorEventHandler {
handler SocketErrorFn // callback function
handler2 SocketErrorFn2 // callback function with reference
is_ref bool // true if has a reference object
ref voidptr // referenced object
}
// OpenEventHandler represents a callback when connection is opened
struct OpenEventHandler {
handler SocketOpenFn // callback function
handler2 SocketOpenFn2 // callback function with reference
is_ref bool // true if has a reference object
ref voidptr // referenced object
}
// CloseEventHandler represents a callback on a closing event
struct CloseEventHandler {
handler SocketCloseFn // callback function
handler2 SocketCloseFn2 // callback function with reference
is_ref bool // true if has a reference object
ref voidptr // referenced object
}
pub type AcceptClientFn = fn (mut c ServerClient) ?bool
pub type SocketMessageFn = fn (mut c Client, msg &Message) ?
pub type SocketMessageFn2 = fn (mut c Client, msg &Message, v voidptr) ?
pub type SocketErrorFn = fn (mut c Client, err string) ?
pub type SocketErrorFn2 = fn (mut c Client, err string, v voidptr) ?
pub type SocketOpenFn = fn (mut c Client) ?
pub type SocketOpenFn2 = fn (mut c Client, v voidptr) ?
pub type SocketCloseFn = fn (mut c Client, code int, reason string) ?
pub type SocketCloseFn2 = fn (mut c Client, code int, reason string, v voidptr) ?
// on_connect registers a callback when client connects to the server
pub fn (mut s Server) on_connect(fun AcceptClientFn) ? {
if s.accept_client_callbacks.len > 0 {
return error('only one callback can be registered for accept client')
}
s.accept_client_callbacks << fun
}
// on_message registers a callback on new messages
pub fn (mut s Server) on_message(fun SocketMessageFn) {
s.message_callbacks << MessageEventHandler{
handler: fun
}
}
// on_message_ref registers a callback on new messages and provides a reference object
pub fn (mut s Server) on_message_ref(fun SocketMessageFn2, ref voidptr) {
s.message_callbacks << MessageEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_close registers a callback on closed socket
pub fn (mut s Server) on_close(fun SocketCloseFn) {
s.close_callbacks << CloseEventHandler{
handler: fun
}
}
// on_close_ref registers a callback on closed socket and provides a reference object
pub fn (mut s Server) on_close_ref(fun SocketCloseFn2, ref voidptr) {
s.close_callbacks << CloseEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_message registers a callback on new messages
pub fn (mut ws Client) on_message(fun SocketMessageFn) {
ws.message_callbacks << MessageEventHandler{
handler: fun
}
}
// on_message_ref registers a callback on new messages and provides a reference object
pub fn (mut ws Client) on_message_ref(fun SocketMessageFn2, ref voidptr) {
ws.message_callbacks << MessageEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_error registers a callback on errors
pub fn (mut ws Client) on_error(fun SocketErrorFn) {
ws.error_callbacks << ErrorEventHandler{
handler: fun
}
}
// on_error_ref registers a callback on errors and provides a reference object
pub fn (mut ws Client) on_error_ref(fun SocketErrorFn2, ref voidptr) {
ws.error_callbacks << ErrorEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_open registers a callback on successful opening the websocket
pub fn (mut ws Client) on_open(fun SocketOpenFn) {
ws.open_callbacks << OpenEventHandler{
handler: fun
}
}
// on_open_ref registers a callback on successful opening the websocket
// and provides a reference object
pub fn (mut ws Client) on_open_ref(fun SocketOpenFn2, ref voidptr) {
ws.open_callbacks << OpenEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_close registers a callback on closed socket
pub fn (mut ws Client) on_close(fun SocketCloseFn) {
ws.close_callbacks << CloseEventHandler{
handler: fun
}
}
// on_close_ref registers a callback on closed socket and provides a reference object
pub fn (mut ws Client) on_close_ref(fun SocketCloseFn2, ref voidptr) {
ws.close_callbacks << CloseEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// send_connect_event invokes the on_connect callback
fn (mut s Server) send_connect_event(mut c ServerClient) ?bool {
if s.accept_client_callbacks.len == 0 {
// If no callback all client will be accepted
return true
}
fun := s.accept_client_callbacks[0]
res := fun(mut c) ?
return res
}
// send_message_event invokes the on_message callback
fn (mut ws Client) send_message_event(msg &Message) {
ws.debug_log('sending on_message event')
for ev_handler in ws.message_callbacks {
if !ev_handler.is_ref {
ev_handler.handler(ws, msg) or { ws.logger.error('send_message_event error: $err') }
} else {
ev_handler.handler2(ws, msg, ev_handler.ref) or {
ws.logger.error('send_message_event error: $err')
}
}
}
}
// send_error_event invokes the on_error callback
fn (mut ws Client) send_error_event(error string) {
ws.debug_log('sending on_error event')
for ev_handler in ws.error_callbacks {
if !ev_handler.is_ref {
ev_handler.handler(mut ws, error) or {
ws.logger.error('send_error_event error: $error, err: $err')
}
} else {
ev_handler.handler2(mut ws, error, ev_handler.ref) or {
ws.logger.error('send_error_event error: $error, err: $err')
}
}
}
}
// send_close_event invokes the on_close callback
fn (mut ws Client) send_close_event(code int, reason string) {
ws.debug_log('sending on_close event')
for ev_handler in ws.close_callbacks {
if !ev_handler.is_ref {
ev_handler.handler(mut ws, code, reason) or {
ws.logger.error('send_close_event error: $err')
}
} else {
ev_handler.handler2(mut ws, code, reason, ev_handler.ref) or {
ws.logger.error('send_close_event error: $err')
}
}
}
}
// send_open_event invokes the on_open callback
fn (mut ws Client) send_open_event() {
ws.debug_log('sending on_open event')
for ev_handler in ws.open_callbacks {
if !ev_handler.is_ref {
ev_handler.handler(mut ws) or { ws.logger.error('send_open_event error: $err') }
} else {
ev_handler.handler2(mut ws, ev_handler.ref) or {
ws.logger.error('send_open_event error: $err')
}
}
}
}

View File

@ -0,0 +1,185 @@
[manualfree]
module websocket
import encoding.base64
import strings
// handshake manages the websocket handshake process
fn (mut ws Client) handshake() ? {
nonce := get_nonce(ws.nonce_size)
seckey := base64.encode_str(nonce)
mut sb := strings.new_builder(1024)
defer {
unsafe { sb.free() }
}
sb.write_string('GET ')
sb.write_string(ws.uri.resource)
sb.write_string(ws.uri.querystring)
sb.write_string(' HTTP/1.1\r\nHost: ')
sb.write_string(ws.uri.hostname)
sb.write_string(':')
sb.write_string(ws.uri.port)
sb.write_string('\r\nUpgrade: websocket\r\nConnection: Upgrade\r\n')
sb.write_string('Sec-WebSocket-Key: ')
sb.write_string(seckey)
sb.write_string('\r\nSec-WebSocket-Version: 13')
for key in ws.header.keys() {
val := ws.header.custom_values(key).join(',')
sb.write_string('\r\n$key:$val')
}
sb.write_string('\r\n\r\n')
handshake := sb.str()
defer {
unsafe { handshake.free() }
}
handshake_bytes := handshake.bytes()
ws.debug_log('sending handshake: $handshake')
ws.socket_write(handshake_bytes) ?
ws.read_handshake(seckey) ?
unsafe { handshake_bytes.free() }
}
// handle_server_handshake manages websocket server handshake process
fn (mut s Server) handle_server_handshake(mut c Client) ?(string, &ServerClient) {
msg := c.read_handshake_str() ?
handshake_response, client := s.parse_client_handshake(msg, mut c) ?
unsafe { msg.free() }
return handshake_response, client
}
// parse_client_handshake parses result from handshake process
fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client) ?(string, &ServerClient) {
s.logger.debug('server-> client handshake:\n$client_handshake')
lines := client_handshake.split_into_lines()
get_tokens := lines[0].split(' ')
if get_tokens.len < 3 {
return error_with_code('unexpected get operation, $get_tokens', 1)
}
if get_tokens[0].trim_space() != 'GET' {
return error_with_code("unexpected request '${get_tokens[0]}', expected 'GET'",
2)
}
if get_tokens[2].trim_space() != 'HTTP/1.1' {
return error_with_code("unexpected request $get_tokens, expected 'HTTP/1.1'",
3)
}
mut seckey := ''
mut flags := []Flag{}
mut key := ''
for i in 1 .. lines.len {
if lines[i].len <= 0 || lines[i] == '\r\n' {
continue
}
keys := lines[i].split(':')
match keys[0] {
'Upgrade', 'upgrade' {
flags << .has_upgrade
}
'Connection', 'connection' {
flags << .has_connection
}
'Sec-WebSocket-Key', 'sec-websocket-key' {
key = keys[1].trim_space()
s.logger.debug('server-> got key: $key')
seckey = create_key_challenge_response(key) ?
s.logger.debug('server-> challenge: $seckey, response: ${keys[1]}')
flags << .has_accept
}
else {
// we ignore other headers like protocol for now
}
}
unsafe { keys.free() }
}
if flags.len < 3 {
return error_with_code('invalid client handshake, $client_handshake', 4)
}
server_handshake := 'HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: $seckey\r\n\r\n'
server_client := &ServerClient{
resource_name: get_tokens[1]
client_key: key
client: unsafe { c }
server: unsafe { s }
}
unsafe {
lines.free()
flags.free()
get_tokens.free()
seckey.free()
key.free()
}
return server_handshake, server_client
}
// read_handshake_str returns the handshake response
fn (mut ws Client) read_handshake_str() ?string {
mut total_bytes_read := 0
mut msg := [1024]byte{}
mut buffer := [1]byte{}
for total_bytes_read < 1024 {
bytes_read := ws.socket_read_ptr(&buffer[0], 1) ?
if bytes_read == 0 {
return error_with_code('unexpected no response from handshake', 5)
}
msg[total_bytes_read] = buffer[0]
total_bytes_read++
if total_bytes_read > 5 && msg[total_bytes_read - 1] == `\n`
&& msg[total_bytes_read - 2] == `\r` && msg[total_bytes_read - 3] == `\n`
&& msg[total_bytes_read - 4] == `\r` {
break
}
}
res := msg[..total_bytes_read].bytestr()
return res
}
// read_handshake reads the handshake result and check if valid
fn (mut ws Client) read_handshake(seckey string) ? {
mut msg := ws.read_handshake_str() ?
ws.check_handshake_response(msg, seckey) ?
unsafe { msg.free() }
}
// check_handshake_response checks the response from handshake and returns
// the response and secure key provided by the websocket client
fn (mut ws Client) check_handshake_response(handshake_response string, seckey string) ? {
ws.debug_log('handshake response:\n$handshake_response')
lines := handshake_response.split_into_lines()
header := lines[0]
if !header.starts_with('HTTP/1.1 101') && !header.starts_with('HTTP/1.0 101') {
return error_with_code('handshake_handler: invalid HTTP status response code, $header',
6)
}
for i in 1 .. lines.len {
if lines[i].len <= 0 || lines[i] == '\r\n' {
continue
}
keys := lines[i].split(':')
match keys[0] {
'Upgrade', 'upgrade' {
ws.flags << .has_upgrade
}
'Connection', 'connection' {
ws.flags << .has_connection
}
'Sec-WebSocket-Accept', 'sec-websocket-accept' {
ws.debug_log('seckey: $seckey')
challenge := create_key_challenge_response(seckey) ?
ws.debug_log('challenge: $challenge, response: ${keys[1]}')
if keys[1].trim_space() != challenge {
return error_with_code('handshake_handler: Sec-WebSocket-Accept header does not match computed sha1/base64 response.',
7)
}
ws.flags << .has_accept
unsafe { challenge.free() }
}
else {}
}
unsafe { keys.free() }
}
unsafe { lines.free() }
if ws.flags.len < 3 {
ws.close(1002, 'invalid websocket HTTP headers') ?
return error_with_code('invalid websocket HTTP headers', 8)
}
}

View File

@ -0,0 +1,100 @@
module websocket
import net
import time
// socket_read reads from socket into the provided buffer
fn (mut ws Client) socket_read(mut buffer []byte) ?int {
lock {
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 {
return error('socket_read: trying to read a closed socket')
}
if ws.is_ssl {
r := ws.ssl_conn.read_into(mut buffer) ?
return r
} else {
for {
r := ws.conn.read(mut buffer) or {
if err.code == net.err_timed_out_code {
continue
}
return err
}
return r
}
}
}
return none
}
// socket_read reads from socket into the provided byte pointer and length
fn (mut ws Client) socket_read_ptr(buf_ptr &byte, len int) ?int {
lock {
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 {
return error('socket_read_ptr: trying to read a closed socket')
}
if ws.is_ssl {
r := ws.ssl_conn.socket_read_into_ptr(buf_ptr, len) ?
return r
} else {
for {
r := ws.conn.read_ptr(buf_ptr, len) or {
if err.code == net.err_timed_out_code {
continue
}
return err
}
return r
}
}
}
return none
}
// socket_write writes the provided byte array to the socket
fn (mut ws Client) socket_write(bytes []byte) ?int {
lock {
if ws.state == .closed || ws.conn.sock.handle <= 1 {
ws.debug_log('socket_write: Socket allready closed')
return error('socket_write: trying to write on a closed socket')
}
if ws.is_ssl {
return ws.ssl_conn.write(bytes)
} else {
for {
n := ws.conn.write(bytes) or {
if err.code == net.err_timed_out_code {
continue
}
return err
}
return n
}
panic('reached unreachable code')
}
}
}
// shutdown_socket shuts down the socket properly when connection is closed
fn (mut ws Client) shutdown_socket() ? {
ws.debug_log('shutting down socket')
if ws.is_ssl {
ws.ssl_conn.shutdown() ?
} else {
ws.conn.close() ?
}
}
// dial_socket connects tcp socket and initializes default configurations
fn (mut ws Client) dial_socket() ?&net.TcpConn {
tcp_address := '$ws.uri.hostname:$ws.uri.port'
mut t := net.dial_tcp(tcp_address) ?
optval := int(1)
t.sock.set_option_int(.keep_alive, optval) ?
t.set_read_timeout(30 * time.second)
t.set_write_timeout(30 * time.second)
if ws.is_ssl {
ws.ssl_conn.connect(mut t, ws.uri.hostname) ?
}
return t
}

View File

@ -0,0 +1,295 @@
module websocket
import encoding.utf8
const (
header_len_offset = 2 // offset for lengthpart of websocket header
buffer_size = 256 // default buffer size
extended_payload16_end_byte = 4 // header length with 16-bit extended payload
extended_payload64_end_byte = 10 // header length with 64-bit extended payload
)
// Fragment represents a websocket data fragment
struct Fragment {
data []byte // included data payload data in a fragment
opcode OPCode // interpretation of the payload data
}
// Frame represents a data frame header
struct Frame {
mut:
// length of the websocket header part
header_len int = 2
// size of total frame
frame_size int = 2
fin bool // true if final fragment of message
rsv1 bool // reserved for future use in websocket RFC
rsv2 bool // reserved for future use in websocket RFC
rsv3 bool // reserved for future use in websocket RFC
opcode OPCode // interpretation of the payload data
has_mask bool // true if the payload data is masked
payload_len int // payload length
masking_key [4]byte // all frames from client to server is masked with this key
}
const (
invalid_close_codes = [999, 1004, 1005, 1006, 1014, 1015, 1016, 1100, 2000, 2999, 5000, 65536]
)
// validate_client validates client frame rules from RFC6455
pub fn (mut ws Client) validate_frame(frame &Frame) ? {
if frame.rsv1 || frame.rsv2 || frame.rsv3 {
ws.close(1002, 'rsv cannot be other than 0, not negotiated') ?
return error('rsv cannot be other than 0, not negotiated')
}
if (int(frame.opcode) >= 3 && int(frame.opcode) <= 7)
|| (int(frame.opcode) >= 11 && int(frame.opcode) <= 15) {
ws.close(1002, 'use of reserved opcode') ?
return error('use of reserved opcode')
}
if frame.has_mask && !ws.is_server {
// server should never send masked frames
// to client, close connection
ws.close(1002, 'client got masked frame') ?
return error('client sent masked frame')
}
if is_control_frame(frame.opcode) {
if !frame.fin {
ws.close(1002, 'control message must not be fragmented') ?
return error('unexpected control frame with no fin')
}
if frame.payload_len > 125 {
ws.close(1002, 'control frames must not exceed 125 bytes') ?
return error('unexpected control frame payload length')
}
}
if frame.fin == false && ws.fragments.len == 0 && frame.opcode == .continuation {
err_msg := 'unexecpected continuation, there are no frames to continue, $frame'
ws.close(1002, err_msg) ?
return error(err_msg)
}
}
// is_control_frame returns true if the frame is a control frame
fn is_control_frame(opcode OPCode) bool {
return opcode !in [.text_frame, .binary_frame, .continuation]
}
// is_data_frame returns true if the frame is a control frame
fn is_data_frame(opcode OPCode) bool {
return opcode in [.text_frame, .binary_frame]
}
// read_payload reads the message payload from the socket
fn (mut ws Client) read_payload(frame &Frame) ?[]byte {
if frame.payload_len == 0 {
return []byte{}
}
mut buffer := []byte{cap: frame.payload_len}
mut read_buf := [1]byte{}
mut bytes_read := 0
for bytes_read < frame.payload_len {
len := ws.socket_read_ptr(&read_buf[0], 1) ?
if len != 1 {
return error('expected read all message, got zero')
}
bytes_read += len
buffer << read_buf[0]
}
if bytes_read != frame.payload_len {
return error('failed to read payload')
}
if frame.has_mask {
for i in 0 .. frame.payload_len {
buffer[i] ^= frame.masking_key[i % 4] & 0xff
}
}
return buffer
}
// validate_utf_8 validates payload for valid utf8 encoding
// - Future implementation needs to support fail fast utf errors for strict autobahn conformance
fn (mut ws Client) validate_utf_8(opcode OPCode, payload []byte) ? {
if opcode in [.text_frame, .close] && !utf8.validate(payload.data, payload.len) {
ws.logger.error('malformed utf8 payload, payload len: ($payload.len)')
ws.send_error_event('Recieved malformed utf8.')
ws.close(1007, 'malformed utf8 payload') ?
return error('malformed utf8 payload')
}
}
// read_next_message reads 1 to n frames to compose a message
pub fn (mut ws Client) read_next_message() ?Message {
for {
frame := ws.parse_frame_header() ?
ws.validate_frame(&frame) ?
frame_payload := ws.read_payload(&frame) ?
if is_control_frame(frame.opcode) {
// Control frames can interject other frames
// and need to be returned immediately
msg := Message{
opcode: OPCode(frame.opcode)
payload: frame_payload.clone()
}
unsafe { frame_payload.free() }
return msg
}
// if the message is fragmented we just put it on fragments
// a fragment is allowed to have zero size payload
if !frame.fin {
ws.fragments << &Fragment{
data: frame_payload.clone()
opcode: frame.opcode
}
unsafe { frame_payload.free() }
continue
}
if ws.fragments.len == 0 {
ws.validate_utf_8(frame.opcode, frame_payload) or {
ws.logger.error('UTF8 validation error: $err, len of payload($frame_payload.len)')
ws.send_error_event('UTF8 validation error: $err, len of payload($frame_payload.len)')
return err
}
msg := Message{
opcode: OPCode(frame.opcode)
payload: frame_payload.clone()
}
unsafe { frame_payload.free() }
return msg
}
defer {
ws.fragments = []
}
if is_data_frame(frame.opcode) {
ws.close(0, '') ?
return error('Unexpected frame opcode')
}
payload := ws.payload_from_fragments(frame_payload) ?
opcode := ws.opcode_from_fragments()
ws.validate_utf_8(opcode, payload) ?
msg := Message{
opcode: opcode
payload: payload.clone()
}
unsafe {
frame_payload.free()
payload.free()
}
return msg
}
return none
}
// payload_from_fragments returs the whole paylaod from fragmented message
fn (ws Client) payload_from_fragments(fin_payload []byte) ?[]byte {
mut total_size := 0
for f in ws.fragments {
if f.data.len > 0 {
total_size += f.data.len
}
}
total_size += fin_payload.len
if total_size == 0 {
return []byte{}
}
mut total_buffer := []byte{cap: total_size}
for f in ws.fragments {
if f.data.len > 0 {
total_buffer << f.data
}
}
total_buffer << fin_payload
return total_buffer
}
// opcode_from_fragments returns the opcode for message from the first fragment sent
fn (ws Client) opcode_from_fragments() OPCode {
return OPCode(ws.fragments[0].opcode)
}
// parse_frame_header parses next message by decoding the incoming frames
pub fn (mut ws Client) parse_frame_header() ?Frame {
mut buffer := [256]byte{}
mut bytes_read := 0
mut frame := Frame{}
mut rbuff := [1]byte{}
mut mask_end_byte := 0
for ws.state == .open {
read_bytes := ws.socket_read_ptr(&rbuff[0], 1) ?
if read_bytes == 0 {
// this is probably a timeout or close
continue
}
buffer[bytes_read] = rbuff[0]
bytes_read++
// parses the first two header bytes to get basic frame information
if bytes_read == u64(websocket.header_len_offset) {
frame.fin = (buffer[0] & 0x80) == 0x80
frame.rsv1 = (buffer[0] & 0x40) == 0x40
frame.rsv2 = (buffer[0] & 0x20) == 0x20
frame.rsv3 = (buffer[0] & 0x10) == 0x10
frame.opcode = OPCode(int(buffer[0] & 0x7F))
frame.has_mask = (buffer[1] & 0x80) == 0x80
frame.payload_len = buffer[1] & 0x7F
// if has mask set the byte postition where mask ends
if frame.has_mask {
mask_end_byte = if frame.payload_len < 126 {
websocket.header_len_offset + 4
} else if frame.payload_len == 126 {
websocket.header_len_offset + 6
} else if frame.payload_len == 127 {
websocket.header_len_offset + 12
} else {
0
} // impossible
}
frame.payload_len = frame.payload_len
frame.frame_size = frame.header_len + frame.payload_len
if !frame.has_mask && frame.payload_len < 126 {
break
}
}
if frame.payload_len == 126 && bytes_read == u64(websocket.extended_payload16_end_byte) {
frame.header_len += 2
frame.payload_len = 0
frame.payload_len |= buffer[2] << 8
frame.payload_len |= buffer[3]
frame.frame_size = frame.header_len + frame.payload_len
if !frame.has_mask {
break
}
}
if frame.payload_len == 127 && bytes_read == u64(websocket.extended_payload64_end_byte) {
frame.header_len += 8
// these shift operators needs 64 bit on clang with -prod flag
mut payload_len := u64(0)
payload_len |= u64(buffer[2]) << 56
payload_len |= u64(buffer[3]) << 48
payload_len |= u64(buffer[4]) << 40
payload_len |= u64(buffer[5]) << 32
payload_len |= u64(buffer[6]) << 24
payload_len |= u64(buffer[7]) << 16
payload_len |= u64(buffer[8]) << 8
payload_len |= u64(buffer[9])
frame.payload_len = int(payload_len)
if !frame.has_mask {
break
}
}
if frame.has_mask && bytes_read == mask_end_byte {
frame.masking_key[0] = buffer[mask_end_byte - 4]
frame.masking_key[1] = buffer[mask_end_byte - 3]
frame.masking_key[2] = buffer[mask_end_byte - 2]
frame.masking_key[3] = buffer[mask_end_byte - 1]
break
}
}
return frame
}
// unmask_sequence unmask any given sequence
fn (f Frame) unmask_sequence(mut buffer []byte) {
for i in 0 .. buffer.len {
buffer[i] ^= f.masking_key[i % 4] & 0xff
}
}

View File

@ -1,7 +1,7 @@
// use this test to test the websocket client in the autobahn test // use this test to test the websocket client in the autobahn test
module main module main
import x.websocket import net.websocket
fn main() { fn main() {
for i in 1 .. 304 { for i in 1 .. 304 {

View File

@ -1,7 +1,7 @@
// use this test to test the websocket client in the autobahn test // use this test to test the websocket client in the autobahn test
module main module main
import x.websocket import net.websocket
fn main() { fn main() {
for i in 1 .. 304 { for i in 1 .. 304 {

View File

@ -1,7 +1,7 @@
// use this to test websocket server to the autobahn test // use this to test websocket server to the autobahn test
module main module main
import x.websocket import net.websocket
fn main() { fn main() {
mut s := websocket.new_server(.ip6, 9002, '/') mut s := websocket.new_server(.ip6, 9002, '/')

View File

@ -17,5 +17,5 @@ services:
client: client:
container_name: autobahn_client container_name: autobahn_client
build: build:
dockerfile: vlib/x/websocket/tests/autobahn/ws_test/Dockerfile dockerfile: vlib/net/websocket/tests/autobahn/ws_test/Dockerfile
context: ../../../../../ context: ../../../../../

View File

@ -1,7 +1,7 @@
// use this test to test the websocket client in the autobahn test // use this test to test the websocket client in the autobahn test
module main module main
import x.websocket import net.websocket
fn main() { fn main() {
for i in 1 .. 304 { for i in 1 .. 304 {

View File

@ -1,7 +1,7 @@
// use this test to test the websocket client in the autobahn test // use this test to test the websocket client in the autobahn test
module main module main
import x.websocket import net.websocket
fn main() { fn main() {
for i in 1 .. 304 { for i in 1 .. 304 {

View File

@ -0,0 +1,12 @@
FROM thevlang/vlang:buster-build
COPY ./ /src/
WORKDIR /src
RUN make CC=clang
RUN /src/v /src/vlib/net/websocket/tests/autobahn/autobahn_server.v
RUN chmod +x /src/vlib/net/websocket/tests/autobahn/autobahn_server
ENTRYPOINT [ "/src/vlib/net/websocket/tests/autobahn/autobahn_server" ]

View File

@ -0,0 +1,16 @@
module websocket
// Uri represents an Uri for websocket connections
struct Uri {
mut:
url string // url to the websocket endpoint
hostname string // hostname of the websocket endpoint
port string // port of the websocket endpoint
resource string // resource of the websocket endpoint
querystring string // query string of the websocket endpoint
}
// str returns the string representation of the Uri
pub fn (u Uri) str() string {
return u.url
}

View File

@ -0,0 +1,54 @@
module websocket
import rand
import crypto.sha1
import encoding.base64
// htonl64 converts payload length to header bits
fn htonl64(payload_len u64) []byte {
mut ret := []byte{len: 8}
ret[0] = byte(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff)
ret[1] = byte(((payload_len & (u64(0xff) << 48)) >> 48) & 0xff)
ret[2] = byte(((payload_len & (u64(0xff) << 40)) >> 40) & 0xff)
ret[3] = byte(((payload_len & (u64(0xff) << 32)) >> 32) & 0xff)
ret[4] = byte(((payload_len & (u64(0xff) << 24)) >> 24) & 0xff)
ret[5] = byte(((payload_len & (u64(0xff) << 16)) >> 16) & 0xff)
ret[6] = byte(((payload_len & (u64(0xff) << 8)) >> 8) & 0xff)
ret[7] = byte(((payload_len & (u64(0xff) << 0)) >> 0) & 0xff)
return ret
}
// create_masking_key returs a new masking key to use when masking websocket messages
fn create_masking_key() []byte {
mask_bit := byte(rand.intn(255))
buf := []byte{len: 4, init: `0`}
unsafe { C.memcpy(buf.data, &mask_bit, 4) }
return buf
}
// create_key_challenge_response creates a key challange response from security key
fn create_key_challenge_response(seckey string) ?string {
if seckey.len == 0 {
return error('unexpected seckey lengt zero')
}
guid := '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
sha1buf := seckey + guid
shabytes := sha1buf.bytes()
hash := sha1.sum(shabytes)
b64 := base64.encode(hash)
unsafe {
hash.free()
shabytes.free()
}
return b64
}
// get_nonce creates a randomized array used in handshake process
fn get_nonce(nonce_size int) string {
mut nonce := []byte{len: nonce_size, cap: nonce_size}
alphanum := '0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz'
for i in 0 .. nonce_size {
nonce[i] = alphanum[rand.intn(alphanum.len)]
}
return unsafe { tos(nonce.data, nonce.len) }.clone()
}

View File

@ -0,0 +1,494 @@
// websocket module implements websocket client and a websocket server
// attribution: @thecoderr the author of original websocket client
[manualfree]
module websocket
import net
import net.http
import net.openssl
import net.urllib
import time
import log
import rand
const (
empty_bytearr = []byte{} // used as empty response to avoid allocation
)
// Client represents websocket client
pub struct Client {
is_server bool
mut:
ssl_conn &openssl.SSLConn // secure connection used when wss is used
flags []Flag // flags used in handshake
fragments []Fragment // current fragments
message_callbacks []MessageEventHandler // all callbacks on_message
error_callbacks []ErrorEventHandler // all callbacks on_error
open_callbacks []OpenEventHandler // all callbacks on_open
close_callbacks []CloseEventHandler // all callbacks on_close
pub:
is_ssl bool // true if secure socket is used
uri Uri // uri of current connection
id string // unique id of client
pub mut:
header http.Header // headers that will be passed when connecting
conn &net.TcpConn // underlying TCP socket connection
nonce_size int = 16 // size of nounce used for masking
panic_on_callback bool // set to true of callbacks can panic
state State // current state of connection
logger &log.Log // logger used to log messages
resource_name string // name of current resource
last_pong_ut u64 // last time in unix time we got a pong message
}
// Flag represents different types of headers in websocket handshake
enum Flag {
has_accept // Webs
has_connection
has_upgrade
}
// State represents the state of the websocket connection.
pub enum State {
connecting = 0
open
closing
closed
}
// Message represents a whole message combined from 1 to n frames
pub struct Message {
pub:
opcode OPCode // websocket frame type of this message
payload []byte // payload of the message
}
// OPCode represents the supported websocket frame types
pub enum OPCode {
continuation = 0x00
text_frame = 0x01
binary_frame = 0x02
close = 0x08
ping = 0x09
pong = 0x0A
}
// new_client instance a new websocket client
pub fn new_client(address string) ?&Client {
uri := parse_uri(address) ?
return &Client{
conn: 0
is_server: false
ssl_conn: openssl.new_ssl_conn()
is_ssl: address.starts_with('wss')
logger: &log.Log{
level: .info
}
uri: uri
state: .closed
id: rand.uuid_v4()
header: http.new_header()
}
}
// connect connects to remote websocket server
pub fn (mut ws Client) connect() ? {
ws.assert_not_connected() ?
ws.set_state(.connecting)
ws.logger.info('connecting to host $ws.uri')
ws.conn = ws.dial_socket() ?
// Todo: make setting configurable
ws.conn.set_read_timeout(time.second * 30)
ws.conn.set_write_timeout(time.second * 30)
ws.handshake() ?
ws.set_state(.open)
ws.logger.info('successfully connected to host $ws.uri')
ws.send_open_event()
}
// listen listens and processes incoming messages
pub fn (mut ws Client) listen() ? {
mut log := 'Starting client listener, server($ws.is_server)...'
ws.logger.info(log)
unsafe { log.free() }
defer {
ws.logger.info('Quit client listener, server($ws.is_server)...')
if ws.state == .open {
ws.close(1000, 'closed by client') or {}
}
}
for ws.state == .open {
msg := ws.read_next_message() or {
if ws.state in [.closed, .closing] {
return
}
ws.debug_log('failed to read next message: $err')
ws.send_error_event('failed to read next message: $err')
return err
}
if ws.state in [.closed, .closing] {
return
}
ws.debug_log('got message: $msg.opcode')
match msg.opcode {
.text_frame {
log = 'read: text'
ws.debug_log(log)
unsafe { log.free() }
ws.send_message_event(msg)
unsafe { msg.free() }
}
.binary_frame {
ws.debug_log('read: binary')
ws.send_message_event(msg)
unsafe { msg.free() }
}
.ping {
ws.debug_log('read: ping, sending pong')
ws.send_control_frame(.pong, 'PONG', msg.payload) or {
ws.logger.error('error in message callback sending PONG: $err')
ws.send_error_event('error in message callback sending PONG: $err')
if ws.panic_on_callback {
panic(err)
}
continue
}
if msg.payload.len > 0 {
unsafe { msg.free() }
}
}
.pong {
ws.debug_log('read: pong')
ws.last_pong_ut = time.now().unix
ws.send_message_event(msg)
if msg.payload.len > 0 {
unsafe { msg.free() }
}
}
.close {
log = 'read: close'
ws.debug_log(log)
unsafe { log.free() }
defer {
ws.manage_clean_close()
}
if msg.payload.len > 0 {
if msg.payload.len == 1 {
ws.close(1002, 'close payload cannot be 1 byte') ?
return error('close payload cannot be 1 byte')
}
code := (int(msg.payload[0]) << 8) + int(msg.payload[1])
if code in invalid_close_codes {
ws.close(1002, 'invalid close code: $code') ?
return error('invalid close code: $code')
}
reason := if msg.payload.len > 2 { msg.payload[2..] } else { []byte{} }
if reason.len > 0 {
ws.validate_utf_8(.close, reason) ?
}
if ws.state !in [.closing, .closed] {
// sending close back according to spec
ws.debug_log('close with reason, code: $code, reason: $reason')
r := reason.bytestr()
ws.close(code, r) ?
}
unsafe { msg.free() }
} else {
if ws.state !in [.closing, .closed] {
ws.debug_log('close with reason, no code')
// sending close back according to spec
ws.close(1000, 'normal') ?
}
unsafe { msg.free() }
}
return
}
.continuation {
ws.logger.error('unexpected opcode continuation, nothing to continue')
ws.send_error_event('unexpected opcode continuation, nothing to continue')
ws.close(1002, 'nothing to continue') ?
return error('unexpected opcode continuation, nothing to continue')
}
}
}
}
// manage_clean_close closes connection in a clean websocket way
fn (mut ws Client) manage_clean_close() {
ws.send_close_event(1000, 'closed by client')
}
// ping sends ping message to server
pub fn (mut ws Client) ping() ? {
ws.send_control_frame(.ping, 'PING', []) ?
}
// pong sends pong message to server,
pub fn (mut ws Client) pong() ? {
ws.send_control_frame(.pong, 'PONG', []) ?
}
// write_ptr writes len bytes provided a byteptr with a websocket messagetype
pub fn (mut ws Client) write_ptr(bytes &byte, payload_len int, code OPCode) ?int {
// ws.debug_log('write_ptr code: $code')
if ws.state != .open || ws.conn.sock.handle < 1 {
// todo: send error here later
return error('trying to write on a closed socket!')
}
mut header_len := 2 + if payload_len > 125 { 2 } else { 0 } +
if payload_len > 0xffff { 6 } else { 0 }
if !ws.is_server {
header_len += 4
}
mut header := []byte{len: header_len, init: `0`} // [`0`].repeat(header_len)
header[0] = byte(int(code)) | 0x80
masking_key := create_masking_key()
if ws.is_server {
if payload_len <= 125 {
header[1] = byte(payload_len)
} else if payload_len > 125 && payload_len <= 0xffff {
len16 := C.htons(payload_len)
header[1] = 126
unsafe { C.memcpy(&header[2], &len16, 2) }
} else if payload_len > 0xffff && payload_len <= 0x7fffffff {
len_bytes := htonl64(u64(payload_len))
header[1] = 127
unsafe { C.memcpy(&header[2], len_bytes.data, 8) }
}
} else {
if payload_len <= 125 {
header[1] = byte(payload_len | 0x80)
header[2] = masking_key[0]
header[3] = masking_key[1]
header[4] = masking_key[2]
header[5] = masking_key[3]
} else if payload_len > 125 && payload_len <= 0xffff {
len16 := C.htons(payload_len)
header[1] = (126 | 0x80)
unsafe { C.memcpy(&header[2], &len16, 2) }
header[4] = masking_key[0]
header[5] = masking_key[1]
header[6] = masking_key[2]
header[7] = masking_key[3]
} else if payload_len > 0xffff && payload_len <= 0x7fffffff {
len64 := htonl64(u64(payload_len))
header[1] = (127 | 0x80)
unsafe { C.memcpy(&header[2], len64.data, 8) }
header[10] = masking_key[0]
header[11] = masking_key[1]
header[12] = masking_key[2]
header[13] = masking_key[3]
} else {
ws.close(1009, 'frame too large') ?
return error('frame too large')
}
}
len := header.len + payload_len
mut frame_buf := []byte{len: len}
unsafe {
C.memcpy(&frame_buf[0], &byte(header.data), header.len)
if payload_len > 0 {
C.memcpy(&frame_buf[header.len], bytes, payload_len)
}
}
if !ws.is_server {
for i in 0 .. payload_len {
frame_buf[header_len + i] ^= masking_key[i % 4] & 0xff
}
}
written_len := ws.socket_write(frame_buf) ?
unsafe {
frame_buf.free()
masking_key.free()
header.free()
}
return written_len
}
// write writes a byte array with a websocket messagetype to socket
pub fn (mut ws Client) write(bytes []byte, code OPCode) ?int {
return ws.write_ptr(&byte(bytes.data), bytes.len, code)
}
// write_string, writes a string with a websocket texttype to socket
[deprecated: 'use Client.write_string() instead']
pub fn (mut ws Client) write_str(str string) ?int {
return ws.write_string(str)
}
// write_str, writes a string with a websocket texttype to socket
pub fn (mut ws Client) write_string(str string) ?int {
return ws.write_ptr(str.str, str.len, .text_frame)
}
// close closes the websocket connection
pub fn (mut ws Client) close(code int, message string) ? {
ws.debug_log('sending close, $code, $message')
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 {
ws.debug_log('close: Websocket allready closed ($ws.state), $message, $code handle($ws.conn.sock.handle)')
err_msg := 'Socket allready closed: $code'
return error(err_msg)
}
defer {
ws.shutdown_socket() or {}
ws.reset_state()
}
ws.set_state(.closing)
// mut code32 := 0
if code > 0 {
code_ := C.htons(code)
message_len := message.len + 2
mut close_frame := []byte{len: message_len}
close_frame[0] = byte(code_ & 0xFF)
close_frame[1] = byte(code_ >> 8)
// code32 = (close_frame[0] << 8) + close_frame[1]
for i in 0 .. message.len {
close_frame[i + 2] = message[i]
}
ws.send_control_frame(.close, 'CLOSE', close_frame) ?
unsafe { close_frame.free() }
} else {
ws.send_control_frame(.close, 'CLOSE', []) ?
}
ws.fragments = []
}
// send_control_frame sends a control frame to the server
fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []byte) ? {
ws.debug_log('send control frame $code, frame_type: $frame_typ')
if ws.state !in [.open, .closing] && ws.conn.sock.handle > 1 {
return error('socket is not connected')
}
header_len := if ws.is_server { 2 } else { 6 }
frame_len := header_len + payload.len
mut control_frame := []byte{len: frame_len}
mut masking_key := if !ws.is_server { create_masking_key() } else { websocket.empty_bytearr }
defer {
unsafe {
control_frame.free()
if masking_key.len > 0 {
masking_key.free()
}
}
}
control_frame[0] = byte(int(code) | 0x80)
if !ws.is_server {
control_frame[1] = byte(payload.len | 0x80)
control_frame[2] = masking_key[0]
control_frame[3] = masking_key[1]
control_frame[4] = masking_key[2]
control_frame[5] = masking_key[3]
} else {
control_frame[1] = byte(payload.len)
}
if code == .close {
if payload.len >= 2 {
if !ws.is_server {
mut parsed_payload := []byte{len: payload.len + 1}
unsafe { C.memcpy(parsed_payload.data, &payload[0], payload.len) }
parsed_payload[payload.len] = `\0`
for i in 0 .. payload.len {
control_frame[6 + i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff
}
unsafe { parsed_payload.free() }
} else {
unsafe { C.memcpy(&control_frame[2], &payload[0], payload.len) }
}
}
} else {
if !ws.is_server {
if payload.len > 0 {
for i in 0 .. payload.len {
control_frame[header_len + i] = (payload[i] ^ masking_key[i % 4]) & 0xff
}
}
} else {
if payload.len > 0 {
unsafe { C.memcpy(&control_frame[2], &payload[0], payload.len) }
}
}
}
ws.socket_write(control_frame) or {
return error('send_control_frame: error sending $frame_typ control frame.')
}
}
// parse_uri parses the url to a Uri
fn parse_uri(url string) ?&Uri {
u := urllib.parse(url) ?
request_uri := u.request_uri()
v := request_uri.split('?')
mut port := u.port()
uri := u.str()
if port == '' {
port = if uri.starts_with('ws://') {
'80'
} else if uri.starts_with('wss://') {
'443'
} else {
u.port()
}
}
querystring := if v.len > 1 { '?' + v[1] } else { '' }
return &Uri{
url: url
hostname: u.hostname()
port: port
resource: v[0]
querystring: querystring
}
}
// set_state sets current state of the websocket connection
fn (mut ws Client) set_state(state State) {
lock {
ws.state = state
}
}
// assert_not_connected returns error if the connection is not connected
fn (ws Client) assert_not_connected() ? {
match ws.state {
.connecting { return error('connect: websocket is connecting') }
.open { return error('connect: websocket already open') }
.closing { return error('connect: reconnect on closing websocket not supported, please use new client') }
else {}
}
}
// reset_state resets the websocket and initialize default settings
fn (mut ws Client) reset_state() {
lock {
ws.state = .closed
ws.ssl_conn = openssl.new_ssl_conn()
ws.flags = []
ws.fragments = []
}
}
// debug_log handles debug logging output for client and server
fn (mut ws Client) debug_log(text string) {
if ws.is_server {
ws.logger.debug('server-> $text')
} else {
ws.logger.debug('client-> $text')
}
}
// free handles manual free memory of Message struct
pub fn (m &Message) free() {
unsafe { m.payload.free() }
}
// free handles manual free memory of Client struct
pub fn (c &Client) free() {
unsafe {
c.flags.free()
c.fragments.free()
c.message_callbacks.free()
c.error_callbacks.free()
c.open_callbacks.free()
c.close_callbacks.free()
c.header.free()
}
}

View File

@ -0,0 +1,10 @@
module websocket
// error_code returns the error code
fn error_code() int {
return C.errno
}
const (
error_ewouldblock = C.EWOULDBLOCK // blocking error code
)

View File

@ -0,0 +1,189 @@
module websocket
import net
import net.openssl
import log
import time
import rand
// Server represents a websocket server connection
pub struct Server {
mut:
logger &log.Log // logger used to log
ls &net.TcpListener // listener used to get incoming connection to socket
accept_client_callbacks []AcceptClientFn // accept client callback functions
message_callbacks []MessageEventHandler // new message callback functions
close_callbacks []CloseEventHandler // close message callback functions
pub:
family net.AddrFamily = .ip
port int // port used as listen to incoming connections
is_ssl bool // true if secure connection (not supported yet on server)
pub mut:
clients map[string]&ServerClient // clients connected to this server
ping_interval int = 30 // interval for sending ping to clients (seconds)
state State // current state of connection
}
// ServerClient represents a connected client
struct ServerClient {
pub:
resource_name string // resource that the client access
client_key string // unique key of client
pub mut:
server &Server
client &Client
}
// new_server instance a new websocket server on provided port and route
pub fn new_server(family net.AddrFamily, port int, route string) &Server {
return &Server{
ls: 0
family: family
port: port
logger: &log.Log{
level: .info
}
state: .closed
}
}
// set_ping_interval sets the interval that the server will send ping messages to clients
pub fn (mut s Server) set_ping_interval(seconds int) {
s.ping_interval = seconds
}
// listen start listen and process to incoming connections from websocket clients
pub fn (mut s Server) listen() ? {
s.logger.info('websocket server: start listen on port $s.port')
s.ls = net.listen_tcp(s.family, ':$s.port') ?
s.set_state(.open)
go s.handle_ping()
for {
mut c := s.accept_new_client() or { continue }
go s.serve_client(mut c)
}
s.logger.info('websocket server: end listen on port $s.port')
}
// Close closes server (not implemented yet)
fn (mut s Server) close() {
// TODO: implement close when moving to net from x.net
}
// handle_ping sends ping to all clients every set interval
fn (mut s Server) handle_ping() {
mut clients_to_remove := []string{}
for s.state == .open {
time.sleep(s.ping_interval * time.second)
for i, _ in s.clients {
mut c := s.clients[i]
if c.client.state == .open {
c.client.ping() or {
s.logger.debug('server-> error sending ping to client')
c.client.close(1002, 'Closing connection: ping send error') or {
// we want to continue even if error
continue
}
clients_to_remove << c.client.id
}
if (time.now().unix - c.client.last_pong_ut) > s.ping_interval * 2 {
clients_to_remove << c.client.id
c.client.close(1000, 'no pong received') or { continue }
}
}
}
// TODO: replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges
for client in clients_to_remove {
lock {
s.clients.delete(client)
}
}
clients_to_remove.clear()
}
}
// serve_client accepts incoming connection and sets up the callbacks
fn (mut s Server) serve_client(mut c Client) ? {
c.logger.debug('server-> Start serve client ($c.id)')
defer {
c.logger.debug('server-> End serve client ($c.id)')
}
mut handshake_response, mut server_client := s.handle_server_handshake(mut c) ?
accept := s.send_connect_event(mut server_client) ?
if !accept {
s.logger.debug('server-> client not accepted')
c.shutdown_socket() ?
return
}
// the client is accepted
c.socket_write(handshake_response.bytes()) ?
lock {
s.clients[server_client.client.id] = server_client
}
s.setup_callbacks(mut server_client)
c.listen() or {
s.logger.error(err.msg)
return err
}
}
// setup_callbacks initialize all callback functions
fn (mut s Server) setup_callbacks(mut sc ServerClient) {
if s.message_callbacks.len > 0 {
for cb in s.message_callbacks {
if cb.is_ref {
sc.client.on_message_ref(cb.handler2, cb.ref)
} else {
sc.client.on_message(cb.handler)
}
}
}
if s.close_callbacks.len > 0 {
for cb in s.close_callbacks {
if cb.is_ref {
sc.client.on_close_ref(cb.handler2, cb.ref)
} else {
sc.client.on_close(cb.handler)
}
}
}
// set standard close so we can remove client if closed
sc.client.on_close_ref(fn (mut c Client, code int, reason string, mut sc ServerClient) ? {
c.logger.debug('server-> Delete client')
lock {
sc.server.clients.delete(sc.client.id)
}
}, sc)
}
// accept_new_client creates a new client instance for client that connects to the socket
fn (mut s Server) accept_new_client() ?&Client {
mut new_conn := s.ls.accept() ?
c := &Client{
is_server: true
conn: new_conn
ssl_conn: openssl.new_ssl_conn()
logger: s.logger
state: .open
last_pong_ut: time.now().unix
id: rand.uuid_v4()
}
return c
}
// set_state sets current state in a thread safe way
fn (mut s Server) set_state(state State) {
lock {
s.state = state
}
}
// free manages manual free of memory for Server instance
pub fn (mut s Server) free() {
unsafe {
s.clients.free()
s.accept_client_callbacks.free()
s.message_callbacks.free()
s.close_callbacks.free()
}
}

View File

@ -1,6 +1,6 @@
import os import os
import net import net
import x.websocket import net.websocket
import time import time
import rand import rand

View File

@ -0,0 +1,12 @@
module websocket
import net
// error_code returns the error code
fn error_code() int {
return C.WSAGetLastError()
}
const (
error_ewouldblock = net.WsaError.wsaewouldblock // blocking error code
)

View File

@ -1,25 +0,0 @@
module openssl
import time
enum Select {
read
write
except
}
pub struct SSLConn {
mut:
sslctx &C.SSL_CTX
ssl &C.SSL
handle int
duration time.Duration
}
pub fn new_ssl_conn() &SSLConn {
return &SSLConn{
sslctx: 0
ssl: 0
handle: 0
}
}

View File

@ -1,12 +0,0 @@
FROM thevlang/vlang:buster-build
COPY ./ /src/
WORKDIR /src
RUN make CC=clang
RUN /src/v /src/vlib/x/websocket/tests/autobahn/autobahn_server.v
RUN chmod +x /src/vlib/x/websocket/tests/autobahn/autobahn_server
ENTRYPOINT [ "/src/vlib/x/websocket/tests/autobahn/autobahn_server" ]

View File

@ -1,11 +1,13 @@
// websocket module implements websocket client and a websocket server // websocket module implements websocket client and a websocket server
// attribution: @thecoderr the author of original websocket client // attribution: @thecoderr the author of original websocket client
// advice that the implementation is deprecated and moved to the net.websocket module!
// it will be removed in later versions
[manualfree] [manualfree]
module websocket module websocket
import net import net
import net.http import net.http
import x.openssl import net.openssl
import net.urllib import net.urllib
import time import time
import log import log
@ -74,6 +76,7 @@ pub enum OPCode {
} }
// new_client instance a new websocket client // new_client instance a new websocket client
[deprecated: 'use net.websocket module instead']
pub fn new_client(address string) ?&Client { pub fn new_client(address string) ?&Client {
uri := parse_uri(address) ? uri := parse_uri(address) ?
return &Client{ return &Client{

View File

@ -1,7 +1,7 @@
module websocket module websocket
import net import net
import x.openssl import net.openssl
import log import log
import time import time
import rand import rand
@ -35,6 +35,7 @@ pub mut:
} }
// new_server instance a new websocket server on provided port and route // new_server instance a new websocket server on provided port and route
[deprecated: 'use net.websocket module instead']
pub fn new_server(family net.AddrFamily, port int, route string) &Server { pub fn new_server(family net.AddrFamily, port int, route string) &Server {
return &Server{ return &Server{
ls: 0 ls: 0