From 92cbea69d6e4f8929ff5f397b36d7852e7084172 Mon Sep 17 00:00:00 2001 From: Jef Roosens Date: Sat, 14 May 2022 21:55:19 +0200 Subject: [PATCH] feat(docker): added ChunkedResponseReader implementation --- src/build/build.v | 6 +-- src/docker/containers.v | 6 +-- src/docker/images.v | 26 ++++++++++++ src/docker/socket.v | 35 ++++++++++------ src/docker/stream.v | 92 ++++++++++++++++++++++++++++++++++++++++- src/util/util.v | 16 +++++++ 6 files changed, 161 insertions(+), 20 deletions(-) diff --git a/src/build/build.v b/src/build/build.v index ae10318..dd72c17 100644 --- a/src/build/build.v +++ b/src/build/build.v @@ -17,7 +17,7 @@ const build_image_repo = 'vieter-build' // makepkg with. The base image should be some Linux distribution that uses // Pacman as its package manager. pub fn create_build_image(base_image string) ?string { - mut dd := docker.new_conn() ? + mut dd := docker.new_conn()? commands := [ // Update repos & install required packages @@ -51,12 +51,12 @@ pub fn create_build_image(base_image string) ?string { docker.pull_image(image_name, image_tag)? id := dd.create_container(c)?.id - /* id := docker.create_container(c)? */ + // id := docker.create_container(c)? dd.start_container(id)? // This loop waits until the container has stopped, so we can remove it after for { - println('wot') + println('wot') data := dd.inspect_container(id)? if !data.state.running { diff --git a/src/docker/containers.v b/src/docker/containers.v index e2da938..0027747 100644 --- a/src/docker/containers.v +++ b/src/docker/containers.v @@ -41,7 +41,7 @@ pub struct NewContainer { struct CreatedContainer { pub: - id string [json: Id] + id string [json: Id] warnings []string [json: Warnings] } @@ -62,7 +62,7 @@ pub fn (mut d DockerDaemon) create_container(c NewContainer) ?CreatedContainer { pub fn (mut d DockerDaemon) start_container(id string) ? { d.send_request('POST', urllib.parse('/v1.41/containers/$id/start')?)? - head, body := d.read_response() ? + head, body := d.read_response()? if head.status_code != 204 { data := json.decode(DockerError, body)? @@ -152,7 +152,7 @@ pub fn inspect_container(id string) ?ContainerInspect { pub fn (mut d DockerDaemon) remove_container(id string) ? { d.send_request('DELETE', urllib.parse('/v1.41/containers/$id')?)? - head, body := d.read_response() ? + head, body := d.read_response()? if head.status_code != 204 { data := json.decode(DockerError, body)? diff --git a/src/docker/images.v b/src/docker/images.v index 2e873fa..2249113 100644 --- a/src/docker/images.v +++ b/src/docker/images.v @@ -9,6 +9,32 @@ pub: id string [json: Id] } +pub fn (mut d DockerDaemon) pull_image(image string, tag string) ? { + d.send_request('POST', urllib.parse('/v1.41/images/create?fromImage=$image&tag=$tag')?)? + head := d.read_response_head()? + + if head.status_code != 200 { + content_length := head.header.get(http.CommonHeader.content_length)?.int() + body := d.read_response_body(content_length)? + data := json.decode(DockerError, body)? + + return error(data.message) + } + + mut body := d.get_chunked_response_reader() + + mut buf := []u8{len: 1024} + + for { + c := body.read(mut buf)? + + if c == 0 { + break + } + print(buf[..c].bytestr()) + } +} + // pull_image pulls tries to pull the image for the given image & tag pub fn pull_image(image string, tag string) ?http.Response { return request('POST', urllib.parse('/v1.41/images/create?fromImage=$image&tag=$tag')?) diff --git a/src/docker/socket.v b/src/docker/socket.v index 749f14e..444d9a9 100644 --- a/src/docker/socket.v +++ b/src/docker/socket.v @@ -6,11 +6,13 @@ import net.http import strings import net.urllib import json +import util const ( - socket = '/var/run/docker.sock' - buf_len = 10 * 1024 - http_separator = [u8(`\r`), `\n`, `\r`, `\n`] + socket = '/var/run/docker.sock' + buf_len = 10 * 1024 + http_separator = [u8(`\r`), `\n`, `\r`, `\n`] + http_chunk_separator = [u8(`\r`), `\n`] ) pub struct DockerDaemon { @@ -61,17 +63,18 @@ pub fn (mut d DockerDaemon) read_response_head() ?http.Response { c = d.reader.read(mut buf)? res << buf[..c] - mut i := 0 - mut match_len := 0 + match_len := util.match_array_in_array(buf[..c], docker.http_separator) + // mut i := 0 + // mut match_len := 0 - for i + match_len < c { - if buf[i + match_len] == docker.http_separator[match_len] { - match_len += 1 - } else { - i += match_len + 1 - match_len = 0 - } - } + // for i + match_len < c { + // if buf[i + match_len] == docker.http_separator[match_len] { + // match_len += 1 + // } else { + // i += match_len + 1 + // match_len = 0 + // } + //} if match_len == 4 { break @@ -114,3 +117,9 @@ pub fn (mut d DockerDaemon) read_response() ?(http.Response, string) { return head, res } + +pub fn (mut d DockerDaemon) get_chunked_response_reader() &ChunkedResponseReader { + r := new_chunked_response_reader(d.reader) + + return r +} diff --git a/src/docker/stream.v b/src/docker/stream.v index 24c51c1..fbc48ba 100644 --- a/src/docker/stream.v +++ b/src/docker/stream.v @@ -1,9 +1,99 @@ module docker import io +import util +import encoding.binary +import encoding.hex -struct ChunkedResponseStream { +struct ChunkedResponseReader { +mut: reader io.Reader + // buf []u8 + // offset int + // len int + bytes_left_in_chunk u64 + end_of_stream bool + started bool } +pub fn new_chunked_response_reader(reader io.Reader) &ChunkedResponseReader { + r := &ChunkedResponseReader{ + reader: reader + } + return r +} + +// We satisfy the io.Reader interface +pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int { + if r.end_of_stream { + return none + } + + if r.bytes_left_in_chunk == 0 { + r.bytes_left_in_chunk = r.read_chunk_size()? + + if r.end_of_stream { + return none + } + } + + mut c := 0 + + if buf.len > r.bytes_left_in_chunk { + c = r.reader.read(mut buf[..r.bytes_left_in_chunk])? + } else { + c = r.reader.read(mut buf)? + } + + r.bytes_left_in_chunk -= u64(c) + + return c +} + +fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { + mut buf := []u8{len: 2} + mut res := []u8{} + + if r.started { + // Each chunk ends with a `\r\n` which we want to skip first + r.reader.read(mut buf) ? + } + + r.started = true + + for { + c := r.reader.read(mut buf)? + res << buf[..c] + + match_len := util.match_array_in_array(buf[..c], http_chunk_separator) + + if match_len == http_chunk_separator.len { + break + } + + if match_len > 0 { + mut buf2 := []u8{len: 2 - match_len} + c2 := r.reader.read(mut buf2)? + res << buf2[..c2] + + if buf2 == http_chunk_separator[match_len..] { + break + } + } + } + + mut num_data := hex.decode(res#[..-2].bytestr())? + + for num_data.len < 8 { + num_data.insert(0, 0) + } + + num := binary.big_endian_u64(num_data) + + if num == 0 { + r.end_of_stream = true + } + + return num +} diff --git a/src/util/util.v b/src/util/util.v index 6602621..f9f58ed 100644 --- a/src/util/util.v +++ b/src/util/util.v @@ -92,3 +92,19 @@ pub fn pretty_bytes(bytes int) string { return '${n:.2}${util.prefixes[i]}' } + +pub fn match_array_in_array(a1 []T, a2 []T) int { + mut i := 0 + mut match_len := 0 + + for i + match_len < a1.len { + if a1[i + match_len] == a2[match_len] { + match_len += 1 + } else { + i += match_len + 1 + match_len = 0 + } + } + + return match_len +}