docker/stream.v

136 lines
3.1 KiB
V

module docker
import io
import util
import encoding.binary
import encoding.hex
// ChunkedResponseReader parses an underlying HTTP chunked response, exposing
// it as if it was a continuous stream of data.
struct ChunkedResponseReader {
mut:
reader io.BufferedReader
bytes_left_in_chunk u64
started bool
}
// new_chunked_response_reader creates a new ChunkedResponseReader on the heap
// with the provided reader.
pub fn new_chunked_response_reader(reader io.BufferedReader) &ChunkedResponseReader {
r := &ChunkedResponseReader{
reader: reader
}
return r
}
// read satisfies the io.Reader interface.
pub fn (mut r ChunkedResponseReader) read(mut buf []u8) !int {
if r.bytes_left_in_chunk == 0 {
// An io.BufferedReader always returns none if its stream has
// ended.
r.bytes_left_in_chunk = r.read_chunk_size()!
}
mut c := 0
// Make sure we don't read more than we can safely read. This is to avoid
// the underlying reader from becoming out of sync with our parsing:
if buf.len > r.bytes_left_in_chunk {
c = r.reader.read(mut buf[..r.bytes_left_in_chunk])!
} else {
c = r.reader.read(mut buf)!
}
r.bytes_left_in_chunk -= u64(c)
return c
}
// read_chunk_size advances the reader & reads the size of the next HTTP chunk.
// This function should only be called if the previous chunk has been
// completely consumed.
fn (mut r ChunkedResponseReader) read_chunk_size() !u64 {
if r.started {
mut buf := []u8{len: 2}
// Each chunk ends with a `\r\n` which we want to skip first
r.reader.read(mut buf)!
}
r.started = true
mut res := []u8{}
util.read_until_separator(mut r.reader, mut res, http_chunk_separator)!
// The length of the next chunk is provided as a hexadecimal
mut num_data := hex.decode(res#[..-2].bytestr())!
for num_data.len < 8 {
num_data.insert(0, 0)
}
num := binary.big_endian_u64(num_data)
// This only occurs for the very last chunk, which always reports a size of
// 0.
if num == 0 {
return error('end of stream')
}
return num
}
// StreamFormatReader parses an underlying stream of Docker logs, removing the
// header bytes.
struct StreamFormatReader {
mut:
reader ChunkedResponseReader
bytes_left_in_chunk u32
}
// new_stream_format_reader creates a new StreamFormatReader using the given
// reader.
pub fn new_stream_format_reader(reader ChunkedResponseReader) &StreamFormatReader {
r := &StreamFormatReader{
reader: reader
}
return r
}
// read satisfies the io.Reader interface.
pub fn (mut r StreamFormatReader) read(mut buf []u8) !int {
if r.bytes_left_in_chunk == 0 {
r.bytes_left_in_chunk = r.read_chunk_size()!
}
mut c := 0
if buf.len > r.bytes_left_in_chunk {
c = r.reader.read(mut buf[..r.bytes_left_in_chunk])!
} else {
c = r.reader.read(mut buf)!
}
r.bytes_left_in_chunk -= u32(c)
return c
}
// read_chunk_size advances the reader & reads the header bytes for the length
// of the next chunk.
fn (mut r StreamFormatReader) read_chunk_size() !u32 {
mut buf := []u8{len: 8}
r.reader.read(mut buf)!
num := binary.big_endian_u32(buf[4..])
if num == 0 {
return error('end of stream')
}
return num
}