module docker import io import util import encoding.binary import encoding.hex // ChunkedResponseReader parses an underlying HTTP chunked response, exposing // it as if it was a continuous stream of data. struct ChunkedResponseReader { mut: reader io.BufferedReader bytes_left_in_chunk u64 started bool } // new_chunked_response_reader creates a new ChunkedResponseReader on the heap // with the provided reader. pub fn new_chunked_response_reader(reader io.BufferedReader) &ChunkedResponseReader { r := &ChunkedResponseReader{ reader: reader } return r } // read satisfies the io.Reader interface. pub fn (mut r ChunkedResponseReader) read(mut buf []u8) !int { if r.bytes_left_in_chunk == 0 { // An io.BufferedReader always returns none if its stream has // ended. r.bytes_left_in_chunk = r.read_chunk_size()! } mut c := 0 // Make sure we don't read more than we can safely read. This is to avoid // the underlying reader from becoming out of sync with our parsing: if buf.len > r.bytes_left_in_chunk { c = r.reader.read(mut buf[..r.bytes_left_in_chunk])! } else { c = r.reader.read(mut buf)! } r.bytes_left_in_chunk -= u64(c) return c } // read_chunk_size advances the reader & reads the size of the next HTTP chunk. // This function should only be called if the previous chunk has been // completely consumed. fn (mut r ChunkedResponseReader) read_chunk_size() !u64 { if r.started { mut buf := []u8{len: 2} // Each chunk ends with a `\r\n` which we want to skip first r.reader.read(mut buf)! } r.started = true mut res := []u8{} util.read_until_separator(mut r.reader, mut res, http_chunk_separator)! // The length of the next chunk is provided as a hexadecimal mut num_data := hex.decode(res#[..-2].bytestr())! for num_data.len < 8 { num_data.insert(0, 0) } num := binary.big_endian_u64(num_data) // This only occurs for the very last chunk, which always reports a size of // 0. if num == 0 { return error('end of stream') } return num } // StreamFormatReader parses an underlying stream of Docker logs, removing the // header bytes. struct StreamFormatReader { mut: reader ChunkedResponseReader bytes_left_in_chunk u32 } // new_stream_format_reader creates a new StreamFormatReader using the given // reader. pub fn new_stream_format_reader(reader ChunkedResponseReader) &StreamFormatReader { r := &StreamFormatReader{ reader: reader } return r } // read satisfies the io.Reader interface. pub fn (mut r StreamFormatReader) read(mut buf []u8) !int { if r.bytes_left_in_chunk == 0 { r.bytes_left_in_chunk = r.read_chunk_size()! } mut c := 0 if buf.len > r.bytes_left_in_chunk { c = r.reader.read(mut buf[..r.bytes_left_in_chunk])! } else { c = r.reader.read(mut buf)! } r.bytes_left_in_chunk -= u32(c) return c } // read_chunk_size advances the reader & reads the header bytes for the length // of the next chunk. fn (mut r StreamFormatReader) read_chunk_size() !u32 { mut buf := []u8{len: 8} r.reader.read(mut buf)! num := binary.big_endian_u32(buf[4..]) if num == 0 { return error('end of stream') } return num }