diff --git a/src/build/build.v b/src/build/build.v index 41f68e2..98c56f5 100644 --- a/src/build/build.v +++ b/src/build/build.v @@ -17,6 +17,8 @@ const build_image_repo = 'vieter-build' // makepkg with. The base image should be some Linux distribution that uses // Pacman as its package manager. pub fn create_build_image(base_image string) ?string { + mut dd := docker.new_conn()? + commands := [ // Update repos & install required packages 'pacman -Syu --needed --noconfirm base-devel git' @@ -48,12 +50,13 @@ pub fn create_build_image(base_image string) ?string { // We pull the provided image docker.pull_image(image_name, image_tag)? - id := docker.create_container(c)? - docker.start_container(id)? + id := dd.create_container(c)?.id + // id := docker.create_container(c)? + dd.start_container(id)? // This loop waits until the container has stopped, so we can remove it after for { - data := docker.inspect_container(id)? + data := dd.inspect_container(id)? if !data.state.running { break @@ -68,7 +71,7 @@ pub fn create_build_image(base_image string) ?string { // conflicts. tag := time.sys_mono_now().str() image := docker.create_image_from_container(id, 'vieter-build', tag)? - docker.remove_container(id)? + dd.remove_container(id)? return image.id } diff --git a/src/docker/containers.v b/src/docker/containers.v index 14ac12d..05c9cc7 100644 --- a/src/docker/containers.v +++ b/src/docker/containers.v @@ -3,19 +3,34 @@ module docker import json import net.urllib import time +import net.http + +struct DockerError { + message string +} struct Container { id string [json: Id] names []string [json: Names] } -// containers returns a list of all currently running containers -pub fn containers() ?[]Container { - res := request('GET', urllib.parse('/v1.41/containers/json')?)? +// containers returns a list of all containers. +pub fn (mut d DockerDaemon) containers() ?[]Container { + d.send_request('GET', urllib.parse('/v1.41/containers/json')?)? + head, res := d.read_response()? - return json.decode([]Container, res.text) or {} + if head.status_code != 200 { + data := json.decode(DockerError, res)? + + return error(data.message) + } + + data := json.decode([]Container, res)? + + return data } +[params] pub struct NewContainer { image string [json: Image] entrypoint []string [json: Entrypoint] @@ -26,7 +41,37 @@ pub struct NewContainer { } struct CreatedContainer { - id string [json: Id] +pub: + id string [json: Id] + warnings []string [json: Warnings] +} + +// create_container creates a new container with the given config. +pub fn (mut d DockerDaemon) create_container(c NewContainer) ?CreatedContainer { + d.send_request_with_json('POST', urllib.parse('/v1.41/containers/create')?, c)? + head, res := d.read_response()? + + if head.status_code != 201 { + data := json.decode(DockerError, res)? + + return error(data.message) + } + + data := json.decode(CreatedContainer, res)? + + return data +} + +// start_container starts the container with the given id. +pub fn (mut d DockerDaemon) start_container(id string) ? { + d.send_request('POST', urllib.parse('/v1.41/containers/$id/start')?)? + head, body := d.read_response()? + + if head.status_code != 204 { + data := json.decode(DockerError, body)? + + return error(data.message) + } } // create_container creates a container defined by the given configuration. If @@ -67,6 +112,28 @@ pub mut: end_time time.Time [skip] } +// inspect_container returns detailed information for a given container. +pub fn (mut d DockerDaemon) inspect_container(id string) ?ContainerInspect { + d.send_request('GET', urllib.parse('/v1.41/containers/$id/json')?)? + head, body := d.read_response()? + + if head.status_code != 200 { + data := json.decode(DockerError, body)? + + return error(data.message) + } + + mut data := json.decode(ContainerInspect, body)? + + data.state.start_time = time.parse_rfc3339(data.state.start_time_str)? + + if data.state.status == 'exited' { + data.state.end_time = time.parse_rfc3339(data.state.end_time_str)? + } + + return data +} + // inspect_container returns the result of inspecting a container with a given // ID. pub fn inspect_container(id string) ?ContainerInspect { @@ -87,6 +154,18 @@ pub fn inspect_container(id string) ?ContainerInspect { return data } +// remove_container removes the container with the given id. +pub fn (mut d DockerDaemon) remove_container(id string) ? { + d.send_request('DELETE', urllib.parse('/v1.41/containers/$id')?)? + head, body := d.read_response()? + + if head.status_code != 204 { + data := json.decode(DockerError, body)? + + return error(data.message) + } +} + // remove_container removes a container with a given ID. pub fn remove_container(id string) ?bool { res := request('DELETE', urllib.parse('/v1.41/containers/$id')?)? @@ -94,6 +173,23 @@ pub fn remove_container(id string) ?bool { return res.status_code == 204 } +// get_container_logs returns a reader object allowing access to the +// container's logs. +pub fn (mut d DockerDaemon) get_container_logs(id string) ?&StreamFormatReader { + d.send_request('GET', urllib.parse('/v1.41/containers/$id/logs?stdout=true&stderr=true')?)? + head := d.read_response_head()? + + if head.status_code != 200 { + content_length := head.header.get(http.CommonHeader.content_length)?.int() + body := d.read_response_body(content_length)? + data := json.decode(DockerError, body)? + + return error(data.message) + } + + return d.get_stream_format_reader() +} + // get_container_logs retrieves the logs for a Docker container, both stdout & // stderr. pub fn get_container_logs(id string) ?string { diff --git a/src/docker/docker.v b/src/docker/docker.v index 305e825..fa83d89 100644 --- a/src/docker/docker.v +++ b/src/docker/docker.v @@ -5,16 +5,12 @@ import net.urllib import net.http import json -const socket = '/var/run/docker.sock' - -const buf_len = 1024 - // send writes a request to the Docker socket, waits for a response & returns // it. fn send(req &string) ?http.Response { // Open a connection to the socket - mut s := unix.connect_stream(docker.socket) or { - return error('Failed to connect to socket ${docker.socket}.') + mut s := unix.connect_stream(socket) or { + return error('Failed to connect to socket ${socket}.') } defer { @@ -25,19 +21,19 @@ fn send(req &string) ?http.Response { } // Write the request to the socket - s.write_string(req) or { return error('Failed to write request to socket ${docker.socket}.') } + s.write_string(req) or { return error('Failed to write request to socket ${socket}.') } s.wait_for_write()? mut c := 0 - mut buf := []u8{len: docker.buf_len} + mut buf := []u8{len: buf_len} mut res := []u8{} for { - c = s.read(mut buf) or { return error('Failed to read data from socket ${docker.socket}.') } + c = s.read(mut buf) or { return error('Failed to read data from socket ${socket}.') } res << buf[..c] - if c < docker.buf_len { + if c < buf_len { break } } @@ -45,7 +41,7 @@ fn send(req &string) ?http.Response { // After reading the first part of the response, we parse it into an HTTP // response. If it isn't chunked, we return early with the data. parsed := http.parse_response(res.bytestr()) or { - return error('Failed to parse HTTP response from socket ${docker.socket}.') + return error('Failed to parse HTTP response from socket ${socket}.') } if parsed.header.get(http.CommonHeader.transfer_encoding) or { '' } != 'chunked' { @@ -59,12 +55,10 @@ fn send(req &string) ?http.Response { s.wait_for_write()? for { - c = s.read(mut buf) or { - return error('Failed to read data from socket ${docker.socket}.') - } + c = s.read(mut buf) or { return error('Failed to read data from socket ${socket}.') } res << buf[..c] - if c < docker.buf_len { + if c < buf_len { break } } diff --git a/src/docker/images.v b/src/docker/images.v index 2e873fa..ab735f2 100644 --- a/src/docker/images.v +++ b/src/docker/images.v @@ -9,6 +9,30 @@ pub: id string [json: Id] } +// pull_image pulls the given image:tag. +pub fn (mut d DockerDaemon) pull_image(image string, tag string) ? { + d.send_request('POST', urllib.parse('/v1.41/images/create?fromImage=$image&tag=$tag')?)? + head := d.read_response_head()? + + if head.status_code != 200 { + content_length := head.header.get(http.CommonHeader.content_length)?.int() + body := d.read_response_body(content_length)? + data := json.decode(DockerError, body)? + + return error(data.message) + } + + mut body := d.get_chunked_response_reader() + + mut buf := []u8{len: 1024} + + for { + c := body.read(mut buf) or { break } + + print(buf[..c].bytestr()) + } +} + // pull_image pulls tries to pull the image for the given image & tag pub fn pull_image(image string, tag string) ?http.Response { return request('POST', urllib.parse('/v1.41/images/create?fromImage=$image&tag=$tag')?) diff --git a/src/docker/socket.v b/src/docker/socket.v new file mode 100644 index 0000000..dfa7ea7 --- /dev/null +++ b/src/docker/socket.v @@ -0,0 +1,153 @@ +module docker + +import net.unix +import io +import net.http +import strings +import net.urllib +import json +import util + +const ( + socket = '/var/run/docker.sock' + buf_len = 10 * 1024 + http_separator = [u8(`\r`), `\n`, `\r`, `\n`] + http_chunk_separator = [u8(`\r`), `\n`] +) + +pub struct DockerDaemon { +mut: + socket &unix.StreamConn + reader &io.BufferedReader +} + +// new_conn creates a new connection to the Docker daemon. +pub fn new_conn() ?&DockerDaemon { + s := unix.connect_stream(docker.socket)? + + d := &DockerDaemon{ + socket: s + reader: io.new_buffered_reader(reader: s) + } + + return d +} + +// send_request sends an HTTP request without body. +pub fn (mut d DockerDaemon) send_request(method string, url urllib.URL) ? { + req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\n\n' + + d.socket.write_string(req)? + + // When starting a new request, the reader needs to be reset. + d.reader = io.new_buffered_reader(reader: d.socket) +} + +// send_request_with_body sends an HTTP request with the given body. +pub fn (mut d DockerDaemon) send_request_with_body(method string, url urllib.URL, content_type string, body string) ? { + req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\nContent-Type: $content_type\nContent-Length: $body.len\n\n$body\n\n' + + d.socket.write_string(req)? + + // When starting a new request, the reader needs to be reset. + d.reader = io.new_buffered_reader(reader: d.socket) +} + +// send_request_with_json is a convenience wrapper around +// send_request_with_body that encodes the input as JSON. +pub fn (mut d DockerDaemon) send_request_with_json(method string, url urllib.URL, data &T) ? { + body := json.encode(data) + + return d.send_request_with_body(method, url, 'application/json', body) +} + +// read_response_head consumes the socket's contents until it encounters +// '\r\n\r\n', after which it parses the response as an HTTP response. +// Importantly, this function never consumes the reader past the HTTP +// separator, so the body can be read fully later on. +pub fn (mut d DockerDaemon) read_response_head() ?http.Response { + mut c := 0 + mut buf := []u8{len: 4} + mut res := []u8{} + + for { + c = d.reader.read(mut buf)? + res << buf[..c] + + match_len := util.match_array_in_array(buf[..c], docker.http_separator) + + if match_len == 4 { + break + } + + if match_len > 0 { + mut buf2 := []u8{len: 4 - match_len} + c2 := d.reader.read(mut buf2)? + res << buf2[..c2] + + if buf2 == docker.http_separator[match_len..] { + break + } + } + } + + return http.parse_response(res.bytestr()) +} + +// read_response_body reads `length` bytes from the stream. It can be used when +// the response encoding isn't chunked to fully read it. +pub fn (mut d DockerDaemon) read_response_body(length int) ?string { + if length == 0 { + return '' + } + + mut buf := []u8{len: docker.buf_len} + mut c := 0 + mut builder := strings.new_builder(docker.buf_len) + + for builder.len < length { + c = d.reader.read(mut buf) or { break } + + builder.write(buf[..c])? + } + + return builder.str() +} + +// read_response is a convenience function which always consumes the entire +// response & returns it. It should only be used when we're certain that the +// result isn't too large. +pub fn (mut d DockerDaemon) read_response() ?(http.Response, string) { + head := d.read_response_head()? + + if head.header.get(http.CommonHeader.transfer_encoding) or { '' } == 'chunked' { + mut builder := strings.new_builder(1024) + mut body := d.get_chunked_response_reader() + + util.reader_to_writer(mut body, mut builder) ? + + return head, builder.str() + } + + content_length := head.header.get(http.CommonHeader.content_length)?.int() + res := d.read_response_body(content_length)? + + return head, res +} + +// get_chunked_response_reader returns a ChunkedResponseReader using the socket +// as reader. +pub fn (mut d DockerDaemon) get_chunked_response_reader() &ChunkedResponseReader { + r := new_chunked_response_reader(d.reader) + + return r +} + +// get_stream_format_reader returns a StreamFormatReader using the socket as +// reader. +pub fn (mut d DockerDaemon) get_stream_format_reader() &StreamFormatReader { + r := new_chunked_response_reader(d.reader) + r2 := new_stream_format_reader(r) + + return r2 +} diff --git a/src/docker/stream.v b/src/docker/stream.v new file mode 100644 index 0000000..f20fe18 --- /dev/null +++ b/src/docker/stream.v @@ -0,0 +1,166 @@ +module docker + +import io +import util +import encoding.binary +import encoding.hex + +// ChunkedResponseReader parses an underlying HTTP chunked response, exposing +// it as if it was a continuous stream of data. +struct ChunkedResponseReader { +mut: + reader io.Reader + bytes_left_in_chunk u64 + end_of_stream bool + started bool +} + +// new_chunked_response_reader creates a new ChunkedResponseReader on the heap +// with the provided reader. +pub fn new_chunked_response_reader(reader io.Reader) &ChunkedResponseReader { + r := &ChunkedResponseReader{ + reader: reader + } + + return r +} + +// read satisfies the io.Reader interface. +pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int { + if r.end_of_stream { + return none + } + + if r.bytes_left_in_chunk == 0 { + r.bytes_left_in_chunk = r.read_chunk_size()? + + if r.end_of_stream { + return none + } + } + + mut c := 0 + + // Make sure we don't read more than we can safely read. This is to avoid + // the underlying reader from becoming out of sync with our parsing: + if buf.len > r.bytes_left_in_chunk { + c = r.reader.read(mut buf[..r.bytes_left_in_chunk])? + } else { + c = r.reader.read(mut buf)? + } + + r.bytes_left_in_chunk -= u64(c) + + return c +} + +// read_chunk_size advances the reader & reads the size of the next HTTP chunk. +// This function should only be called if the previous chunk has been +// completely consumed. +fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { + mut buf := []u8{len: 2} + mut res := []u8{} + + if r.started { + // Each chunk ends with a `\r\n` which we want to skip first + r.reader.read(mut buf)? + } + + r.started = true + + for { + c := r.reader.read(mut buf)? + res << buf[..c] + + match_len := util.match_array_in_array(buf[..c], http_chunk_separator) + + if match_len == http_chunk_separator.len { + break + } + + if match_len > 0 { + mut buf2 := []u8{len: 2 - match_len} + c2 := r.reader.read(mut buf2)? + res << buf2[..c2] + + if buf2 == http_chunk_separator[match_len..] { + break + } + } + } + + // The length of the next chunk is provided as a hexadecimal + mut num_data := hex.decode(res#[..-2].bytestr())? + + for num_data.len < 8 { + num_data.insert(0, 0) + } + + num := binary.big_endian_u64(num_data) + + // This only occurs for the very last chunk, which always reports a size of + // 0. + if num == 0 { + r.end_of_stream = true + } + + return num +} + +// StreamFormatReader parses an underlying stream of Docker logs, removing the +// header bytes. +struct StreamFormatReader { +mut: + reader io.Reader + bytes_left_in_chunk u32 + end_of_stream bool +} + +// new_stream_format_reader creates a new StreamFormatReader using the given +// reader. +pub fn new_stream_format_reader(reader io.Reader) &StreamFormatReader { + r := &StreamFormatReader{ + reader: reader + } + + return r +} + +// read satisfies the io.Reader interface. +pub fn (mut r StreamFormatReader) read(mut buf []u8) ?int { + if r.end_of_stream { + return none + } + + if r.bytes_left_in_chunk == 0 { + r.bytes_left_in_chunk = r.read_chunk_size()? + + if r.end_of_stream { + return none + } + } + + mut c := 0 + + if buf.len > r.bytes_left_in_chunk { + c = r.reader.read(mut buf[..r.bytes_left_in_chunk])? + } else { + c = r.reader.read(mut buf)? + } + + r.bytes_left_in_chunk -= u32(c) + + return c +} + +// read_chunk_size advances the reader & reads the header bytes for the length +// of the next chunk. +fn (mut r StreamFormatReader) read_chunk_size() ?u32 { + mut buf := []u8{len: 8} + + r.reader.read(mut buf)? + + num := binary.big_endian_u32(buf[4..]) + + return num +} diff --git a/src/main.v b/src/main.v index 41d0d33..6b1e7bc 100644 --- a/src/main.v +++ b/src/main.v @@ -7,6 +7,7 @@ import build import console.git import console.logs import cron +import docker fn main() { mut app := cli.Command{ @@ -31,7 +32,7 @@ fn main() { logs.cmd(), ] } - app.setup() app.parse(os.args) + return } diff --git a/src/util/util.v b/src/util/util.v index 6602621..7aabc1b 100644 --- a/src/util/util.v +++ b/src/util/util.v @@ -23,6 +23,17 @@ pub fn exit_with_message(code int, msg string) { exit(code) } +// reader_to_writer tries to consume the entire reader & write it to the writer. +pub fn reader_to_writer(mut reader io.Reader, mut writer io.Writer) ? { + mut buf := []u8{len: 10 * 1024} + + for { + c := reader.read(mut buf) or { break } + + writer.write(buf) or { break } + } +} + // reader_to_file writes the contents of a BufferedReader to a file pub fn reader_to_file(mut reader io.BufferedReader, length int, path string) ? { mut file := os.create(path)? @@ -92,3 +103,22 @@ pub fn pretty_bytes(bytes int) string { return '${n:.2}${util.prefixes[i]}' } + +// match_array_in_array returns how many elements of a2 overlap with a1. For +// example, if a1 = "abcd" & a2 = "cd", the result will be 2. If the match is +// not at the end of a1, the result is 0. +pub fn match_array_in_array(a1 []T, a2 []T) int { + mut i := 0 + mut match_len := 0 + + for i + match_len < a1.len { + if a1[i + match_len] == a2[match_len] { + match_len += 1 + } else { + i += match_len + 1 + match_len = 0 + } + } + + return match_len +}