x.websockets: new websockets module on top of x.net (#6189)

pull/6190/head
Tomas Hellström 2020-08-22 00:50:38 +02:00 committed by GitHub
parent 1b914d217e
commit fb148e0b61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 2302 additions and 1 deletions

View File

@ -302,6 +302,7 @@ jobs:
run: |
.\v.exe setup-freetype
.\.github\workflows\windows-install-sqlite.bat
choco install openssl
## .\.github\workflows\windows-install-sdl.bat
- name: Fixed tests
run: |
@ -335,6 +336,7 @@ jobs:
run: |
.\v.exe setup-freetype
.\.github\workflows\windows-install-sqlite.bat
choco install openssl
## .\.github\workflows\windows-install-sdl.bat
- name: Fixed tests
run: |
@ -373,6 +375,7 @@ jobs:
run: |
.\v.exe setup-freetype
.\.github\workflows\windows-install-sqlite.bat
choco install openssl
## .\.github\workflows\windows-install-sdl.bat
- name: Fixed tests
run: |
@ -474,3 +477,42 @@ jobs:
../v .
../v -autofree .
cd ..
# TODO: ACTIVATE THIS AFTER MERGE
# websocket_autobahn:
# name: Autobahn integrations tests
# runs-on: ubuntu-latest
# steps:
# - name: Checkout
# uses: actions/checkout@v2
# - name: Run autobahn services
# run: docker-compose -f ${{github.workspace}}/vlib/x/websocket/tests/autobahn/docker-compose.yml up -d
# - name: Build client test
# run: docker exec autobahn_client "v" "/src/vlib/x/websocket/tests/autobahn/autobahn_client.v"
# - name: Run client test
# run: docker exec autobahn_client "/src/vlib/x/websocket/tests/autobahn/autobahn_client"
# - name: Run server test
# run: docker exec autobahn_server "wstest" "-m" "fuzzingclient" "-s" "/config/fuzzingclient.json"
# - name: Copy reports
# run: docker cp autobahn_server:/reports ${{github.workspace}}/reports
# - name: Test success
# run: docker exec autobahn_server "python" "/check_results.py"
# - name: Publish all reports
# uses: actions/upload-artifact@v2
# with:
# name: full report
# path: ${{github.workspace}}/reports
# - name: Publish report client
# uses: actions/upload-artifact@v2
# with:
# name: client
# path: ${{github.workspace}}/reports/clients/index.html
# - name: Publish report server
# uses: actions/upload-artifact@v2
# with:
# name: server
# path: ${{github.workspace}}/reports/servers/index.html

View File

@ -0,0 +1,96 @@
module main
import time
import os
import x.websocket
fn main() {
go start_server()
time.sleep_ms(100)
go start_client()
println('press enter to quit...')
os.get_line()
}
fn start_server() ? {
mut s := websocket.new_server(30000, '')
// Make that in execution test time give time to execute at least one time
s.ping_interval = 100
s.on_connect(fn (mut s websocket.ServerClient) ?bool {
// Here you can look att the client info and accept or not accept
// just returning a true/false
if s.resource_name != '/' {
return false
}
return true
})?
s.on_message(fn (mut ws websocket.Client, msg &websocket.Message) ? {
ws.write(msg.payload, msg.opcode) or {
panic(err)
}
})
s.on_close(fn (mut ws websocket.Client, code int, reason string) ? {
// println('client ($ws.id) closed connection')
})
s.listen() or {
// println('error on server listen: $err')
}
unsafe {
s.free()
}
}
fn start_client() ? {
mut ws := websocket.new_client('ws://localhost:30000')?
// mut ws := websocket.new_client('wss://echo.websocket.org:443')?
// use on_open_ref if you want to send any reference object
ws.on_open(fn (mut ws websocket.Client) ? {
println('open!')
})
// use on_error_ref if you want to send any reference object
ws.on_error(fn (mut ws websocket.Client, err string) ? {
println('error: $err')
})
// use on_close_ref if you want to send any reference object
ws.on_close(fn (mut ws websocket.Client, code int, reason string) ? {
println('closed')
})
// use on_message_ref if you want to send any reference object
ws.on_message(fn (mut ws websocket.Client, msg &websocket.Message) ? {
if msg.payload.len > 0 {
message := string(msg.payload, msg.payload.len)
println('client got type: $msg.opcode payload:\n$message')
}
})
// you can add any pointer reference to use in callback
// t := TestRef{count: 10}
// ws.on_message_ref(fn (mut ws websocket.Client, msg &websocket.Message, r &SomeRef)? {
// // println('type: $msg.opcode payload:\n$msg.payload ref: $r')
// }, &r)
ws.connect() or {
println('error on connect: $err')
}
go write_echo(mut ws) or {
println('error on write_echo $err')
}
ws.listen() or {
println('error on listen $err')
}
unsafe {
ws.free()
}
}
fn write_echo(mut ws websocket.Client) ? {
message := 'echo this'
for i := 0; i <= 10; i++ {
// Server will send pings every 30 seconds
ws.write_str(message) or {
println('panicing writing $err')
}
time.sleep_ms(100)
}
ws.close(1000, 'normal') or {
println('panicing $err')
}
}

View File

@ -6,6 +6,8 @@ module openssl
// the next flag is harmless, since it will still use the
// (older) system openssl.
#flag linux -I/usr/local/include/openssl -L/usr/local/lib
#flag windows -l libssl -l libcrypto
#flag -l ssl -l crypto
// MacPorts
#flag darwin -I/opt/local/include
@ -40,6 +42,7 @@ fn C.SSL_set_fd() int
fn C.SSL_connect() int
fn C.SSL_set_cipher_list() int
fn C.SSL_get_peer_certificate() int
fn C.SSL_get_error() int
fn C.SSL_get_verify_result() int
fn C.SSL_set_tlsext_host_name() int
fn C.SSL_shutdown()
@ -52,6 +55,7 @@ fn C.SSLv23_client_method() &C.SSL_METHOD
fn C.TLSv1_2_method() voidptr
fn init() {
C.SSL_load_error_strings()
C.SSL_library_init()
}

View File

@ -0,0 +1,27 @@
module openssl
// ssl_error returns non error ssl code or error if unrecoverable and we should panic
pub fn ssl_error(ret int, ssl voidptr) ?SSLError {
res := C.SSL_get_error(ssl, ret)
match SSLError(res) {
.ssl_error_syscall { return error_with_code('unrecoverable syscall ($res)', res) }
.ssl_error_ssl { return error_with_code('unrecoverable ssl protocol error ($res)',
res) }
else { return res }
}
}
pub enum SSLError {
ssl_error_none = 0 //SSL_ERROR_NONE
ssl_error_ssl = 1 //SSL_ERROR_SSL
ssl_error_want_read = 2 //SSL_ERROR_WANT_READ
ssl_error_want_write = 3 //SSL_ERROR_WANT_WRITE
ssl_error_want_x509_lookup = 4 //SSL_ERROR_WANT_X509_LOOKUP
ssl_error_syscall = 5 //SSL_ERROR_SYSCALL
ssl_error_zero_return = 6 //SSL_ERROR_ZERO_RETURN
ssl_error_want_connect = 7 //SSL_ERROR_WANT_CONNECT
ssl_error_want_accept = 8 //SSL_ERROR_WANT_ACCEPT
ssl_error_want_async = 9 //SSL_ERROR_WANT_ASYNC
ssl_error_want_async_job = 10 //SSL_ERROR_WANT_ASYNC_JOB
ssl_error_want_early = 11 //SSL_ERROR_WANT_EARLY
}

View File

@ -92,4 +92,4 @@ fn C.FD_SET()
fn C.FD_ISSET() bool
[typedef]
struct C.fd_set {}
pub struct C.fd_set {}

View File

@ -70,6 +70,25 @@ pub fn (c TcpConn) write_string(s string) ? {
return c.write_ptr(s.str, s.len)
}
pub fn (c TcpConn) read_into_ptr(buf_ptr byteptr, len int) ?int {
res := C.recv(c.sock.handle, buf_ptr, len, 0)
if res >= 0 {
return res
}
code := error_code()
match code {
error_ewouldblock {
c.wait_for_read()?
return socket_error(C.recv(c.sock.handle, buf_ptr, len, 0))
}
else {
wrap_error(code)?
}
}
}
pub fn (c TcpConn) read_into(mut buf []byte) ?int {
res := C.recv(c.sock.handle, buf.data, buf.len, 0)

View File

@ -0,0 +1,245 @@
module openssl
import net.openssl
import x.net
import time
// const (
// is_used = openssl.is_used
// )
pub struct SSLConn {
mut:
sslctx &C.SSL_CTX
ssl &C.SSL
handle int
duration time.Duration
}
enum Select {
read
write
except
}
pub fn new_ssl_conn() &SSLConn {
return &SSLConn{
sslctx: 0
ssl: 0
handle: 0
}
}
// shutdown closes the ssl connection and do clean up
pub fn (mut s SSLConn) shutdown() ? {
if s.ssl != 0 {
mut res := 0
for {
res = int(C.SSL_shutdown(s.ssl))
if res < 0 {
err_res := openssl.ssl_error(res, s.ssl) or {
break // We break to free rest of resources
}
if err_res == .ssl_error_want_read {
for {
ready := @select(s.handle, .read, s.duration)?
if ready {
break
}
}
continue
} else if err_res == .ssl_error_want_write {
for {
ready := @select(s.handle, .write, s.duration)?
if ready {
break
}
}
continue
} else {
println('error: $err_res')
return error('unexepedted ssl error $err_res')
}
C.SSL_free(s.ssl)
if s.sslctx != 0 {
C.SSL_CTX_free(s.sslctx)
}
return error('Could not connect using SSL. ($err_res),err')
} else if res == 0 {
continue
} else if res == 1 {
break
}
}
C.SSL_free(s.ssl)
}
if s.sslctx != 0 {
C.SSL_CTX_free(s.sslctx)
}
}
// connect to server using open ssl
pub fn (mut s SSLConn) connect(mut tcp_conn net.TcpConn) ? {
s.handle = tcp_conn.sock.handle
s.duration = tcp_conn.read_timeout()
// C.SSL_load_error_strings()
s.sslctx = C.SSL_CTX_new(C.SSLv23_client_method())
if s.sslctx == 0 {
return error("Couldn't get ssl context")
}
s.ssl = C.SSL_new(s.sslctx)
if s.ssl == 0 {
return error("Couldn't create OpenSSL instance.")
}
if C.SSL_set_fd(s.ssl, tcp_conn.sock.handle) != 1 {
return error("Couldn't assign ssl to socket.")
}
for {
res := C.SSL_connect(s.ssl)
if res != 1 {
err_res := openssl.ssl_error(res, s.ssl)?
if err_res == .ssl_error_want_read {
for {
ready := @select(s.handle, .read, s.duration)?
if ready {
break
}
}
continue
} else if err_res == .ssl_error_want_write {
for {
ready := @select(s.handle, .write, s.duration)?
if ready {
break
}
}
continue
}
return error('Could not connect using SSL. ($err_res),err')
}
println('CONNECT OK')
break
}
}
pub fn (mut s SSLConn) socket_read_into_ptr(buf_ptr byteptr, len int) ?int {
mut res := 0
for {
res = C.SSL_read(s.ssl, buf_ptr, len)
if res < 0 {
err_res := openssl.ssl_error(res, s.ssl)?
if err_res == .ssl_error_want_read {
for {
ready := @select(s.handle, .read, s.duration)?
if ready {
break
}
}
continue
} else if err_res == .ssl_error_want_write {
for {
ready := @select(s.handle, .write, s.duration)?
if ready {
break
}
}
continue
} else if err_res == .ssl_error_zero_return {
return 0
}
return error('Could not read using SSL. ($err_res),err')
}
break
}
return res
}
pub fn (mut s SSLConn) read_into(mut buffer []Byte) ?int {
res := s.socket_read_into_ptr(byteptr(buffer.data), buffer.len)?
return res
}
// write number of bytes to SSL connection
pub fn (mut s SSLConn) write(bytes []Byte) ? {
unsafe {
mut ptr_base := byteptr(bytes.data)
mut total_sent := 0
for total_sent < bytes.len {
ptr := ptr_base + total_sent
remaining := bytes.len - total_sent
mut sent := C.SSL_write(s.ssl, ptr, remaining)
if sent <= 0 {
err_res := openssl.ssl_error(sent, s.ssl)?
if err_res == .ssl_error_want_read {
for {
ready := @select(s.handle, .read, s.duration)?
if ready {
break
}
}
} else if err_res == .ssl_error_want_write {
for {
ready := @select(s.handle, .write, s.duration)?
if ready {
break
}
}
continue
} else if err_res == .ssl_error_zero_return {
return error('ssl write on closed connection') // Todo error_with_code close
}
return error_with_code('Could not write SSL. ($err_res),err', err_res)
}
total_sent += sent
}
}
}
// // ssl_error returns non error ssl code or error if unrecoverable and we should panic
// fn (mut s SSLConn) ssl_error(ret int) ?SSLError {
// res := C.SSL_get_error(s.ssl, ret)
// match SSLError(res) {
// .ssl_error_syscall { return error_with_code('unrecoverable syscall ($res)', res) }
// .ssl_error_ssl { return error_with_code('unrecoverable ssl protocol error ($res)',
// res) }
// else { return res }
// }
// }
// enum SSLError {
// ssl_error_none = C.SSL_ERROR_NONE
// ssl_error_ssl = C.SSL_ERROR_SSL
// ssl_error_want_read = C.SSL_ERROR_WANT_READ
// ssl_error_want_write = C.SSL_ERROR_WANT_WRITE
// ssl_error_want_x509_lookup = C.SSL_ERROR_WANT_X509_LOOKUP
// ssl_error_syscall = C.SSL_ERROR_SYSCALL
// ssl_error_zero_return = C.SSL_ERROR_ZERO_RETURN
// ssl_error_want_connect = C.SSL_ERROR_WANT_CONNECT
// ssl_error_want_accept = C.SSL_ERROR_WANT_ACCEPT
// ssl_error_want_async = C.SSL_ERROR_WANT_ASYNC
// ssl_error_want_async_job = C.SSL_ERROR_WANT_ASYNC_JOB
// ssl_error_want_client_hello_cb = C.SSL_ERROR_WANT_CLIENT_HELLO_CB
// }
/*
This is basically a copy of Emily socket implementation of select.
This have to be consolidated into common net lib features
when merging this to V
*/
[typedef]
pub struct C.fd_set {
}
// Select waits for an io operation (specified by parameter `test`) to be available
fn @select(handle int, test Select, timeout time.Duration) ?bool {
set := C.fd_set{}
C.FD_ZERO(&set)
C.FD_SET(handle, &set)
timeval_timeout := C.timeval{
tv_sec: u64(0)
tv_usec: u64(timeout.microseconds())
}
match test {
.read { net.socket_error(C.@select(handle, &set, C.NULL, C.NULL, &timeval_timeout))? }
.write { net.socket_error(C.@select(handle, C.NULL, &set, C.NULL, &timeval_timeout))? }
.except { net.socket_error(C.@select(handle, C.NULL, C.NULL, &set, &timeval_timeout))? }
}
return C.FD_ISSET(handle, &set)
}

View File

@ -0,0 +1,205 @@
module websocket
// All this plumbing will go awauy when we can do EventHandler<T> properly
struct MessageEventHandler {
handler SocketMessageFn
handler2 SocketMessageFn2
is_ref bool = false
ref voidptr
}
struct ErrorEventHandler {
handler SocketErrorFn
handler2 SocketErrorFn2
is_ref bool = false
ref voidptr
}
struct OpenEventHandler {
handler SocketOpenFn
handler2 SocketOpenFn2
is_ref bool = false
ref voidptr
}
struct CloseEventHandler {
handler SocketCloseFn
handler2 SocketCloseFn2
is_ref bool = false
ref voidptr
}
pub type AcceptClientFn = fn (mut c ServerClient) ?bool
pub type SocketMessageFn = fn (mut c Client, msg &Message) ?
pub type SocketMessageFn2 = fn (mut c Client, msg &Message, v voidptr) ?
pub type SocketErrorFn = fn (mut c Client, err string) ?
pub type SocketErrorFn2 = fn (mut c Client, err string, v voidptr) ?
pub type SocketOpenFn = fn (mut c Client) ?
pub type SocketOpenFn2 = fn (mut c Client, v voidptr) ?
pub type SocketCloseFn = fn (mut c Client, code int, reason string) ?
pub type SocketCloseFn2 = fn (mut c Client, code int, reason string, v voidptr) ?
pub fn (mut s Server) on_connect(fun AcceptClientFn) ? {
if s.accept_client_callbacks.len > 0 {
return error('only one callback can be registered for accept client')
}
s.accept_client_callbacks << fun
}
fn (mut s Server) send_connect_event(mut c ServerClient) ?bool {
if s.accept_client_callbacks.len == 0 {
// If no callback all client will be accepted
return true
}
fun := s.accept_client_callbacks[0]
res := fun(mut c)?
return res
}
// on_message, register a callback on new messages
pub fn (mut s Server) on_message(fun SocketMessageFn) {
s.message_callbacks << MessageEventHandler{
handler: fun
}
}
// on_message_ref, register a callback on new messages and provide a reference
pub fn (mut s Server) on_message_ref(fun SocketMessageFn2, ref voidptr) {
s.message_callbacks << MessageEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_close, register a callback on closed socket
pub fn (mut s Server) on_close(fun SocketCloseFn) {
s.close_callbacks << CloseEventHandler{
handler: fun
}
}
// on_close_ref, register a callback on closed socket and provide a reference
pub fn (mut s Server) on_close_ref(fun SocketCloseFn2, ref voidptr) {
s.close_callbacks << CloseEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_message, register a callback on new messages
pub fn (mut ws Client) on_message(fun SocketMessageFn) {
ws.message_callbacks << MessageEventHandler{
handler: fun
}
}
// on_message_ref, register a callback on new messages and provide a reference
pub fn (mut ws Client) on_message_ref(fun SocketMessageFn2, ref voidptr) {
ws.message_callbacks << MessageEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_error, register a callback on errors
pub fn (mut ws Client) on_error(fun SocketErrorFn) {
ws.error_callbacks << ErrorEventHandler{
handler: fun
}
}
// on_error_ref, register a callback on errors and provida a reference
pub fn (mut ws Client) on_error_ref(fun SocketErrorFn2, ref voidptr) {
ws.error_callbacks << ErrorEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_open, register a callback on successful open
pub fn (mut ws Client) on_open(fun SocketOpenFn) {
ws.open_callbacks << OpenEventHandler{
handler: fun
}
}
// on_open_ref, register a callback on successful open and provide a reference
pub fn (mut ws Client) on_open_ref(fun SocketOpenFn2, ref voidptr) {
ws.open_callbacks << OpenEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
// on_close, register a callback on closed socket
pub fn (mut ws Client) on_close(fun SocketCloseFn) {
ws.close_callbacks << CloseEventHandler{
handler: fun
}
}
// on_close_ref, register a callback on closed socket and provide a reference
pub fn (mut ws Client) on_close_ref(fun SocketCloseFn2, ref voidptr) {
ws.close_callbacks << CloseEventHandler{
handler2: fun
ref: ref
is_ref: true
}
}
fn (mut ws Client) send_message_event(msg &Message) {
ws.debug_log('sending on_message event')
for ev_handler in ws.message_callbacks {
if !ev_handler.is_ref {
ev_handler.handler(ws, msg)
} else {
ev_handler.handler2(ws, msg, ev_handler.ref)
}
}
}
fn (mut ws Client) send_error_event(err string) {
ws.debug_log('sending on_error event')
for ev_handler in ws.error_callbacks {
if !ev_handler.is_ref {
ev_handler.handler(mut ws, err)
} else {
ev_handler.handler2(mut ws, err, ev_handler.ref)
}
}
}
fn (mut ws Client) send_close_event(code int, reason string) {
ws.debug_log('sending on_close event')
for ev_handler in ws.close_callbacks {
if !ev_handler.is_ref {
ev_handler.handler(mut ws, code, reason)
} else {
ev_handler.handler2(mut ws, code, reason, ev_handler.ref)
}
}
}
fn (mut ws Client) send_open_event() {
ws.debug_log('sending on_open event')
for ev_handler in ws.open_callbacks {
if !ev_handler.is_ref {
ev_handler.handler(mut ws)
} else {
ev_handler.handler2(mut ws, ev_handler.ref)
}
}
}

View File

@ -0,0 +1,188 @@
module websocket
import encoding.base64
import strings
// handshake manage the handshake part of connecting
fn (mut ws Client) handshake() ? {
nonce := get_nonce(ws.nonce_size)
seckey := base64.encode(nonce)
// handshake := 'GET $ws.uri.resource$ws.uri.querystring HTTP/1.1\r\nHost: $ws.uri.hostname:$ws.uri.port\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: $seckey\r\nSec-WebSocket-Version: 13\r\n\r\n'
mut sb := strings.new_builder(1024)
// todo, remove when autofree
defer {
sb.free()
}
sb.write('GET ')
sb.write(ws.uri.resource)
sb.write(ws.uri.querystring)
sb.write(' HTTP/1.1\r\nHost: ')
sb.write(ws.uri.hostname)
sb.write(':')
sb.write(ws.uri.port)
sb.write('\r\nUpgrade: websocket\r\nConnection: Upgrade\r\n')
sb.write('Sec-WebSocket-Key: ')
sb.write(seckey)
sb.write('\r\nSec-WebSocket-Version: 13\r\n\r\n')
// handshake := 'GET $ws.uri.resource$ws.uri.querystring HTTP/1.1\r\nHost: $ws.uri.hostname:$ws.uri.port\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: $seckey\r\nSec-WebSocket-Version: 13\r\n\r\n'
handshake := sb.str()
defer {
handshake.free()
}
handshake_bytes := handshake.bytes()
ws.debug_log('sending handshake: $handshake')
ws.socket_write(handshake_bytes)?
ws.read_handshake(seckey)?
unsafe {
handshake_bytes.free()
}
}
// handshake manage the handshake part of connecting
fn (mut s Server) handle_server_handshake(mut c Client) ?(string, &ServerClient) {
msg := c.read_handshake_str()?
handshake_response, client := s.parse_client_handshake(msg, mut c)?
unsafe {
msg.free()
}
return handshake_response, client
}
fn (mut s Server) parse_client_handshake(client_handshake string, mut c Client) ?(string, &ServerClient) {
s.logger.debug('server-> client handshake:\n$client_handshake')
lines := client_handshake.split_into_lines()
get_tokens := lines[0].split(' ')
if get_tokens.len < 3 {
return error('unexpected get operation, $get_tokens')
}
if get_tokens[0].trim_space() != 'GET' {
return error("unexpected request '${get_tokens[0]}', expected 'GET'")
}
if get_tokens[2].trim_space() != 'HTTP/1.1' {
return error("unexpected request $get_tokens, expected 'HTTP/1.1'")
}
// path := get_tokens[1].trim_space()
mut seckey := ''
mut flags := []Flag{}
mut key := ''
for i in 1 .. lines.len {
if lines[i].len <= 0 || lines[i] == '\r\n' {
continue
}
keys := lines[i].split(':')
match keys[0] {
'Upgrade', 'upgrade' {
flags << .has_upgrade
}
'Connection', 'connection' {
flags << .has_connection
}
'Sec-WebSocket-Key', 'sec-websocket-key' {
key = keys[1].trim_space()
s.logger.debug('server-> got key: $key')
seckey = create_key_challenge_response(key)?
s.logger.debug('server-> challenge: $seckey, response: ${keys[1]}')
flags << .has_accept
}
else {
// We ignore other headers like protocol for now
}
}
unsafe {
keys.free()
}
}
if flags.len < 3 {
return error('invalid client handshake, $client_handshake')
}
server_handshake := 'HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: $seckey\r\n\r\n'
server_client := &ServerClient{
resource_name: get_tokens[1]
client_key: key
client: c
server: s
}
unsafe {
lines.free()
flags.free()
get_tokens.free()
seckey.free()
key.free()
}
return server_handshake, server_client
}
fn (mut ws Client) read_handshake_str() ?string {
mut total_bytes_read := 0
mut msg := [1024]byte{}
mut buffer := [1]byte{}
for total_bytes_read < 1024 {
bytes_read := ws.socket_read_into_ptr(byteptr(&buffer), 1)?
if bytes_read == 0 {
return error('unexpected no response from handshake')
}
msg[total_bytes_read] = buffer[0]
total_bytes_read++
if total_bytes_read > 5 && msg[total_bytes_read - 1] == `\n` && msg[total_bytes_read -
2] == `\r` && msg[total_bytes_read - 3] == `\n` && msg[total_bytes_read - 4] == `\r` {
break
}
}
res := string(msg[..total_bytes_read])
return res
}
// read_handshake reads the handshake and check if valid
fn (mut ws Client) read_handshake(seckey string) ? {
mut msg := ws.read_handshake_str()?
ws.check_handshake_response(msg, seckey)?
unsafe {
msg.free()
}
}
fn (mut ws Client) check_handshake_response(handshake_response, seckey string) ? {
ws.debug_log('handshake response:\n$handshake_response')
lines := handshake_response.split_into_lines()
header := lines[0]
if !header.starts_with('HTTP/1.1 101') && !header.starts_with('HTTP/1.0 101') {
return error('handshake_handler: invalid HTTP status response code')
}
for i in 1 .. lines.len {
if lines[i].len <= 0 || lines[i] == '\r\n' {
continue
}
keys := lines[i].split(':')
match keys[0] {
'Upgrade', 'upgrade' {
ws.flags << .has_upgrade
}
'Connection', 'connection' {
ws.flags << .has_connection
}
'Sec-WebSocket-Accept', 'sec-websocket-accept' {
ws.debug_log('seckey: $seckey')
challenge := create_key_challenge_response(seckey)?
ws.debug_log('challenge: $challenge, response: ${keys[1]}')
if keys[1].trim_space() != challenge {
return error('handshake_handler: Sec-WebSocket-Accept header does not match computed sha1/base64 response.')
}
ws.flags << .has_accept
unsafe {
challenge.free()
}
}
else {}
}
unsafe {
keys.free()
}
}
unsafe {
lines.free()
}
if ws.flags.len < 3 {
ws.close(1002, 'invalid websocket HTTP headers')?
return error('invalid websocket HTTP headers')
}
}

View File

@ -0,0 +1,95 @@
module websocket
import x.net
import time
interface WebsocketIO {
socket_read_into(mut buffer []byte) ?int
socket_write(bytes []byte) ?
}
// socket_read_into reads into the provided buffer with it's lenght
fn (mut ws Client) socket_read_into(mut buffer []byte) ?int {
lock {
if ws.is_ssl {
r := ws.ssl_conn.read_into(mut buffer)?
return r
} else {
for {
r := ws.conn.read_into(mut buffer) or {
if errcode == net.err_timed_out_code {
continue
}
return error(err)
}
return r
}
}
}
}
fn (mut ws Client) socket_read_into_ptr(buf_ptr byteptr, len int) ?int {
lock {
if ws.is_ssl {
r := ws.ssl_conn.socket_read_into_ptr(buf_ptr, len)?
return r
} else {
for {
r := ws.conn.read_into_ptr(buf_ptr, len) or {
if errcode == net.err_timed_out_code {
continue
}
return error(err)
}
return r
}
}
}
}
// socket_write, writes the whole byte array provided to the socket
fn (mut ws Client) socket_write(bytes []byte) ? {
lock {
if ws.state == .closed || ws.conn.sock.handle <= 1 {
ws.debug_log('write: Socket allready closed')
return error('Socket allready closed')
}
if ws.is_ssl {
ws.ssl_conn.write(bytes)?
} else {
for {
ws.conn.write(bytes) or {
if errcode == net.err_timed_out_code {
continue
}
return error(err)
}
return
}
}
}
}
// shutdown_socket, proper shutdown make PR in Emeliy repo
fn (mut ws Client) shutdown_socket() ? {
ws.debug_log('shutting down socket')
if ws.is_ssl {
ws.ssl_conn.shutdown()?
} else {
ws.conn.close()?
}
return none
}
// dial_socket, setup socket communication, options and timeouts
fn (mut ws Client) dial_socket() ?net.TcpConn {
mut t := net.dial_tcp('$ws.uri.hostname:$ws.uri.port')?
optval := int(1)
t.sock.set_option_int(.keep_alive, optval)?
t.set_read_timeout(10 * time.millisecond)
t.set_write_timeout(10 * time.millisecond)
if ws.is_ssl {
ws.ssl_conn.connect(mut t)?
}
return t
}

View File

@ -0,0 +1,302 @@
module websocket
import encoding.utf8
const (
header_len_offset = 2
buffer_size = 256
extended_payload16_end_byte = 4
extended_payload64_end_byte = 10
)
struct Fragment {
data []byte
opcode OPCode
}
struct Frame {
mut:
header_len int = 2
frame_size int = 2
fin bool
rsv1 bool
rsv2 bool
rsv3 bool
opcode OPCode
has_mask bool
payload_len int
masking_key [4]byte
}
const (
invalid_close_codes = [999, 1004, 1005, 1006, 1014, 1015, 1016, 1100, 2000, 2999, 5000, 65536]
)
// validate_client, validate client frame rules from RFC6455
pub fn (mut ws Client) validate_frame(frame &Frame) ? {
if frame.rsv1 || frame.rsv2 || frame.rsv3 {
ws.close(1002, 'rsv cannot be other than 0, not negotiated')
return error('rsv cannot be other than 0, not negotiated')
}
if (int(frame.opcode) >= 3 && int(frame.opcode) <= 7) ||
(int(frame.opcode) >= 11 && int(frame.opcode) <= 15) {
ws.close(1002, 'use of reserved opcode')?
return error('use of reserved opcode')
}
if frame.has_mask && !ws.is_server {
// Server should never send masked frames
// to client, close connection
ws.close(1002, 'client got masked frame')?
return error('client sent masked frame')
}
if is_control_frame(frame.opcode) {
if !frame.fin {
ws.close(1002, 'control message must not be fragmented')?
return error('unexpected control frame with no fin')
}
if frame.payload_len > 125 {
ws.close(1002, 'control frames must not exceed 125 bytes')?
return error('unexpected control frame payload length')
}
}
if frame.fin == false && ws.fragments.len == 0 && frame.opcode == .continuation {
err_msg := 'unexecpected continuation, there are no frames to continue, $frame'
ws.close(1002, err_msg)?
return error(err_msg)
}
}
[inline]
fn is_control_frame(opcode OPCode) bool {
return opcode !in [.text_frame, .binary_frame, .continuation]
}
[inline]
fn is_data_frame(opcode OPCode) bool {
return opcode in [.text_frame, .binary_frame]
}
// read_payload, reads the payload from socket
fn (mut ws Client) read_payload(frame &Frame) ?[]byte {
if frame.payload_len == 0 {
return []byte{}
}
// TODO: make a dynamic reusable memory pool here
mut buffer := []byte{cap: frame.payload_len}
mut read_buf := [1]byte{}
mut bytes_read := 0
for bytes_read < frame.payload_len {
len := ws.socket_read_into_ptr(byteptr(&read_buf), 1)?
if len != 1 {
return error('expected read all message, got zero')
}
bytes_read += len
buffer << read_buf[0]
}
if bytes_read != frame.payload_len {
return error('failed to read payload')
}
if frame.has_mask {
for i in 0 .. frame.payload_len {
buffer[i] ^= frame.masking_key[i % 4] & 0xff
}
}
return buffer
}
// validate_utf_8, validates payload for valid utf encoding
// todo: support fail fast utf errors for strict autobahn conformance
fn (mut ws Client) validate_utf_8(opcode OPCode, payload []byte) ? {
if opcode in [.text_frame, .close] && !utf8.validate(payload.data, payload.len) {
ws.logger.error('malformed utf8 payload, payload len: ($payload.len)')
ws.send_error_event('Recieved malformed utf8.')
ws.close(1007, 'malformed utf8 payload')
return error('malformed utf8 payload')
}
}
// read_next_message reads 1 to n frames to compose a message
pub fn (mut ws Client) read_next_message() ?Message {
for {
frame := ws.parse_frame_header()?
// This debug message leaks so remove if needed
// ws.debug_log('read_next_message: frame\n$frame')
ws.validate_frame(&frame)?
frame_payload := ws.read_payload(&frame)?
if is_control_frame(frame.opcode) {
// Control frames can interject other frames
// and need to be returned immediately
msg := Message{
opcode: OPCode(frame.opcode)
payload: frame_payload.clone()
}
unsafe {
frame_payload.free()
}
return msg
}
// If the message is fragmented we just put it on fragments
// a fragment is allowed to have zero size payload
if !frame.fin {
ws.fragments << &Fragment{
data: frame_payload.clone()
opcode: frame.opcode
}
unsafe {
frame_payload.free()
}
continue
}
if ws.fragments.len == 0 {
ws.validate_utf_8(frame.opcode, frame_payload) or {
ws.logger.error('UTF8 validation error: $err, len of payload($frame_payload.len)')
ws.send_error_event('UTF8 validation error: $err, len of payload($frame_payload.len)')
return error(err)
}
msg := Message{
opcode: OPCode(frame.opcode)
payload: frame_payload.clone()
}
unsafe {
frame_payload.free()
}
return msg
}
defer {
ws.fragments = []
}
if is_data_frame(frame.opcode) {
ws.close(0, '')?
return error('Unexpected frame opcode')
}
payload := ws.payload_from_fragments(frame_payload)?
opcode := ws.opcode_from_fragments()
ws.validate_utf_8(opcode, payload)?
msg := Message{
opcode: opcode
payload: payload.clone()
}
unsafe {
frame_payload.free()
payload.free()
}
return msg
}
}
// payload_from_fragments, returs the whole paylaod from fragmented message
fn (ws Client) payload_from_fragments(fin_payload []byte) ?[]byte {
mut total_size := 0
for f in ws.fragments {
if f.data.len > 0 {
total_size += f.data.len
}
}
total_size += fin_payload.len
if total_size == 0 {
return []byte{}
}
mut total_buffer := []byte{cap: total_size}
for f in ws.fragments {
if f.data.len > 0 {
total_buffer << f.data
}
}
total_buffer << fin_payload
return total_buffer
}
// opcode_from_fragments, returns the opcode for message from the first fragment sent
fn (ws Client) opcode_from_fragments() OPCode {
return OPCode(ws.fragments[0].opcode)
}
// parse_frame_header parses next message by decoding the incoming frames
pub fn (mut ws Client) parse_frame_header() ?Frame {
// TODO: make a dynamic reusable memory pool here
// mut buffer := []byte{cap: buffer_size}
mut buffer := [256]byte{}
mut bytes_read := 0
mut frame := Frame{}
mut rbuff := [1]byte{}
mut mask_end_byte := 0
for ws.state == .open {
// Todo: different error scenarios to make sure we close correctly on error
// reader.read_into(mut rbuff) ?
read_bytes := ws.socket_read_into_ptr(byteptr(rbuff), 1)?
if read_bytes == 0 {
// This is probably a timeout or close
continue
}
buffer[bytes_read] = rbuff[0]
bytes_read++
// parses the first two header bytes to get basic frame information
if bytes_read == u64(header_len_offset) {
frame.fin = (buffer[0] & 0x80) == 0x80
frame.rsv1 = (buffer[0] & 0x40) == 0x40
frame.rsv2 = (buffer[0] & 0x20) == 0x20
frame.rsv3 = (buffer[0] & 0x10) == 0x10
frame.opcode = OPCode(int(buffer[0] & 0x7F))
frame.has_mask = (buffer[1] & 0x80) == 0x80
frame.payload_len = buffer[1] & 0x7F
// if has mask set the byte postition where mask ends
if frame.has_mask {
mask_end_byte = if frame.payload_len < 126 {
header_len_offset + 4
} else if frame.payload_len == 126 {
header_len_offset + 6
} else if frame.payload_len == 127 {
header_len_offset + 12
} else {
0
} // Impossible
}
frame.payload_len = frame.payload_len
frame.frame_size = frame.header_len + frame.payload_len
if !frame.has_mask && frame.payload_len < 126 {
break
}
}
if frame.payload_len == 126 && bytes_read == u64(extended_payload16_end_byte) {
frame.header_len += 2
frame.payload_len = 0
frame.payload_len |= buffer[2] << 8
frame.payload_len |= buffer[3] << 0
frame.frame_size = frame.header_len + frame.payload_len
if !frame.has_mask {
break
}
}
if frame.payload_len == 127 && bytes_read == u64(extended_payload64_end_byte) {
frame.header_len += 8 // TODO Not sure...
frame.payload_len = 0
frame.payload_len |= buffer[2] << 56
frame.payload_len |= buffer[3] << 48
frame.payload_len |= buffer[4] << 40
frame.payload_len |= buffer[5] << 32
frame.payload_len |= buffer[6] << 24
frame.payload_len |= buffer[7] << 16
frame.payload_len |= buffer[8] << 8
frame.payload_len |= buffer[9] << 0
if !frame.has_mask {
break
}
}
// We have a mask and we read the whole mask data
if frame.has_mask && bytes_read == mask_end_byte {
frame.masking_key[0] = buffer[mask_end_byte - 4]
frame.masking_key[1] = buffer[mask_end_byte - 3]
frame.masking_key[2] = buffer[mask_end_byte - 2]
frame.masking_key[3] = buffer[mask_end_byte - 1]
break
}
}
return frame
}
// unmask_sequence unmask any given sequence
fn (f Frame) unmask_sequence(mut buffer []byte) {
for i in 0 .. buffer.len {
buffer[i] ^= f.masking_key[i % 4] & 0xff
}
}

View File

@ -0,0 +1,3 @@
# Autobahn tests
This is the autobahn automatic tests on build. The performance tests are skipped due to timeouts in Github actions.

View File

@ -0,0 +1,38 @@
// use this test to test the websocket client in the autobahn test
module main
import x.websocket
fn main() {
for i in 1 ..304 {
println('\ncase: $i')
handle_case(i) or {
println('error should be ok: $err')
}
}
// update the reports
uri := 'ws://autobahn_server:9001/updateReports?agent=v-client'
mut ws := websocket.new_client(uri)?
ws.connect()?
ws.listen()?
}
fn handle_case(case_nr int) ? {
uri := 'ws://autobahn_server:9001/runCase?case=$case_nr&agent=v-client'
mut ws := websocket.new_client(uri)?
ws.on_message(on_message)
ws.connect()?
ws.listen()?
}
fn on_message(mut ws websocket.Client, msg &websocket.Message)? {
// autobahn tests expects to send same message back
if msg.opcode == .pong {
// We just wanna pass text and binary message back to autobahn
return
}
ws.write(msg.payload, msg.opcode) or {
panic(err)
}
}

View File

@ -0,0 +1,30 @@
// use this to test websocket server to the autobahn test
module main
import x.websocket
fn main() {
mut s := websocket.new_server(9002, '/')
s.on_message(on_message)
s.listen()
}
fn handle_case(case_nr int) ? {
uri := 'ws://localhost:9002/runCase?case=$case_nr&agent=v-client'
mut ws := websocket.new_client(uri)?
ws.on_message(on_message)
ws.connect()?
ws.listen()?
}
fn on_message(mut ws websocket.Client, msg &websocket.Message)? {
// autobahn tests expects to send same message back
if msg.opcode == .pong {
// We just wanna pass text and binary message back to autobahn
return
}
ws.write(msg.payload, msg.opcode) or {
panic(err)
}
}

View File

@ -0,0 +1,20 @@
version: '3'
services:
web:
container_name: autobahn_server
build: fuzzing_server
ports:
- "9001:9001"
- "8080:8080"
client:
container_name: autobahn_client
build:
#vlib/x/websocket/tests/autobahn/ws_test/Dockerfile
dockerfile: tests/autobahn/ws_test/Dockerfile
context: ../../
# volumes:
# - ../../:/src
# redis:
# container_name: redis-backend
# image: "redis:alpine"

View File

@ -0,0 +1,6 @@
FROM crossbario/autobahn-testsuite
COPY check_results.py /check_results.py
RUN chmod +x /check_results.py
COPY config/fuzzingserver.json /config/fuzzingserver.json
COPY config/fuzzingclient.json /config/fuzzingclient.json

View File

@ -0,0 +1,59 @@
import json
nr_of_client_errs = 0
nr_of_client_tests = 0
nr_of_server_errs = 0
nr_of_server_tests = 0
with open("/reports/clients/index.json") as f:
data = json.load(f)
for i in data["v-client"]:
# Count errors
if (
data["v-client"][i]["behavior"] == "FAILED"
or data["v-client"][i]["behaviorClose"] == "FAILED"
):
nr_of_client_errs = nr_of_client_errs + 1
nr_of_client_tests = nr_of_client_tests + 1
with open("/reports/clients/index.json") as f:
data = json.load(f)
for i in data["v-client"]:
# Count errors
if (
data["v-client"][i]["behavior"] == "FAILED"
or data["v-client"][i]["behaviorClose"] == "FAILED"
):
nr_of_client_errs = nr_of_client_errs + 1
nr_of_client_tests = nr_of_client_tests + 1
with open("/reports/servers/index.json") as f:
data = json.load(f)
for i in data["AutobahnServer"]:
if (
data["AutobahnServer"][i]["behavior"] == "FAILED"
or data["AutobahnServer"][i]["behaviorClose"] == "FAILED"
):
nr_of_server_errs = nr_of_server_errs + 1
nr_of_server_tests = nr_of_server_tests + 1
if nr_of_client_errs > 0 or nr_of_server_errs > 0:
print(
"FAILED AUTOBAHN TESTS, CLIENT ERRORS {0}(of {1}), SERVER ERRORS {2}(of {3})".format(
nr_of_client_errs, nr_of_client_tests, nr_of_server_errs, nr_of_server_tests
)
)
exit(1)
print(
"TEST SUCCESS!, CLIENT TESTS({0}), SERVER TESTS ({1})".format(
nr_of_client_tests, nr_of_server_tests
)
)

View File

@ -0,0 +1,22 @@
{
"options": {
"failByDrop": false
},
"outdir": "./reports/servers",
"servers": [
{
"agent": "AutobahnServer",
"url": "ws://autobahn_client:9002"
}
],
"cases": [
"*"
],
"exclude-cases": [
"9.*",
"11.*",
"12.*",
"13.*"
],
"exclude-agent-cases": {}
}

View File

@ -0,0 +1,14 @@
{
"url": "ws://127.0.0.1:9001",
"outdir": "./reports/clients",
"cases": [
"*"
],
"exclude-cases": [
"9.*",
"11.*",
"12.*",
"13.*"
],
"exclude-agent-cases": {}
}

View File

@ -0,0 +1,12 @@
# Use this as docker builder with https://github.com/nektos/act
# build with: docker build tests/autobahn/. -t myimage
# use in act: act -P ubuntu-latest=myimage
FROM node:12.6-buster-slim
COPY config/fuzzingserver.json /config/fuzzingserver.json
RUN chmod +775 /config/fuzzingserver.json
RUN apt-get update && \
apt-get install -y \
docker \
docker-compose

View File

@ -0,0 +1,4 @@
# Run tests locally
Todo: document how, also how to use https://github.com/nektos/act

View File

@ -0,0 +1,11 @@
FROM thevlang/vlang:buster-dev
# ARG WORKSPACE_ROOT
# WORKDIR ${WORKSPACE_ROOT}
COPY ./ /src/
# COPY tests/autobahn/ws_test/run.sh /run.sh
# RUN chmod +x /run.sh
RUN v -autofree /src/tests/autobahn/autobahn_server.v
RUN chmod +x /src/tests/autobahn/autobahn_server
ENTRYPOINT [ "/src/tests/autobahn/autobahn_server" ]

View File

@ -0,0 +1,14 @@
module websocket
struct Uri {
mut:
url string
hostname string
port string
resource string
querystring string
}
pub fn (u Uri) str() string {
return u.url
}

View File

@ -0,0 +1,52 @@
module websocket
import rand
import crypto.sha1
import encoding.base64
fn htonl64(payload_len u64) []byte {
mut ret := []byte{len: 8}
ret[0] = byte(((payload_len & (u64(0xff) << 56)) >> 56) & 0xff)
ret[1] = byte(((payload_len & (u64(0xff) << 48)) >> 48) & 0xff)
ret[2] = byte(((payload_len & (u64(0xff) << 40)) >> 40) & 0xff)
ret[3] = byte(((payload_len & (u64(0xff) << 32)) >> 32) & 0xff)
ret[4] = byte(((payload_len & (u64(0xff) << 24)) >> 24) & 0xff)
ret[5] = byte(((payload_len & (u64(0xff) << 16)) >> 16) & 0xff)
ret[6] = byte(((payload_len & (u64(0xff) << 8)) >> 8) & 0xff)
ret[7] = byte(((payload_len & (u64(0xff) << 0)) >> 0) & 0xff)
return ret
}
fn create_masking_key() []byte {
mask_bit := byte(rand.intn(255))
buf := []byte{len: 4, init: `0`}
unsafe {
C.memcpy(buf.data, &mask_bit, 4)
}
return buf
}
fn create_key_challenge_response(seckey string) ?string {
if seckey.len == 0 {
return error('unexpected seckey lengt zero')
}
guid := '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
sha1buf := seckey + guid
shabytes := sha1buf.bytes()
hash := sha1.sum(shabytes)
b64 := base64.encode(tos(hash.data, hash.len))
unsafe {
hash.free()
shabytes.free()
}
return b64
}
fn get_nonce(nonce_size int) string {
mut nonce := []byte{len: nonce_size, cap: nonce_size}
alphanum := '0123456789ABCDEFGHIJKLMNOPQRSTUVXYZabcdefghijklmnopqrstuvwxyz'
for i in 0 .. nonce_size {
nonce[i] = alphanum[rand.intn(alphanum.len)]
}
return tos(nonce.data, nonce.len).clone()
}

View File

@ -0,0 +1,515 @@
// The websocket client implements the websocket capabilities
// it is a refactor of the original V-websocket client class
// from @thecoderr.
// There are quite a few manual memory management free() going on
// int the code. This will be refactored once the memory management
// is done. For now there are no leaks on message levels. Please
// check with valgrind if you do any changes in the free calls
module websocket
import x.net
import x.openssl
import net.urllib
import time
import log
import sync
import rand
const (
empty_bytearr = []byte{}
)
// Client represents websocket client state
pub struct Client {
is_server bool = false
mut:
ssl_conn &openssl.SSLConn
flags []Flag
fragments []Fragment
logger &log.Log
message_callbacks []MessageEventHandler
error_callbacks []ErrorEventHandler
open_callbacks []OpenEventHandler
close_callbacks []CloseEventHandler
pub:
is_ssl bool
uri Uri
id string
pub mut:
conn net.TcpConn
nonce_size int = 16 // you can try 18 too
panic_on_callback bool = false
state State
resource_name string
last_pong_ut u64
}
enum Flag {
has_accept
has_connection
has_upgrade
}
// State of the websocket connection.
// Messages should be sent only on state .open
enum State {
connecting = 0
open
closing
closed
}
// Message, represents a whole message conbined from 1 to n frames
pub struct Message {
pub:
opcode OPCode
payload []byte
}
// OPCode, the supported websocket frame types
pub enum OPCode {
continuation = 0x00
text_frame = 0x01
binary_frame = 0x02
close = 0x08
ping = 0x09
pong = 0x0A
}
// new_client, instance a new websocket client
pub fn new_client(address string) ?&Client {
uri := parse_uri(address)?
return &Client{
is_server: false
ssl_conn: openssl.new_ssl_conn()
is_ssl: address.starts_with('wss')
logger: &log.Log{
level: .info
}
uri: uri
state: .closed
id: rand.uuid_v4()
}
}
// connect, connects and do handshake procedure with remote server
pub fn (mut ws Client) connect() ? {
ws.assert_not_connected()
ws.set_state(.connecting)
ws.logger.info('connecting to host $ws.uri')
ws.conn = ws.dial_socket()?
ws.handshake()?
ws.set_state(.open)
ws.logger.info('successfully connected to host $ws.uri')
ws.send_open_event()
}
// listen, listens to incoming messages and handles them
pub fn (mut ws Client) listen() ? {
ws.logger.info('Starting client listener, server($ws.is_server)...')
defer {
ws.logger.info('Quit client listener, server($ws.is_server)...')
}
for ws.state == .open {
msg := ws.read_next_message() or {
if ws.state in [.closed, .closing] {
return
}
ws.debug_log('failed to read next message: $err')
ws.send_error_event('failed to read next message: $err')
return error(err)
}
ws.debug_log('got message: $msg.opcode') // , payload: $msg.payload') leaks
match msg.opcode {
.text_frame {
ws.debug_log('read: text')
ws.send_message_event(msg)
unsafe {
msg.free()
}
}
.binary_frame {
ws.debug_log('read: binary')
ws.send_message_event(msg)
unsafe {
msg.free()
}
}
.ping {
ws.debug_log('read: ping, sending pong')
ws.send_control_frame(.pong, 'PONG', msg.payload) or {
ws.logger.error('error in message callback sending PONG: $err')
ws.send_error_event('error in message callback sending PONG: $err')
if ws.panic_on_callback {
panic(err)
}
continue
}
if msg.payload.len > 0 {
unsafe {
msg.free()
}
}
}
.pong {
ws.debug_log('read: pong')
ws.last_pong_ut = time.now().unix
ws.send_message_event(msg)
if msg.payload.len > 0 {
unsafe {
msg.free()
}
}
}
.close {
ws.debug_log('read: close')
defer {
ws.manage_clean_close()
}
if msg.payload.len > 0 {
if msg.payload.len == 1 {
ws.close(1002, 'close payload cannot be 1 byte')?
return error('close payload cannot be 1 byte')
}
code := (int(msg.payload[0]) << 8) + int(msg.payload[1])
if code in invalid_close_codes {
ws.close(1002, 'invalid close code: $code')?
return error('invalid close code: $code')
}
reason := if msg.payload.len > 2 { msg.payload[2..] } else { []byte{} }
if reason.len > 0 {
ws.validate_utf_8(.close, reason)?
}
if ws.state !in [.closing, .closed] {
// sending close back according to spec
ws.debug_log('close with reason, code: $code, reason: $reason')
r := if reason.len > 0 { string(reason, reason.len) } else { '' }
ws.close(code, r)?
}
unsafe {
msg.free()
}
} else {
if ws.state !in [.closing, .closed] {
ws.debug_log('close with reason, no code')
// sending close back according to spec
ws.close(1000, 'normal')?
}
unsafe {
msg.free()
}
}
return
}
.continuation {
ws.logger.error('unexpected opcode continuation, nothing to continue')
ws.send_error_event('unexpected opcode continuation, nothing to continue')
ws.close(1002, 'nothing to continue')?
return error('unexpected opcode continuation, nothing to continue')
}
}
}
}
// this function was needed for defer
fn (mut ws Client) manage_clean_close() {
ws.send_close_event(1000, 'closed by client')
}
// ping, sends ping message to server,
// ping response will be pushed to message callback
pub fn (mut ws Client) ping() ? {
ws.send_control_frame(.ping, 'PING', [])?
}
// pong, sends pog message to server,
// pongs are normally automatically sent back to server
pub fn (mut ws Client) pong() ? {
ws.send_control_frame(.pong, 'PONG', [])?
}
// write_ptr, writes len bytes provided a byteptr with a websocket messagetype
pub fn (mut ws Client) write_ptr(bytes byteptr, payload_len int, code OPCode) ? {
// Temporary, printing bytes are leaking
ws.debug_log('write code: $code')
// ws.debug_log('write code: $code, payload: $bytes')
if ws.state != .open || ws.conn.sock.handle < 1 {
// send error here later
return error('trying to write on a closed socket!')
}
// payload_len := bytes.len
mut header_len := 2 + if payload_len > 125 { 2 } else { 0 } + if payload_len > 0xffff { 6 } else { 0 }
if !ws.is_server {
header_len += 4
}
mut header := []byte{len: header_len, init: `0`} // [`0`].repeat(header_len)
header[0] = byte(int(code)) | 0x80
masking_key := create_masking_key()
defer {
unsafe {
}
}
if ws.is_server {
if payload_len <= 125 {
header[1] = byte(payload_len)
// 0x80
} else if payload_len > 125 && payload_len <= 0xffff {
len16 := C.htons(payload_len)
header[1] = 126
// 0x80
// todo: fix v style copy instead
unsafe {
C.memcpy(&header[2], &len16, 2)
}
} else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff {
len_bytes := htonl64(u64(payload_len))
header[1] = 127 // 0x80
// todo: fix v style copy instead
unsafe {
C.memcpy(&header[2], len_bytes.data, 8)
}
}
} else {
if payload_len <= 125 {
header[1] = byte(payload_len | 0x80)
header[2] = masking_key[0]
header[3] = masking_key[1]
header[4] = masking_key[2]
header[5] = masking_key[3]
} else if payload_len > 125 && payload_len <= 0xffff {
len16 := C.htons(payload_len)
header[1] = (126 | 0x80)
// todo: fix v style copy instead
unsafe {
C.memcpy(&header[2], &len16, 2)
}
header[4] = masking_key[0]
header[5] = masking_key[1]
header[6] = masking_key[2]
header[7] = masking_key[3]
} else if payload_len > 0xffff && payload_len <= 0xffffffffffffffff { // 65535 && 18446744073709551615
len64 := htonl64(u64(payload_len))
header[1] = (127 | 0x80)
// todo: fix v style copy instead
unsafe {
C.memcpy(&header[2], len64.data, 8)
}
header[10] = masking_key[0]
header[11] = masking_key[1]
header[12] = masking_key[2]
header[13] = masking_key[3]
} else {
// l.c('write: frame too large')
ws.close(1009, 'frame too large')?
return error('frame too large')
}
}
len := header.len + payload_len
mut frame_buf := []byte{len: len}
unsafe {
C.memcpy(&frame_buf[0], byteptr(header.data), header.len)
if payload_len > 0 {
C.memcpy(&frame_buf[header.len], bytes, payload_len)
}
}
if !ws.is_server {
for i in 0 .. payload_len {
frame_buf[header_len + i] ^= masking_key[i % 4] & 0xff
}
}
ws.socket_write(frame_buf)?
// Temporary hack until memory management is done
unsafe {
frame_buf.free()
masking_key.free()
header.free()
}
}
// write, writes a byte array with a websocket messagetype
pub fn (mut ws Client) write(bytes []byte, code OPCode) ? {
ws.write_ptr(byteptr(bytes.data), bytes.len, code)?
}
pub fn (mut ws Client) write_str(str string) ? {
ws.write_ptr(str.str, str.len, .text_frame)
}
// close, closes the websocket connection
pub fn (mut ws Client) close(code int, message string) ? {
ws.debug_log('sending close, $code, $message')
if ws.state in [.closed, .closing] || ws.conn.sock.handle <= 1 {
ws.debug_log('close: Websocket allready closed ($ws.state), $message, $code handle($ws.conn.sock.handle)')
err_msg := 'Socket allready closed: $code'
ret_err := error(err_msg)
// unsafe {
// err_msg.free()
// }
return ret_err
}
defer {
ws.shutdown_socket()
ws.reset_state()
}
ws.set_state(.closing)
mut code32 := 0
if code > 0 {
code_ := C.htons(code)
message_len := message.len + 2
mut close_frame := []byte{len: message_len} // [`0`].repeat(message_len)
close_frame[0] = byte(code_ & 0xFF)
close_frame[1] = byte(code_ >> 8)
code32 = (close_frame[0] << 8) + close_frame[1]
for i in 0 .. message.len {
close_frame[i + 2] = message[i]
}
ws.send_control_frame(.close, 'CLOSE', close_frame)?
ws.send_close_event(code, message)
unsafe {
close_frame.free()
}
} else {
ws.send_control_frame(.close, 'CLOSE', [])?
ws.send_close_event(code, '')
}
ws.fragments = []
}
// send_control_frame, sends a control frame to the server
fn (mut ws Client) send_control_frame(code OPCode, frame_typ string, payload []byte) ? {
ws.debug_log('send control frame $code, frame_type: $frame_typ') // , payload: $payload')
if ws.state !in [.open, .closing] && ws.conn.sock.handle > 1 {
return error('socket is not connected')
}
header_len := if ws.is_server { 2 } else { 6 }
frame_len := header_len + payload.len
mut control_frame := []byte{len: frame_len} // [`0`].repeat(frame_len)
mut masking_key := if !ws.is_server { create_masking_key() } else { empty_bytearr }
defer {
unsafe {
control_frame.free()
if masking_key.len > 0 {
masking_key.free()
}
}
}
control_frame[0] = byte(int(code) | 0x80)
if !ws.is_server {
control_frame[1] = byte(payload.len | 0x80)
control_frame[2] = masking_key[0]
control_frame[3] = masking_key[1]
control_frame[4] = masking_key[2]
control_frame[5] = masking_key[3]
} else {
control_frame[1] = byte(payload.len)
}
if code == .close {
if payload.len >= 2 {
if !ws.is_server {
mut parsed_payload := []byte{len: payload.len + 1}
unsafe {
C.memcpy(parsed_payload.data, &payload[0], payload.len)
}
parsed_payload[payload.len] = `\0`
for i in 0 .. payload.len {
control_frame[6 + i] = (parsed_payload[i] ^ masking_key[i % 4]) & 0xff
}
unsafe {
parsed_payload.free()
}
} else {
unsafe {
C.memcpy(&control_frame[2], &payload[0], payload.len)
}
}
}
} else {
if !ws.is_server {
if payload.len > 0 {
for i in 0 .. payload.len {
control_frame[header_len + i] = (payload[i] ^ masking_key[i % 4]) & 0xff
}
}
} else {
if payload.len > 0 {
unsafe {
C.memcpy(&control_frame[2], &payload[0], payload.len)
}
}
}
}
println('SEND CONTROL FRAME $code, $control_frame.len from server? ($ws.is_server)')
ws.socket_write(control_frame) or {
return error('send_control_frame: error sending $frame_typ control frame.')
}
}
// parse_uri, parses the url string to it's components
// todo: support not using port to default ones
fn parse_uri(url string) ?&Uri {
u := urllib.parse(url) or {
return error(err)
}
v := u.request_uri().split('?')
querystring := if v.len > 1 { '?' + v[1] } else { '' }
return &Uri{
url: url
hostname: u.hostname()
port: u.port()
resource: v[0]
querystring: querystring
}
}
// set_state sets current state in a thread safe way
fn (mut ws Client) set_state(state State) {
lock {
ws.state = state
}
}
[inline]
fn (ws Client) assert_not_connected() ? {
match ws.state {
.connecting { return error('connect: websocket is connecting') }
.open { return error('connect: websocket already open') }
else {}
}
}
// reset_state, resets the websocket and can connect again
fn (mut ws Client) reset_state() {
lock {
ws.state = .closed
ws.ssl_conn = openssl.new_ssl_conn()
ws.flags = []
ws.fragments = []
}
}
fn (mut ws Client) debug_log(text string) {
if ws.is_server {
ws.logger.debug('server-> $text')
} else {
ws.logger.debug('client-> $text')
}
}
[unsafe]
pub fn (m &Message) free() {
unsafe {
m.payload.free()
}
}
[unsafe]
pub fn (c &Client) free() {
unsafe {
c.flags.free()
c.fragments.free()
c.message_callbacks.free()
c.error_callbacks.free()
c.open_callbacks.free()
c.close_callbacks.free()
}
}

View File

@ -0,0 +1,9 @@
module websocket
fn error_code() int {
return C.errno
}
const (
error_ewouldblock = C.EWOULDBLOCK
)

View File

@ -0,0 +1,185 @@
// The module websocket implements the websocket server capabilities
module websocket
import x.net
import x.openssl
import log
import sync
import time
import rand
pub struct Server {
mut:
clients map[string]&ServerClient
logger &log.Log
ls net.TcpListener
accept_client_callbacks []AcceptClientFn
message_callbacks []MessageEventHandler
close_callbacks []CloseEventHandler
pub:
port int
is_ssl bool = false
pub mut:
ping_interval int = 30 // in seconds
state State
}
struct ServerClient {
pub:
resource_name string
client_key string
pub mut:
server &Server
client &Client
}
pub fn new_server(port int, route string) &Server {
return &Server{
port: port
logger: &log.Log{
level: .info
}
state: .closed
}
}
pub fn (mut s Server) set_ping_interval(seconds int) {
s.ping_interval = seconds
}
pub fn (mut s Server) listen() ? {
s.logger.info('websocket server: start listen on port $s.port')
s.ls = net.listen_tcp(s.port)?
s.set_state(.open)
go s.handle_ping()
for {
c := s.accept_new_client() or {
continue
}
go s.serve_client(mut c)
}
s.logger.info('websocket server: end listen on port $s.port')
}
fn (mut s Server) close() {
}
// Todo: make thread safe
fn (mut s Server) handle_ping() {
mut clients_to_remove := []string{}
for s.state == .open {
time.sleep(s.ping_interval)
for _, cli in s.clients {
mut c := cli
if c.client.state == .open {
c.client.ping() or {
s.logger.debug('server-> error sending ping to client')
// todo fix better close message, search the standard
c.client.close(1002, 'Clossing connection: ping send error') or {
// we want to continue even if error
continue
}
clients_to_remove << c.client.id
}
if (time.now().unix - c.client.last_pong_ut) > s.ping_interval * 2 {
clients_to_remove << c.client.id
c.client.close(1000, 'no pong received') or {
continue
}
}
}
}
// TODO replace for with s.clients.delete_all(clients_to_remove) if (https://github.com/vlang/v/pull/6020) merges
for client in clients_to_remove {
lock {
s.clients.delete(client)
}
}
clients_to_remove.clear()
}
}
fn (mut s Server) serve_client(mut c Client) ? {
c.logger.debug('server-> Start serve client ($c.id)')
defer {
c.logger.debug('server-> End serve client ($c.id)')
}
handshake_response, server_client := s.handle_server_handshake(mut c)?
accept := s.send_connect_event(mut server_client)?
if !accept {
s.logger.debug('server-> client not accepted')
c.shutdown_socket()?
return
}
// The client is accepted
c.socket_write(handshake_response.bytes())?
lock {
s.clients[server_client.client.id] = server_client
}
s.setup_callbacks(mut server_client)
c.listen() or {
s.logger.error(err)
return error(err)
}
}
fn (mut s Server) setup_callbacks(mut sc ServerClient) {
if s.message_callbacks.len > 0 {
for cb in s.message_callbacks {
if cb.is_ref {
sc.client.on_message_ref(cb.handler2, cb.ref)
} else {
sc.client.on_message(cb.handler)
}
}
}
if s.close_callbacks.len > 0 {
for cb in s.close_callbacks {
if cb.is_ref {
sc.client.on_close_ref(cb.handler2, cb.ref)
} else {
sc.client.on_close(cb.handler)
}
}
}
// Set standard close so we can remove client if closed
sc.client.on_close_ref(fn (mut c Client, code int, reason string, mut sc ServerClient) ? {
c.logger.debug('server-> Delete client')
lock {
sc.server.clients.delete(sc.client.id)
}
}, mut sc)
}
fn (mut s Server) accept_new_client() ?&Client {
mut new_conn := s.ls.accept()?
c := &Client{
is_server: true
conn: new_conn
ssl_conn: openssl.new_ssl_conn()
logger: s.logger
state: .open
last_pong_ut: time.now().unix
id: rand.uuid_v4()
}
return c
}
// set_state sets current state in a thread safe way
[inline]
fn (mut s Server) set_state(state State) {
lock {
s.state = state
}
}
pub fn (mut s Server) free() {
unsafe {
s.clients.free()
// s.logger.free()
// s.ls.free()
s.accept_client_callbacks.free()
s.message_callbacks.free()
s.close_callbacks.free()
}
}

View File

@ -0,0 +1,73 @@
import x.websocket
import time
// Tests with external ws & wss servers
fn test_ws() ? {
go start_server()
time.sleep_ms(100)
ws_test('ws://localhost:30000')?
}
fn start_server() ? {
mut s := websocket.new_server(30000, '')
// Make that in execution test time give time to execute at least one time
s.ping_interval = 100
s.on_connect(fn (mut s websocket.ServerClient) ?bool {
// Here you can look att the client info and accept or not accept
// just returning a true/false
if s.resource_name != '/' {
return false
}
return true
})?
s.on_message(fn (mut ws websocket.Client, msg &websocket.Message) ? {
// payload := if msg.payload.len == 0 { '' } else { string(msg.payload, msg.payload.len) }
// println('server client ($ws.id) got message: opcode: $msg.opcode, payload: $payload')
ws.write(msg.payload, msg.opcode) or {
panic(err)
}
})
s.on_close(fn (mut ws websocket.Client, code int, reason string) ? {
// println('client ($ws.id) closed connection')
})
s.listen() or {
// println('error on server listen: $err')
}
}
fn ws_test(uri string) ? {
eprintln('connecting to $uri ...')
mut ws := websocket.new_client(uri)?
ws.on_open(fn (mut ws websocket.Client) ? {
println('open!')
ws.pong()
assert true
})
ws.on_error(fn (mut ws websocket.Client, err string) ? {
println('error: $err')
// this can be thrown by internet connection problems
assert false
})
ws.on_close(fn (mut ws websocket.Client, code int, reason string) ? {
println('closed')
})
ws.on_message(fn (mut ws websocket.Client, msg &websocket.Message) ? {
println('client got type: $msg.opcode payload:\n$msg.payload')
if msg.opcode == .text_frame {
println('Message: ${string(msg.payload, msg.payload.len)}')
assert string(msg.payload, msg.payload.len) == 'a'
} else {
println('Binary message: $msg')
}
})
ws.connect()
go ws.listen()
text := ['a'].repeat(2)
for msg in text {
ws.write(msg.bytes(), .text_frame)?
// sleep to give time to recieve response before send a new one
time.sleep_ms(100)
}
// sleep to give time to recieve response before asserts
time.sleep_ms(500)
}

View File

@ -0,0 +1,11 @@
module websocket
import x.net
fn error_code() int {
return C.WSAGetLastError()
}
const (
error_ewouldblock = net.WsaError.wsaewouldblock
)