2020-08-21 22:50:38 +00:00
// The websocket client implements the websocket capabilities
// it is a refactor of the original V-websocket client class
// from @thecoderr.
// There are quite a few manual memory management free() going on
// int the code. This will be refactored once the memory management
// is done. For now there are no leaks on message levels. Please
// check with valgrind if you do any changes in the free calls
module websocket
2020-11-15 20:54:47 +00:00
import net
2020-08-21 22:50:38 +00:00
import x.openssl
import net.urllib
import time
import log
import sync
import rand
const (
empty_bytearr = []byte{}
// Client represents websocket client state
pub struct Client {
2020-09-09 13:34:41 +00:00
is_server bool
2020-08-21 22:50:38 +00:00
ssl_conn &openssl.SSLConn
flags []Flag
fragments []Fragment
message_callbacks []MessageEventHandler
error_callbacks []ErrorEventHandler
open_callbacks []OpenEventHandler
close_callbacks []CloseEventHandler
is_ssl bool
uri Uri
id string
pub mut:
conn net.TcpConn
nonce_size int = 16 // you can try 18 too
2020-09-09 13:34:41 +00:00
panic_on_callback bool
2020-08-21 22:50:38 +00:00
state State
2020-11-18 17:22:44 +00:00
logger &log.Log
2020-08-21 22:50:38 +00:00
resource_name string
last_pong_ut u64
enum Flag {
// State of the websocket connection.
// Messages should be sent only on state .open
enum State {
connecting = 0
// Message, represents a whole message conbined from 1 to n frames
pub struct Message {
opcode OPCode
payload []byte
// OPCode, the supported websocket frame types
pub enum OPCode {
continuation = 0x00
text_frame = 0x01
binary_frame = 0x02
close = 0x08
ping = 0x09
pong = 0x0A
// new_client, instance a new websocket client
pub fn new_client(address string) ?&Client {
uri := parse_uri(address)?
return &Client{
is_server: false
ssl_conn: openssl.new_ssl_conn()
is_ssl: address.starts_with('wss')
logger: &log.Log{
level: .info
uri: uri
state: .closed
id: rand.uuid_v4()
// connect, connects and do handshake procedure with remote server
pub fn (mut ws Client) connect() ? {
ws.logger.info('connecting to host $ws.uri')
ws.conn = ws.dial_socket()?
2020-11-15 20:54:47 +00:00
2020-08-21 22:50:38 +00:00
ws.logger.info('successfully connected to host $ws.uri')
// listen, listens to incoming messages and handles them
pub fn (mut ws Client) listen() ? {
ws.logger.info('Starting client listener, server($ws.is_server)...')
defer {
ws.logger.info('Quit client listener, server($ws.is_server)...')
2020-11-08 18:36:42 +00:00
ws.close(1000, 'closed by client')
2020-08-21 22:50:38 +00:00
for ws.state == .open {
msg := ws.read_next_message() or {
if ws.state in [.closed, .closing] {
ws.debug_log('failed to read next message: $err')
ws.send_error_event('failed to read next message: $err')
return error(err)
ws.debug_log('got message: $msg.opcode') // , payload: $msg.payload') leaks
match msg.opcode {
.text_frame {
ws.debug_log('read: text')
unsafe {
.binary_frame {
ws.debug_log('read: binary')
unsafe {
.ping {
ws.debug_log('read: ping, sending pong')
ws.send_control_frame(.pong, 'PONG', msg.payload) or {
ws.logger.error('error in message callback sending PONG: $err')
ws.send_error_event('error in message callback sending PONG: $err')
if ws.panic_on_callback {
if msg.payload.len > 0 {
unsafe {
.pong {
ws.debug_log('read: pong')
ws.last_pong_ut = time.now().unix
if msg.payload.len > 0 {
unsafe {
.close {
ws.debug_log('read: close')
defer {
if msg.payload.len > 0 {
if msg.payload.len == 1 {
ws.close(1002, 'close payload cannot be 1 byte')?
return error('close payload cannot be 1 byte')
code := (int(msg.payload[0]) << 8) + int(msg.payload[1])
if code in invalid_close_codes {
ws.close(1002, 'invalid close code: $code')?
return error('invalid close code: $code')
reason := if msg.payload.len > 2 { msg.payload[2..] } else { []byte{} }
if reason.len > 0 {
ws.validate_utf_8(.close, reason)?
if ws.state !in [.closing, .closed] {
// sending close back according to spec
ws.debug_log('close with reason, code: $code, reason: $reason')
2020-08-22 10:29:15 +00:00
r := reason.bytestr()
2020-08-21 22:50:38 +00:00
ws.close(code, r)?
unsafe {
} else {
if ws.state !in [.closing, .closed] {
ws.debug_log('close with reason, no code')
// sending close back according to spec
ws.close(1000, 'normal')?
unsafe {
.continuation {
ws.logger.error('unexpected opcode continuation, nothing to continue')
ws.send_error_event('unexpected opcode continuation, nothing to continue')
ws.close(1002, 'nothing to continue')?
return error('unexpected opcode continuation, nothing to continue')
// this function was needed for defer
fn (mut ws Client) manage_clean_close() {
ws.send_close_event(1000, 'closed by client')
// ping, sends ping message to server,
// ping response will be pushed to message callback
pub fn (mut ws Client) ping() ? {
ws.send_control_frame(.ping, 'PING', [])?
// pong, sends pog message to server,
// pongs are normally automatically sent back to server
pub fn (mut ws Client) pong() ? {
ws.send_control_frame(.pong, 'PONG', [])?
// write_ptr, writes len bytes provided a byteptr with a websocket messagetype
pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ? {
// Temporary, printing bytes are leaking
ws.debug_log('write code: $code')
// ws.debug_log('write code: $code, payload: $bytes')
if ws.state != .open || ws.conn.sock.handle < 1 {
// send error here later
return error('trying to write on a closed socket!')
// payload_len := bytes.len
mut header_len := 2 + if payload_len > 125 { 2 } else { 0 } + if payload_len > 0xffff { 6 } else { 0 }
if !ws.is_server {
header_len += 4
mut header := []byte{len: header_len, init: `0`} // [`0`].repeat(header_len)
header[0] = byte(int(code)) | 0x80
masking_key := create_masking_key()
defer {
unsafe {
if ws.is_server {
if payload_len <= 125 {
header[1] = byte(payload_len)
// 0x80
} else if payload_len > 125 && payload_len <= 0xffff {
len16 := C.htons(payload_len)
header[1] = 126
// 0x80
// todo: fix v style copy instead
unsafe {
C.memcpy(&header[2], &len16, 2)
} else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff {
len_bytes := htonl64(u64(payload_len))
header[1] = 127 // 0x80
// todo: fix v style copy instead
unsafe {
C.memcpy(&header[2], len_bytes.data, 8)
} else {
if payload_len <= 125 {
header[1] = byte(payload_len | 0x80)
header[2] = masking_key[0]
header[3] = masking_key[1]
header[4] = masking_key[2]
header[5] = masking_key[3]
} else if payload_len > 125 && payload_len <= 0xffff {
len16 := C.htons(payload_len)
header[1] = (126 | 0x80)
// todo: fix v style copy instead
unsafe {
C.memcpy(&header[2], &len16, 2)
header[4] = masking_key[0]
header[5] = masking_key[1]
header[6] = masking_key[2]
header[7] = masking_key[3]
} else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff { // 65535 && 18446744073709551615
len64 := htonl64(u64(payload_len))
header[1] = (127 | 0x80)
// todo: fix v style copy instead
unsafe {
C.memcpy(&header[2], len64.data, 8)
header[10] = masking_key[0]
header[11] = masking_key[1]
header[12] = masking_key[2]
header[13] = masking_key[3]
} else {
// l.c('write: frame too large')
ws.close(1009, 'frame too large')?
return error('frame too large')
len := header.len + payload_len
mut frame_buf := []byte{len: len}
unsafe {
C.memcpy(&frame_buf[0], byteptr(header.data), header.len)
if payload_len > 0 {
C.memcpy(&frame_buf[header.len], bytes, payload_len)
if !ws.is_server {
for i in 0 .. payload_len {
frame_buf[header_len + i] ^= masking_key[i % 4] & 0xff
// Temporary hack until memory management is done
unsafe {
// write, writes a byte array with a websocket messagetype
pub fn (mut ws Client) write(bytes []byte, code OPCode) ? {
ws.write_ptr(byteptr(bytes.data), bytes.len, code)?
pub fn (mut ws Client) write_str(str string) ? {
ws.write_ptr(str.str, str.len, .text_frame)
// close, closes the websocket connection
pub fn (mut ws Client) close(code int, message string) ? {
ws.debug_log('sending close, $code, $message')
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 {
ws.debug_log('close: Websocket allready closed ($ws.state), $message, $code handle($ws.conn.sock.handle)')
err_msg := 'Socket allready closed: $code'
ret_err := error(err_msg)
// unsafe {
// err_msg.free()
// }
return ret_err
defer {
mut code32 := 0
if code > 0 {
code_ := C.htons(code)
message_len := message.len + 2
mut close_frame := []byte{len: message_len} // [`0`].repeat(message_len)
close_frame[0] = byte(code_ & 0xFF)
close_frame[1] = byte(code_ >> 8)
code32 = (close_frame[0] << 8) + close_frame[1]
for i in 0 .. message.len {
close_frame[i + 2] = message[i]
ws.send_control_frame(.close, 'CLOSE', close_frame)?
ws.send_close_event(code, message)
unsafe {
} else {
ws.send_control_frame(.close, 'CLOSE', [])?
ws.send_close_event(code, '')
ws.fragments = []
// send_control_frame, sends a control frame to the server
fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []byte) ? {
ws.debug_log('send control frame $code, frame_type: $frame_typ') // , payload: $payload')
if ws.state !in [.open, .closing] && ws.conn.sock.handle > 1 {
return error('socket is not connected')
header_len := if ws.is_server { 2 } else { 6 }
frame_len := header_len + payload.len
mut control_frame := []byte{len: frame_len} // [`0`].repeat(frame_len)
mut masking_key := if !ws.is_server { create_masking_key() } else { empty_bytearr }
defer {
unsafe {
if masking_key.len > 0 {
control_frame[0] = byte(int(code) | 0x80)
if !ws.is_server {
control_frame[1] = byte(payload.len | 0x80)
control_frame[2] = masking_key[0]
control_frame[3] = masking_key[1]
control_frame[4] = masking_key[2]
control_frame[5] = masking_key[3]
} else {
control_frame[1] = byte(payload.len)
if code == .close {
if payload.len >= 2 {
if !ws.is_server {
mut parsed_payload := []byte{len: payload.len + 1}
unsafe {
C.memcpy(parsed_payload.data, &payload[0], payload.len)
parsed_payload[payload.len] = `\0`
for i in 0 .. payload.len {
control_frame[6 + i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff
unsafe {
} else {
unsafe {
C.memcpy(&control_frame[2], &payload[0], payload.len)
} else {
if !ws.is_server {
if payload.len > 0 {
for i in 0 .. payload.len {
control_frame[header_len + i] = (payload[i] ^ masking_key[i % 4]) & 0xff
} else {
if payload.len > 0 {
unsafe {
C.memcpy(&control_frame[2], &payload[0], payload.len)
ws.socket_write(control_frame) or {
return error('send_control_frame: error sending $frame_typ control frame.')
// parse_uri, parses the url string to it's components
fn parse_uri(url string) ?&Uri {
2020-08-28 23:58:03 +00:00
u := urllib.parse(url)?
2020-08-21 22:50:38 +00:00
v := u.request_uri().split('?')
2020-11-07 16:14:33 +00:00
mut port := u.port()
if port == '' {
port = if u.str().starts_with('ws://') {
} else if u.str().starts_with('wss://') {
} else {
2020-11-05 05:36:50 +00:00
2020-08-21 22:50:38 +00:00
querystring := if v.len > 1 { '?' + v[1] } else { '' }
return &Uri{
url: url
hostname: u.hostname()
2020-11-05 05:36:50 +00:00
port: port
2020-08-21 22:50:38 +00:00
resource: v[0]
querystring: querystring
// set_state sets current state in a thread safe way
fn (mut ws Client) set_state(state State) {
lock {
ws.state = state
fn (ws Client) assert_not_connected() ? {
match ws.state {
.connecting { return error('connect: websocket is connecting') }
.open { return error('connect: websocket already open') }
else {}
// reset_state, resets the websocket and can connect again
fn (mut ws Client) reset_state() {
lock {
ws.state = .closed
ws.ssl_conn = openssl.new_ssl_conn()
ws.flags = []
ws.fragments = []
fn (mut ws Client) debug_log(text string) {
if ws.is_server {
ws.logger.debug('server-> $text')
} else {
ws.logger.debug('client-> $text')
pub fn (m &Message) free() {
unsafe {
pub fn (c &Client) free() {
unsafe {