copy thecodrr/vws to vlib/net/websocket

pull/4293/head
Alexander Medvednikov 2020-04-08 14:22:31 +02:00
parent 5ef5712e91
commit 8426db7fe5
12 changed files with 1358 additions and 0 deletions

View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 Abdullah Atta
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,28 @@
# WebSockets Library for V
**This is still work-in-progress!**
Heavily inspired (and used **very** liberally) from [cwebsockets](https://github.com/jeremyhahn/cwebsocket).
The websockets library itself is ready and working (passes all tests of AutoBahn). What's left:
1. It needs to be updated and made to run with latest V.
2. No Windows Support (SSL issues)
3. No proper AutoBahn test client (a prototype is in the main.v but nothing clean and neat).
4. No Websocket Server.
## What's needed for Windows support:
1. SSL (either make the VSChannel work or OpenSSL)
General code cleanup etc. is also needed.
## Contributors
Anyone and everyone is welcome to contribute. I don't have time for working on this completely but I will review and merge Pull Requests ASAP. So if anyone is interested, know that I am interested too.
If anyone has any questions regarding design etc. please open an Issue or contact me on Discord.
## Future Planning:
This is supposed to be merged into V stdlib but it's not ready for that yet. As soon as it is, I will open a PR.

View File

@ -0,0 +1,56 @@
module logger
const (
colors = {
"success": "\e[32",
"debug": "\e[36",
"error": "\e[91",
"warn": "\e[33",
"critical": "\e[31",
"fatal": "\e[31",
"info": "\e[37"
}
)
struct Logger {
mod string
}
pub fn new(mod string) &Logger {
return &Logger{mod: mod}
}
pub fn (l &Logger) d(message string){
$if debug {
l.print("debug", message)
}
}
pub fn (l &Logger) i(message string){
l.print('info', message)
}
pub fn (l &Logger) e(message string){
l.print('error', message)
}
pub fn (l &Logger) c(message string){
l.print('critical', message)
}
pub fn (l &Logger) f(message string){
l.print('fatal', message)
exit(-1)
}
pub fn (l &Logger) w(message string){
l.print('warn', message)
}
pub fn (l &Logger) s(message string) {
l.print('success', message)
}
fn (l &Logger) print(mod, message string) {
println('${colors[mod]};7m[${mod}]\e[0m \e[1m${l.mod}\e[0m: ${message}')
}

View File

@ -0,0 +1,124 @@
module main
import (
ws
eventbus
time
readline
term
benchmark
)
const (
eb = eventbus.new()
)
#flag -I $PWD
#include "utf8.h"
fn C.utf8_validate_str() bool
fn main(){
//println(sss)
/* for sss in 0..10 {
mut bm := benchmark.new_benchmark()
for i in 0..10000 {
for a, t in tests {
ss := ws.utf8_validate(t.str, t.len)
if !ss {
panic("failed")
}
//println("${a}:${ss}")
}
}
bm.stop()
println( bm.total_message('remarks about the benchmark') )
} */
mut websocket := ws.new("ws://localhost:9001/getCaseCount")
//defer { }
websocket.subscriber.subscribe("on_open", on_open)
websocket.subscriber.subscribe("on_message", on_message)
websocket.subscriber.subscribe("on_error", on_error)
websocket.subscriber.subscribe("on_close", on_close)
//go
websocket.connect()
websocket.read()
//time.usleep(2000000)
//go websocket.listen()
//term.erase_clear()
/* text := read_line("[client]:")
if text == "close" {
ws.close(1005, "done")
time.usleep(1000000)
exit(0)
}
ws.write(text, .text_frame) */
/* time.usleep(1000000)
ws.read() */
//ws.close(1005, "done")
/*
*/
//websocket.close(1005, "done")
//read_line("wait")
}
fn read_line(text string) string {
mut r := readline.Readline{}
mut output := r.read_line(text + " ") or {
panic(err)
}
output = output.replace("\n","")
if output.len <= 0 {
return ""
}
return output
}
fn on_open(params eventbus.Params){
println("Websocket opened.")
}
fn on_message(params eventbus.Params){
println("Message recieved. Sending it back.")
typ := params.get_string("type")
len := params.get_int("len")
mut ws := params.get_caller(ws.Client{})
if typ == "string" {
message := params.get_string("payload")
if ws.uri.ends_with("getCaseCount") {
num := message.int()
ws.close(1005, "done")
start_tests(mut ws, num)
return
}
//println("Message: " + message)
ws.write(message.str, len, .text_frame)
} else {
println("Binary message.")
message := params.get_raw("payload")
ws.write(message, len, .binary_frame)
}
}
fn start_tests(websocket mut ws.Client, num int) {
for i := 1; i < num; i++ {
println("Running test: " + i.str())
websocket.uri = "ws://localhost:9001/runCase?case=${i.str()}&agent=vws/1.0a"
if websocket.connect() >= 0 {
websocket.listen()
}
}
println("Done!")
websocket.uri = "ws://localhost:9001/updateReports?agent=vws/1.0a"
if websocket.connect() >= 0 {
websocket.read()
websocket.close(1000, "disconnecting...")
}
exit(0)
}
fn on_close(params eventbus.Params){
println("Websocket closed.")
}
fn on_error(params eventbus.Params){
println("we have an error.")
}

View File

@ -0,0 +1,166 @@
/*
Copyright (c) 2015, Andreas Fett
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <assert.h>
typedef int utf8_state;
static utf8_state next_state(utf8_state, unsigned char);
// Public API see utf8-validate.h for docs of the following function
bool utf8_validate(utf8_state *const state, int c)
{
assert(state);
return (*state = next_state(*state, c)) != -1;
}
bool utf8_validate_some(utf8_state *const state, const void * const src, size_t len)
{
assert(state);
assert(src);
for (size_t i = 0; i < len; ++i) {
*state = next_state(*state, *((unsigned char *)src + i));
if (*state == -1) {
return false;
}
}
return true;
}
bool utf8_validate_mem(const void * const src, size_t len)
{
assert(src);
utf8_state state = 0;
for (size_t i = 0; i < len; ++i) {
state = next_state(state, *((unsigned char *)src + i));
if (state == -1) {
return false;
}
}
// detect unterminated sequence
return state == 0;
}
bool utf8_validate_str(const char * const str)
{
assert(str);
utf8_state state = 0;
for (size_t i = 0; str[i] != 0; ++i) {
state = next_state(state, str[i]);
if (state == -1) {
return false;
}
}
// detect unterminated sequence
return state == 0;
}
/* Private state engine
*
* The macros below assemble the cases for a switch statement
* matching the language of the ABNF grammar given in rfc3629.
*
* Each SEQ# macro adds the states to match a # char long sequence.
*
* The SEQ#_HELPERs all have a 'fall through' to the next sequence.
* for # > 1 this is an explicit goto
*/
#define SEQ_END(n) SEQ_ ## n ## _END
#define SEQ1_HELPER(s, r0) \
case (s * 4) + 0: if (r0) return 0; goto SEQ_END(s); \
SEQ_END(s):
#define SEQ2_HELPER(s, r0, r1) \
case (s * 4) + 0: if (r0) { printf("ehe"); return (s * 4) + 1; } goto SEQ_END(s); \
case (s * 4) + 1: if (r1) return 0; return -1; \
SEQ_END(s):
#define SEQ3_HELPER(s, r0, r1, r2) \
case (s * 4) + 0: if (r0) return (s * 4) + 1; goto SEQ_END(s); \
case (s * 4) + 1: if (r1) return (s * 4) + 2; return -1; \
case (s * 4) + 2: if (r2) return 0; return -1; \
SEQ_END(s):
#define SEQ4_HELPER(s, r0, r1, r2, r3) \
case (s * 4) + 0: if (r0) return (s * 4) + 1; goto SEQ_END(s); \
case (s * 4) + 1: if (r1) return (s * 4) + 2; return -1; \
case (s * 4) + 2: if (r2) return (s * 4) + 3; return -1; \
case (s * 4) + 3: if (r3) return 0; return -1; \
SEQ_END(s):
#define SEQ1(s, r0) SEQ1_HELPER(s, r0)
#define SEQ2(s, r0, r1) SEQ2_HELPER(s, r0, r1)
#define SEQ3(s, r0, r1, r2) SEQ3_HELPER(s, r0, r1, r2)
#define SEQ4(s, r0, r1, r2, r3) SEQ4_HELPER(s, r0, r1, r2, r3)
// Matcher macros
#define VALUE(v) (c == v)
#define RANGE(s, e) (c >= s && c <= e)
/* workaround for "-Wtype-limits" as c >= s is allways true for
* the unsigned char in the case of c == 0 */
#define EGNAR(s, e) ((c >= s + 1 && c <= e) || c == s)
/* from rfc3629
*
* UTF8-octets = *( UTF8-char )
* UTF8-char = UTF8-1 / UTF8-2 / UTF8-3 / UTF8-4
* UTF8-1 = %x00-7F
* UTF8-2 = %xC2-DF UTF8-tail
* UTF8-3 = %xE0 %xA0-BF UTF8-tail / %xE1-EC 2( UTF8-tail ) /
* %xED %x80-9F UTF8-tail / %xEE-EF 2( UTF8-tail )
* UTF8-4 = %xF0 %x90-BF 2( UTF8-tail ) / %xF1-F3 3( UTF8-tail ) /
* %xF4 %x80-8F 2( UTF8-tail )
* UTF8-tail = %x80-BF
*/
#define TAIL RANGE(0x80, 0xBF)
static inline utf8_state next_state(utf8_state state, unsigned char c)
{
printf("C: %d\n", c);
switch (state) {
SEQ1(0, EGNAR(0x00, 0x7F))
SEQ2(1, RANGE(0xC2, 0xDF), TAIL)
SEQ3(2, VALUE(0xE0), RANGE(0xA0, 0xBF), TAIL)
SEQ3(3, RANGE(0xE1, 0xEC), TAIL, TAIL)
SEQ3(4, VALUE(0xED), RANGE(0x80, 0x9F), TAIL)
SEQ3(5, RANGE(0xEE, 0xEF), TAIL, TAIL)
SEQ4(6, VALUE(0xF0), RANGE(0x90, 0xBF), TAIL, TAIL)
SEQ4(7, RANGE(0xF1, 0xF3), TAIL, TAIL, TAIL)
SEQ4(8, VALUE(0xF4), RANGE(0x80, 0x8F), TAIL, TAIL)
// no sequence start matched
break;
default:
/*
* This should not happen, unless you feed an error
* state or an uninitialized utf8_state to this function.
*/
assert(false && "invalid utf8 state");
}
return -1;
}

View File

@ -0,0 +1,40 @@
module ws
import (
eventbus
)
fn (ws &Client) send_message_event(msg Message){
mut params := eventbus.Params{}
mut typ := ""
if msg.opcode == .text_frame {
params.put_string("payload", string(byteptr(msg.payload)))
typ = 'string'
} else if msg.opcode == .binary_frame {
params.put_custom("payload", "binary", msg.payload)
typ = 'binary'
}
params.put_string("type", typ)
params.put_int("len", msg.payload_len)
ws.eb.publish("on_message", params, ws)
l.d("sending on_message event")
}
fn (ws &Client) send_error_event(err string) {
mut params := eventbus.Params{}
params.put_string("error", err)
ws.eb.publish("on_error", params, ws)
l.d("sending on_error event")
}
fn (ws &Client) send_close_event() {
params := eventbus.Params{}
ws.eb.publish("on_close", params, ws)
l.d("sending on_close event")
}
fn (ws &Client) send_open_event() {
params := eventbus.Params{}
ws.eb.publish("on_open", params, ws)
l.d("sending on_open event")
}

View File

@ -0,0 +1,72 @@
module ws
fn (ws mut Client) read_handshake(seckey string){
l.d("reading handshake...")
mut bytes_read := 0
max_buffer := 256
buffer_size := 1
mut buffer := malloc(max_buffer)
for bytes_read <= max_buffer {
res := ws.read_from_server(buffer + bytes_read, buffer_size)
if res == 0 || res == -1 {
l.f("read_handshake: Failed to read handshake.")
}
if buffer[bytes_read] == `\n` && buffer[bytes_read-1] == `\r` && buffer[bytes_read-2] == `\n` && buffer[bytes_read-3] == `\r` {
break
}
bytes_read += buffer_size
}
buffer[max_buffer+1] = `\0`
ws.handshake_handler(string(byteptr(buffer)), seckey)
}
fn (ws mut Client) handshake_handler(handshake_response, seckey string){
l.d("handshake_handler:\r\n${handshake_response}")
lines := handshake_response.split_into_lines()
header := lines[0]
if !header.starts_with("HTTP/1.1 101") && !header.starts_with("HTTP/1.0 101") {
l.f("handshake_handler: invalid HTTP status response code")
}
for i in 1..lines.len {
if lines[i].len <= 0 || lines[i] == "\r\n" {
continue
}
keys := lines[i].split(":")
match keys[0] {
"Upgrade" {
ws.flags << Flag.has_upgrade
}
"Connection" {
ws.flags << Flag.has_connection
}
"Sec-WebSocket-Accept" {
l.d("comparing hashes")
response := create_key_challenge_response(seckey)
if keys[1].trim_space() != response {
l.e("handshake_handler: Sec-WebSocket-Accept header does not match computed sha1/base64 response.")
}
ws.flags << Flag.has_accept
unsafe {
response.free()
}
} else {}
}
unsafe {
keys.free()
}
}
if ws.flags.len < 3 {
ws.close(1002, "invalid websocket HTTP headers")
l.e("invalid websocket HTTP headers")
}
l.i("handshake successful!")
unsafe {
handshake_response.free()
lines.free()
header.free()
}
}

View File

@ -0,0 +1,23 @@
module ws
fn C.write() int
fn (ws mut Client) write_to_server(buf voidptr, len int) int {
mut bytes_written := 0
ws.write_lock.lock()
bytes_written = if ws.is_ssl {
C.SSL_write(ws.ssl, buf, len)
} else {
C.write(ws.socket.sockfd, buf, len)
}
ws.write_lock.unlock()
return bytes_written
}
fn (ws &Client) read_from_server(buffer byteptr, buffer_size int) int {
return if ws.is_ssl {
C.SSL_read(ws.ssl, buffer, buffer_size)
} else {
C.read(ws.socket.sockfd, buffer, buffer_size)
}
}

View File

@ -0,0 +1,46 @@
module ws
#flag -lssl
#include <openssl/rand.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
struct C.SSL_CTX
struct C.SSL
struct C.SSL_METHOD
fn C.SSL_load_error_strings()
fn C.SSL_library_init()
fn C.SSLv23_client_method() &SSL_METHOD
fn C.SSL_CTX_new() &SSL_CTX
fn C.SSL_new() &SSL
fn C.SSL_set_fd() int
fn C.SSL_connect() int
fn C.SSL_shutdown()
fn C.SSL_free()
fn C.SSL_CTX_free()
fn C.SSL_write() int
fn C.SSL_read() int
fn (ws mut Client) connect_ssl(){
l.i("Using secure SSL connection")
C.SSL_load_error_strings()
C.SSL_library_init()
ws.sslctx = SSL_CTX_new(SSLv23_client_method())
if ws.sslctx == C.NULL {
l.f("Couldn't get ssl context")
}
ws.ssl = SSL_new(ws.sslctx)
if ws.ssl == C.NULL {
l.f("Couldn't create OpenSSL instance.")
}
if SSL_set_fd(ws.ssl, ws.socket.sockfd) != 1 {
l.f("Couldn't assign ssl to socket.")
}
if SSL_connect(ws.ssl) != 1 {
l.f("Couldn't connect using SSL.")
}
}

View File

@ -0,0 +1,75 @@
module ws
pub fn utf8_validate_str(str string) bool {
return utf8_validate(str.str, str.len)
}
struct Utf8State {
mut:
index int
subindex int
failed bool
}
pub fn utf8_validate(data byteptr, len int) bool {
mut state := Utf8State{}
for i := 0; i < len; i++ {
s := data[i]
if s == 0 {break}
state.next_state(s)
if state.failed {
return false
}
//i++ //fast forward
}
return !state.failed && state.subindex <= 0
}
fn (s mut Utf8State) seq(r0 bool, r1 bool, is_tail bool) bool {
if s.subindex == 0 || (s.index > 1 && s.subindex == 1) || (s.index >= 6 && s.subindex == 2) {
if (s.subindex == 0 && r0) || (s.subindex == 1 && r1) || (s.subindex == 2 && is_tail) {
s.subindex++
return true
}
goto next
}
else {
s.failed = true
if is_tail {
s.index = 0
s.subindex = 0
s.failed = false
}
return true
}
next:
s.index++
s.subindex = 0
return false
}
fn (s mut Utf8State) next_state (c byte) {
//sequence 1
if s.index == 0 {
if ((c >= 0x00 + 1 && c <= 0x7F) || c == 0x00) {
return
}
s.index++
s.subindex = 0
}
is_tail := c >= 0x80 && c <= 0xBF
//sequence 2
if s.index == 1 && s.seq(c >= 0xC2 && c <= 0xDF, false, is_tail) {return}
//sequence 3
if s.index == 2 && s.seq(c == 0xE0, c >= 0xA0 && c <= 0xBF, is_tail) {return}
if s.index == 3 && s.seq(c >= 0xE1 && c <= 0xEC, c >= 0x80 && c <= 0xBF, is_tail) {return}
if s.index == 4 && s.seq(c == 0xED, c >= 0x80 && c <= 0x9F, is_tail) {return}
if s.index == 5 && s.seq(c >= 0xEE && c <= 0xEF, c >= 0x80 && c <= 0xBF, is_tail) {return}
//sequence 4
if s.index == 6 && s.seq(c == 0xF0, c >= 0x90 && c <= 0xBF, is_tail) {return}
if s.index == 7 && s.seq(c >= 0xF1 && c <= 0xF3, c >= 0x80 && c <= 0xBF, is_tail) {return}
if s.index == 8 && s.seq(c == 0xF4, c >= 0x80 && c <= 0x8F, is_tail) {return}
//we should never reach here
s.failed = true
}

View File

@ -0,0 +1,55 @@
module ws
import (
time
rand
math
crypto.sha1
encoding.base64
)
fn htonl64(payload_len u64) byteptr {
mut ret := malloc(8)
ret[0] = byte(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff)
ret[1] = byte(((payload_len & (u64(0xff) << 48)) >> 48) & 0xff)
ret[2] = byte(((payload_len & (u64(0xff) << 40)) >> 40) & 0xff)
ret[3] = byte(((payload_len & (u64(0xff) << 32)) >> 32) & 0xff)
ret[4] = byte(((payload_len & (u64(0xff) << 24)) >> 24) & 0xff)
ret[5] = byte(((payload_len & (u64(0xff) << 16)) >> 16) & 0xff)
ret[6] = byte(((payload_len & (u64(0xff) << 8)) >> 8) & 0xff)
ret[7] = byte(((payload_len & (u64(0xff) << 0)) >> 0) & 0xff)
return ret
}
fn create_masking_key() []byte {
t := time.ticks()
tseq := t % 23237671
mut rnd := rand.new_pcg32(u64(t), u64(tseq) )
mask_bit := byte(rnd.bounded_next(u32(math.max_i32)))
buf := [`0`].repeat(4)
C.memcpy(buf.data, &mask_bit, 4)
return buf
}
fn create_key_challenge_response(seckey string) string {
guid := "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
sha1buf := seckey + guid
hash := sha1.sum(sha1buf.bytes())
hashstr := string(byteptr(hash.data))
b64 := base64.encode(hashstr)
unsafe {
sha1buf.free()
hash.free()
}
return b64
}
fn get_nonce() string {
mut nonce := []byte
alphanum := "0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz"
for i in 0..16 {
nonce << alphanum[rand.next(61)]
}
return string(byteptr(nonce.data))
}

View File

@ -0,0 +1,652 @@
module ws
import (
net
net.urllib
encoding.base64
eventbus
sync
logger
)
const (
l = logger.new("ws")
)
pub struct Client {
retry int
eb &eventbus.EventBus
is_ssl bool
lock sync.Mutex = sync.new_mutex()
write_lock sync.Mutex = sync.new_mutex()
//subprotocol_len int
//cwebsocket_subprotocol *subprotocol;
//cwebsocket_subprotocol *subprotocols[];
mut:
state State
socket net.Socket
flags []Flag
sslctx &C.SSL_CTX
ssl &C.SSL
fragments []Fragment
pub mut:
uri string
subscriber &eventbus.Subscriber
}
struct Fragment {
data voidptr
len u64
code OPCode
}
struct Message {
opcode OPCode
payload voidptr
payload_len int
}
pub enum OPCode {
continuation = 0x00,
text_frame = 0x01,
binary_frame = 0x02,
close = 0x08,
ping = 0x09,
pong = 0x0A
}
enum State {
connecting = 0,
connected,
open,
closing,
closed
}
struct Uri {
mut:
hostname string
port string
resource string
querystring string
}
enum Flag {
has_accept,
has_connection,
has_upgrade
}
struct Frame {
mut:
fin bool
rsv1 bool
rsv2 bool
rsv3 bool
opcode OPCode
mask bool
payload_len u64
masking_key [4]byte
}
pub fn new(uri string) &Client {
eb := eventbus.new()
ws := &Client{
uri: uri
state: .closed
eb: eb,
subscriber: eb.subscriber
is_ssl: uri.starts_with("wss")
ssl: C.NULL
sslctx: C.NULL
}
return ws
}
fn C.sscanf() int
fn (ws &Client) parse_uri() &Uri {
u := urllib.parse(ws.uri) or {panic(err)}
v := u.request_uri().split("?")
querystring := if v.len > 1 {"?" + v[1]} else {""}
return &Uri {
hostname: u.hostname()
port: u.port()
resource: v[0]
querystring: querystring
}
}
pub fn (ws mut Client) connect() int {
if ws.state == .connected {
l.f("connect: websocket already connected")
} else if ws.state == .connecting {
l.f("connect: websocket already connecting")
} else if ws.state == .open {
l.f("connect: websocket already open")
}
ws.lock.lock()
ws.state = .connecting
ws.lock.unlock()
uri := ws.parse_uri()
nonce := get_nonce()
seckey := base64.encode(nonce)
ai_family := C.AF_INET
ai_socktype := C.SOCK_STREAM
handshake := "GET ${uri.resource}${uri.querystring} HTTP/1.1\r\nHost: ${uri.hostname}:${uri.port}\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: ${seckey}\r\nSec-WebSocket-Version: 13\r\n\r\n"
socket := net.new_socket(ai_family, ai_socktype, 0) or {
l.f(err)
return -1
}
ws.socket = socket
ws.socket.connect(uri.hostname, uri.port.int()) or {
l.f(err)
return -1
}
optval := 1
ws.socket.setsockopt(C.SOL_SOCKET, C.SO_KEEPALIVE, &optval) or {
l.f(err)
return -1
}
if ws.is_ssl {
ws.connect_ssl()
}
ws.lock.lock()
ws.state = .connected
ws.lock.unlock()
res := ws.write_to_server(handshake.str, handshake.len)
if res <= 0 {
l.f("Handshake failed.")
}
ws.read_handshake(seckey)
ws.lock.lock()
ws.state = .open
ws.lock.unlock()
ws.send_open_event()
unsafe {
handshake.free()
nonce.free()
free(uri)
}
return 0
}
pub fn (ws mut Client) close(code int, message string){
if ws.state != .closed && ws.socket.sockfd > 1 {
ws.lock.lock()
ws.state = .closing
ws.lock.unlock()
mut code32 := 0
if code > 0 {
_code := C.htons(code)
message_len := message.len + 2
mut close_frame := [`0`].repeat(message_len)
close_frame[0] = _code & 0xFF
close_frame[1] = (_code >> 8)
code32 = (close_frame[0] << 8) + close_frame[1]
for i in 0..message.len {
close_frame[i+2] = message[i]
}
ws.send_control_frame(.close, "CLOSE", close_frame)
} else {
ws.send_control_frame(.close, "CLOSE", [])
}
if ws.ssl != C.NULL {
SSL_shutdown(ws.ssl)
SSL_free(ws.ssl)
if ws.sslctx != C.NULL {
SSL_CTX_free(ws.sslctx)
}
} else {
if C.shutdown(ws.socket.sockfd, C.SHUT_WR) == -1 {
l.e("Unabled to shutdown websocket.")
}
mut buf := [`0`]
for ws.read_from_server(buf.data, 1) > 0 {
buf[0] = `\0`
}
unsafe {
buf.free()
}
if C.close(ws.socket.sockfd) == -1 {
//ws.send_close_event()(websocket, 1011, strerror(errno));
}
}
ws.fragments = []
ws.send_close_event()
ws.lock.lock()
ws.state = .closed
ws.lock.unlock()
unsafe {
}
//TODO impl autoreconnect
}
}
pub fn (ws mut Client) write(payload byteptr, payload_len int, code OPCode) int {
if ws.state != .open {
ws.send_error_event("WebSocket closed. Cannot write.")
goto free_data
return -1
}
header_len := 6 + if payload_len > 125 {2} else {0} + if payload_len > 0xffff {6} else {0}
masking_key := create_masking_key()
mut header := [`0`].repeat(header_len)
mut bytes_written := -1
header[0] = (int(code) | 0x80)
if payload_len <= 125 {
header[1] = (payload_len | 0x80)
header[2] = masking_key[0]
header[3] = masking_key[1]
header[4] = masking_key[2]
header[5] = masking_key[3]
} else if payload_len > 125 && payload_len <= 0xffff {
len16 := C.htons(payload_len)
header[1] = (126 | 0x80)
C.memcpy(header.data+2, &len16, 2)
header[4] = masking_key[0]
header[5] = masking_key[1]
header[6] = masking_key[2]
header[7] = masking_key[3]
} else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff { // 65535 && 18446744073709551615
len64 := htonl64(u64(payload_len))
header[1] = (127 | 0x80)
C.memcpy(header.data+2, len64, 8)
header[10] = masking_key[0]
header[11] = masking_key[1]
header[12] = masking_key[2]
header[13] = masking_key[3]
} else {
l.c("write: frame too large")
ws.close(1009, "frame too large")
goto free_data
return -1
}
frame_len := header_len + payload_len
mut frame_buf := [`0`].repeat(frame_len)
C.memcpy(frame_buf.data, header.data, header_len)
C.memcpy(&frame_buf.data[header_len], payload, payload_len)
for i in 0..payload_len {
frame_buf[header_len+i] ^= masking_key[i % 4] & 0xff
}
bytes_written = ws.write_to_server(frame_buf.data, frame_len)
if bytes_written == -1 {
err := string(byteptr(C.strerror(C.errno)))
l.e("write: there was an error writing data: ${err}")
ws.send_error_event("Error writing data")
goto free_data
return -1
}
l.d("write: ${bytes_written} bytes written.")
free_data:
unsafe {
free(payload)
frame_buf.free()
header.free()
masking_key.free()
}
return bytes_written
}
pub fn (ws mut Client) listen() {
l.i("Starting listener...")
for ws.state == .open {
ws.read()
}
l.i("Listener stopped as websocket was closed.")
}
pub fn (ws mut Client) read() int {
mut bytes_read := u64(0)
initial_buffer := u64(256)
mut header_len := 2
header_len_offset := 2
extended_payload16_end_byte := 4
extended_payload64_end_byte := 10
mut payload_len := u64(0)
mut data := C.calloc(initial_buffer, 1)//[`0`].repeat(int(max_buffer))
mut frame := Frame{}
mut frame_size := u64(header_len)
for bytes_read < frame_size && ws.state == .open {
byt := ws.read_from_server(data + int(bytes_read), 1)
match byt {
0 {
error := "server closed the connection."
l.e("read: ${error}")
ws.send_error_event(error)
ws.close(1006, error)
goto free_data
return -1
}
-1 {
err := string(byteptr(C.strerror(C.errno)))
l.e("read: error reading frame. ${err}")
ws.send_error_event("error reading frame")
goto free_data
return -1
} else {
bytes_read++
}
}
if bytes_read == u64(header_len_offset) {
frame.fin = (data[0] & 0x80) == 0x80
frame.rsv1 = (data[0] & 0x40) == 0x40
frame.rsv2 = (data[0] & 0x20) == 0x20
frame.rsv3 = (data[0] & 0x10) == 0x10
frame.opcode = OPCode(int(data[0] & 0x7F))
frame.mask = (data[1] & 0x80) == 0x80
frame.payload_len = u64(data[1] & 0x7F)
//masking key
if frame.mask {
frame.masking_key[0] = data[2]
frame.masking_key[1] = data[3]
frame.masking_key[2] = data[4]
frame.masking_key[3] = data[5]
}
payload_len = frame.payload_len
frame_size = u64(header_len) + payload_len
}
if frame.payload_len == u64(126) && bytes_read == u64(extended_payload16_end_byte) {
header_len += 2
mut extended_payload_len := 0
extended_payload_len |= data[2] << 8
extended_payload_len |= data[3] << 0
//masking key
if frame.mask {
frame.masking_key[0] = data[4]
frame.masking_key[1] = data[5]
frame.masking_key[2] = data[6]
frame.masking_key[3] = data[7]
}
payload_len = u64(extended_payload_len)
frame_size = u64(header_len) + payload_len
if frame_size > initial_buffer {
l.d("reallocating: ${frame_size}")
data = C.realloc(data, frame_size)
}
} else if frame.payload_len == u64(127) && bytes_read == u64(extended_payload64_end_byte) {
header_len += 8 //TODO Not sure...
mut extended_payload_len := u64(0)
extended_payload_len |= u64(data[2]) << 56
extended_payload_len |= u64(data[3]) << 48
extended_payload_len |= u64(data[4]) << 40
extended_payload_len |= u64(data[5]) << 32
extended_payload_len |= u64(data[6]) << 24
extended_payload_len |= u64(data[7]) << 16
extended_payload_len |= u64(data[8]) << 8
extended_payload_len |= u64(data[9]) << 0
//masking key
if frame.mask {
frame.masking_key[0] = data[10]
frame.masking_key[1] = data[11]
frame.masking_key[2] = data[12]
frame.masking_key[3] = data[13]
}
payload_len = extended_payload_len
frame_size = u64(header_len) + payload_len
if frame_size > initial_buffer {
l.d("reallocating: ${frame_size}")
data = C.realloc(data, frame_size)
}
}
}
// unmask the payload
if frame.mask {
for i in 0..payload_len {
data[header_len+i] ^= frame.masking_key[i % 4] & 0xff
}
}
if ws.fragments.len > 0 && frame.opcode in [.text_frame, .binary_frame] {
ws.close(0, "")
goto free_data
return -1
} else if frame.opcode in [.text_frame, .binary_frame] {
data_node:
l.d("read: recieved text_frame or binary_frame")
mut payload := malloc(sizeof(byte) * int(payload_len) + 1)
if payload == C.NULL {
l.f("out of memory")
}
C.memcpy(payload, &data[header_len], payload_len)
if frame.fin {
if ws.fragments.len > 0 {
//join fragments
ws.fragments << Fragment{
data: payload
len: payload_len
}
mut frags := []Fragment
mut size := u64(0)
for f in ws.fragments {
if f.len > 0 {
frags << f
size += f.len
}
}
mut pl := malloc(sizeof(byte) * int(size))
if pl == C.NULL {
l.f("out of memory")
}
mut by := 0
for i, f in frags {
C.memcpy(pl + by, f.data, f.len)
by += int(f.len)
unsafe {free(f.data)}
}
payload = pl
frame.opcode = ws.fragments[0].code
payload_len = size
//clear the fragments
unsafe {
ws.fragments.free()
}
ws.fragments = []
}
payload[payload_len] = `\0`
if frame.opcode == .text_frame && payload_len > 0 {
if !utf8_validate(payload, int(payload_len)) {
l.e("malformed utf8 payload")
ws.send_error_event("Recieved malformed utf8.")
ws.close(1007, "malformed utf8 payload")
goto free_data
return -1
}
}
message := Message {
opcode: frame.opcode
payload: payload
payload_len: int(payload_len)
}
ws.send_message_event(message)
} else {
//fragment start.
ws.fragments << Fragment{
data: payload
len: payload_len
code: frame.opcode
}
}
unsafe {
free(data)
}
return int(bytes_read)
}
else if frame.opcode == .continuation {
l.d("read: continuation")
if ws.fragments.len <= 0 {
l.e("Nothing to continue.")
ws.close(1002, "nothing to continue")
goto free_data
return -1
}
goto data_node
return 0
}
else if frame.opcode == .ping {
l.d("read: ping")
if !frame.fin {
ws.close(1002, "control message must not be fragmented")
goto free_data
return -1
}
if frame.payload_len > 125 {
ws.close(1002, "control frames must not exceed 125 bytes")
goto free_data
return -1
}
mut payload := []byte
if payload_len > 0 {
payload = [`0`].repeat(int(payload_len))
C.memcpy(payload.data, &data[header_len], payload_len)
}
unsafe {
free(data)
}
return ws.send_control_frame(.pong, "PONG", payload)
}
else if frame.opcode == .pong {
if !frame.fin {
ws.close(1002, "control message must not be fragmented")
goto free_data
return -1
}
unsafe {
free(data)
}
//got pong
return 0
}
else if frame.opcode == .close {
l.d("read: close")
if frame.payload_len > 125 {
ws.close(1002, "control frames must not exceed 125 bytes")
goto free_data
return -1
}
mut code := 0
mut reason := ""
if payload_len > 2 {
code = (int(data[header_len]) << 8) + int(data[header_len+1])
header_len += 2
payload_len -= 2
reason = string(&data[header_len])
l.i("Closing with reason: ${reason} & code: ${code}")
if reason.len > 1 && !utf8_validate(reason.str, reason.len) {
l.e("malformed utf8 payload")
ws.send_error_event("Recieved malformed utf8.")
ws.close(1007, "malformed utf8 payload")
goto free_data
return -1
}
}
unsafe{
free(data)
}
ws.close(code, reason)
return 0
}
l.e("read: Recieved unsupported opcode: ${frame.opcode} fin: ${frame.fin} uri: ${ws.uri}")
ws.send_error_event("Recieved unsupported opcode: ${frame.opcode}")
ws.close(1002, "Unsupported opcode")
free_data:
unsafe {
free(data)
}
return -1
}
fn (ws mut Client) send_control_frame(code OPCode, frame_typ string, payload []byte) int {
if ws.socket.sockfd <= 0 {
l.e("No socket opened.")
goto free_data
return -1
}
header_len := 6
frame_len := header_len + payload.len
mut control_frame := [`0`].repeat(frame_len)
masking_key := create_masking_key()
control_frame[0] = (int(code) | 0x80)
control_frame[1] = (payload.len | 0x80)
control_frame[2] = masking_key[0]
control_frame[3] = masking_key[1]
control_frame[4] = masking_key[2]
control_frame[5] = masking_key[3]
if code == .close {
close_code := 1000
if payload.len > 2 {
mut parsed_payload := [`0`].repeat(payload.len + 1)
C.memcpy(parsed_payload.data, &payload[0], payload.len)
parsed_payload[payload.len] = `\0`
for i in 0..payload.len {
control_frame[6+i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff
}
unsafe {
parsed_payload.free()
}
}
} else {
for i in 0..payload.len {
control_frame[header_len + i] = (payload[i] ^ masking_key[i % 4]) & 0xff
}
}
mut bytes_written := -1
bytes_written = ws.write_to_server(control_frame.data, frame_len)
free_data:
unsafe {
control_frame.free()
payload.free()
masking_key.free()
}
match bytes_written {
0 {
l.d("send_control_frame: remote host closed the connection.")
return 0
}
-1 {
l.c("send_control_frame: error sending ${frame_typ} control frame.")
return -1
} else {
l.d("send_control_frame: wrote ${bytes_written} byte ${frame_typ} frame.")
return bytes_written
}
}
}