forked from vieter-v/vieter
				
			Merge pull request 'Full Docker code refactor' (#183) from Chewing_Bever/vieter:docker-rework into dev
Reviewed-on: vieter/vieter#183remotes/1739821333615734048/dev
						commit
						92f73ad364
					
				|  | @ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||
| * Web API for adding & querying build logs | ||||
| * CLI commands to access build logs API | ||||
| * Cron build logs are uploaded to above API | ||||
| * Proper ASCII table output in CLI | ||||
| 
 | ||||
| ### Changed | ||||
| 
 | ||||
|  | @ -20,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||
| * Official Arch packages are now split between `vieter` & `vieter-git` | ||||
|     * `vieter` is the latest release | ||||
|     * `vieter-git` is the latest commit on the dev branch | ||||
| * Full refactor of Docker socket code | ||||
| 
 | ||||
| ## [0.3.0-alpha.1](https://git.rustybever.be/vieter/vieter/src/tag/0.3.0-alpha.1) | ||||
| 
 | ||||
|  |  | |||
|  | @ -6,10 +6,13 @@ import time | |||
| import os | ||||
| import db | ||||
| import client | ||||
| import strings | ||||
| import util | ||||
| 
 | ||||
| const container_build_dir = '/build' | ||||
| 
 | ||||
| const build_image_repo = 'vieter-build' | ||||
| const ( | ||||
| 	container_build_dir = '/build' | ||||
| 	build_image_repo    = 'vieter-build' | ||||
| ) | ||||
| 
 | ||||
| // create_build_image creates a builder image given some base image which can | ||||
| // then be used to build & package Arch images. It mostly just updates the | ||||
|  | @ -17,6 +20,12 @@ const build_image_repo = 'vieter-build' | |||
| // makepkg with. The base image should be some Linux distribution that uses | ||||
| // Pacman as its package manager. | ||||
| pub fn create_build_image(base_image string) ?string { | ||||
| 	mut dd := docker.new_conn()? | ||||
| 
 | ||||
| 	defer { | ||||
| 		dd.close() or {} | ||||
| 	} | ||||
| 
 | ||||
| 	commands := [ | ||||
| 		// Update repos & install required packages | ||||
| 		'pacman -Syu --needed --noconfirm base-devel git' | ||||
|  | @ -46,14 +55,15 @@ pub fn create_build_image(base_image string) ?string { | |||
| 	image_tag := if image_parts.len > 1 { image_parts[1] } else { 'latest' } | ||||
| 
 | ||||
| 	// We pull the provided image | ||||
| 	docker.pull_image(image_name, image_tag)? | ||||
| 	dd.pull_image(image_name, image_tag)? | ||||
| 
 | ||||
| 	id := docker.create_container(c)? | ||||
| 	docker.start_container(id)? | ||||
| 	id := dd.create_container(c)?.id | ||||
| 	// id := docker.create_container(c)? | ||||
| 	dd.start_container(id)? | ||||
| 
 | ||||
| 	// This loop waits until the container has stopped, so we can remove it after | ||||
| 	for { | ||||
| 		data := docker.inspect_container(id)? | ||||
| 		data := dd.inspect_container(id)? | ||||
| 
 | ||||
| 		if !data.state.running { | ||||
| 			break | ||||
|  | @ -67,8 +77,8 @@ pub fn create_build_image(base_image string) ?string { | |||
| 	// TODO also add the base image's name into the image name to prevent | ||||
| 	// conflicts. | ||||
| 	tag := time.sys_mono_now().str() | ||||
| 	image := docker.create_image_from_container(id, 'vieter-build', tag)? | ||||
| 	docker.remove_container(id)? | ||||
| 	image := dd.create_image_from_container(id, 'vieter-build', tag)? | ||||
| 	dd.remove_container(id)? | ||||
| 
 | ||||
| 	return image.id | ||||
| } | ||||
|  | @ -85,6 +95,12 @@ pub: | |||
| // provided GitRepo. The base image ID should be of an image previously created | ||||
| // by create_build_image. It returns the logs of the container. | ||||
| pub fn build_repo(address string, api_key string, base_image_id string, repo &db.GitRepo) ?BuildResult { | ||||
| 	mut dd := docker.new_conn()? | ||||
| 
 | ||||
| 	defer { | ||||
| 		dd.close() or {} | ||||
| 	} | ||||
| 
 | ||||
| 	build_arch := os.uname().machine | ||||
| 
 | ||||
| 	// TODO what to do with PKGBUILDs that build multiple packages? | ||||
|  | @ -112,27 +128,31 @@ pub fn build_repo(address string, api_key string, base_image_id string, repo &db | |||
| 		user: 'builder:builder' | ||||
| 	} | ||||
| 
 | ||||
| 	id := docker.create_container(c)? | ||||
| 	docker.start_container(id)? | ||||
| 	id := dd.create_container(c)?.id | ||||
| 	dd.start_container(id)? | ||||
| 
 | ||||
| 	mut data := docker.inspect_container(id)? | ||||
| 	mut data := dd.inspect_container(id)? | ||||
| 
 | ||||
| 	// This loop waits until the container has stopped, so we can remove it after | ||||
| 	for data.state.running { | ||||
| 		time.sleep(1 * time.second) | ||||
| 
 | ||||
| 		data = docker.inspect_container(id)? | ||||
| 		data = dd.inspect_container(id)? | ||||
| 	} | ||||
| 
 | ||||
| 	logs := docker.get_container_logs(id)? | ||||
| 	mut logs_stream := dd.get_container_logs(id)? | ||||
| 
 | ||||
| 	docker.remove_container(id)? | ||||
| 	// Read in the entire stream | ||||
| 	mut logs_builder := strings.new_builder(10 * 1024) | ||||
| 	util.reader_to_writer(mut logs_stream, mut logs_builder)? | ||||
| 
 | ||||
| 	dd.remove_container(id)? | ||||
| 
 | ||||
| 	return BuildResult{ | ||||
| 		start_time: data.state.start_time | ||||
| 		end_time: data.state.end_time | ||||
| 		exit_code: data.state.exit_code | ||||
| 		logs: logs | ||||
| 		logs: logs_builder.str() | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | @ -150,7 +170,14 @@ fn build(conf Config, repo_id int) ? { | |||
| 	res := build_repo(conf.address, conf.api_key, image_id, repo)? | ||||
| 
 | ||||
| 	println('Removing build image...') | ||||
| 	docker.remove_image(image_id)? | ||||
| 
 | ||||
| 	mut dd := docker.new_conn()? | ||||
| 
 | ||||
| 	defer { | ||||
| 		dd.close() or {} | ||||
| 	} | ||||
| 
 | ||||
| 	dd.remove_image(image_id)? | ||||
| 
 | ||||
| 	println('Uploading logs to Vieter...') | ||||
| 	c.add_build_log(repo.id, res.start_time, res.end_time, build_arch, res.exit_code, | ||||
|  |  | |||
|  | @ -253,14 +253,21 @@ fn (mut d Daemon) rebuild_base_image() bool { | |||
| fn (mut d Daemon) clean_old_base_images() { | ||||
| 	mut i := 0 | ||||
| 
 | ||||
| 	mut dd := docker.new_conn() or { | ||||
| 		d.lerror('Failed to connect to Docker socket.') | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	defer { | ||||
| 		dd.close() or {} | ||||
| 	} | ||||
| 
 | ||||
| 	for i < d.builder_images.len - 1 { | ||||
| 		// For each builder image, we try to remove it by calling the Docker | ||||
| 		// API. If the function returns an error or false, that means the image | ||||
| 		// wasn't deleted. Therefore, we move the index over. If the function | ||||
| 		// returns true, the array's length has decreased by one so we don't | ||||
| 		// move the index. | ||||
| 		if !docker.remove_image(d.builder_images[i]) or { false } { | ||||
| 			i += 1 | ||||
| 		} | ||||
| 		dd.remove_image(d.builder_images[i]) or { i += 1 } | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,3 @@ | |||
| This module implements part of the Docker Engine API v1.41 | ||||
| ([documentation](https://docs.docker.com/engine/api/v1.41/)) using socket-based | ||||
| HTTP communication. | ||||
|  | @ -3,17 +3,10 @@ module docker | |||
| import json | ||||
| import net.urllib | ||||
| import time | ||||
| import net.http { Method } | ||||
| 
 | ||||
| struct Container { | ||||
| 	id    string   [json: Id] | ||||
| 	names []string [json: Names] | ||||
| } | ||||
| 
 | ||||
| // containers returns a list of all currently running containers | ||||
| pub fn containers() ?[]Container { | ||||
| 	res := request('GET', urllib.parse('/v1.41/containers/json')?)? | ||||
| 
 | ||||
| 	return json.decode([]Container, res.text) or {} | ||||
| struct DockerError { | ||||
| 	message string | ||||
| } | ||||
| 
 | ||||
| pub struct NewContainer { | ||||
|  | @ -26,27 +19,37 @@ pub struct NewContainer { | |||
| } | ||||
| 
 | ||||
| struct CreatedContainer { | ||||
| 	id string [json: Id] | ||||
| pub: | ||||
| 	id       string   [json: Id] | ||||
| 	warnings []string [json: Warnings] | ||||
| } | ||||
| 
 | ||||
| // create_container creates a container defined by the given configuration. If | ||||
| // successful, it returns the ID of the newly created container. | ||||
| pub fn create_container(c &NewContainer) ?string { | ||||
| 	res := request_with_json('POST', urllib.parse('/v1.41/containers/create')?, c)? | ||||
| // create_container creates a new container with the given config. | ||||
| pub fn (mut d DockerConn) create_container(c NewContainer) ?CreatedContainer { | ||||
| 	d.send_request_with_json(Method.post, urllib.parse('/v1.41/containers/create')?, c)? | ||||
| 	head, res := d.read_response()? | ||||
| 
 | ||||
| 	if res.status_code != 201 { | ||||
| 		return error('Failed to create container.') | ||||
| 	if head.status_code != 201 { | ||||
| 		data := json.decode(DockerError, res)? | ||||
| 
 | ||||
| 		return error(data.message) | ||||
| 	} | ||||
| 
 | ||||
| 	return json.decode(CreatedContainer, res.text)?.id | ||||
| 	data := json.decode(CreatedContainer, res)? | ||||
| 
 | ||||
| 	return data | ||||
| } | ||||
| 
 | ||||
| // start_container starts a container with a given ID. It returns whether the | ||||
| // container was started or not. | ||||
| pub fn start_container(id string) ?bool { | ||||
| 	res := request('POST', urllib.parse('/v1.41/containers/$id/start')?)? | ||||
| // start_container starts the container with the given id. | ||||
| pub fn (mut d DockerConn) start_container(id string) ? { | ||||
| 	d.send_request(Method.post, urllib.parse('/v1.41/containers/$id/start')?)? | ||||
| 	head, body := d.read_response()? | ||||
| 
 | ||||
| 	return res.status_code == 204 | ||||
| 	if head.status_code != 204 { | ||||
| 		data := json.decode(DockerError, body)? | ||||
| 
 | ||||
| 		return error(data.message) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| struct ContainerInspect { | ||||
|  | @ -67,16 +70,18 @@ pub mut: | |||
| 	end_time   time.Time [skip] | ||||
| } | ||||
| 
 | ||||
| // inspect_container returns the result of inspecting a container with a given | ||||
| // ID. | ||||
| pub fn inspect_container(id string) ?ContainerInspect { | ||||
| 	res := request('GET', urllib.parse('/v1.41/containers/$id/json')?)? | ||||
| // inspect_container returns detailed information for a given container. | ||||
| pub fn (mut d DockerConn) inspect_container(id string) ?ContainerInspect { | ||||
| 	d.send_request(Method.get, urllib.parse('/v1.41/containers/$id/json')?)? | ||||
| 	head, body := d.read_response()? | ||||
| 
 | ||||
| 	if res.status_code != 200 { | ||||
| 		return error('Failed to inspect container.') | ||||
| 	if head.status_code != 200 { | ||||
| 		data := json.decode(DockerError, body)? | ||||
| 
 | ||||
| 		return error(data.message) | ||||
| 	} | ||||
| 
 | ||||
| 	mut data := json.decode(ContainerInspect, res.text)? | ||||
| 	mut data := json.decode(ContainerInspect, body)? | ||||
| 
 | ||||
| 	data.state.start_time = time.parse_rfc3339(data.state.start_time_str)? | ||||
| 
 | ||||
|  | @ -87,31 +92,31 @@ pub fn inspect_container(id string) ?ContainerInspect { | |||
| 	return data | ||||
| } | ||||
| 
 | ||||
| // remove_container removes a container with a given ID. | ||||
| pub fn remove_container(id string) ?bool { | ||||
| 	res := request('DELETE', urllib.parse('/v1.41/containers/$id')?)? | ||||
| // remove_container removes the container with the given id. | ||||
| pub fn (mut d DockerConn) remove_container(id string) ? { | ||||
| 	d.send_request(Method.delete, urllib.parse('/v1.41/containers/$id')?)? | ||||
| 	head, body := d.read_response()? | ||||
| 
 | ||||
| 	return res.status_code == 204 | ||||
| 	if head.status_code != 204 { | ||||
| 		data := json.decode(DockerError, body)? | ||||
| 
 | ||||
| 		return error(data.message) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // get_container_logs retrieves the logs for a Docker container, both stdout & | ||||
| // stderr. | ||||
| pub fn get_container_logs(id string) ?string { | ||||
| 	res := request('GET', urllib.parse('/v1.41/containers/$id/logs?stdout=true&stderr=true')?)? | ||||
| 	mut res_bytes := res.text.bytes() | ||||
| // get_container_logs returns a reader object allowing access to the | ||||
| // container's logs. | ||||
| pub fn (mut d DockerConn) get_container_logs(id string) ?&StreamFormatReader { | ||||
| 	d.send_request(Method.get, urllib.parse('/v1.41/containers/$id/logs?stdout=true&stderr=true')?)? | ||||
| 	head := d.read_response_head()? | ||||
| 
 | ||||
| 	// Docker uses a special "stream" format for their logs, so we have to | ||||
| 	// clean up the data. | ||||
| 	mut index := 0 | ||||
| 	if head.status_code != 200 { | ||||
| 		content_length := head.header.get(http.CommonHeader.content_length)?.int() | ||||
| 		body := d.read_response_body(content_length)? | ||||
| 		data := json.decode(DockerError, body)? | ||||
| 
 | ||||
| 	for index < res_bytes.len { | ||||
| 		// The reverse is required because V reads in the bytes differently | ||||
| 		t := res_bytes[index + 4..index + 8].reverse() | ||||
| 		len_length := unsafe { *(&u32(&t[0])) } | ||||
| 
 | ||||
| 		res_bytes.delete_many(index, 8) | ||||
| 		index += int(len_length) | ||||
| 		return error(data.message) | ||||
| 	} | ||||
| 
 | ||||
| 	return res_bytes.bytestr() | ||||
| 	return d.get_stream_format_reader() | ||||
| } | ||||
|  |  | |||
|  | @ -1,97 +1,137 @@ | |||
| module docker | ||||
| 
 | ||||
| import net.unix | ||||
| import net.urllib | ||||
| import io | ||||
| import net.http | ||||
| import strings | ||||
| import net.urllib | ||||
| import json | ||||
| import util | ||||
| 
 | ||||
| const socket = '/var/run/docker.sock' | ||||
| const ( | ||||
| 	socket               = '/var/run/docker.sock' | ||||
| 	buf_len              = 10 * 1024 | ||||
| 	http_separator       = [u8(`\r`), `\n`, `\r`, `\n`] | ||||
| 	http_chunk_separator = [u8(`\r`), `\n`] | ||||
| ) | ||||
| 
 | ||||
| const buf_len = 1024 | ||||
| pub struct DockerConn { | ||||
| mut: | ||||
| 	socket &unix.StreamConn | ||||
| 	reader &io.BufferedReader | ||||
| } | ||||
| 
 | ||||
| // send writes a request to the Docker socket, waits for a response & returns | ||||
| // it. | ||||
| fn send(req &string) ?http.Response { | ||||
| 	// Open a connection to the socket | ||||
| 	mut s := unix.connect_stream(docker.socket) or { | ||||
| 		return error('Failed to connect to socket ${docker.socket}.') | ||||
| // new_conn creates a new connection to the Docker daemon. | ||||
| pub fn new_conn() ?&DockerConn { | ||||
| 	s := unix.connect_stream(docker.socket)? | ||||
| 
 | ||||
| 	d := &DockerConn{ | ||||
| 		socket: s | ||||
| 		reader: io.new_buffered_reader(reader: s) | ||||
| 	} | ||||
| 
 | ||||
| 	defer { | ||||
| 		// This or is required because otherwise, the V compiler segfaults for | ||||
| 		// some reason | ||||
| 		// https://github.com/vlang/v/issues/13534 | ||||
| 		s.close() or {} | ||||
| 	} | ||||
| 	return d | ||||
| } | ||||
| 
 | ||||
| 	// Write the request to the socket | ||||
| 	s.write_string(req) or { return error('Failed to write request to socket ${docker.socket}.') } | ||||
| // close closes the underlying socket connection. | ||||
| pub fn (mut d DockerConn) close() ? { | ||||
| 	d.socket.close()? | ||||
| } | ||||
| 
 | ||||
| 	s.wait_for_write()? | ||||
| // send_request sends an HTTP request without body. | ||||
| pub fn (mut d DockerConn) send_request(method http.Method, url urllib.URL) ? { | ||||
| 	req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\n\n' | ||||
| 
 | ||||
| 	mut c := 0 | ||||
| 	mut buf := []u8{len: docker.buf_len} | ||||
| 	d.socket.write_string(req)? | ||||
| 
 | ||||
| 	// When starting a new request, the reader needs to be reset. | ||||
| 	d.reader = io.new_buffered_reader(reader: d.socket) | ||||
| } | ||||
| 
 | ||||
| // send_request_with_body sends an HTTP request with the given body. | ||||
| pub fn (mut d DockerConn) send_request_with_body(method http.Method, url urllib.URL, content_type string, body string) ? { | ||||
| 	req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\nContent-Type: $content_type\nContent-Length: $body.len\n\n$body\n\n' | ||||
| 
 | ||||
| 	d.socket.write_string(req)? | ||||
| 
 | ||||
| 	// When starting a new request, the reader needs to be reset. | ||||
| 	d.reader = io.new_buffered_reader(reader: d.socket) | ||||
| } | ||||
| 
 | ||||
| // send_request_with_json<T> is a convenience wrapper around | ||||
| // send_request_with_body that encodes the input as JSON. | ||||
| pub fn (mut d DockerConn) send_request_with_json<T>(method http.Method, url urllib.URL, data &T) ? { | ||||
| 	body := json.encode(data) | ||||
| 
 | ||||
| 	return d.send_request_with_body(method, url, 'application/json', body) | ||||
| } | ||||
| 
 | ||||
| // read_response_head consumes the socket's contents until it encounters | ||||
| // '\r\n\r\n', after which it parses the response as an HTTP response. | ||||
| // Importantly, this function never consumes the reader past the HTTP | ||||
| // separator, so the body can be read fully later on. | ||||
| pub fn (mut d DockerConn) read_response_head() ?http.Response { | ||||
| 	mut res := []u8{} | ||||
| 
 | ||||
| 	for { | ||||
| 		c = s.read(mut buf) or { return error('Failed to read data from socket ${docker.socket}.') } | ||||
| 		res << buf[..c] | ||||
| 	util.read_until_separator(mut d.reader, mut res, docker.http_separator)? | ||||
| 
 | ||||
| 		if c < docker.buf_len { | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// After reading the first part of the response, we parse it into an HTTP | ||||
| 	// response. If it isn't chunked, we return early with the data. | ||||
| 	parsed := http.parse_response(res.bytestr()) or { | ||||
| 		return error('Failed to parse HTTP response from socket ${docker.socket}.') | ||||
| 	} | ||||
| 
 | ||||
| 	if parsed.header.get(http.CommonHeader.transfer_encoding) or { '' } != 'chunked' { | ||||
| 		return parsed | ||||
| 	} | ||||
| 
 | ||||
| 	// We loop until we've encountered the end of the chunked response | ||||
| 	// A chunked HTTP response always ends with '0\r\n\r\n'. | ||||
| 	for res.len < 5 || res#[-5..] != [u8(`0`), `\r`, `\n`, `\r`, `\n`] { | ||||
| 		// Wait for the server to respond | ||||
| 		s.wait_for_write()? | ||||
| 
 | ||||
| 		for { | ||||
| 			c = s.read(mut buf) or { | ||||
| 				return error('Failed to read data from socket ${docker.socket}.') | ||||
| 			} | ||||
| 			res << buf[..c] | ||||
| 
 | ||||
| 			if c < docker.buf_len { | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// Decode chunked response | ||||
| 	return http.parse_response(res.bytestr()) | ||||
| } | ||||
| 
 | ||||
| // request_with_body sends a request to the Docker socket with the given body. | ||||
| fn request_with_body(method string, url urllib.URL, content_type string, body string) ?http.Response { | ||||
| 	req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\nContent-Type: $content_type\nContent-Length: $body.len\n\n$body\n\n' | ||||
| // read_response_body reads `length` bytes from the stream. It can be used when | ||||
| // the response encoding isn't chunked to fully read it. | ||||
| pub fn (mut d DockerConn) read_response_body(length int) ?string { | ||||
| 	if length == 0 { | ||||
| 		return '' | ||||
| 	} | ||||
| 
 | ||||
| 	return send(req) | ||||
| 	mut buf := []u8{len: docker.buf_len} | ||||
| 	mut c := 0 | ||||
| 	mut builder := strings.new_builder(docker.buf_len) | ||||
| 
 | ||||
| 	for builder.len < length { | ||||
| 		c = d.reader.read(mut buf) or { break } | ||||
| 
 | ||||
| 		builder.write(buf[..c])? | ||||
| 	} | ||||
| 
 | ||||
| 	return builder.str() | ||||
| } | ||||
| 
 | ||||
| // request sends a request to the Docker socket with an empty body. | ||||
| fn request(method string, url urllib.URL) ?http.Response { | ||||
| 	req := '$method $url.request_uri() HTTP/1.1\nHost: localhost\n\n' | ||||
| // read_response is a convenience function which always consumes the entire | ||||
| // response & returns it. It should only be used when we're certain that the | ||||
| // result isn't too large. | ||||
| pub fn (mut d DockerConn) read_response() ?(http.Response, string) { | ||||
| 	head := d.read_response_head()? | ||||
| 
 | ||||
| 	return send(req) | ||||
| 	if head.header.get(http.CommonHeader.transfer_encoding) or { '' } == 'chunked' { | ||||
| 		mut builder := strings.new_builder(1024) | ||||
| 		mut body := d.get_chunked_response_reader() | ||||
| 
 | ||||
| 		util.reader_to_writer(mut body, mut builder)? | ||||
| 
 | ||||
| 		return head, builder.str() | ||||
| 	} | ||||
| 
 | ||||
| 	content_length := head.header.get(http.CommonHeader.content_length)?.int() | ||||
| 	res := d.read_response_body(content_length)? | ||||
| 
 | ||||
| 	return head, res | ||||
| } | ||||
| 
 | ||||
| // request_with_json<T> sends a request to the Docker socket with a given JSON | ||||
| // payload | ||||
| pub fn request_with_json<T>(method string, url urllib.URL, data &T) ?http.Response { | ||||
| 	body := json.encode(data) | ||||
| // get_chunked_response_reader returns a ChunkedResponseReader using the socket | ||||
| // as reader. | ||||
| pub fn (mut d DockerConn) get_chunked_response_reader() &ChunkedResponseReader { | ||||
| 	r := new_chunked_response_reader(d.reader) | ||||
| 
 | ||||
| 	return request_with_body(method, url, 'application/json', body) | ||||
| 	return r | ||||
| } | ||||
| 
 | ||||
| // get_stream_format_reader returns a StreamFormatReader using the socket as | ||||
| // reader. | ||||
| pub fn (mut d DockerConn) get_stream_format_reader() &StreamFormatReader { | ||||
| 	r := new_chunked_response_reader(d.reader) | ||||
| 	r2 := new_stream_format_reader(r) | ||||
| 
 | ||||
| 	return r2 | ||||
| } | ||||
|  |  | |||
|  | @ -1,6 +1,6 @@ | |||
| module docker | ||||
| 
 | ||||
| import net.http | ||||
| import net.http { Method } | ||||
| import net.urllib | ||||
| import json | ||||
| 
 | ||||
|  | @ -9,26 +9,53 @@ pub: | |||
| 	id string [json: Id] | ||||
| } | ||||
| 
 | ||||
| // pull_image pulls tries to pull the image for the given image & tag | ||||
| pub fn pull_image(image string, tag string) ?http.Response { | ||||
| 	return request('POST', urllib.parse('/v1.41/images/create?fromImage=$image&tag=$tag')?) | ||||
| } | ||||
| // pull_image pulls the given image:tag. | ||||
| pub fn (mut d DockerConn) pull_image(image string, tag string) ? { | ||||
| 	d.send_request(Method.post, urllib.parse('/v1.41/images/create?fromImage=$image&tag=$tag')?)? | ||||
| 	head := d.read_response_head()? | ||||
| 
 | ||||
| // create_image_from_container creates a new image from a container with the | ||||
| // given repo & tag, given the container's ID. | ||||
| pub fn create_image_from_container(id string, repo string, tag string) ?Image { | ||||
| 	res := request('POST', urllib.parse('/v1.41/commit?container=$id&repo=$repo&tag=$tag')?)? | ||||
| 	if head.status_code != 200 { | ||||
| 		content_length := head.header.get(http.CommonHeader.content_length)?.int() | ||||
| 		body := d.read_response_body(content_length)? | ||||
| 		data := json.decode(DockerError, body)? | ||||
| 
 | ||||
| 	if res.status_code != 201 { | ||||
| 		return error('Failed to create image from container.') | ||||
| 		return error(data.message) | ||||
| 	} | ||||
| 
 | ||||
| 	return json.decode(Image, res.text) or {} | ||||
| 	// Keep reading the body until the pull has completed | ||||
| 	mut body := d.get_chunked_response_reader() | ||||
| 
 | ||||
| 	mut buf := []u8{len: 1024} | ||||
| 
 | ||||
| 	for { | ||||
| 		body.read(mut buf) or { break } | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // remove_image removes the image with the given ID. | ||||
| pub fn remove_image(id string) ?bool { | ||||
| 	res := request('DELETE', urllib.parse('/v1.41/images/$id')?)? | ||||
| // create_image_from_container creates a new image from a container. | ||||
| pub fn (mut d DockerConn) create_image_from_container(id string, repo string, tag string) ?Image { | ||||
| 	d.send_request(Method.post, urllib.parse('/v1.41/commit?container=$id&repo=$repo&tag=$tag')?)? | ||||
| 	head, body := d.read_response()? | ||||
| 
 | ||||
| 	return res.status_code == 200 | ||||
| 	if head.status_code != 201 { | ||||
| 		data := json.decode(DockerError, body)? | ||||
| 
 | ||||
| 		return error(data.message) | ||||
| 	} | ||||
| 
 | ||||
| 	data := json.decode(Image, body)? | ||||
| 
 | ||||
| 	return data | ||||
| } | ||||
| 
 | ||||
| // remove_image removes the image with the given id. | ||||
| pub fn (mut d DockerConn) remove_image(id string) ? { | ||||
| 	d.send_request(Method.delete, urllib.parse('/v1.41/images/$id')?)? | ||||
| 	head, body := d.read_response()? | ||||
| 
 | ||||
| 	if head.status_code != 200 { | ||||
| 		data := json.decode(DockerError, body)? | ||||
| 
 | ||||
| 		return error(data.message) | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,135 @@ | |||
| module docker | ||||
| 
 | ||||
| import io | ||||
| import util | ||||
| import encoding.binary | ||||
| import encoding.hex | ||||
| 
 | ||||
| // ChunkedResponseReader parses an underlying HTTP chunked response, exposing | ||||
| // it as if it was a continuous stream of data. | ||||
| struct ChunkedResponseReader { | ||||
| mut: | ||||
| 	reader              io.BufferedReader | ||||
| 	bytes_left_in_chunk u64 | ||||
| 	started             bool | ||||
| } | ||||
| 
 | ||||
| // new_chunked_response_reader creates a new ChunkedResponseReader on the heap | ||||
| // with the provided reader. | ||||
| pub fn new_chunked_response_reader(reader io.BufferedReader) &ChunkedResponseReader { | ||||
| 	r := &ChunkedResponseReader{ | ||||
| 		reader: reader | ||||
| 	} | ||||
| 
 | ||||
| 	return r | ||||
| } | ||||
| 
 | ||||
| // read satisfies the io.Reader interface. | ||||
| pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int { | ||||
| 	if r.bytes_left_in_chunk == 0 { | ||||
| 		// An io.BufferedReader always returns none if its stream has | ||||
| 		// ended. | ||||
| 		r.bytes_left_in_chunk = r.read_chunk_size()? | ||||
| 	} | ||||
| 
 | ||||
| 	mut c := 0 | ||||
| 
 | ||||
| 	// Make sure we don't read more than we can safely read. This is to avoid | ||||
| 	// the underlying reader from becoming out of sync with our parsing: | ||||
| 	if buf.len > r.bytes_left_in_chunk { | ||||
| 		c = r.reader.read(mut buf[..r.bytes_left_in_chunk])? | ||||
| 	} else { | ||||
| 		c = r.reader.read(mut buf)? | ||||
| 	} | ||||
| 
 | ||||
| 	r.bytes_left_in_chunk -= u64(c) | ||||
| 
 | ||||
| 	return c | ||||
| } | ||||
| 
 | ||||
| // read_chunk_size advances the reader & reads the size of the next HTTP chunk. | ||||
| // This function should only be called if the previous chunk has been | ||||
| // completely consumed. | ||||
| fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 { | ||||
| 	if r.started { | ||||
| 		mut buf := []u8{len: 2} | ||||
| 
 | ||||
| 		// Each chunk ends with a `\r\n` which we want to skip first | ||||
| 		r.reader.read(mut buf)? | ||||
| 	} | ||||
| 
 | ||||
| 	r.started = true | ||||
| 
 | ||||
| 	mut res := []u8{} | ||||
| 	util.read_until_separator(mut r.reader, mut res, http_chunk_separator)? | ||||
| 
 | ||||
| 	// The length of the next chunk is provided as a hexadecimal | ||||
| 	mut num_data := hex.decode(res#[..-2].bytestr())? | ||||
| 
 | ||||
| 	for num_data.len < 8 { | ||||
| 		num_data.insert(0, 0) | ||||
| 	} | ||||
| 
 | ||||
| 	num := binary.big_endian_u64(num_data) | ||||
| 
 | ||||
| 	// This only occurs for the very last chunk, which always reports a size of | ||||
| 	// 0. | ||||
| 	if num == 0 { | ||||
| 		return none | ||||
| 	} | ||||
| 
 | ||||
| 	return num | ||||
| } | ||||
| 
 | ||||
| // StreamFormatReader parses an underlying stream of Docker logs, removing the | ||||
| // header bytes. | ||||
| struct StreamFormatReader { | ||||
| mut: | ||||
| 	reader              ChunkedResponseReader | ||||
| 	bytes_left_in_chunk u32 | ||||
| } | ||||
| 
 | ||||
| // new_stream_format_reader creates a new StreamFormatReader using the given | ||||
| // reader. | ||||
| pub fn new_stream_format_reader(reader ChunkedResponseReader) &StreamFormatReader { | ||||
| 	r := &StreamFormatReader{ | ||||
| 		reader: reader | ||||
| 	} | ||||
| 
 | ||||
| 	return r | ||||
| } | ||||
| 
 | ||||
| // read satisfies the io.Reader interface. | ||||
| pub fn (mut r StreamFormatReader) read(mut buf []u8) ?int { | ||||
| 	if r.bytes_left_in_chunk == 0 { | ||||
| 		r.bytes_left_in_chunk = r.read_chunk_size()? | ||||
| 	} | ||||
| 
 | ||||
| 	mut c := 0 | ||||
| 
 | ||||
| 	if buf.len > r.bytes_left_in_chunk { | ||||
| 		c = r.reader.read(mut buf[..r.bytes_left_in_chunk])? | ||||
| 	} else { | ||||
| 		c = r.reader.read(mut buf)? | ||||
| 	} | ||||
| 
 | ||||
| 	r.bytes_left_in_chunk -= u32(c) | ||||
| 
 | ||||
| 	return c | ||||
| } | ||||
| 
 | ||||
| // read_chunk_size advances the reader & reads the header bytes for the length | ||||
| // of the next chunk. | ||||
| fn (mut r StreamFormatReader) read_chunk_size() ?u32 { | ||||
| 	mut buf := []u8{len: 8} | ||||
| 
 | ||||
| 	r.reader.read(mut buf)? | ||||
| 
 | ||||
| 	num := binary.big_endian_u32(buf[4..]) | ||||
| 
 | ||||
| 	if num == 0 { | ||||
| 		return none | ||||
| 	} | ||||
| 
 | ||||
| 	return num | ||||
| } | ||||
|  | @ -0,0 +1,7 @@ | |||
| This module provides a framework for parsing a configuration, defined as a | ||||
| struct, from both a TOML configuration file & environment variables. Some | ||||
| notable features are: | ||||
| 
 | ||||
| * Overwrite values in config file using environment variables | ||||
| * Allow default values in config struct | ||||
| * Read environment variable value from file | ||||
|  | @ -3,13 +3,19 @@ module env | |||
| import os | ||||
| import toml | ||||
| 
 | ||||
| // The prefix that every environment variable should have | ||||
| const prefix = 'VIETER_' | ||||
| 
 | ||||
| // The suffix an environment variable in order for it to be loaded from a file | ||||
| // instead | ||||
| const file_suffix = '_FILE' | ||||
| const ( | ||||
| 	// The prefix that every environment variable should have | ||||
| 	prefix      = 'VIETER_' | ||||
| 	// The suffix an environment variable in order for it to be loaded from a file | ||||
| 	// instead | ||||
| 	file_suffix = '_FILE' | ||||
| ) | ||||
| 
 | ||||
| // get_env_var tries to read the contents of the given environment variable. It | ||||
| // looks for either `${env.prefix}${field_name.to_upper()}` or | ||||
| // `${env.prefix}${field_name.to_upper()}${env.file_suffix}`, returning the | ||||
| // contents of the file instead if the latter. If both or neither exist, the | ||||
| // function returns an error. | ||||
| fn get_env_var(field_name string) ?string { | ||||
| 	env_var_name := '$env.prefix$field_name.to_upper()' | ||||
| 	env_file_name := '$env.prefix$field_name.to_upper()$env.file_suffix' | ||||
|  |  | |||
|  | @ -31,7 +31,7 @@ fn main() { | |||
| 			logs.cmd(), | ||||
| 		] | ||||
| 	} | ||||
| 
 | ||||
| 	app.setup() | ||||
| 	app.parse(os.args) | ||||
| 	return | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,2 @@ | |||
| This module defines a few useful functions used throughout the codebase that | ||||
| don't specifically fit inside a module. | ||||
|  | @ -0,0 +1,95 @@ | |||
| // Functions for interacting with `io.Reader` & `io.Writer` objects. | ||||
| module util | ||||
| 
 | ||||
| import io | ||||
| import os | ||||
| 
 | ||||
| // reader_to_writer tries to consume the entire reader & write it to the writer. | ||||
| pub fn reader_to_writer(mut reader io.Reader, mut writer io.Writer) ? { | ||||
| 	mut buf := []u8{len: 10 * 1024} | ||||
| 
 | ||||
| 	for { | ||||
| 		bytes_read := reader.read(mut buf) or { break } | ||||
| 		mut bytes_written := 0 | ||||
| 
 | ||||
| 		for bytes_written < bytes_read { | ||||
| 			c := writer.write(buf[bytes_written..bytes_read]) or { break } | ||||
| 
 | ||||
| 			bytes_written += c | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // reader_to_file writes the contents of a BufferedReader to a file | ||||
| pub fn reader_to_file(mut reader io.BufferedReader, length int, path string) ? { | ||||
| 	mut file := os.create(path)? | ||||
| 	defer { | ||||
| 		file.close() | ||||
| 	} | ||||
| 
 | ||||
| 	mut buf := []u8{len: reader_buf_size} | ||||
| 	mut bytes_left := length | ||||
| 
 | ||||
| 	// Repeat as long as the stream still has data | ||||
| 	for bytes_left > 0 { | ||||
| 		// TODO check if just breaking here is safe | ||||
| 		bytes_read := reader.read(mut buf) or { break } | ||||
| 		bytes_left -= bytes_read | ||||
| 
 | ||||
| 		mut to_write := bytes_read | ||||
| 
 | ||||
| 		for to_write > 0 { | ||||
| 			// TODO don't just loop infinitely here | ||||
| 			bytes_written := file.write(buf[bytes_read - to_write..bytes_read]) or { continue } | ||||
| 			// file.flush() | ||||
| 
 | ||||
| 			to_write = to_write - bytes_written | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // match_array_in_array<T> returns how many elements of a2 overlap with a1. For | ||||
| // example, if a1 = "abcd" & a2 = "cd", the result will be 2. If the match is | ||||
| // not at the end of a1, the result is 0. | ||||
| pub fn match_array_in_array<T>(a1 []T, a2 []T) int { | ||||
| 	mut i := 0 | ||||
| 	mut match_len := 0 | ||||
| 
 | ||||
| 	for i + match_len < a1.len { | ||||
| 		if a1[i + match_len] == a2[match_len] { | ||||
| 			match_len += 1 | ||||
| 		} else { | ||||
| 			i += match_len + 1 | ||||
| 			match_len = 0 | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return match_len | ||||
| } | ||||
| 
 | ||||
| // read_until_separator consumes an io.Reader until it encounters some | ||||
| // separator array. The data read is stored inside the provided res array. | ||||
| pub fn read_until_separator(mut reader io.Reader, mut res []u8, sep []u8) ? { | ||||
| 	mut buf := []u8{len: sep.len} | ||||
| 
 | ||||
| 	for { | ||||
| 		c := reader.read(mut buf)? | ||||
| 		res << buf[..c] | ||||
| 
 | ||||
| 		match_len := match_array_in_array(buf[..c], sep) | ||||
| 
 | ||||
| 		if match_len == sep.len { | ||||
| 			break | ||||
| 		} | ||||
| 
 | ||||
| 		if match_len > 0 { | ||||
| 			match_left := sep.len - match_len | ||||
| 			c2 := reader.read(mut buf[..match_left])? | ||||
| 			res << buf[..c2] | ||||
| 
 | ||||
| 			if buf[..c2] == sep[match_len..] { | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | @ -1,13 +1,13 @@ | |||
| module util | ||||
| 
 | ||||
| import os | ||||
| import io | ||||
| import crypto.md5 | ||||
| import crypto.sha256 | ||||
| 
 | ||||
| const reader_buf_size = 1_000_000 | ||||
| 
 | ||||
| const prefixes = ['B', 'KB', 'MB', 'GB'] | ||||
| const ( | ||||
| 	reader_buf_size = 1_000_000 | ||||
| 	prefixes        = ['B', 'KB', 'MB', 'GB'] | ||||
| ) | ||||
| 
 | ||||
| // Dummy struct to work around the fact that you can only share structs, maps & | ||||
| // arrays | ||||
|  | @ -23,34 +23,6 @@ pub fn exit_with_message(code int, msg string) { | |||
| 	exit(code) | ||||
| } | ||||
| 
 | ||||
| // reader_to_file writes the contents of a BufferedReader to a file | ||||
| pub fn reader_to_file(mut reader io.BufferedReader, length int, path string) ? { | ||||
| 	mut file := os.create(path)? | ||||
| 	defer { | ||||
| 		file.close() | ||||
| 	} | ||||
| 
 | ||||
| 	mut buf := []u8{len: util.reader_buf_size} | ||||
| 	mut bytes_left := length | ||||
| 
 | ||||
| 	// Repeat as long as the stream still has data | ||||
| 	for bytes_left > 0 { | ||||
| 		// TODO check if just breaking here is safe | ||||
| 		bytes_read := reader.read(mut buf) or { break } | ||||
| 		bytes_left -= bytes_read | ||||
| 
 | ||||
| 		mut to_write := bytes_read | ||||
| 
 | ||||
| 		for to_write > 0 { | ||||
| 			// TODO don't just loop infinitely here | ||||
| 			bytes_written := file.write(buf[bytes_read - to_write..bytes_read]) or { continue } | ||||
| 			// file.flush() | ||||
| 
 | ||||
| 			to_write = to_write - bytes_written | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // hash_file returns the md5 & sha256 hash of a given file | ||||
| // TODO actually implement sha256 | ||||
| pub fn hash_file(path &string) ?(string, string) { | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue