From 9693116cfea0787709fba319347865a8f7eba9c4 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Wed, 11 May 2022 10:03:43 +0200 Subject: [PATCH 1/9] wip --- src/docker/docker.v | 4 ---- src/docker/socket.v | 47 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) create mode 100644 src/docker/socket.v diff --git a/src/docker/docker.v b/src/docker/docker.v index 305e825..ce01e7e 100644 --- a/src/docker/docker.v +++ b/src/docker/docker.v @@ -5,10 +5,6 @@ import net.urllib import net.http import json -const socket = '/var/run/docker.sock' - -const buf_len = 1024 - // send writes a request to the Docker socket, waits for a response & returns // it. fn send(req &string) ?http.Response { diff --git a/src/docker/socket.v b/src/docker/socket.v new file mode 100644 index 0000000..3a72f14 --- /dev/null +++ b/src/docker/socket.v @@ -0,0 +1,47 @@ +module docker + +import net.unix +import io +import net.http + +const socket = '/var/run/docker.sock' + +const buf_len = 10 * 1024 + +pub struct DockerDaemon { +mut: + socket &unix.StreamConn + reader &io.BufferedReader +} + +pub fn new_conn() ?DockerDaemon { + s := unix.connect_stream(socket) ? + + d := DockerDaemon{socket: s, reader: io.new_buffered_reader(reader: s)} + + return d +} + +fn (mut d DockerDaemon) send_request(req &string) ? { + d.socket.write_string(req) ? + d.socket.wait_for_write() ? +} + +// read_response_head consumes the socket's contents until it encounters +// '\n\n', after which it parses the response as an HTTP response. +fn (mut d DockerDaemon) read_response_head() ?http.Response { + mut c := 0 + mut buf := [buf_len]u8{len: docker.buf_len} + mut res := []u8{} + + for { + c = d.socket.read(mut buf) ? + res << buf[..c] + + if res#[-2..] == [u8(`\n`), `\n`] { + break + } + } + + return http.parse_response(res.bytestr()) +} From 0cb5cdcc2a8490a966c36fb470fe61a5878c4c03 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Fri, 13 May 2022 21:28:24 +0200 Subject: [PATCH 2/9] refactor: ran vfmt with new defaults feat(docker): started work on new implementation --- src/docker/containers.v | 13 +++++-- src/docker/docker.v | 20 +++++----- src/docker/socket.v | 83 ++++++++++++++++++++++++++++++++++------- src/main.v | 2 +- 4 files changed, 88 insertions(+), 30 deletions(-) diff --git a/src/docker/containers.v b/src/docker/containers.v index 14ac12d..3b674e6 100644 --- a/src/docker/containers.v +++ b/src/docker/containers.v @@ -3,17 +3,22 @@ module docker import json import net.urllib import time +import net.http struct Container { id string [json: Id] names []string [json: Names] } -// containers returns a list of all currently running containers -pub fn containers() ?[]Container { - res := request('GET', urllib.parse('/v1.41/containers/json')?)? +pub fn (mut d DockerDaemon) containers() ?[]Container { + d.send_request('GET', urllib.parse('/v1.41/containers/json')?)? + res_header := d.read_response_head()? + content_length := res_header.header.get(http.CommonHeader.content_length)?.int() + res := d.read_response_body(content_length)? - return json.decode([]Container, res.text) or {} + data := json.decode([]Container, res)? + + return data } pub struct NewContainer { diff --git a/src/docker/docker.v b/src/docker/docker.v index ce01e7e..fa83d89 100644 --- a/src/docker/docker.v +++ b/src/docker/docker.v @@ -9,8 +9,8 @@ import json // it. fn send(req &string) ?http.Response { // Open a connection to the socket - mut s := unix.connect_stream(docker.socket) or { - return error('Failed to connect to socket ${docker.socket}.') + mut s := unix.connect_stream(socket) or { + return error('Failed to connect to socket ${socket}.') } defer { @@ -21,19 +21,19 @@ fn send(req &string) ?http.Response { } // Write the request to the socket - s.write_string(req) or { return error('Failed to write request to socket ${docker.socket}.') } + s.write_string(req) or { return error('Failed to write request to socket ${socket}.') } s.wait_for_write()? mut c := 0 - mut buf := []u8{len: docker.buf_len} + mut buf := []u8{len: buf_len} mut res := []u8{} for { - c = s.read(mut buf) or { return error('Failed to read data from socket ${docker.socket}.') } + c = s.read(mut buf) or { return error('Failed to read data from socket ${socket}.') } res << buf[..c] - if c < docker.buf_len { + if c < buf_len { break } } @@ -41,7 +41,7 @@ fn send(req &string) ?http.Response { // After reading the first part of the response, we parse it into an HTTP // response. If it isn't chunked, we return early with the data. parsed := http.parse_response(res.bytestr()) or { - return error('Failed to parse HTTP response from socket ${docker.socket}.') + return error('Failed to parse HTTP response from socket ${socket}.') } if parsed.header.get(http.CommonHeader.transfer_encoding) or { '' } != 'chunked' { @@ -55,12 +55,10 @@ fn send(req &string) ?http.Response { s.wait_for_write()? for { - c = s.read(mut buf) or { - return error('Failed to read data from socket ${docker.socket}.') - } + c = s.read(mut buf) or { return error('Failed to read data from socket ${socket}.') } res << buf[..c] - if c < docker.buf_len { + if c < buf_len { break } } diff --git a/src/docker/socket.v b/src/docker/socket.v index 3a72f14..7815435 100644 --- a/src/docker/socket.v +++ b/src/docker/socket.v @@ -3,10 +3,15 @@ module docker import net.unix import io import net.http +import strings +import net.urllib +import json -const socket = '/var/run/docker.sock' - -const buf_len = 10 * 1024 +const ( + socket = '/var/run/docker.sock' + buf_len = 10 * 1024 + http_separator = [u8(`\r`), `\n`, `\r`, `\n`] +) pub struct DockerDaemon { mut: @@ -14,34 +19,84 @@ mut: reader &io.BufferedReader } -pub fn new_conn() ?DockerDaemon { - s := unix.connect_stream(socket) ? +pub fn new_conn() ?&DockerDaemon { + s := unix.connect_stream(docker.socket)? - d := DockerDaemon{socket: s, reader: io.new_buffered_reader(reader: s)} + d := &DockerDaemon{ + socket: s + reader: io.new_buffered_reader(reader: s) + } return d } -fn (mut d DockerDaemon) send_request(req &string) ? { - d.socket.write_string(req) ? - d.socket.wait_for_write() ? +pub fn (mut d DockerDaemon) send_request(method string, url urllib.URL) ? { + req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\n\n' + + d.socket.write_string(req)? +} + +pub fn (mut d DockerDaemon) send_request_with_body(method string, url urllib.URL, content_type string, body string) ? { + req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\nContent-Type: $content_type\nContent-Length: $body.len\n\n$body\n\n' + + d.socket.write_string(req)? +} + +pub fn (mut d DockerDaemon) request_with_json(method string, url urllib.URL, data &T) ? { + body := json.encode(data) + + return request_with_body(method, url, 'application/json', body) } // read_response_head consumes the socket's contents until it encounters -// '\n\n', after which it parses the response as an HTTP response. -fn (mut d DockerDaemon) read_response_head() ?http.Response { +// '\r\n\r\n', after which it parses the response as an HTTP response. +pub fn (mut d DockerDaemon) read_response_head() ?http.Response { mut c := 0 - mut buf := [buf_len]u8{len: docker.buf_len} + mut buf := []u8{len: 4} mut res := []u8{} for { - c = d.socket.read(mut buf) ? + c = d.reader.read(mut buf)? res << buf[..c] - if res#[-2..] == [u8(`\n`), `\n`] { + mut i := 0 + mut match_len := 0 + + for i + match_len < c { + if buf[i + match_len] == docker.http_separator[match_len] { + match_len += 1 + } else { + i += match_len + 1 + match_len = 0 + } + } + + if match_len == 4 { break + } else if match_len > 0 { + mut buf2 := []u8{len: 4 - match_len} + c2 := d.reader.read(mut buf2)? + res << buf2[..c2] + + if buf2 == docker.http_separator[match_len..] { + break + } } } return http.parse_response(res.bytestr()) } + +pub fn (mut d DockerDaemon) read_response_body(length int) ?string { + mut buf := []u8{len: docker.buf_len} + mut c := 0 + mut builder := strings.new_builder(docker.buf_len) + + for builder.len < length { + c = d.reader.read(mut buf) or { break } + + builder.write(buf[..c])? + } + + return builder.str() +} diff --git a/src/main.v b/src/main.v index 41d0d33..db6d5ef 100644 --- a/src/main.v +++ b/src/main.v @@ -31,7 +31,7 @@ fn main() { logs.cmd(), ] } - app.setup() app.parse(os.args) + return } From 30a584ce3b7e41b0abd9b7c05d7c75d306512496 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Fri, 13 May 2022 22:03:06 +0200 Subject: [PATCH 3/9] WIP --- src/build/build.v | 11 ++++++---- src/docker/containers.v | 47 ++++++++++++++++++++++++++++++++++++++--- src/docker/socket.v | 12 +++++++++-- src/main.v | 1 + 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/build/build.v b/src/build/build.v index 41f68e2..b470853 100644 --- a/src/build/build.v +++ b/src/build/build.v @@ -17,6 +17,8 @@ const build_image_repo = 'vieter-build' // makepkg with. The base image should be some Linux distribution that uses // Pacman as its package manager. pub fn create_build_image(base_image string) ?string { + mut dd := docker.new_conn() ? + commands := [ // Update repos & install required packages 'pacman -Syu --needed --noconfirm base-devel git' @@ -48,12 +50,13 @@ pub fn create_build_image(base_image string) ?string { // We pull the provided image docker.pull_image(image_name, image_tag)? - id := docker.create_container(c)? - docker.start_container(id)? + id := dd.create_container(c)?.id + /* id := docker.create_container(c)? */ + dd.start_container(id)? // This loop waits until the container has stopped, so we can remove it after for { - data := docker.inspect_container(id)? + data := dd.inspect_container(id)? if !data.state.running { break @@ -68,7 +71,7 @@ pub fn create_build_image(base_image string) ?string { // conflicts. tag := time.sys_mono_now().str() image := docker.create_image_from_container(id, 'vieter-build', tag)? - docker.remove_container(id)? + dd.remove_container(id)? return image.id } diff --git a/src/docker/containers.v b/src/docker/containers.v index 3b674e6..98116cc 100644 --- a/src/docker/containers.v +++ b/src/docker/containers.v @@ -12,15 +12,14 @@ struct Container { pub fn (mut d DockerDaemon) containers() ?[]Container { d.send_request('GET', urllib.parse('/v1.41/containers/json')?)? - res_header := d.read_response_head()? - content_length := res_header.header.get(http.CommonHeader.content_length)?.int() - res := d.read_response_body(content_length)? + _, res := d.read_response()? data := json.decode([]Container, res)? return data } +[params] pub struct NewContainer { image string [json: Image] entrypoint []string [json: Entrypoint] @@ -31,7 +30,25 @@ pub struct NewContainer { } struct CreatedContainer { +pub: id string [json: Id] + warnings []string [json: Warnings] +} + +pub fn (mut d DockerDaemon) create_container(c NewContainer) ?CreatedContainer { + d.send_request_with_json('POST', urllib.parse('/v1.41/containers/create')?, c)? + _, res := d.read_response()? + + data := json.decode(CreatedContainer, res)? + + return data +} + +pub fn (mut d DockerDaemon) start_container(id string) ?bool { + d.send_request('POST', urllib.parse('/v1.41/containers/$id/start')?)? + head := d.read_response_head() ? + + return head.status_code == 204 } // create_container creates a container defined by the given configuration. If @@ -72,6 +89,25 @@ pub mut: end_time time.Time [skip] } +pub fn (mut d DockerDaemon) inspect_container(id string) ?ContainerInspect { + d.send_request('GET', urllib.parse('/v1.41/containers/$id/json')?)? + head, body := d.read_response()? + + if head.status_code != 200 { + return error('Failed to inspect container.') + } + + mut data := json.decode(ContainerInspect, body)? + + data.state.start_time = time.parse_rfc3339(data.state.start_time_str)? + + if data.state.status == 'exited' { + data.state.end_time = time.parse_rfc3339(data.state.end_time_str)? + } + + return data +} + // inspect_container returns the result of inspecting a container with a given // ID. pub fn inspect_container(id string) ?ContainerInspect { @@ -92,6 +128,11 @@ pub fn inspect_container(id string) ?ContainerInspect { return data } +pub fn (mut d DockerDaemon) remove_container(id string) ? { + d.send_request('DELETE', urllib.parse('/v1.41/containers/$id')?)? + head := d.read_response_head() ? +} + // remove_container removes a container with a given ID. pub fn remove_container(id string) ?bool { res := request('DELETE', urllib.parse('/v1.41/containers/$id')?)? diff --git a/src/docker/socket.v b/src/docker/socket.v index 7815435..41f9f6d 100644 --- a/src/docker/socket.v +++ b/src/docker/socket.v @@ -42,10 +42,10 @@ pub fn (mut d DockerDaemon) send_request_with_body(method string, url urllib.URL d.socket.write_string(req)? } -pub fn (mut d DockerDaemon) request_with_json(method string, url urllib.URL, data &T) ? { +pub fn (mut d DockerDaemon) send_request_with_json(method string, url urllib.URL, data &T) ? { body := json.encode(data) - return request_with_body(method, url, 'application/json', body) + return d.send_request_with_body(method, url, 'application/json', body) } // read_response_head consumes the socket's contents until it encounters @@ -100,3 +100,11 @@ pub fn (mut d DockerDaemon) read_response_body(length int) ?string { return builder.str() } + +pub fn (mut d DockerDaemon) read_response() ?(http.Response, string) { + head := d.read_response_head()? + content_length := head.header.get(http.CommonHeader.content_length)?.int() + res := d.read_response_body(content_length)? + + return head, res +} diff --git a/src/main.v b/src/main.v index db6d5ef..6b1e7bc 100644 --- a/src/main.v +++ b/src/main.v @@ -7,6 +7,7 @@ import build import console.git import console.logs import cron +import docker fn main() { mut app := cli.Command{ From fa780b6e4afb4e4c5a56dba77a6a8726d3e93232 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Fri, 13 May 2022 22:15:30 +0200 Subject: [PATCH 4/9] some better error handling --- src/build/build.v | 1 + src/docker/containers.v | 42 ++++++++++++++++++++++++++++++++++------- src/docker/socket.v | 4 ++++ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/build/build.v b/src/build/build.v index b470853..ae10318 100644 --- a/src/build/build.v +++ b/src/build/build.v @@ -56,6 +56,7 @@ pub fn create_build_image(base_image string) ?string { // This loop waits until the container has stopped, so we can remove it after for { + println('wot') data := dd.inspect_container(id)? if !data.state.running { diff --git a/src/docker/containers.v b/src/docker/containers.v index 98116cc..e2da938 100644 --- a/src/docker/containers.v +++ b/src/docker/containers.v @@ -5,6 +5,10 @@ import net.urllib import time import net.http +struct DockerError { + message string +} + struct Container { id string [json: Id] names []string [json: Names] @@ -12,7 +16,13 @@ struct Container { pub fn (mut d DockerDaemon) containers() ?[]Container { d.send_request('GET', urllib.parse('/v1.41/containers/json')?)? - _, res := d.read_response()? + head, res := d.read_response()? + + if head.status_code != 200 { + data := json.decode(DockerError, res)? + + return error(data.message) + } data := json.decode([]Container, res)? @@ -37,18 +47,28 @@ pub: pub fn (mut d DockerDaemon) create_container(c NewContainer) ?CreatedContainer { d.send_request_with_json('POST', urllib.parse('/v1.41/containers/create')?, c)? - _, res := d.read_response()? + head, res := d.read_response()? + + if head.status_code != 201 { + data := json.decode(DockerError, res)? + + return error(data.message) + } data := json.decode(CreatedContainer, res)? return data } -pub fn (mut d DockerDaemon) start_container(id string) ?bool { +pub fn (mut d DockerDaemon) start_container(id string) ? { d.send_request('POST', urllib.parse('/v1.41/containers/$id/start')?)? - head := d.read_response_head() ? + head, body := d.read_response() ? - return head.status_code == 204 + if head.status_code != 204 { + data := json.decode(DockerError, body)? + + return error(data.message) + } } // create_container creates a container defined by the given configuration. If @@ -94,7 +114,9 @@ pub fn (mut d DockerDaemon) inspect_container(id string) ?ContainerInspect { head, body := d.read_response()? if head.status_code != 200 { - return error('Failed to inspect container.') + data := json.decode(DockerError, body)? + + return error(data.message) } mut data := json.decode(ContainerInspect, body)? @@ -130,7 +152,13 @@ pub fn inspect_container(id string) ?ContainerInspect { pub fn (mut d DockerDaemon) remove_container(id string) ? { d.send_request('DELETE', urllib.parse('/v1.41/containers/$id')?)? - head := d.read_response_head() ? + head, body := d.read_response() ? + + if head.status_code != 204 { + data := json.decode(DockerError, body)? + + return error(data.message) + } } // remove_container removes a container with a given ID. diff --git a/src/docker/socket.v b/src/docker/socket.v index 41f9f6d..5632e1d 100644 --- a/src/docker/socket.v +++ b/src/docker/socket.v @@ -88,6 +88,10 @@ pub fn (mut d DockerDaemon) read_response_head() ?http.Response { } pub fn (mut d DockerDaemon) read_response_body(length int) ?string { + if length == 0 { + return '' + } + mut buf := []u8{len: docker.buf_len} mut c := 0 mut builder := strings.new_builder(docker.buf_len) From e29c188f7679714f1b90bbd5de041df9094b3198 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 14 May 2022 20:04:21 +0200 Subject: [PATCH 5/9] remove me later --- src/docker/socket.v | 2 ++ src/docker/stream.v | 9 +++++++++ 2 files changed, 11 insertions(+) create mode 100644 src/docker/stream.v diff --git a/src/docker/socket.v b/src/docker/socket.v index 5632e1d..749f14e 100644 --- a/src/docker/socket.v +++ b/src/docker/socket.v @@ -50,6 +50,8 @@ pub fn (mut d DockerDaemon) send_request_with_json(method string, url urllib. // read_response_head consumes the socket's contents until it encounters // '\r\n\r\n', after which it parses the response as an HTTP response. +// Importantly, this function never consumes past the HTTP separator, so the +// body can be read fully later on. pub fn (mut d DockerDaemon) read_response_head() ?http.Response { mut c := 0 mut buf := []u8{len: 4} diff --git a/src/docker/stream.v b/src/docker/stream.v new file mode 100644 index 0000000..24c51c1 --- /dev/null +++ b/src/docker/stream.v @@ -0,0 +1,9 @@ +module docker + +import io + +struct ChunkedResponseStream { + reader io.Reader +} + + From f81a837d1759b7063617cc275ee5a0350ab36bcc Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 14 May 2022 21:55:19 +0200 Subject: [PATCH 6/9] feat(docker): added ChunkedResponseReader implementation --- src/build/build.v | 6 +-- src/docker/containers.v | 6 +-- src/docker/images.v | 26 ++++++++++++ src/docker/socket.v | 35 ++++++++++------ src/docker/stream.v | 92 ++++++++++++++++++++++++++++++++++++++++- src/util/util.v | 16 +++++++ 6 files changed, 161 insertions(+), 20 deletions(-) diff --git a/src/build/build.v b/src/build/build.v index ae10318..dd72c17 100644 --- a/src/build/build.v +++ b/src/build/build.v @@ -17,7 +17,7 @@ const build_image_repo = 'vieter-build' // makepkg with. The base image should be some Linux distribution that uses // Pacman as its package manager. pub fn create_build_image(base_image string) ?string { - mut dd := docker.new_conn() ? + mut dd := docker.new_conn()? commands := [ // Update repos & install required packages @@ -51,12 +51,12 @@ pub fn create_build_image(base_image string) ?string { docker.pull_image(image_name, image_tag)? id := dd.create_container(c)?.id - /* id := docker.create_container(c)? */ + // id := docker.create_container(c)? dd.start_container(id)? // This loop waits until the container has stopped, so we can remove it after for { - println('wot') + println('wot') data := dd.inspect_container(id)? if !data.state.running { diff --git a/src/docker/containers.v b/src/docker/containers.v index e2da938..0027747 100644 --- a/src/docker/containers.v +++ b/src/docker/containers.v @@ -41,7 +41,7 @@ pub struct NewContainer { struct CreatedContainer { pub: - id string [json: Id] + id string [json: Id] warnings []string [json: Warnings] } @@ -62,7 +62,7 @@ pub fn (mut d DockerDaemon) create_container(c NewContainer) ?CreatedContainer { pub fn (mut d DockerDaemon) start_container(id string) ? { d.send_request('POST', urllib.parse('/v1.41/containers/$id/start')?)? - head, body := d.read_response() ? + head, body := d.read_response()? if head.status_code != 204 { data := json.decode(DockerError, body)? @@ -152,7 +152,7 @@ pub fn inspect_container(id string) ?ContainerInspect { pub fn (mut d DockerDaemon) remove_container(id string) ? { d.send_request('DELETE', urllib.parse('/v1.41/containers/$id')?)? - head, body := d.read_response() ? + head, body := d.read_response()? if head.status_code != 204 { data := json.decode(DockerError, body)? diff --git a/src/docker/images.v b/src/docker/images.v index 2e873fa..2249113 100644 --- a/src/docker/images.v +++ b/src/docker/images.v @@ -9,6 +9,32 @@ pub: id string [json: Id] } +pub fn (mut d DockerDaemon) pull_image(image string, tag string) ? { + d.send_request('POST', urllib.parse('/v1.41/images/create?fromImage=$image&tag=$tag')?)? + head := d.read_response_head()? + + if head.status_code != 200 { + content_length := head.header.get(http.CommonHeader.content_length)?.int() + body := d.read_response_body(content_length)? + data := json.decode(DockerError, body)? + + return error(data.message) + } + + mut body := d.get_chunked_response_reader() + + mut buf := []u8{len: 1024} + + for { + c := body.read(mut buf)? + + if c == 0 { + break + } + print(buf[..c].bytestr()) + } +} + // pull_image pulls tries to pull the image for the given image & tag pub fn pull_image(image string, tag string) ?http.Response { return request('POST', urllib.parse('/v1.41/images/create?fromImage=$image&tag=$tag')?) diff --git a/src/docker/socket.v b/src/docker/socket.v index 749f14e..444d9a9 100644 --- a/src/docker/socket.v +++ b/src/docker/socket.v @@ -6,11 +6,13 @@ import net.http import strings import net.urllib import json +import util const ( - socket = '/var/run/docker.sock' - buf_len = 10 * 1024 - http_separator = [u8(`\r`), `\n`, `\r`, `\n`] + socket = '/var/run/docker.sock' + buf_len = 10 * 1024 + http_separator = [u8(`\r`), `\n`, `\r`, `\n`] + http_chunk_separator = [u8(`\r`), `\n`] ) pub struct DockerDaemon { @@ -61,17 +63,18 @@ pub fn (mut d DockerDaemon) read_response_head() ?http.Response { c = d.reader.read(mut buf)? res << buf[..c] - mut i := 0 - mut match_len := 0 + match_len := util.match_array_in_array(buf[..c], docker.http_separator) + // mut i := 0 + // mut match_len := 0 - for i + match_len < c { - if buf[i + match_len] == docker.http_separator[match_len] { - match_len += 1 - } else { - i += match_len + 1 - match_len = 0 - } - } + // for i + match_len < c { + // if buf[i + match_len] == docker.http_separator[match_len] { + // match_len += 1 + // } else { + // i += match_len + 1 + // match_len = 0 + // } + //} if match_len == 4 { break @@ -114,3 +117,9 @@ pub fn (mut d DockerDaemon) read_response() ?(http.Response, string) { return head, res } + +pub fn (mut d DockerDaemon) get_chunked_response_reader() &ChunkedResponseReader { + r := new_chunked_response_reader(d.reader) + + return r +} diff --git a/src/docker/stream.v b/src/docker/stream.v index 24c51c1..fbc48ba 100644 --- a/src/docker/stream.v +++ b/src/docker/stream.v @@ -1,9 +1,99 @@ module docker import io +import util +import encoding.binary +import encoding.hex -struct ChunkedResponseStream { +struct ChunkedResponseReader { +mut: reader io.Reader + // buf []u8 + // offset int + // len int + bytes_left_in_chunk u64 + end_of_stream bool + started bool } +pub fn new_chunked_response_reader(reader io.Reader) &ChunkedResponseReader { + r := &ChunkedResponseReader{ + reader: reader + } + return r +} + +// We satisfy the io.Reader interface +pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int { + if r.end_of_stream { + return none + } + + if r.bytes_left_in_chunk == 0 { + r.bytes_left_in_chunk = r.read_chunk_size()? + + if r.end_of_stream { + return none + } + } + + mut c := 0 + + if buf.len > r.bytes_left_in_chunk { + c = r.reader.read(mut buf[..r.bytes_left_in_chunk])? + } else { + c = r.reader.read(mut buf)? + } + + r.bytes_left_in_chunk -= u64(c) + + return c +} + +fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { + mut buf := []u8{len: 2} + mut res := []u8{} + + if r.started { + // Each chunk ends with a `\r\n` which we want to skip first + r.reader.read(mut buf) ? + } + + r.started = true + + for { + c := r.reader.read(mut buf)? + res << buf[..c] + + match_len := util.match_array_in_array(buf[..c], http_chunk_separator) + + if match_len == http_chunk_separator.len { + break + } + + if match_len > 0 { + mut buf2 := []u8{len: 2 - match_len} + c2 := r.reader.read(mut buf2)? + res << buf2[..c2] + + if buf2 == http_chunk_separator[match_len..] { + break + } + } + } + + mut num_data := hex.decode(res#[..-2].bytestr())? + + for num_data.len < 8 { + num_data.insert(0, 0) + } + + num := binary.big_endian_u64(num_data) + + if num == 0 { + r.end_of_stream = true + } + + return num +} diff --git a/src/util/util.v b/src/util/util.v index 6602621..f9f58ed 100644 --- a/src/util/util.v +++ b/src/util/util.v @@ -92,3 +92,19 @@ pub fn pretty_bytes(bytes int) string { return '${n:.2}${util.prefixes[i]}' } + +pub fn match_array_in_array(a1 []T, a2 []T) int { + mut i := 0 + mut match_len := 0 + + for i + match_len < a1.len { + if a1[i + match_len] == a2[match_len] { + match_len += 1 + } else { + i += match_len + 1 + match_len = 0 + } + } + + return match_len +} From bb91b112c401e00d77a97af971dcc34d88e915d6 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 14 May 2022 22:39:52 +0200 Subject: [PATCH 7/9] feat(docker): added StreamFormatReader --- src/docker/containers.v | 15 +++++++++ src/docker/socket.v | 7 ++++ src/docker/stream.v | 73 ++++++++++++++++++++++++++++++++++------- 3 files changed, 84 insertions(+), 11 deletions(-) diff --git a/src/docker/containers.v b/src/docker/containers.v index 0027747..ff48c69 100644 --- a/src/docker/containers.v +++ b/src/docker/containers.v @@ -168,6 +168,21 @@ pub fn remove_container(id string) ?bool { return res.status_code == 204 } +pub fn (mut d DockerDaemon) get_container_logs(id string) ?&StreamFormatReader { + d.send_request('GET', urllib.parse('/v1.41/containers/$id/logs?stdout=true&stderr=true')?)? + head := d.read_response_head()? + + if head.status_code != 200 { + content_length := head.header.get(http.CommonHeader.content_length)?.int() + body := d.read_response_body(content_length)? + data := json.decode(DockerError, body)? + + return error(data.message) + } + + return d.get_stream_format_reader() +} + // get_container_logs retrieves the logs for a Docker container, both stdout & // stderr. pub fn get_container_logs(id string) ?string { diff --git a/src/docker/socket.v b/src/docker/socket.v index 444d9a9..f6dfeb1 100644 --- a/src/docker/socket.v +++ b/src/docker/socket.v @@ -123,3 +123,10 @@ pub fn (mut d DockerDaemon) get_chunked_response_reader() &ChunkedResponseReader return r } + +pub fn (mut d DockerDaemon) get_stream_format_reader() &StreamFormatReader { + r := new_chunked_response_reader(d.reader) + r2 := new_stream_format_reader(r) + + return r2 +} diff --git a/src/docker/stream.v b/src/docker/stream.v index fbc48ba..8822fd3 100644 --- a/src/docker/stream.v +++ b/src/docker/stream.v @@ -7,13 +7,10 @@ import encoding.hex struct ChunkedResponseReader { mut: - reader io.Reader - // buf []u8 - // offset int - // len int + reader io.Reader bytes_left_in_chunk u64 end_of_stream bool - started bool + started bool } pub fn new_chunked_response_reader(reader io.Reader) &ChunkedResponseReader { @@ -39,7 +36,7 @@ pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int { } mut c := 0 - + if buf.len > r.bytes_left_in_chunk { c = r.reader.read(mut buf[..r.bytes_left_in_chunk])? } else { @@ -56,8 +53,8 @@ fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { mut res := []u8{} if r.started { - // Each chunk ends with a `\r\n` which we want to skip first - r.reader.read(mut buf) ? + // Each chunk ends with a `\r\n` which we want to skip first + r.reader.read(mut buf)? } r.started = true @@ -85,9 +82,9 @@ fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { mut num_data := hex.decode(res#[..-2].bytestr())? - for num_data.len < 8 { - num_data.insert(0, 0) - } + for num_data.len < 8 { + num_data.insert(0, 0) + } num := binary.big_endian_u64(num_data) @@ -97,3 +94,57 @@ fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { return num } + +struct StreamFormatReader { + stdout bool + stderr bool + stdin bool +mut: + reader io.Reader + bytes_left_in_chunk u32 + end_of_stream bool +} + +pub fn new_stream_format_reader(reader io.Reader) &StreamFormatReader { + r := &StreamFormatReader{ + reader: reader + } + + return r +} + +pub fn (mut r StreamFormatReader) read(mut buf []u8) ?int { + if r.end_of_stream { + return none + } + + if r.bytes_left_in_chunk == 0 { + r.bytes_left_in_chunk = r.read_chunk_size()? + + if r.end_of_stream { + return none + } + } + + mut c := 0 + + if buf.len > r.bytes_left_in_chunk { + c = r.reader.read(mut buf[..r.bytes_left_in_chunk])? + } else { + c = r.reader.read(mut buf)? + } + + r.bytes_left_in_chunk -= u32(c) + + return c +} + +fn (mut r StreamFormatReader) read_chunk_size() ?u32 { + mut buf := []u8{len: 8} + + r.reader.read(mut buf)? + + num := binary.big_endian_u32(buf[4..]) + + return num +} From 9eaa16ebeddfed64a3197e9cc365d010f90def78 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 14 May 2022 23:42:20 +0200 Subject: [PATCH 8/9] doc: documented new Docker code --- src/docker/containers.v | 7 +++++++ src/docker/images.v | 1 + src/docker/socket.v | 33 +++++++++++++++++++-------------- src/docker/stream.v | 24 ++++++++++++++++++++---- src/util/util.v | 3 +++ 5 files changed, 50 insertions(+), 18 deletions(-) diff --git a/src/docker/containers.v b/src/docker/containers.v index ff48c69..05c9cc7 100644 --- a/src/docker/containers.v +++ b/src/docker/containers.v @@ -14,6 +14,7 @@ struct Container { names []string [json: Names] } +// containers returns a list of all containers. pub fn (mut d DockerDaemon) containers() ?[]Container { d.send_request('GET', urllib.parse('/v1.41/containers/json')?)? head, res := d.read_response()? @@ -45,6 +46,7 @@ pub: warnings []string [json: Warnings] } +// create_container creates a new container with the given config. pub fn (mut d DockerDaemon) create_container(c NewContainer) ?CreatedContainer { d.send_request_with_json('POST', urllib.parse('/v1.41/containers/create')?, c)? head, res := d.read_response()? @@ -60,6 +62,7 @@ pub fn (mut d DockerDaemon) create_container(c NewContainer) ?CreatedContainer { return data } +// start_container starts the container with the given id. pub fn (mut d DockerDaemon) start_container(id string) ? { d.send_request('POST', urllib.parse('/v1.41/containers/$id/start')?)? head, body := d.read_response()? @@ -109,6 +112,7 @@ pub mut: end_time time.Time [skip] } +// inspect_container returns detailed information for a given container. pub fn (mut d DockerDaemon) inspect_container(id string) ?ContainerInspect { d.send_request('GET', urllib.parse('/v1.41/containers/$id/json')?)? head, body := d.read_response()? @@ -150,6 +154,7 @@ pub fn inspect_container(id string) ?ContainerInspect { return data } +// remove_container removes the container with the given id. pub fn (mut d DockerDaemon) remove_container(id string) ? { d.send_request('DELETE', urllib.parse('/v1.41/containers/$id')?)? head, body := d.read_response()? @@ -168,6 +173,8 @@ pub fn remove_container(id string) ?bool { return res.status_code == 204 } +// get_container_logs returns a reader object allowing access to the +// container's logs. pub fn (mut d DockerDaemon) get_container_logs(id string) ?&StreamFormatReader { d.send_request('GET', urllib.parse('/v1.41/containers/$id/logs?stdout=true&stderr=true')?)? head := d.read_response_head()? diff --git a/src/docker/images.v b/src/docker/images.v index 2249113..974b408 100644 --- a/src/docker/images.v +++ b/src/docker/images.v @@ -9,6 +9,7 @@ pub: id string [json: Id] } +// pull_image pulls the given image:tag. pub fn (mut d DockerDaemon) pull_image(image string, tag string) ? { d.send_request('POST', urllib.parse('/v1.41/images/create?fromImage=$image&tag=$tag')?)? head := d.read_response_head()? diff --git a/src/docker/socket.v b/src/docker/socket.v index f6dfeb1..b1e3080 100644 --- a/src/docker/socket.v +++ b/src/docker/socket.v @@ -21,6 +21,7 @@ mut: reader &io.BufferedReader } +// new_conn creates a new connection to the Docker daemon. pub fn new_conn() ?&DockerDaemon { s := unix.connect_stream(docker.socket)? @@ -32,18 +33,22 @@ pub fn new_conn() ?&DockerDaemon { return d } +// send_request sends an HTTP request without body. pub fn (mut d DockerDaemon) send_request(method string, url urllib.URL) ? { req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\n\n' d.socket.write_string(req)? } +// send_request_with_body sends an HTTP request with the given body. pub fn (mut d DockerDaemon) send_request_with_body(method string, url urllib.URL, content_type string, body string) ? { req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\nContent-Type: $content_type\nContent-Length: $body.len\n\n$body\n\n' d.socket.write_string(req)? } +// send_request_with_json is a convenience wrapper around +// send_request_with_body that encodes the input as JSON. pub fn (mut d DockerDaemon) send_request_with_json(method string, url urllib.URL, data &T) ? { body := json.encode(data) @@ -52,8 +57,8 @@ pub fn (mut d DockerDaemon) send_request_with_json(method string, url urllib. // read_response_head consumes the socket's contents until it encounters // '\r\n\r\n', after which it parses the response as an HTTP response. -// Importantly, this function never consumes past the HTTP separator, so the -// body can be read fully later on. +// Importantly, this function never consumes the reader past the HTTP +// separator, so the body can be read fully later on. pub fn (mut d DockerDaemon) read_response_head() ?http.Response { mut c := 0 mut buf := []u8{len: 4} @@ -64,21 +69,12 @@ pub fn (mut d DockerDaemon) read_response_head() ?http.Response { res << buf[..c] match_len := util.match_array_in_array(buf[..c], docker.http_separator) - // mut i := 0 - // mut match_len := 0 - - // for i + match_len < c { - // if buf[i + match_len] == docker.http_separator[match_len] { - // match_len += 1 - // } else { - // i += match_len + 1 - // match_len = 0 - // } - //} if match_len == 4 { break - } else if match_len > 0 { + } + + if match_len > 0 { mut buf2 := []u8{len: 4 - match_len} c2 := d.reader.read(mut buf2)? res << buf2[..c2] @@ -92,6 +88,8 @@ pub fn (mut d DockerDaemon) read_response_head() ?http.Response { return http.parse_response(res.bytestr()) } +// read_response_body reads `length` bytes from the stream. It can be used when +// the response encoding isn't chunked to fully read it. pub fn (mut d DockerDaemon) read_response_body(length int) ?string { if length == 0 { return '' @@ -110,6 +108,9 @@ pub fn (mut d DockerDaemon) read_response_body(length int) ?string { return builder.str() } +// read_response is a convenience function combining read_response_head & +// read_response_body. It can be used when you know for certain the response +// won't be chunked. pub fn (mut d DockerDaemon) read_response() ?(http.Response, string) { head := d.read_response_head()? content_length := head.header.get(http.CommonHeader.content_length)?.int() @@ -118,12 +119,16 @@ pub fn (mut d DockerDaemon) read_response() ?(http.Response, string) { return head, res } +// get_chunked_response_reader returns a ChunkedResponseReader using the socket +// as reader. pub fn (mut d DockerDaemon) get_chunked_response_reader() &ChunkedResponseReader { r := new_chunked_response_reader(d.reader) return r } +// get_stream_format_reader returns a StreamFormatReader using the socket as +// reader. pub fn (mut d DockerDaemon) get_stream_format_reader() &StreamFormatReader { r := new_chunked_response_reader(d.reader) r2 := new_stream_format_reader(r) diff --git a/src/docker/stream.v b/src/docker/stream.v index 8822fd3..f20fe18 100644 --- a/src/docker/stream.v +++ b/src/docker/stream.v @@ -5,6 +5,8 @@ import util import encoding.binary import encoding.hex +// ChunkedResponseReader parses an underlying HTTP chunked response, exposing +// it as if it was a continuous stream of data. struct ChunkedResponseReader { mut: reader io.Reader @@ -13,6 +15,8 @@ mut: started bool } +// new_chunked_response_reader creates a new ChunkedResponseReader on the heap +// with the provided reader. pub fn new_chunked_response_reader(reader io.Reader) &ChunkedResponseReader { r := &ChunkedResponseReader{ reader: reader @@ -21,7 +25,7 @@ pub fn new_chunked_response_reader(reader io.Reader) &ChunkedResponseReader { return r } -// We satisfy the io.Reader interface +// read satisfies the io.Reader interface. pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int { if r.end_of_stream { return none @@ -37,6 +41,8 @@ pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int { mut c := 0 + // Make sure we don't read more than we can safely read. This is to avoid + // the underlying reader from becoming out of sync with our parsing: if buf.len > r.bytes_left_in_chunk { c = r.reader.read(mut buf[..r.bytes_left_in_chunk])? } else { @@ -48,6 +54,9 @@ pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int { return c } +// read_chunk_size advances the reader & reads the size of the next HTTP chunk. +// This function should only be called if the previous chunk has been +// completely consumed. fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { mut buf := []u8{len: 2} mut res := []u8{} @@ -80,6 +89,7 @@ fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { } } + // The length of the next chunk is provided as a hexadecimal mut num_data := hex.decode(res#[..-2].bytestr())? for num_data.len < 8 { @@ -88,6 +98,8 @@ fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { num := binary.big_endian_u64(num_data) + // This only occurs for the very last chunk, which always reports a size of + // 0. if num == 0 { r.end_of_stream = true } @@ -95,16 +107,17 @@ fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { return num } +// StreamFormatReader parses an underlying stream of Docker logs, removing the +// header bytes. struct StreamFormatReader { - stdout bool - stderr bool - stdin bool mut: reader io.Reader bytes_left_in_chunk u32 end_of_stream bool } +// new_stream_format_reader creates a new StreamFormatReader using the given +// reader. pub fn new_stream_format_reader(reader io.Reader) &StreamFormatReader { r := &StreamFormatReader{ reader: reader @@ -113,6 +126,7 @@ pub fn new_stream_format_reader(reader io.Reader) &StreamFormatReader { return r } +// read satisfies the io.Reader interface. pub fn (mut r StreamFormatReader) read(mut buf []u8) ?int { if r.end_of_stream { return none @@ -139,6 +153,8 @@ pub fn (mut r StreamFormatReader) read(mut buf []u8) ?int { return c } +// read_chunk_size advances the reader & reads the header bytes for the length +// of the next chunk. fn (mut r StreamFormatReader) read_chunk_size() ?u32 { mut buf := []u8{len: 8} diff --git a/src/util/util.v b/src/util/util.v index f9f58ed..f805e6d 100644 --- a/src/util/util.v +++ b/src/util/util.v @@ -93,6 +93,9 @@ pub fn pretty_bytes(bytes int) string { return '${n:.2}${util.prefixes[i]}' } +// match_array_in_array returns how many elements of a2 overlap with a1. For +// example, if a1 = "abcd" & a2 = "cd", the result will be 2. If the match is +// not at the end of a1, the result is 0. pub fn match_array_in_array(a1 []T, a2 []T) int { mut i := 0 mut match_len := 0 From fc24a19cecf8327ca305c6a37c7d83462c36d5c5 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sun, 15 May 2022 00:23:52 +0200 Subject: [PATCH 9/9] fix(docker): read_response now handles chunked data --- src/build/build.v | 1 - src/docker/images.v | 5 +---- src/docker/socket.v | 22 +++++++++++++++++++--- src/util/util.v | 11 +++++++++++ 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/src/build/build.v b/src/build/build.v index dd72c17..98c56f5 100644 --- a/src/build/build.v +++ b/src/build/build.v @@ -56,7 +56,6 @@ pub fn create_build_image(base_image string) ?string { // This loop waits until the container has stopped, so we can remove it after for { - println('wot') data := dd.inspect_container(id)? if !data.state.running { diff --git a/src/docker/images.v b/src/docker/images.v index 974b408..ab735f2 100644 --- a/src/docker/images.v +++ b/src/docker/images.v @@ -27,11 +27,8 @@ pub fn (mut d DockerDaemon) pull_image(image string, tag string) ? { mut buf := []u8{len: 1024} for { - c := body.read(mut buf)? + c := body.read(mut buf) or { break } - if c == 0 { - break - } print(buf[..c].bytestr()) } } diff --git a/src/docker/socket.v b/src/docker/socket.v index b1e3080..dfa7ea7 100644 --- a/src/docker/socket.v +++ b/src/docker/socket.v @@ -38,6 +38,9 @@ pub fn (mut d DockerDaemon) send_request(method string, url urllib.URL) ? { req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\n\n' d.socket.write_string(req)? + + // When starting a new request, the reader needs to be reset. + d.reader = io.new_buffered_reader(reader: d.socket) } // send_request_with_body sends an HTTP request with the given body. @@ -45,6 +48,9 @@ pub fn (mut d DockerDaemon) send_request_with_body(method string, url urllib.URL req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\nContent-Type: $content_type\nContent-Length: $body.len\n\n$body\n\n' d.socket.write_string(req)? + + // When starting a new request, the reader needs to be reset. + d.reader = io.new_buffered_reader(reader: d.socket) } // send_request_with_json is a convenience wrapper around @@ -108,11 +114,21 @@ pub fn (mut d DockerDaemon) read_response_body(length int) ?string { return builder.str() } -// read_response is a convenience function combining read_response_head & -// read_response_body. It can be used when you know for certain the response -// won't be chunked. +// read_response is a convenience function which always consumes the entire +// response & returns it. It should only be used when we're certain that the +// result isn't too large. pub fn (mut d DockerDaemon) read_response() ?(http.Response, string) { head := d.read_response_head()? + + if head.header.get(http.CommonHeader.transfer_encoding) or { '' } == 'chunked' { + mut builder := strings.new_builder(1024) + mut body := d.get_chunked_response_reader() + + util.reader_to_writer(mut body, mut builder) ? + + return head, builder.str() + } + content_length := head.header.get(http.CommonHeader.content_length)?.int() res := d.read_response_body(content_length)? diff --git a/src/util/util.v b/src/util/util.v index f805e6d..7aabc1b 100644 --- a/src/util/util.v +++ b/src/util/util.v @@ -23,6 +23,17 @@ pub fn exit_with_message(code int, msg string) { exit(code) } +// reader_to_writer tries to consume the entire reader & write it to the writer. +pub fn reader_to_writer(mut reader io.Reader, mut writer io.Writer) ? { + mut buf := []u8{len: 10 * 1024} + + for { + c := reader.read(mut buf) or { break } + + writer.write(buf) or { break } + } +} + // reader_to_file writes the contents of a BufferedReader to a file pub fn reader_to_file(mut reader io.BufferedReader, length int, path string) ? { mut file := os.create(path)?