x.websocket: docs and cleanup (#7078)
parent
953a51bec8
commit
7c394b9d58
|
@ -1,27 +1,30 @@
|
||||||
module websocket
|
module websocket
|
||||||
|
|
||||||
// All this plumbing will go awauy when we can do EventHandler<T> properly
|
// Represents a callback on a new message
|
||||||
struct MessageEventHandler {
|
struct MessageEventHandler {
|
||||||
handler SocketMessageFn
|
handler SocketMessageFn // Callback function
|
||||||
handler2 SocketMessageFn2
|
handler2 SocketMessageFn2 // Callback function with reference
|
||||||
is_ref bool
|
is_ref bool // Has a reference object
|
||||||
ref voidptr
|
ref voidptr // The referenced object
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Represents a callback on error
|
||||||
struct ErrorEventHandler {
|
struct ErrorEventHandler {
|
||||||
handler SocketErrorFn
|
handler SocketErrorFn // Callback function
|
||||||
handler2 SocketErrorFn2
|
handler2 SocketErrorFn2 // Callback function with reference
|
||||||
is_ref bool
|
is_ref bool // Has a reference object
|
||||||
ref voidptr
|
ref voidptr // The referenced object
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Represents a callback when connection is opened
|
||||||
struct OpenEventHandler {
|
struct OpenEventHandler {
|
||||||
handler SocketOpenFn
|
handler SocketOpenFn // Callback function
|
||||||
handler2 SocketOpenFn2
|
handler2 SocketOpenFn2 // Callback function with reference
|
||||||
is_ref bool
|
is_ref bool // Has a reference object
|
||||||
ref voidptr
|
ref voidptr // The referenced object
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Represents a callback on a closing event
|
||||||
struct CloseEventHandler {
|
struct CloseEventHandler {
|
||||||
handler SocketCloseFn
|
handler SocketCloseFn
|
||||||
handler2 SocketCloseFn2
|
handler2 SocketCloseFn2
|
||||||
|
@ -47,6 +50,7 @@ pub type SocketCloseFn = fn (mut c Client, code int, reason string) ?
|
||||||
|
|
||||||
pub type SocketCloseFn2 = fn (mut c Client, code int, reason string, v voidptr) ?
|
pub type SocketCloseFn2 = fn (mut c Client, code int, reason string, v voidptr) ?
|
||||||
|
|
||||||
|
// on_connect register callback when client connects to the server
|
||||||
pub fn (mut s Server) on_connect(fun AcceptClientFn) ? {
|
pub fn (mut s Server) on_connect(fun AcceptClientFn) ? {
|
||||||
if s.accept_client_callbacks.len > 0 {
|
if s.accept_client_callbacks.len > 0 {
|
||||||
return error('only one callback can be registered for accept client')
|
return error('only one callback can be registered for accept client')
|
||||||
|
@ -54,16 +58,6 @@ pub fn (mut s Server) on_connect(fun AcceptClientFn) ? {
|
||||||
s.accept_client_callbacks << fun
|
s.accept_client_callbacks << fun
|
||||||
}
|
}
|
||||||
|
|
||||||
fn (mut s Server) send_connect_event(mut c ServerClient) ?bool {
|
|
||||||
if s.accept_client_callbacks.len == 0 {
|
|
||||||
// If no callback all client will be accepted
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
fun := s.accept_client_callbacks[0]
|
|
||||||
res := fun(mut c)?
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
// on_message, register a callback on new messages
|
// on_message, register a callback on new messages
|
||||||
pub fn (mut s Server) on_message(fun SocketMessageFn) {
|
pub fn (mut s Server) on_message(fun SocketMessageFn) {
|
||||||
s.message_callbacks << MessageEventHandler{
|
s.message_callbacks << MessageEventHandler{
|
||||||
|
@ -160,6 +154,18 @@ pub fn (mut ws Client) on_close_ref(fun SocketCloseFn2, ref voidptr) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// send_connect_event, invokes the on_connect callback
|
||||||
|
fn (mut s Server) send_connect_event(mut c ServerClient) ?bool {
|
||||||
|
if s.accept_client_callbacks.len == 0 {
|
||||||
|
// If no callback all client will be accepted
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
fun := s.accept_client_callbacks[0]
|
||||||
|
res := fun(mut c) ?
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// send_message_event invokes the on_message callback
|
||||||
fn (mut ws Client) send_message_event(msg &Message) {
|
fn (mut ws Client) send_message_event(msg &Message) {
|
||||||
ws.debug_log('sending on_message event')
|
ws.debug_log('sending on_message event')
|
||||||
for ev_handler in ws.message_callbacks {
|
for ev_handler in ws.message_callbacks {
|
||||||
|
@ -171,6 +177,7 @@ fn (mut ws Client) send_message_event(msg &Message) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// send_error_event invokes the on_error callback
|
||||||
fn (mut ws Client) send_error_event(err string) {
|
fn (mut ws Client) send_error_event(err string) {
|
||||||
ws.debug_log('sending on_error event')
|
ws.debug_log('sending on_error event')
|
||||||
for ev_handler in ws.error_callbacks {
|
for ev_handler in ws.error_callbacks {
|
||||||
|
@ -182,6 +189,7 @@ fn (mut ws Client) send_error_event(err string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// send_close_event invokes the on_close callback
|
||||||
fn (mut ws Client) send_close_event(code int, reason string) {
|
fn (mut ws Client) send_close_event(code int, reason string) {
|
||||||
ws.debug_log('sending on_close event')
|
ws.debug_log('sending on_close event')
|
||||||
for ev_handler in ws.close_callbacks {
|
for ev_handler in ws.close_callbacks {
|
||||||
|
@ -193,6 +201,7 @@ fn (mut ws Client) send_close_event(code int, reason string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// send_open_event invokes the on_open callback
|
||||||
fn (mut ws Client) send_open_event() {
|
fn (mut ws Client) send_open_event() {
|
||||||
ws.debug_log('sending on_open event')
|
ws.debug_log('sending on_open event')
|
||||||
for ev_handler in ws.open_callbacks {
|
for ev_handler in ws.open_callbacks {
|
||||||
|
|
|
@ -3,7 +3,7 @@ module websocket
|
||||||
import encoding.base64
|
import encoding.base64
|
||||||
import strings
|
import strings
|
||||||
|
|
||||||
// handshake manage the handshake part of connecting
|
// handshake manage the websocket handshake process
|
||||||
fn (mut ws Client) handshake() ? {
|
fn (mut ws Client) handshake() ? {
|
||||||
nonce := get_nonce(ws.nonce_size)
|
nonce := get_nonce(ws.nonce_size)
|
||||||
seckey := base64.encode(nonce)
|
seckey := base64.encode(nonce)
|
||||||
|
@ -24,7 +24,6 @@ fn (mut ws Client) handshake() ? {
|
||||||
sb.write('Sec-WebSocket-Key: ')
|
sb.write('Sec-WebSocket-Key: ')
|
||||||
sb.write(seckey)
|
sb.write(seckey)
|
||||||
sb.write('\r\nSec-WebSocket-Version: 13\r\n\r\n')
|
sb.write('\r\nSec-WebSocket-Version: 13\r\n\r\n')
|
||||||
// handshake := 'GET $ws.uri.resource$ws.uri.querystring HTTP/1.1\r\nHost: $ws.uri.hostname:$ws.uri.port\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: $seckey\r\nSec-WebSocket-Version: 13\r\n\r\n'
|
|
||||||
handshake := sb.str()
|
handshake := sb.str()
|
||||||
defer {
|
defer {
|
||||||
handshake.free()
|
handshake.free()
|
||||||
|
@ -36,7 +35,7 @@ fn (mut ws Client) handshake() ? {
|
||||||
unsafe {handshake_bytes.free()}
|
unsafe {handshake_bytes.free()}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handshake manage the handshake part of connecting
|
// handle_server_handshake manage websocket server handshake
|
||||||
fn (mut s Server) handle_server_handshake(mut c Client) ?(string, &ServerClient) {
|
fn (mut s Server) handle_server_handshake(mut c Client) ?(string, &ServerClient) {
|
||||||
msg := c.read_handshake_str() ?
|
msg := c.read_handshake_str() ?
|
||||||
handshake_response, client := s.parse_client_handshake(msg, mut c) ?
|
handshake_response, client := s.parse_client_handshake(msg, mut c) ?
|
||||||
|
@ -44,6 +43,7 @@ fn (mut s Server) handle_server_handshake(mut c Client) ?(string, &ServerClient)
|
||||||
return handshake_response, client
|
return handshake_response, client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// parse_client_handshake parses handshake result
|
||||||
fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client) ?(string, &ServerClient) {
|
fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client) ?(string, &ServerClient) {
|
||||||
s.logger.debug('server-> client handshake:\n$client_handshake')
|
s.logger.debug('server-> client handshake:\n$client_handshake')
|
||||||
lines := client_handshake.split_into_lines()
|
lines := client_handshake.split_into_lines()
|
||||||
|
@ -52,12 +52,13 @@ fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client)
|
||||||
return error_with_code('unexpected get operation, $get_tokens', 1)
|
return error_with_code('unexpected get operation, $get_tokens', 1)
|
||||||
}
|
}
|
||||||
if get_tokens[0].trim_space() != 'GET' {
|
if get_tokens[0].trim_space() != 'GET' {
|
||||||
return error_with_code("unexpected request '${get_tokens[0]}', expected 'GET'", 2)
|
return error_with_code("unexpected request '${get_tokens[0]}', expected 'GET'",
|
||||||
|
2)
|
||||||
}
|
}
|
||||||
if get_tokens[2].trim_space() != 'HTTP/1.1' {
|
if get_tokens[2].trim_space() != 'HTTP/1.1' {
|
||||||
return error_with_code("unexpected request $get_tokens, expected 'HTTP/1.1'", 3)
|
return error_with_code("unexpected request $get_tokens, expected 'HTTP/1.1'",
|
||||||
|
3)
|
||||||
}
|
}
|
||||||
// path := get_tokens[1].trim_space()
|
|
||||||
mut seckey := ''
|
mut seckey := ''
|
||||||
mut flags := []Flag{}
|
mut flags := []Flag{}
|
||||||
mut key := ''
|
mut key := ''
|
||||||
|
@ -106,6 +107,7 @@ fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client)
|
||||||
return server_handshake, server_client
|
return server_handshake, server_client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// / read_handshake_str returns the handshake response
|
||||||
fn (mut ws Client) read_handshake_str() ?string {
|
fn (mut ws Client) read_handshake_str() ?string {
|
||||||
mut total_bytes_read := 0
|
mut total_bytes_read := 0
|
||||||
mut msg := [1024]byte{}
|
mut msg := [1024]byte{}
|
||||||
|
@ -133,12 +135,15 @@ fn (mut ws Client) read_handshake(seckey string) ? {
|
||||||
unsafe {msg.free()}
|
unsafe {msg.free()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check_handshake_response checks the response from handshake and returns
|
||||||
|
// the response and secure key
|
||||||
fn (mut ws Client) check_handshake_response(handshake_response string, seckey string) ? {
|
fn (mut ws Client) check_handshake_response(handshake_response string, seckey string) ? {
|
||||||
ws.debug_log('handshake response:\n$handshake_response')
|
ws.debug_log('handshake response:\n$handshake_response')
|
||||||
lines := handshake_response.split_into_lines()
|
lines := handshake_response.split_into_lines()
|
||||||
header := lines[0]
|
header := lines[0]
|
||||||
if !header.starts_with('HTTP/1.1 101') && !header.starts_with('HTTP/1.0 101') {
|
if !header.starts_with('HTTP/1.1 101') && !header.starts_with('HTTP/1.0 101') {
|
||||||
return error_with_code('handshake_handler: invalid HTTP status response code, $header', 6)
|
return error_with_code('handshake_handler: invalid HTTP status response code, $header',
|
||||||
|
6)
|
||||||
}
|
}
|
||||||
for i in 1 .. lines.len {
|
for i in 1 .. lines.len {
|
||||||
if lines[i].len <= 0 || lines[i] == '\r\n' {
|
if lines[i].len <= 0 || lines[i] == '\r\n' {
|
||||||
|
@ -157,7 +162,8 @@ fn (mut ws Client) check_handshake_response(handshake_response string, seckey st
|
||||||
challenge := create_key_challenge_response(seckey) ?
|
challenge := create_key_challenge_response(seckey) ?
|
||||||
ws.debug_log('challenge: $challenge, response: ${keys[1]}')
|
ws.debug_log('challenge: $challenge, response: ${keys[1]}')
|
||||||
if keys[1].trim_space() != challenge {
|
if keys[1].trim_space() != challenge {
|
||||||
return error_with_code('handshake_handler: Sec-WebSocket-Accept header does not match computed sha1/base64 response.', 7)
|
return error_with_code('handshake_handler: Sec-WebSocket-Accept header does not match computed sha1/base64 response.',
|
||||||
|
7)
|
||||||
}
|
}
|
||||||
ws.flags << .has_accept
|
ws.flags << .has_accept
|
||||||
unsafe {challenge.free()}
|
unsafe {challenge.free()}
|
||||||
|
|
|
@ -2,11 +2,7 @@ module websocket
|
||||||
|
|
||||||
import net
|
import net
|
||||||
import time
|
import time
|
||||||
|
import sync
|
||||||
interface WebsocketIO {
|
|
||||||
socket_read(mut buffer []byte) ?int
|
|
||||||
socket_write(bytes []byte) ?
|
|
||||||
}
|
|
||||||
|
|
||||||
// socket_read reads into the provided buffer with its length
|
// socket_read reads into the provided buffer with its length
|
||||||
fn (mut ws Client) socket_read(mut buffer []byte) ?int {
|
fn (mut ws Client) socket_read(mut buffer []byte) ?int {
|
||||||
|
@ -15,7 +11,7 @@ fn (mut ws Client) socket_read(mut buffer []byte) ?int {
|
||||||
return error('socket_read: trying to read a closed socket')
|
return error('socket_read: trying to read a closed socket')
|
||||||
}
|
}
|
||||||
if ws.is_ssl {
|
if ws.is_ssl {
|
||||||
r := ws.ssl_conn.read_into(mut buffer)?
|
r := ws.ssl_conn.read_into(mut buffer) ?
|
||||||
return r
|
return r
|
||||||
} else {
|
} else {
|
||||||
for {
|
for {
|
||||||
|
@ -31,14 +27,14 @@ fn (mut ws Client) socket_read(mut buffer []byte) ?int {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// socket_read reads into the provided byte pointer and length
|
||||||
fn (mut ws Client) socket_read_ptr(buf_ptr byteptr, len int) ?int {
|
fn (mut ws Client) socket_read_ptr(buf_ptr byteptr, len int) ?int {
|
||||||
lock {
|
lock {
|
||||||
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 {
|
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 {
|
||||||
return error('socket_read_ptr: trying to read a closed socket')
|
return error('socket_read_ptr: trying to read a closed socket')
|
||||||
}
|
}
|
||||||
|
|
||||||
if ws.is_ssl {
|
if ws.is_ssl {
|
||||||
r := ws.ssl_conn.socket_read_into_ptr(buf_ptr, len)?
|
r := ws.ssl_conn.socket_read_into_ptr(buf_ptr, len) ?
|
||||||
return r
|
return r
|
||||||
} else {
|
} else {
|
||||||
for {
|
for {
|
||||||
|
@ -62,7 +58,7 @@ fn (mut ws Client) socket_write(bytes []byte) ? {
|
||||||
return error('socket_write: trying to write on a closed socket')
|
return error('socket_write: trying to write on a closed socket')
|
||||||
}
|
}
|
||||||
if ws.is_ssl {
|
if ws.is_ssl {
|
||||||
ws.ssl_conn.write(bytes)?
|
ws.ssl_conn.write(bytes) ?
|
||||||
} else {
|
} else {
|
||||||
for {
|
for {
|
||||||
ws.conn.write(bytes) or {
|
ws.conn.write(bytes) or {
|
||||||
|
@ -77,26 +73,26 @@ fn (mut ws Client) socket_write(bytes []byte) ? {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// shutdown_socket, proper shutdown make PR in Emeliy repo
|
// shutdown_socket, shut down socket properly closing the connection
|
||||||
fn (mut ws Client) shutdown_socket() ? {
|
fn (mut ws Client) shutdown_socket() ? {
|
||||||
ws.debug_log('shutting down socket')
|
ws.debug_log('shutting down socket')
|
||||||
if ws.is_ssl {
|
if ws.is_ssl {
|
||||||
ws.ssl_conn.shutdown()?
|
ws.ssl_conn.shutdown() ?
|
||||||
} else {
|
} else {
|
||||||
ws.conn.close()?
|
ws.conn.close() ?
|
||||||
}
|
}
|
||||||
return none
|
return none
|
||||||
}
|
}
|
||||||
|
|
||||||
// dial_socket, setup socket communication, options and timeouts
|
// dial_socket, setup socket communication, options and timeouts
|
||||||
fn (mut ws Client) dial_socket() ?net.TcpConn {
|
fn (mut ws Client) dial_socket() ?net.TcpConn {
|
||||||
mut t := net.dial_tcp('$ws.uri.hostname:$ws.uri.port')?
|
mut t := net.dial_tcp('$ws.uri.hostname:$ws.uri.port') ?
|
||||||
optval := int(1)
|
optval := int(1)
|
||||||
t.sock.set_option_int(.keep_alive, optval)?
|
t.sock.set_option_int(.keep_alive, optval) ?
|
||||||
t.set_read_timeout(10 * time.millisecond)
|
t.set_read_timeout(10 * time.millisecond)
|
||||||
t.set_write_timeout(10 * time.millisecond)
|
t.set_write_timeout(10 * time.millisecond)
|
||||||
if ws.is_ssl {
|
if ws.is_ssl {
|
||||||
ws.ssl_conn.connect(mut t)?
|
ws.ssl_conn.connect(mut t) ?
|
||||||
}
|
}
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,33 +3,37 @@ module websocket
|
||||||
import encoding.utf8
|
import encoding.utf8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
header_len_offset = 2
|
header_len_offset = 2 // Offset for lenghtpart of websocket header
|
||||||
buffer_size = 256
|
buffer_size = 256 // Default buffer size
|
||||||
extended_payload16_end_byte = 4
|
extended_payload16_end_byte = 4 // Offset for extended lenght 16 bit of websocket header
|
||||||
extended_payload64_end_byte = 10
|
extended_payload64_end_byte = 10 // Offset for extended lenght 64 bit of websocket header
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// A websocket data fragment
|
||||||
struct Fragment {
|
struct Fragment {
|
||||||
data []byte
|
data []byte // The included data payload data in a fragment
|
||||||
opcode OPCode
|
opcode OPCode // Defines the interpretation of the payload data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Represents a data frame header
|
||||||
struct Frame {
|
struct Frame {
|
||||||
mut:
|
mut:
|
||||||
header_len int = 2
|
header_len int = 2
|
||||||
|
// Lenght of the websocket header part
|
||||||
frame_size int = 2
|
frame_size int = 2
|
||||||
fin bool
|
// Size of the total frame
|
||||||
rsv1 bool
|
fin bool // Indicates if final fragment of message
|
||||||
rsv2 bool
|
rsv1 bool // Reserved for future use in websocket RFC
|
||||||
rsv3 bool
|
rsv2 bool // Reserved for future use in websocket RFC
|
||||||
opcode OPCode
|
rsv3 bool // Reserved for future use in websocket RFC
|
||||||
has_mask bool
|
opcode OPCode // Defines the interpretation of the payload data
|
||||||
payload_len int
|
has_mask bool // Defines whether the payload data is masked.
|
||||||
masking_key [4]byte
|
payload_len int // Payload lenght
|
||||||
|
masking_key [4]byte // All frames from client to server is masked with this key
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
invalid_close_codes = [999, 1004, 1005, 1006, 1014, 1015, 1016, 1100, 2000, 2999, 5000, 65536]
|
invalid_close_codes = [999, 1004, 1005, 1006, 1014, 1015, 1016, 1100, 2000, 2999, 5000, 65536] // List of invalid close codes
|
||||||
)
|
)
|
||||||
|
|
||||||
// validate_client, validate client frame rules from RFC6455
|
// validate_client, validate client frame rules from RFC6455
|
||||||
|
@ -40,53 +44,52 @@ pub fn (mut ws Client) validate_frame(frame &Frame) ? {
|
||||||
}
|
}
|
||||||
if (int(frame.opcode) >= 3 && int(frame.opcode) <= 7) ||
|
if (int(frame.opcode) >= 3 && int(frame.opcode) <= 7) ||
|
||||||
(int(frame.opcode) >= 11 && int(frame.opcode) <= 15) {
|
(int(frame.opcode) >= 11 && int(frame.opcode) <= 15) {
|
||||||
ws.close(1002, 'use of reserved opcode')?
|
ws.close(1002, 'use of reserved opcode') ?
|
||||||
return error('use of reserved opcode')
|
return error('use of reserved opcode')
|
||||||
}
|
}
|
||||||
if frame.has_mask && !ws.is_server {
|
if frame.has_mask && !ws.is_server {
|
||||||
// Server should never send masked frames
|
// Server should never send masked frames
|
||||||
// to client, close connection
|
// to client, close connection
|
||||||
ws.close(1002, 'client got masked frame')?
|
ws.close(1002, 'client got masked frame') ?
|
||||||
return error('client sent masked frame')
|
return error('client sent masked frame')
|
||||||
}
|
}
|
||||||
if is_control_frame(frame.opcode) {
|
if is_control_frame(frame.opcode) {
|
||||||
if !frame.fin {
|
if !frame.fin {
|
||||||
ws.close(1002, 'control message must not be fragmented')?
|
ws.close(1002, 'control message must not be fragmented') ?
|
||||||
return error('unexpected control frame with no fin')
|
return error('unexpected control frame with no fin')
|
||||||
}
|
}
|
||||||
if frame.payload_len > 125 {
|
if frame.payload_len > 125 {
|
||||||
ws.close(1002, 'control frames must not exceed 125 bytes')?
|
ws.close(1002, 'control frames must not exceed 125 bytes') ?
|
||||||
return error('unexpected control frame payload length')
|
return error('unexpected control frame payload length')
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if frame.fin == false && ws.fragments.len == 0 && frame.opcode == .continuation {
|
if frame.fin == false && ws.fragments.len == 0 && frame.opcode == .continuation {
|
||||||
err_msg := 'unexecpected continuation, there are no frames to continue, $frame'
|
err_msg := 'unexecpected continuation, there are no frames to continue, $frame'
|
||||||
ws.close(1002, err_msg)?
|
ws.close(1002, err_msg) ?
|
||||||
return error(err_msg)
|
return error(err_msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[inline]
|
// is_control_frame returns true if the fram is a control frame
|
||||||
fn is_control_frame(opcode OPCode) bool {
|
fn is_control_frame(opcode OPCode) bool {
|
||||||
return opcode !in [.text_frame, .binary_frame, .continuation]
|
return opcode !in [.text_frame, .binary_frame, .continuation]
|
||||||
}
|
}
|
||||||
|
|
||||||
[inline]
|
// is_data_frame returns true if the fram is a control frame
|
||||||
fn is_data_frame(opcode OPCode) bool {
|
fn is_data_frame(opcode OPCode) bool {
|
||||||
return opcode in [.text_frame, .binary_frame]
|
return opcode in [.text_frame, .binary_frame]
|
||||||
}
|
}
|
||||||
|
|
||||||
// read_payload, reads the payload from socket
|
// read_payload, reads the payload from the socket
|
||||||
fn (mut ws Client) read_payload(frame &Frame) ?[]byte {
|
fn (mut ws Client) read_payload(frame &Frame) ?[]byte {
|
||||||
if frame.payload_len == 0 {
|
if frame.payload_len == 0 {
|
||||||
return []byte{}
|
return []byte{}
|
||||||
}
|
}
|
||||||
// TODO: make a dynamic reusable memory pool here
|
|
||||||
mut buffer := []byte{cap: frame.payload_len}
|
mut buffer := []byte{cap: frame.payload_len}
|
||||||
mut read_buf := [1]byte{}
|
mut read_buf := [1]byte{}
|
||||||
mut bytes_read := 0
|
mut bytes_read := 0
|
||||||
for bytes_read < frame.payload_len {
|
for bytes_read < frame.payload_len {
|
||||||
len := ws.socket_read_ptr(byteptr(&read_buf), 1)?
|
len := ws.socket_read_ptr(byteptr(&read_buf), 1) ?
|
||||||
if len != 1 {
|
if len != 1 {
|
||||||
return error('expected read all message, got zero')
|
return error('expected read all message, got zero')
|
||||||
}
|
}
|
||||||
|
@ -104,8 +107,8 @@ fn (mut ws Client) read_payload(frame &Frame) ?[]byte {
|
||||||
return buffer
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate_utf_8, validates payload for valid utf encoding
|
// validate_utf_8, validates payload for valid utf8 encoding
|
||||||
// todo: support fail fast utf errors for strict autobahn conformance
|
// - Future implementation needs to support fail fast utf errors for strict autobahn conformance
|
||||||
fn (mut ws Client) validate_utf_8(opcode OPCode, payload []byte) ? {
|
fn (mut ws Client) validate_utf_8(opcode OPCode, payload []byte) ? {
|
||||||
if opcode in [.text_frame, .close] && !utf8.validate(payload.data, payload.len) {
|
if opcode in [.text_frame, .close] && !utf8.validate(payload.data, payload.len) {
|
||||||
ws.logger.error('malformed utf8 payload, payload len: ($payload.len)')
|
ws.logger.error('malformed utf8 payload, payload len: ($payload.len)')
|
||||||
|
@ -118,11 +121,11 @@ fn (mut ws Client) validate_utf_8(opcode OPCode, payload []byte) ? {
|
||||||
// read_next_message reads 1 to n frames to compose a message
|
// read_next_message reads 1 to n frames to compose a message
|
||||||
pub fn (mut ws Client) read_next_message() ?Message {
|
pub fn (mut ws Client) read_next_message() ?Message {
|
||||||
for {
|
for {
|
||||||
frame := ws.parse_frame_header()?
|
frame := ws.parse_frame_header() ?
|
||||||
// This debug message leaks so remove if needed
|
// This debug message leaks so remove if needed
|
||||||
// ws.debug_log('read_next_message: frame\n$frame')
|
// ws.debug_log('read_next_message: frame\n$frame')
|
||||||
ws.validate_frame(&frame)?
|
ws.validate_frame(&frame) ?
|
||||||
frame_payload := ws.read_payload(&frame)?
|
frame_payload := ws.read_payload(&frame) ?
|
||||||
if is_control_frame(frame.opcode) {
|
if is_control_frame(frame.opcode) {
|
||||||
// Control frames can interject other frames
|
// Control frames can interject other frames
|
||||||
// and need to be returned immediately
|
// and need to be returned immediately
|
||||||
|
@ -130,9 +133,7 @@ pub fn (mut ws Client) read_next_message() ?Message {
|
||||||
opcode: OPCode(frame.opcode)
|
opcode: OPCode(frame.opcode)
|
||||||
payload: frame_payload.clone()
|
payload: frame_payload.clone()
|
||||||
}
|
}
|
||||||
unsafe {
|
unsafe {frame_payload.free()}
|
||||||
frame_payload.free()
|
|
||||||
}
|
|
||||||
return msg
|
return msg
|
||||||
}
|
}
|
||||||
// If the message is fragmented we just put it on fragments
|
// If the message is fragmented we just put it on fragments
|
||||||
|
@ -142,9 +143,7 @@ pub fn (mut ws Client) read_next_message() ?Message {
|
||||||
data: frame_payload.clone()
|
data: frame_payload.clone()
|
||||||
opcode: frame.opcode
|
opcode: frame.opcode
|
||||||
}
|
}
|
||||||
unsafe {
|
unsafe {frame_payload.free()}
|
||||||
frame_payload.free()
|
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ws.fragments.len == 0 {
|
if ws.fragments.len == 0 {
|
||||||
|
@ -157,21 +156,19 @@ pub fn (mut ws Client) read_next_message() ?Message {
|
||||||
opcode: OPCode(frame.opcode)
|
opcode: OPCode(frame.opcode)
|
||||||
payload: frame_payload.clone()
|
payload: frame_payload.clone()
|
||||||
}
|
}
|
||||||
unsafe {
|
unsafe {frame_payload.free()}
|
||||||
frame_payload.free()
|
|
||||||
}
|
|
||||||
return msg
|
return msg
|
||||||
}
|
}
|
||||||
defer {
|
defer {
|
||||||
ws.fragments = []
|
ws.fragments = []
|
||||||
}
|
}
|
||||||
if is_data_frame(frame.opcode) {
|
if is_data_frame(frame.opcode) {
|
||||||
ws.close(0, '')?
|
ws.close(0, '') ?
|
||||||
return error('Unexpected frame opcode')
|
return error('Unexpected frame opcode')
|
||||||
}
|
}
|
||||||
payload := ws.payload_from_fragments(frame_payload)?
|
payload := ws.payload_from_fragments(frame_payload) ?
|
||||||
opcode := ws.opcode_from_fragments()
|
opcode := ws.opcode_from_fragments()
|
||||||
ws.validate_utf_8(opcode, payload)?
|
ws.validate_utf_8(opcode, payload) ?
|
||||||
msg := Message{
|
msg := Message{
|
||||||
opcode: opcode
|
opcode: opcode
|
||||||
payload: payload.clone()
|
payload: payload.clone()
|
||||||
|
@ -206,24 +203,20 @@ fn (ws Client) payload_from_fragments(fin_payload []byte) ?[]byte {
|
||||||
return total_buffer
|
return total_buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// opcode_from_fragments, returns the opcode for message from the first fragment sent
|
// opcode_from_fragments returns the opcode for message from the first fragment sent
|
||||||
fn (ws Client) opcode_from_fragments() OPCode {
|
fn (ws Client) opcode_from_fragments() OPCode {
|
||||||
return OPCode(ws.fragments[0].opcode)
|
return OPCode(ws.fragments[0].opcode)
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse_frame_header parses next message by decoding the incoming frames
|
// parse_frame_header parses next message by decoding the incoming frames
|
||||||
pub fn (mut ws Client) parse_frame_header() ?Frame {
|
pub fn (mut ws Client) parse_frame_header() ?Frame {
|
||||||
// TODO: make a dynamic reusable memory pool here
|
|
||||||
// mut buffer := []byte{cap: buffer_size}
|
|
||||||
mut buffer := [256]byte{}
|
mut buffer := [256]byte{}
|
||||||
mut bytes_read := 0
|
mut bytes_read := 0
|
||||||
mut frame := Frame{}
|
mut frame := Frame{}
|
||||||
mut rbuff := [1]byte{}
|
mut rbuff := [1]byte{}
|
||||||
mut mask_end_byte := 0
|
mut mask_end_byte := 0
|
||||||
for ws.state == .open {
|
for ws.state == .open {
|
||||||
// Todo: different error scenarios to make sure we close correctly on error
|
read_bytes := ws.socket_read_ptr(byteptr(rbuff), 1) ?
|
||||||
// reader.read_into(mut rbuff) ?
|
|
||||||
read_bytes := ws.socket_read_ptr(byteptr(rbuff), 1)?
|
|
||||||
if read_bytes == 0 {
|
if read_bytes == 0 {
|
||||||
// This is probably a timeout or close
|
// This is probably a timeout or close
|
||||||
continue
|
continue
|
||||||
|
@ -268,7 +261,7 @@ pub fn (mut ws Client) parse_frame_header() ?Frame {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if frame.payload_len == 127 && bytes_read == u64(extended_payload64_end_byte) {
|
if frame.payload_len == 127 && bytes_read == u64(extended_payload64_end_byte) {
|
||||||
frame.header_len += 8 // TODO Not sure...
|
frame.header_len += 8
|
||||||
frame.payload_len = 0
|
frame.payload_len = 0
|
||||||
frame.payload_len |= buffer[2] << 56
|
frame.payload_len |= buffer[2] << 56
|
||||||
frame.payload_len |= buffer[3] << 48
|
frame.payload_len |= buffer[3] << 48
|
||||||
|
|
|
@ -1,14 +1,16 @@
|
||||||
module websocket
|
module websocket
|
||||||
|
|
||||||
|
// Represents an Uri for websocket connections
|
||||||
struct Uri {
|
struct Uri {
|
||||||
mut:
|
mut:
|
||||||
url string
|
url string // The url to the websocket endpoint
|
||||||
hostname string
|
hostname string // The hostname to the websocket endpoint
|
||||||
port string
|
port string // The port to the websocket endpoint
|
||||||
resource string
|
resource string // The resource used on the websocket endpoint
|
||||||
querystring string
|
querystring string // The query string on the websocket endpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// str returns the string representation of the Uri
|
||||||
pub fn (u Uri) str() string {
|
pub fn (u Uri) str() string {
|
||||||
return u.url
|
return u.url
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import rand
|
||||||
import crypto.sha1
|
import crypto.sha1
|
||||||
import encoding.base64
|
import encoding.base64
|
||||||
|
|
||||||
|
// htonl64 converts payload lenght to header bits
|
||||||
fn htonl64(payload_len u64) []byte {
|
fn htonl64(payload_len u64) []byte {
|
||||||
mut ret := []byte{len: 8}
|
mut ret := []byte{len: 8}
|
||||||
ret[0] = byte(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff)
|
ret[0] = byte(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff)
|
||||||
|
@ -17,15 +18,15 @@ fn htonl64(payload_len u64) []byte {
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create_masking_key returs a new masking key byte array
|
||||||
fn create_masking_key() []byte {
|
fn create_masking_key() []byte {
|
||||||
mask_bit := byte(rand.intn(255))
|
mask_bit := byte(rand.intn(255))
|
||||||
buf := []byte{len: 4, init: `0`}
|
buf := []byte{len: 4, init: `0`}
|
||||||
unsafe {
|
unsafe {C.memcpy(buf.data, &mask_bit, 4)}
|
||||||
C.memcpy(buf.data, &mask_bit, 4)
|
|
||||||
}
|
|
||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create_key_challenge_response creates a key challange response from security key
|
||||||
fn create_key_challenge_response(seckey string) ?string {
|
fn create_key_challenge_response(seckey string) ?string {
|
||||||
if seckey.len == 0 {
|
if seckey.len == 0 {
|
||||||
return error('unexpected seckey lengt zero')
|
return error('unexpected seckey lengt zero')
|
||||||
|
@ -42,6 +43,7 @@ fn create_key_challenge_response(seckey string) ?string {
|
||||||
return b64
|
return b64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get_nonce, returns a randomized array used in handshake process
|
||||||
fn get_nonce(nonce_size int) string {
|
fn get_nonce(nonce_size int) string {
|
||||||
mut nonce := []byte{len: nonce_size, cap: nonce_size}
|
mut nonce := []byte{len: nonce_size, cap: nonce_size}
|
||||||
alphanum := '0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz'
|
alphanum := '0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz'
|
||||||
|
|
|
@ -23,27 +23,29 @@ const (
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
is_server bool
|
is_server bool
|
||||||
mut:
|
mut:
|
||||||
ssl_conn &openssl.SSLConn
|
ssl_conn &openssl.SSLConn // Secure connection used when wss is used
|
||||||
flags []Flag
|
flags []Flag // Flags
|
||||||
fragments []Fragment
|
fragments []Fragment // Current fragments
|
||||||
message_callbacks []MessageEventHandler
|
message_callbacks []MessageEventHandler // All callbacks on_message
|
||||||
error_callbacks []ErrorEventHandler
|
error_callbacks []ErrorEventHandler // All callbacks on_error
|
||||||
open_callbacks []OpenEventHandler
|
open_callbacks []OpenEventHandler // All callbacks on_open
|
||||||
close_callbacks []CloseEventHandler
|
close_callbacks []CloseEventHandler // All callbacks on_close
|
||||||
pub:
|
pub:
|
||||||
is_ssl bool
|
is_ssl bool // True if secure socket is used
|
||||||
uri Uri
|
uri Uri // Uri of current connection
|
||||||
id string
|
id string // Unique id of client
|
||||||
pub mut:
|
pub mut:
|
||||||
conn net.TcpConn
|
conn net.TcpConn // Underlying TCP connection
|
||||||
nonce_size int = 16 // you can try 18 too
|
nonce_size int = 16
|
||||||
panic_on_callback bool
|
// you can try 18 too
|
||||||
state State
|
panic_on_callback bool // Set to true of callbacks can panic
|
||||||
logger &log.Log
|
state State // Current state of connection
|
||||||
resource_name string
|
logger &log.Log // Logger used to log messages
|
||||||
last_pong_ut u64
|
resource_name string // Name of current resource
|
||||||
|
last_pong_ut u64 // Last time in unix time we got a pong message
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Flag of websocket handshake
|
||||||
enum Flag {
|
enum Flag {
|
||||||
has_accept
|
has_accept
|
||||||
has_connection
|
has_connection
|
||||||
|
@ -78,7 +80,7 @@ pub enum OPCode {
|
||||||
|
|
||||||
// new_client, instance a new websocket client
|
// new_client, instance a new websocket client
|
||||||
pub fn new_client(address string) ?&Client {
|
pub fn new_client(address string) ?&Client {
|
||||||
uri := parse_uri(address)?
|
uri := parse_uri(address) ?
|
||||||
return &Client{
|
return &Client{
|
||||||
is_server: false
|
is_server: false
|
||||||
ssl_conn: openssl.new_ssl_conn()
|
ssl_conn: openssl.new_ssl_conn()
|
||||||
|
@ -94,14 +96,14 @@ pub fn new_client(address string) ?&Client {
|
||||||
|
|
||||||
// connect, connects and do handshake procedure with remote server
|
// connect, connects and do handshake procedure with remote server
|
||||||
pub fn (mut ws Client) connect() ? {
|
pub fn (mut ws Client) connect() ? {
|
||||||
ws.assert_not_connected()?
|
ws.assert_not_connected() ?
|
||||||
ws.set_state(.connecting)
|
ws.set_state(.connecting)
|
||||||
ws.logger.info('connecting to host $ws.uri')
|
ws.logger.info('connecting to host $ws.uri')
|
||||||
ws.conn = ws.dial_socket()?
|
ws.conn = ws.dial_socket() ?
|
||||||
// Todo: make setting configurable
|
// Todo: make setting configurable
|
||||||
ws.conn.set_read_timeout(time.second * 30)
|
ws.conn.set_read_timeout(time.second * 30)
|
||||||
ws.conn.set_write_timeout(time.second * 30)
|
ws.conn.set_write_timeout(time.second * 30)
|
||||||
ws.handshake()?
|
ws.handshake() ?
|
||||||
ws.set_state(.open)
|
ws.set_state(.open)
|
||||||
ws.logger.info('successfully connected to host $ws.uri')
|
ws.logger.info('successfully connected to host $ws.uri')
|
||||||
ws.send_open_event()
|
ws.send_open_event()
|
||||||
|
@ -133,16 +135,12 @@ pub fn (mut ws Client) listen() ? {
|
||||||
.text_frame {
|
.text_frame {
|
||||||
ws.debug_log('read: text')
|
ws.debug_log('read: text')
|
||||||
ws.send_message_event(msg)
|
ws.send_message_event(msg)
|
||||||
unsafe {
|
unsafe {msg.free()}
|
||||||
msg.free()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
.binary_frame {
|
.binary_frame {
|
||||||
ws.debug_log('read: binary')
|
ws.debug_log('read: binary')
|
||||||
ws.send_message_event(msg)
|
ws.send_message_event(msg)
|
||||||
unsafe {
|
unsafe {msg.free()}
|
||||||
msg.free()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
.ping {
|
.ping {
|
||||||
ws.debug_log('read: ping, sending pong')
|
ws.debug_log('read: ping, sending pong')
|
||||||
|
@ -155,9 +153,7 @@ pub fn (mut ws Client) listen() ? {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if msg.payload.len > 0 {
|
if msg.payload.len > 0 {
|
||||||
unsafe {
|
unsafe {msg.free()}
|
||||||
msg.free()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.pong {
|
.pong {
|
||||||
|
@ -165,9 +161,7 @@ pub fn (mut ws Client) listen() ? {
|
||||||
ws.last_pong_ut = time.now().unix
|
ws.last_pong_ut = time.now().unix
|
||||||
ws.send_message_event(msg)
|
ws.send_message_event(msg)
|
||||||
if msg.payload.len > 0 {
|
if msg.payload.len > 0 {
|
||||||
unsafe {
|
unsafe {msg.free()}
|
||||||
msg.free()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.close {
|
.close {
|
||||||
|
@ -177,50 +171,46 @@ pub fn (mut ws Client) listen() ? {
|
||||||
}
|
}
|
||||||
if msg.payload.len > 0 {
|
if msg.payload.len > 0 {
|
||||||
if msg.payload.len == 1 {
|
if msg.payload.len == 1 {
|
||||||
ws.close(1002, 'close payload cannot be 1 byte')?
|
ws.close(1002, 'close payload cannot be 1 byte') ?
|
||||||
return error('close payload cannot be 1 byte')
|
return error('close payload cannot be 1 byte')
|
||||||
}
|
}
|
||||||
code := (int(msg.payload[0]) << 8) + int(msg.payload[1])
|
code := (int(msg.payload[0]) << 8) + int(msg.payload[1])
|
||||||
if code in invalid_close_codes {
|
if code in invalid_close_codes {
|
||||||
ws.close(1002, 'invalid close code: $code')?
|
ws.close(1002, 'invalid close code: $code') ?
|
||||||
return error('invalid close code: $code')
|
return error('invalid close code: $code')
|
||||||
}
|
}
|
||||||
reason := if msg.payload.len > 2 { msg.payload[2..] } else { []byte{} }
|
reason := if msg.payload.len > 2 { msg.payload[2..] } else { []byte{} }
|
||||||
if reason.len > 0 {
|
if reason.len > 0 {
|
||||||
ws.validate_utf_8(.close, reason)?
|
ws.validate_utf_8(.close, reason) ?
|
||||||
}
|
}
|
||||||
if ws.state !in [.closing, .closed] {
|
if ws.state !in [.closing, .closed] {
|
||||||
// sending close back according to spec
|
// sending close back according to spec
|
||||||
ws.debug_log('close with reason, code: $code, reason: $reason')
|
ws.debug_log('close with reason, code: $code, reason: $reason')
|
||||||
r := reason.bytestr()
|
r := reason.bytestr()
|
||||||
ws.close(code, r)?
|
ws.close(code, r) ?
|
||||||
}
|
|
||||||
unsafe {
|
|
||||||
msg.free()
|
|
||||||
}
|
}
|
||||||
|
unsafe {msg.free()}
|
||||||
} else {
|
} else {
|
||||||
if ws.state !in [.closing, .closed] {
|
if ws.state !in [.closing, .closed] {
|
||||||
ws.debug_log('close with reason, no code')
|
ws.debug_log('close with reason, no code')
|
||||||
// sending close back according to spec
|
// sending close back according to spec
|
||||||
ws.close(1000, 'normal')?
|
ws.close(1000, 'normal') ?
|
||||||
}
|
|
||||||
unsafe {
|
|
||||||
msg.free()
|
|
||||||
}
|
}
|
||||||
|
unsafe {msg.free()}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
.continuation {
|
.continuation {
|
||||||
ws.logger.error('unexpected opcode continuation, nothing to continue')
|
ws.logger.error('unexpected opcode continuation, nothing to continue')
|
||||||
ws.send_error_event('unexpected opcode continuation, nothing to continue')
|
ws.send_error_event('unexpected opcode continuation, nothing to continue')
|
||||||
ws.close(1002, 'nothing to continue')?
|
ws.close(1002, 'nothing to continue') ?
|
||||||
return error('unexpected opcode continuation, nothing to continue')
|
return error('unexpected opcode continuation, nothing to continue')
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// this function was needed for defer
|
// manage_clean_close closes connection in a clean way
|
||||||
fn (mut ws Client) manage_clean_close() {
|
fn (mut ws Client) manage_clean_close() {
|
||||||
ws.send_close_event(1000, 'closed by client')
|
ws.send_close_event(1000, 'closed by client')
|
||||||
}
|
}
|
||||||
|
@ -228,13 +218,13 @@ fn (mut ws Client) manage_clean_close() {
|
||||||
// ping, sends ping message to server,
|
// ping, sends ping message to server,
|
||||||
// ping response will be pushed to message callback
|
// ping response will be pushed to message callback
|
||||||
pub fn (mut ws Client) ping() ? {
|
pub fn (mut ws Client) ping() ? {
|
||||||
ws.send_control_frame(.ping, 'PING', [])?
|
ws.send_control_frame(.ping, 'PING', []) ?
|
||||||
}
|
}
|
||||||
|
|
||||||
// pong, sends pog message to server,
|
// pong, sends pong message to server,
|
||||||
// pongs are normally automatically sent back to server
|
// pongs are normally automatically sent back to server
|
||||||
pub fn (mut ws Client) pong() ? {
|
pub fn (mut ws Client) pong() ? {
|
||||||
ws.send_control_frame(.pong, 'PONG', [])?
|
ws.send_control_frame(.pong, 'PONG', []) ?
|
||||||
}
|
}
|
||||||
|
|
||||||
// write_ptr, writes len bytes provided a byteptr with a websocket messagetype
|
// write_ptr, writes len bytes provided a byteptr with a websocket messagetype
|
||||||
|
@ -265,18 +255,11 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ?
|
||||||
} else if payload_len > 125 && payload_len <= 0xffff {
|
} else if payload_len > 125 && payload_len <= 0xffff {
|
||||||
len16 := C.htons(payload_len)
|
len16 := C.htons(payload_len)
|
||||||
header[1] = 126
|
header[1] = 126
|
||||||
// 0x80
|
unsafe {C.memcpy(&header[2], &len16, 2)}
|
||||||
// todo: fix v style copy instead
|
|
||||||
unsafe {
|
|
||||||
C.memcpy(&header[2], &len16, 2)
|
|
||||||
}
|
|
||||||
} else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff {
|
} else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff {
|
||||||
len_bytes := htonl64(u64(payload_len))
|
len_bytes := htonl64(u64(payload_len))
|
||||||
header[1] = 127 // 0x80
|
header[1] = 127 // 0x80
|
||||||
// todo: fix v style copy instead
|
unsafe {C.memcpy(&header[2], len_bytes.data, 8)}
|
||||||
unsafe {
|
|
||||||
C.memcpy(&header[2], len_bytes.data, 8)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if payload_len <= 125 {
|
if payload_len <= 125 {
|
||||||
|
@ -288,10 +271,7 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ?
|
||||||
} else if payload_len > 125 && payload_len <= 0xffff {
|
} else if payload_len > 125 && payload_len <= 0xffff {
|
||||||
len16 := C.htons(payload_len)
|
len16 := C.htons(payload_len)
|
||||||
header[1] = (126 | 0x80)
|
header[1] = (126 | 0x80)
|
||||||
// todo: fix v style copy instead
|
unsafe {C.memcpy(&header[2], &len16, 2)}
|
||||||
unsafe {
|
|
||||||
C.memcpy(&header[2], &len16, 2)
|
|
||||||
}
|
|
||||||
header[4] = masking_key[0]
|
header[4] = masking_key[0]
|
||||||
header[5] = masking_key[1]
|
header[5] = masking_key[1]
|
||||||
header[6] = masking_key[2]
|
header[6] = masking_key[2]
|
||||||
|
@ -299,17 +279,13 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ?
|
||||||
} else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff { // 65535 && 18446744073709551615
|
} else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff { // 65535 && 18446744073709551615
|
||||||
len64 := htonl64(u64(payload_len))
|
len64 := htonl64(u64(payload_len))
|
||||||
header[1] = (127 | 0x80)
|
header[1] = (127 | 0x80)
|
||||||
// todo: fix v style copy instead
|
unsafe {C.memcpy(&header[2], len64.data, 8)}
|
||||||
unsafe {
|
|
||||||
C.memcpy(&header[2], len64.data, 8)
|
|
||||||
}
|
|
||||||
header[10] = masking_key[0]
|
header[10] = masking_key[0]
|
||||||
header[11] = masking_key[1]
|
header[11] = masking_key[1]
|
||||||
header[12] = masking_key[2]
|
header[12] = masking_key[2]
|
||||||
header[13] = masking_key[3]
|
header[13] = masking_key[3]
|
||||||
} else {
|
} else {
|
||||||
// l.c('write: frame too large')
|
ws.close(1009, 'frame too large') ?
|
||||||
ws.close(1009, 'frame too large')?
|
|
||||||
return error('frame too large')
|
return error('frame too large')
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -326,7 +302,7 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ?
|
||||||
frame_buf[header_len + i] ^= masking_key[i % 4] & 0xff
|
frame_buf[header_len + i] ^= masking_key[i % 4] & 0xff
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ws.socket_write(frame_buf)?
|
ws.socket_write(frame_buf) ?
|
||||||
// Temporary hack until memory management is done
|
// Temporary hack until memory management is done
|
||||||
unsafe {
|
unsafe {
|
||||||
frame_buf.free()
|
frame_buf.free()
|
||||||
|
@ -337,9 +313,10 @@ pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ?
|
||||||
|
|
||||||
// write, writes a byte array with a websocket messagetype
|
// write, writes a byte array with a websocket messagetype
|
||||||
pub fn (mut ws Client) write(bytes []byte, code OPCode) ? {
|
pub fn (mut ws Client) write(bytes []byte, code OPCode) ? {
|
||||||
ws.write_ptr(byteptr(bytes.data), bytes.len, code)?
|
ws.write_ptr(byteptr(bytes.data), bytes.len, code) ?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// write_str, writes a string with a websocket texttype
|
||||||
pub fn (mut ws Client) write_str(str string) ? {
|
pub fn (mut ws Client) write_str(str string) ? {
|
||||||
ws.write_ptr(str.str, str.len, .text_frame)
|
ws.write_ptr(str.str, str.len, .text_frame)
|
||||||
}
|
}
|
||||||
|
@ -362,20 +339,18 @@ pub fn (mut ws Client) close(code int, message string) ? {
|
||||||
if code > 0 {
|
if code > 0 {
|
||||||
code_ := C.htons(code)
|
code_ := C.htons(code)
|
||||||
message_len := message.len + 2
|
message_len := message.len + 2
|
||||||
mut close_frame := []byte{len: message_len} // [`0`].repeat(message_len)
|
mut close_frame := []byte{len: message_len}
|
||||||
close_frame[0] = byte(code_ & 0xFF)
|
close_frame[0] = byte(code_ & 0xFF)
|
||||||
close_frame[1] = byte(code_ >> 8)
|
close_frame[1] = byte(code_ >> 8)
|
||||||
code32 = (close_frame[0] << 8) + close_frame[1]
|
code32 = (close_frame[0] << 8) + close_frame[1]
|
||||||
for i in 0 .. message.len {
|
for i in 0 .. message.len {
|
||||||
close_frame[i + 2] = message[i]
|
close_frame[i + 2] = message[i]
|
||||||
}
|
}
|
||||||
ws.send_control_frame(.close, 'CLOSE', close_frame)?
|
ws.send_control_frame(.close, 'CLOSE', close_frame) ?
|
||||||
ws.send_close_event(code, message)
|
ws.send_close_event(code, message)
|
||||||
unsafe {
|
unsafe {close_frame.free()}
|
||||||
close_frame.free()
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
ws.send_control_frame(.close, 'CLOSE', [])?
|
ws.send_control_frame(.close, 'CLOSE', []) ?
|
||||||
ws.send_close_event(code, '')
|
ws.send_close_event(code, '')
|
||||||
}
|
}
|
||||||
ws.fragments = []
|
ws.fragments = []
|
||||||
|
@ -413,20 +388,14 @@ fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []b
|
||||||
if payload.len >= 2 {
|
if payload.len >= 2 {
|
||||||
if !ws.is_server {
|
if !ws.is_server {
|
||||||
mut parsed_payload := []byte{len: payload.len + 1}
|
mut parsed_payload := []byte{len: payload.len + 1}
|
||||||
unsafe {
|
unsafe {C.memcpy(parsed_payload.data, &payload[0], payload.len)}
|
||||||
C.memcpy(parsed_payload.data, &payload[0], payload.len)
|
|
||||||
}
|
|
||||||
parsed_payload[payload.len] = `\0`
|
parsed_payload[payload.len] = `\0`
|
||||||
for i in 0 .. payload.len {
|
for i in 0 .. payload.len {
|
||||||
control_frame[6 + i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff
|
control_frame[6 + i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff
|
||||||
}
|
}
|
||||||
unsafe {
|
unsafe {parsed_payload.free()}
|
||||||
parsed_payload.free()
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
unsafe {
|
unsafe {C.memcpy(&control_frame[2], &payload[0], payload.len)}
|
||||||
C.memcpy(&control_frame[2], &payload[0], payload.len)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -438,9 +407,7 @@ fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []b
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if payload.len > 0 {
|
if payload.len > 0 {
|
||||||
unsafe {
|
unsafe {C.memcpy(&control_frame[2], &payload[0], payload.len)}
|
||||||
C.memcpy(&control_frame[2], &payload[0], payload.len)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -451,7 +418,7 @@ fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []b
|
||||||
|
|
||||||
// parse_uri, parses the url string to it's components
|
// parse_uri, parses the url string to it's components
|
||||||
fn parse_uri(url string) ?&Uri {
|
fn parse_uri(url string) ?&Uri {
|
||||||
u := urllib.parse(url)?
|
u := urllib.parse(url) ?
|
||||||
v := u.request_uri().split('?')
|
v := u.request_uri().split('?')
|
||||||
mut port := u.port()
|
mut port := u.port()
|
||||||
if port == '' {
|
if port == '' {
|
||||||
|
@ -480,7 +447,7 @@ fn (mut ws Client) set_state(state State) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[inline]
|
// assert_not_connected returns error if the connection is not connected
|
||||||
fn (ws Client) assert_not_connected() ? {
|
fn (ws Client) assert_not_connected() ? {
|
||||||
match ws.state {
|
match ws.state {
|
||||||
.connecting { return error('connect: websocket is connecting') }
|
.connecting { return error('connect: websocket is connecting') }
|
||||||
|
@ -500,6 +467,7 @@ fn (mut ws Client) reset_state() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// debug_log makes debug logging output different depending if client or server
|
||||||
fn (mut ws Client) debug_log(text string) {
|
fn (mut ws Client) debug_log(text string) {
|
||||||
if ws.is_server {
|
if ws.is_server {
|
||||||
ws.logger.debug('server-> $text')
|
ws.logger.debug('server-> $text')
|
||||||
|
@ -508,14 +476,12 @@ fn (mut ws Client) debug_log(text string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[unsafe]
|
// free, manual free memory of Message struct
|
||||||
pub fn (m &Message) free() {
|
pub fn (m &Message) free() {
|
||||||
unsafe {
|
unsafe {m.payload.free()}
|
||||||
m.payload.free()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[unsafe]
|
// free, manual free memory of Client struct
|
||||||
pub fn (c &Client) free() {
|
pub fn (c &Client) free() {
|
||||||
unsafe {
|
unsafe {
|
||||||
c.flags.free()
|
c.flags.free()
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
module websocket
|
module websocket
|
||||||
|
|
||||||
|
// error_code returns the error code
|
||||||
fn error_code() int {
|
fn error_code() int {
|
||||||
return C.errno
|
return C.errno
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
error_ewouldblock = C.EWOULDBLOCK
|
error_ewouldblock = C.EWOULDBLOCK // blocking error code
|
||||||
)
|
)
|
||||||
|
|
|
@ -8,31 +8,35 @@ import sync
|
||||||
import time
|
import time
|
||||||
import rand
|
import rand
|
||||||
|
|
||||||
|
// Server holds state of websocket server connection
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
mut:
|
mut:
|
||||||
clients map[string]&ServerClient
|
clients map[string]&ServerClient // Clients connected to this server
|
||||||
logger &log.Log
|
logger &log.Log // Logger used to log
|
||||||
ls net.TcpListener
|
ls net.TcpListener // TCpLister used to get incoming connection to socket
|
||||||
accept_client_callbacks []AcceptClientFn
|
accept_client_callbacks []AcceptClientFn // Accept client callback functions
|
||||||
message_callbacks []MessageEventHandler
|
message_callbacks []MessageEventHandler // New message callback functions
|
||||||
close_callbacks []CloseEventHandler
|
close_callbacks []CloseEventHandler // Close message callback functions
|
||||||
pub:
|
pub:
|
||||||
port int
|
port int // Port used as listen to incoming connections
|
||||||
is_ssl bool
|
is_ssl bool // True if secure connection (not supported yet on server)
|
||||||
pub mut:
|
pub mut:
|
||||||
ping_interval int = 30 // in seconds
|
ping_interval int = 30
|
||||||
state State
|
// Interval for automatic sending ping to connected clients in seconds
|
||||||
|
state State // Current state of connection
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServerClient has state of connected clients
|
||||||
struct ServerClient {
|
struct ServerClient {
|
||||||
pub:
|
pub:
|
||||||
resource_name string
|
resource_name string // The resource that the client access
|
||||||
client_key string
|
client_key string // Unique key of client
|
||||||
pub mut:
|
pub mut:
|
||||||
server &Server
|
server &Server // The server instance
|
||||||
client &Client
|
client &Client // The client instance
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// new_server instance new websocket server on port and route
|
||||||
pub fn new_server(port int, route string) &Server {
|
pub fn new_server(port int, route string) &Server {
|
||||||
return &Server{
|
return &Server{
|
||||||
port: port
|
port: port
|
||||||
|
@ -43,13 +47,15 @@ pub fn new_server(port int, route string) &Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set_ping_interval sets the interval that the server will send ping messages to clients
|
||||||
pub fn (mut s Server) set_ping_interval(seconds int) {
|
pub fn (mut s Server) set_ping_interval(seconds int) {
|
||||||
s.ping_interval = seconds
|
s.ping_interval = seconds
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// listen, start listen to incoming connections
|
||||||
pub fn (mut s Server) listen() ? {
|
pub fn (mut s Server) listen() ? {
|
||||||
s.logger.info('websocket server: start listen on port $s.port')
|
s.logger.info('websocket server: start listen on port $s.port')
|
||||||
s.ls = net.listen_tcp(s.port)?
|
s.ls = net.listen_tcp(s.port) ?
|
||||||
s.set_state(.open)
|
s.set_state(.open)
|
||||||
go s.handle_ping()
|
go s.handle_ping()
|
||||||
for {
|
for {
|
||||||
|
@ -61,10 +67,11 @@ pub fn (mut s Server) listen() ? {
|
||||||
s.logger.info('websocket server: end listen on port $s.port')
|
s.logger.info('websocket server: end listen on port $s.port')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close server (not implemented)
|
||||||
fn (mut s Server) close() {
|
fn (mut s Server) close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Todo: make thread safe
|
// handle_ping sends ping to all clients every set interval
|
||||||
fn (mut s Server) handle_ping() {
|
fn (mut s Server) handle_ping() {
|
||||||
mut clients_to_remove := []string{}
|
mut clients_to_remove := []string{}
|
||||||
for s.state == .open {
|
for s.state == .open {
|
||||||
|
@ -74,8 +81,7 @@ fn (mut s Server) handle_ping() {
|
||||||
if c.client.state == .open {
|
if c.client.state == .open {
|
||||||
c.client.ping() or {
|
c.client.ping() or {
|
||||||
s.logger.debug('server-> error sending ping to client')
|
s.logger.debug('server-> error sending ping to client')
|
||||||
// todo fix better close message, search the standard
|
c.client.close(1002, 'Closing connection: ping send error') or {
|
||||||
c.client.close(1002, 'Clossing connection: ping send error') or {
|
|
||||||
// we want to continue even if error
|
// we want to continue even if error
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -99,20 +105,21 @@ fn (mut s Server) handle_ping() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serve_client accepts incoming connection and setup the websocket handshake
|
||||||
fn (mut s Server) serve_client(mut c Client) ? {
|
fn (mut s Server) serve_client(mut c Client) ? {
|
||||||
c.logger.debug('server-> Start serve client ($c.id)')
|
c.logger.debug('server-> Start serve client ($c.id)')
|
||||||
defer {
|
defer {
|
||||||
c.logger.debug('server-> End serve client ($c.id)')
|
c.logger.debug('server-> End serve client ($c.id)')
|
||||||
}
|
}
|
||||||
mut handshake_response, mut server_client := s.handle_server_handshake(mut c)?
|
mut handshake_response, mut server_client := s.handle_server_handshake(mut c) ?
|
||||||
accept := s.send_connect_event(mut server_client)?
|
accept := s.send_connect_event(mut server_client) ?
|
||||||
if !accept {
|
if !accept {
|
||||||
s.logger.debug('server-> client not accepted')
|
s.logger.debug('server-> client not accepted')
|
||||||
c.shutdown_socket()?
|
c.shutdown_socket() ?
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// The client is accepted
|
// The client is accepted
|
||||||
c.socket_write(handshake_response.bytes())?
|
c.socket_write(handshake_response.bytes()) ?
|
||||||
lock {
|
lock {
|
||||||
s.clients[server_client.client.id] = server_client
|
s.clients[server_client.client.id] = server_client
|
||||||
}
|
}
|
||||||
|
@ -123,6 +130,7 @@ fn (mut s Server) serve_client(mut c Client) ? {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setup_callbacks initialize all callback functions
|
||||||
fn (mut s Server) setup_callbacks(mut sc ServerClient) {
|
fn (mut s Server) setup_callbacks(mut sc ServerClient) {
|
||||||
if s.message_callbacks.len > 0 {
|
if s.message_callbacks.len > 0 {
|
||||||
for cb in s.message_callbacks {
|
for cb in s.message_callbacks {
|
||||||
|
@ -151,8 +159,9 @@ fn (mut s Server) setup_callbacks(mut sc ServerClient) {
|
||||||
}, sc)
|
}, sc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// accept_new_client creates a new client instance for client connects to socket
|
||||||
fn (mut s Server) accept_new_client() ?&Client {
|
fn (mut s Server) accept_new_client() ?&Client {
|
||||||
mut new_conn := s.ls.accept()?
|
mut new_conn := s.ls.accept() ?
|
||||||
c := &Client{
|
c := &Client{
|
||||||
is_server: true
|
is_server: true
|
||||||
conn: new_conn
|
conn: new_conn
|
||||||
|
@ -166,18 +175,16 @@ fn (mut s Server) accept_new_client() ?&Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
// set_state sets current state in a thread safe way
|
// set_state sets current state in a thread safe way
|
||||||
[inline]
|
|
||||||
fn (mut s Server) set_state(state State) {
|
fn (mut s Server) set_state(state State) {
|
||||||
lock {
|
lock {
|
||||||
s.state = state
|
s.state = state
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// free, manual free memory of Server instance
|
||||||
pub fn (mut s Server) free() {
|
pub fn (mut s Server) free() {
|
||||||
unsafe {
|
unsafe {
|
||||||
s.clients.free()
|
s.clients.free()
|
||||||
// s.logger.free()
|
|
||||||
// s.ls.free()
|
|
||||||
s.accept_client_callbacks.free()
|
s.accept_client_callbacks.free()
|
||||||
s.message_callbacks.free()
|
s.message_callbacks.free()
|
||||||
s.close_callbacks.free()
|
s.close_callbacks.free()
|
||||||
|
|
|
@ -2,10 +2,11 @@ module websocket
|
||||||
|
|
||||||
import net
|
import net
|
||||||
|
|
||||||
|
// error_code returns the error code
|
||||||
fn error_code() int {
|
fn error_code() int {
|
||||||
return C.WSAGetLastError()
|
return C.WSAGetLastError()
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
error_ewouldblock = net.WsaError.wsaewouldblock
|
error_ewouldblock = net.WsaError.wsaewouldblock // blocking error code
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue