495 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			V
		
	
	
			
		
		
	
	
			495 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			V
		
	
	
| // websocket module implements websocket client and a websocket server
 | |
| // attribution: @thecoderr the author of original websocket client
 | |
| [manualfree]
 | |
| module websocket
 | |
| 
 | |
| import net
 | |
| import net.http
 | |
| import x.openssl
 | |
| import net.urllib
 | |
| import time
 | |
| import log
 | |
| import rand
 | |
| 
 | |
| const (
 | |
| 	empty_bytearr = []byte{} // used as empty response to avoid allocation
 | |
| )
 | |
| 
 | |
| // Client represents websocket client
 | |
| pub struct Client {
 | |
| 	is_server bool
 | |
| mut:
 | |
| 	ssl_conn          &openssl.SSLConn // secure connection used when wss is used
 | |
| 	flags             []Flag     // flags used in handshake
 | |
| 	fragments         []Fragment // current fragments
 | |
| 	message_callbacks []MessageEventHandler // all callbacks on_message
 | |
| 	error_callbacks   []ErrorEventHandler   // all callbacks on_error
 | |
| 	open_callbacks    []OpenEventHandler    // all callbacks on_open
 | |
| 	close_callbacks   []CloseEventHandler   // all callbacks on_close
 | |
| pub:
 | |
| 	is_ssl bool   // true if secure socket is used
 | |
| 	uri    Uri    // uri of current connection
 | |
| 	id     string // unique id of client
 | |
| pub mut:
 | |
| 	header            http.Header  // headers that will be passed when connecting
 | |
| 	conn              &net.TcpConn // underlying TCP socket connection
 | |
| 	nonce_size        int = 16 // size of nounce used for masking
 | |
| 	panic_on_callback bool     // set to true of callbacks can panic
 | |
| 	state             State    // current state of connection
 | |
| 	logger            &log.Log // logger used to log messages
 | |
| 	resource_name     string   // name of current resource
 | |
| 	last_pong_ut      u64      // last time in unix time we got a pong message
 | |
| }
 | |
| 
 | |
| // Flag represents different types of headers in websocket handshake
 | |
| enum Flag {
 | |
| 	has_accept // Webs
 | |
| 	has_connection
 | |
| 	has_upgrade
 | |
| }
 | |
| 
 | |
| // State represents the state of the websocket connection.
 | |
| pub enum State {
 | |
| 	connecting = 0
 | |
| 	open
 | |
| 	closing
 | |
| 	closed
 | |
| }
 | |
| 
 | |
| // Message represents a whole message combined from 1 to n frames
 | |
| pub struct Message {
 | |
| pub:
 | |
| 	opcode  OPCode // websocket frame type of this message
 | |
| 	payload []byte // payload of the message
 | |
| }
 | |
| 
 | |
| // OPCode represents the supported websocket frame types
 | |
| pub enum OPCode {
 | |
| 	continuation = 0x00
 | |
| 	text_frame = 0x01
 | |
| 	binary_frame = 0x02
 | |
| 	close = 0x08
 | |
| 	ping = 0x09
 | |
| 	pong = 0x0A
 | |
| }
 | |
| 
 | |
| // new_client instance a new websocket client
 | |
| pub fn new_client(address string) ?&Client {
 | |
| 	uri := parse_uri(address) ?
 | |
| 	return &Client{
 | |
| 		conn: 0
 | |
| 		is_server: false
 | |
| 		ssl_conn: openssl.new_ssl_conn()
 | |
| 		is_ssl: address.starts_with('wss')
 | |
| 		logger: &log.Log{
 | |
| 			level: .info
 | |
| 		}
 | |
| 		uri: uri
 | |
| 		state: .closed
 | |
| 		id: rand.uuid_v4()
 | |
| 		header: http.new_header()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // connect connects to remote websocket server
 | |
| pub fn (mut ws Client) connect() ? {
 | |
| 	ws.assert_not_connected() ?
 | |
| 	ws.set_state(.connecting)
 | |
| 	ws.logger.info('connecting to host $ws.uri')
 | |
| 	ws.conn = ws.dial_socket() ?
 | |
| 	// Todo: make setting configurable
 | |
| 	ws.conn.set_read_timeout(time.second * 30)
 | |
| 	ws.conn.set_write_timeout(time.second * 30)
 | |
| 	ws.handshake() ?
 | |
| 	ws.set_state(.open)
 | |
| 	ws.logger.info('successfully connected to host $ws.uri')
 | |
| 	ws.send_open_event()
 | |
| }
 | |
| 
 | |
| // listen listens and processes incoming messages
 | |
| pub fn (mut ws Client) listen() ? {
 | |
| 	mut log := 'Starting client listener, server($ws.is_server)...'
 | |
| 	ws.logger.info(log)
 | |
| 	unsafe { log.free() }
 | |
| 	defer {
 | |
| 		ws.logger.info('Quit client listener, server($ws.is_server)...')
 | |
| 		if ws.state == .open {
 | |
| 			ws.close(1000, 'closed by client') or {}
 | |
| 		}
 | |
| 	}
 | |
| 	for ws.state == .open {
 | |
| 		msg := ws.read_next_message() or {
 | |
| 			if ws.state in [.closed, .closing] {
 | |
| 				return
 | |
| 			}
 | |
| 			ws.debug_log('failed to read next message: $err')
 | |
| 			ws.send_error_event('failed to read next message: $err')
 | |
| 			return err
 | |
| 		}
 | |
| 		if ws.state in [.closed, .closing] {
 | |
| 			return
 | |
| 		}
 | |
| 		ws.debug_log('got message: $msg.opcode')
 | |
| 		match msg.opcode {
 | |
| 			.text_frame {
 | |
| 				log = 'read: text'
 | |
| 				ws.debug_log(log)
 | |
| 				unsafe { log.free() }
 | |
| 				ws.send_message_event(msg)
 | |
| 				unsafe { msg.free() }
 | |
| 			}
 | |
| 			.binary_frame {
 | |
| 				ws.debug_log('read: binary')
 | |
| 				ws.send_message_event(msg)
 | |
| 				unsafe { msg.free() }
 | |
| 			}
 | |
| 			.ping {
 | |
| 				ws.debug_log('read: ping, sending pong')
 | |
| 				ws.send_control_frame(.pong, 'PONG', msg.payload) or {
 | |
| 					ws.logger.error('error in message callback sending PONG: $err')
 | |
| 					ws.send_error_event('error in message callback sending PONG: $err')
 | |
| 					if ws.panic_on_callback {
 | |
| 						panic(err)
 | |
| 					}
 | |
| 					continue
 | |
| 				}
 | |
| 				if msg.payload.len > 0 {
 | |
| 					unsafe { msg.free() }
 | |
| 				}
 | |
| 			}
 | |
| 			.pong {
 | |
| 				ws.debug_log('read: pong')
 | |
| 				ws.last_pong_ut = time.now().unix
 | |
| 				ws.send_message_event(msg)
 | |
| 				if msg.payload.len > 0 {
 | |
| 					unsafe { msg.free() }
 | |
| 				}
 | |
| 			}
 | |
| 			.close {
 | |
| 				log = 'read: close'
 | |
| 				ws.debug_log(log)
 | |
| 				unsafe { log.free() }
 | |
| 				defer {
 | |
| 					ws.manage_clean_close()
 | |
| 				}
 | |
| 				if msg.payload.len > 0 {
 | |
| 					if msg.payload.len == 1 {
 | |
| 						ws.close(1002, 'close payload cannot be 1 byte') ?
 | |
| 						return error('close payload cannot be 1 byte')
 | |
| 					}
 | |
| 					code := (int(msg.payload[0]) << 8) + int(msg.payload[1])
 | |
| 					if code in invalid_close_codes {
 | |
| 						ws.close(1002, 'invalid close code: $code') ?
 | |
| 						return error('invalid close code: $code')
 | |
| 					}
 | |
| 					reason := if msg.payload.len > 2 { msg.payload[2..] } else { []byte{} }
 | |
| 					if reason.len > 0 {
 | |
| 						ws.validate_utf_8(.close, reason) ?
 | |
| 					}
 | |
| 					if ws.state !in [.closing, .closed] {
 | |
| 						// sending close back according to spec
 | |
| 						ws.debug_log('close with reason, code: $code, reason: $reason')
 | |
| 						r := reason.bytestr()
 | |
| 						ws.close(code, r) ?
 | |
| 					}
 | |
| 					unsafe { msg.free() }
 | |
| 				} else {
 | |
| 					if ws.state !in [.closing, .closed] {
 | |
| 						ws.debug_log('close with reason, no code')
 | |
| 						// sending close back according to spec
 | |
| 						ws.close(1000, 'normal') ?
 | |
| 					}
 | |
| 					unsafe { msg.free() }
 | |
| 				}
 | |
| 				return
 | |
| 			}
 | |
| 			.continuation {
 | |
| 				ws.logger.error('unexpected opcode continuation, nothing to continue')
 | |
| 				ws.send_error_event('unexpected opcode continuation, nothing to continue')
 | |
| 				ws.close(1002, 'nothing to continue') ?
 | |
| 				return error('unexpected opcode continuation, nothing to continue')
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // manage_clean_close closes connection in a clean websocket way
 | |
| fn (mut ws Client) manage_clean_close() {
 | |
| 	ws.send_close_event(1000, 'closed by client')
 | |
| }
 | |
| 
 | |
| // ping sends ping message to server
 | |
| pub fn (mut ws Client) ping() ? {
 | |
| 	ws.send_control_frame(.ping, 'PING', []) ?
 | |
| }
 | |
| 
 | |
| // pong sends pong message to server,
 | |
| pub fn (mut ws Client) pong() ? {
 | |
| 	ws.send_control_frame(.pong, 'PONG', []) ?
 | |
| }
 | |
| 
 | |
| // write_ptr writes len bytes provided a byteptr with a websocket messagetype
 | |
| pub fn (mut ws Client) write_ptr(bytes &byte, payload_len int, code OPCode) ?int {
 | |
| 	// ws.debug_log('write_ptr code: $code')
 | |
| 	if ws.state != .open || ws.conn.sock.handle < 1 {
 | |
| 		// todo: send error here later
 | |
| 		return error('trying to write on a closed socket!')
 | |
| 	}
 | |
| 	mut header_len := 2 + if payload_len > 125 { 2 } else { 0 } +
 | |
| 		if payload_len > 0xffff { 6 } else { 0 }
 | |
| 	if !ws.is_server {
 | |
| 		header_len += 4
 | |
| 	}
 | |
| 	mut header := []byte{len: header_len, init: `0`} // [`0`].repeat(header_len)
 | |
| 	header[0] = byte(int(code)) | 0x80
 | |
| 	masking_key := create_masking_key()
 | |
| 	if ws.is_server {
 | |
| 		if payload_len <= 125 {
 | |
| 			header[1] = byte(payload_len)
 | |
| 		} else if payload_len > 125 && payload_len <= 0xffff {
 | |
| 			len16 := C.htons(payload_len)
 | |
| 			header[1] = 126
 | |
| 			unsafe { C.memcpy(&header[2], &len16, 2) }
 | |
| 		} else if payload_len > 0xffff && payload_len <= 0x7fffffff {
 | |
| 			len_bytes := htonl64(u64(payload_len))
 | |
| 			header[1] = 127
 | |
| 			unsafe { C.memcpy(&header[2], len_bytes.data, 8) }
 | |
| 		}
 | |
| 	} else {
 | |
| 		if payload_len <= 125 {
 | |
| 			header[1] = byte(payload_len | 0x80)
 | |
| 			header[2] = masking_key[0]
 | |
| 			header[3] = masking_key[1]
 | |
| 			header[4] = masking_key[2]
 | |
| 			header[5] = masking_key[3]
 | |
| 		} else if payload_len > 125 && payload_len <= 0xffff {
 | |
| 			len16 := C.htons(payload_len)
 | |
| 			header[1] = (126 | 0x80)
 | |
| 			unsafe { C.memcpy(&header[2], &len16, 2) }
 | |
| 			header[4] = masking_key[0]
 | |
| 			header[5] = masking_key[1]
 | |
| 			header[6] = masking_key[2]
 | |
| 			header[7] = masking_key[3]
 | |
| 		} else if payload_len > 0xffff && payload_len <= 0x7fffffff {
 | |
| 			len64 := htonl64(u64(payload_len))
 | |
| 			header[1] = (127 | 0x80)
 | |
| 			unsafe { C.memcpy(&header[2], len64.data, 8) }
 | |
| 			header[10] = masking_key[0]
 | |
| 			header[11] = masking_key[1]
 | |
| 			header[12] = masking_key[2]
 | |
| 			header[13] = masking_key[3]
 | |
| 		} else {
 | |
| 			ws.close(1009, 'frame too large') ?
 | |
| 			return error('frame too large')
 | |
| 		}
 | |
| 	}
 | |
| 	len := header.len + payload_len
 | |
| 	mut frame_buf := []byte{len: len}
 | |
| 	unsafe {
 | |
| 		C.memcpy(&frame_buf[0], &byte(header.data), header.len)
 | |
| 		if payload_len > 0 {
 | |
| 			C.memcpy(&frame_buf[header.len], bytes, payload_len)
 | |
| 		}
 | |
| 	}
 | |
| 	if !ws.is_server {
 | |
| 		for i in 0 .. payload_len {
 | |
| 			frame_buf[header_len + i] ^= masking_key[i % 4] & 0xff
 | |
| 		}
 | |
| 	}
 | |
| 	written_len := ws.socket_write(frame_buf) ?
 | |
| 	unsafe {
 | |
| 		frame_buf.free()
 | |
| 		masking_key.free()
 | |
| 		header.free()
 | |
| 	}
 | |
| 	return written_len
 | |
| }
 | |
| 
 | |
| // write writes a byte array with a websocket messagetype to socket
 | |
| pub fn (mut ws Client) write(bytes []byte, code OPCode) ?int {
 | |
| 	return ws.write_ptr(&byte(bytes.data), bytes.len, code)
 | |
| }
 | |
| 
 | |
| // write_string, writes a string with a websocket texttype to socket
 | |
| [deprecated: 'use Client.write_string() instead']
 | |
| pub fn (mut ws Client) write_str(str string) ?int {
 | |
| 	return ws.write_string(str)
 | |
| }
 | |
| 
 | |
| // write_str, writes a string with a websocket texttype to socket
 | |
| pub fn (mut ws Client) write_string(str string) ?int {
 | |
| 	return ws.write_ptr(str.str, str.len, .text_frame)
 | |
| }
 | |
| 
 | |
| // close closes the websocket connection
 | |
| pub fn (mut ws Client) close(code int, message string) ? {
 | |
| 	ws.debug_log('sending close, $code, $message')
 | |
| 	if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 {
 | |
| 		ws.debug_log('close: Websocket allready closed ($ws.state), $message, $code handle($ws.conn.sock.handle)')
 | |
| 		err_msg := 'Socket allready closed: $code'
 | |
| 		return error(err_msg)
 | |
| 	}
 | |
| 	defer {
 | |
| 		ws.shutdown_socket() or {}
 | |
| 		ws.reset_state()
 | |
| 	}
 | |
| 	ws.set_state(.closing)
 | |
| 	// mut code32 := 0
 | |
| 	if code > 0 {
 | |
| 		code_ := C.htons(code)
 | |
| 		message_len := message.len + 2
 | |
| 		mut close_frame := []byte{len: message_len}
 | |
| 		close_frame[0] = byte(code_ & 0xFF)
 | |
| 		close_frame[1] = byte(code_ >> 8)
 | |
| 		// code32 = (close_frame[0] << 8) + close_frame[1]
 | |
| 		for i in 0 .. message.len {
 | |
| 			close_frame[i + 2] = message[i]
 | |
| 		}
 | |
| 		ws.send_control_frame(.close, 'CLOSE', close_frame) ?
 | |
| 		unsafe { close_frame.free() }
 | |
| 	} else {
 | |
| 		ws.send_control_frame(.close, 'CLOSE', []) ?
 | |
| 	}
 | |
| 	ws.fragments = []
 | |
| }
 | |
| 
 | |
| // send_control_frame sends a control frame to the server
 | |
| fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []byte) ? {
 | |
| 	ws.debug_log('send control frame $code, frame_type: $frame_typ')
 | |
| 	if ws.state !in [.open, .closing] && ws.conn.sock.handle > 1 {
 | |
| 		return error('socket is not connected')
 | |
| 	}
 | |
| 	header_len := if ws.is_server { 2 } else { 6 }
 | |
| 	frame_len := header_len + payload.len
 | |
| 	mut control_frame := []byte{len: frame_len}
 | |
| 	mut masking_key := if !ws.is_server { create_masking_key() } else { websocket.empty_bytearr }
 | |
| 	defer {
 | |
| 		unsafe {
 | |
| 			control_frame.free()
 | |
| 			if masking_key.len > 0 {
 | |
| 				masking_key.free()
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	control_frame[0] = byte(int(code) | 0x80)
 | |
| 	if !ws.is_server {
 | |
| 		control_frame[1] = byte(payload.len | 0x80)
 | |
| 		control_frame[2] = masking_key[0]
 | |
| 		control_frame[3] = masking_key[1]
 | |
| 		control_frame[4] = masking_key[2]
 | |
| 		control_frame[5] = masking_key[3]
 | |
| 	} else {
 | |
| 		control_frame[1] = byte(payload.len)
 | |
| 	}
 | |
| 	if code == .close {
 | |
| 		if payload.len >= 2 {
 | |
| 			if !ws.is_server {
 | |
| 				mut parsed_payload := []byte{len: payload.len + 1}
 | |
| 				unsafe { C.memcpy(parsed_payload.data, &payload[0], payload.len) }
 | |
| 				parsed_payload[payload.len] = `\0`
 | |
| 				for i in 0 .. payload.len {
 | |
| 					control_frame[6 + i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff
 | |
| 				}
 | |
| 				unsafe { parsed_payload.free() }
 | |
| 			} else {
 | |
| 				unsafe { C.memcpy(&control_frame[2], &payload[0], payload.len) }
 | |
| 			}
 | |
| 		}
 | |
| 	} else {
 | |
| 		if !ws.is_server {
 | |
| 			if payload.len > 0 {
 | |
| 				for i in 0 .. payload.len {
 | |
| 					control_frame[header_len + i] = (payload[i] ^ masking_key[i % 4]) & 0xff
 | |
| 				}
 | |
| 			}
 | |
| 		} else {
 | |
| 			if payload.len > 0 {
 | |
| 				unsafe { C.memcpy(&control_frame[2], &payload[0], payload.len) }
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	ws.socket_write(control_frame) or {
 | |
| 		return error('send_control_frame: error sending $frame_typ control frame.')
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // parse_uri parses the url to a Uri
 | |
| fn parse_uri(url string) ?&Uri {
 | |
| 	u := urllib.parse(url) ?
 | |
| 	request_uri := u.request_uri()
 | |
| 	v := request_uri.split('?')
 | |
| 	mut port := u.port()
 | |
| 	uri := u.str()
 | |
| 	if port == '' {
 | |
| 		port = if uri.starts_with('ws://') {
 | |
| 			'80'
 | |
| 		} else if uri.starts_with('wss://') {
 | |
| 			'443'
 | |
| 		} else {
 | |
| 			u.port()
 | |
| 		}
 | |
| 	}
 | |
| 	querystring := if v.len > 1 { '?' + v[1] } else { '' }
 | |
| 	return &Uri{
 | |
| 		url: url
 | |
| 		hostname: u.hostname()
 | |
| 		port: port
 | |
| 		resource: v[0]
 | |
| 		querystring: querystring
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // set_state sets current state of the websocket connection
 | |
| fn (mut ws Client) set_state(state State) {
 | |
| 	lock  {
 | |
| 		ws.state = state
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // assert_not_connected returns error if the connection is not connected
 | |
| fn (ws Client) assert_not_connected() ? {
 | |
| 	match ws.state {
 | |
| 		.connecting { return error('connect: websocket is connecting') }
 | |
| 		.open { return error('connect: websocket already open') }
 | |
| 		.closing { return error('connect: reconnect on closing websocket not supported, please use new client') }
 | |
| 		else {}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // reset_state resets the websocket and initialize default settings
 | |
| fn (mut ws Client) reset_state() {
 | |
| 	lock  {
 | |
| 		ws.state = .closed
 | |
| 		ws.ssl_conn = openssl.new_ssl_conn()
 | |
| 		ws.flags = []
 | |
| 		ws.fragments = []
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // debug_log handles debug logging output for client and server
 | |
| fn (mut ws Client) debug_log(text string) {
 | |
| 	if ws.is_server {
 | |
| 		ws.logger.debug('server-> $text')
 | |
| 	} else {
 | |
| 		ws.logger.debug('client-> $text')
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // free handles manual free memory of Message struct
 | |
| pub fn (m &Message) free() {
 | |
| 	unsafe { m.payload.free() }
 | |
| }
 | |
| 
 | |
| // free handles manual free memory of Client struct
 | |
| pub fn (c &Client) free() {
 | |
| 	unsafe {
 | |
| 		c.flags.free()
 | |
| 		c.fragments.free()
 | |
| 		c.message_callbacks.free()
 | |
| 		c.error_callbacks.free()
 | |
| 		c.open_callbacks.free()
 | |
| 		c.close_callbacks.free()
 | |
| 		c.header.free()
 | |
| 	}
 | |
| }
 |