docker/stream.v

136 lines
3.1 KiB
Coq
Raw Normal View History

2022-06-22 09:15:00 +02:00
module docker
2022-06-18 17:59:22 +02:00
import io
import util
import encoding.binary
import encoding.hex
// ChunkedResponseReader parses an underlying HTTP chunked response, exposing
// it as if it was a continuous stream of data.
struct ChunkedResponseReader {
mut:
reader io.BufferedReader
bytes_left_in_chunk u64
started bool
}
// new_chunked_response_reader creates a new ChunkedResponseReader on the heap
// with the provided reader.
pub fn new_chunked_response_reader(reader io.BufferedReader) &ChunkedResponseReader {
r := &ChunkedResponseReader{
reader: reader
}
return r
}
// read satisfies the io.Reader interface.
2022-11-01 19:14:25 +01:00
pub fn (mut r ChunkedResponseReader) read(mut buf []u8) !int {
2022-06-18 17:59:22 +02:00
if r.bytes_left_in_chunk == 0 {
// An io.BufferedReader always returns none if its stream has
// ended.
2022-11-01 19:14:25 +01:00
r.bytes_left_in_chunk = r.read_chunk_size()!
2022-06-18 17:59:22 +02:00
}
mut c := 0
// Make sure we don't read more than we can safely read. This is to avoid
// the underlying reader from becoming out of sync with our parsing:
if buf.len > r.bytes_left_in_chunk {
2022-11-01 19:14:25 +01:00
c = r.reader.read(mut buf[..r.bytes_left_in_chunk])!
2022-06-18 17:59:22 +02:00
} else {
2022-11-01 19:14:25 +01:00
c = r.reader.read(mut buf)!
2022-06-18 17:59:22 +02:00
}
r.bytes_left_in_chunk -= u64(c)
return c
}
// read_chunk_size advances the reader & reads the size of the next HTTP chunk.
// This function should only be called if the previous chunk has been
// completely consumed.
2022-11-01 19:14:25 +01:00
fn (mut r ChunkedResponseReader) read_chunk_size() !u64 {
2022-06-18 17:59:22 +02:00
if r.started {
mut buf := []u8{len: 2}
// Each chunk ends with a `\r\n` which we want to skip first
2022-11-01 19:14:25 +01:00
r.reader.read(mut buf)!
2022-06-18 17:59:22 +02:00
}
r.started = true
mut res := []u8{}
2022-11-01 19:14:25 +01:00
util.read_until_separator(mut r.reader, mut res, http_chunk_separator)!
2022-06-18 17:59:22 +02:00
// The length of the next chunk is provided as a hexadecimal
2022-11-01 19:14:25 +01:00
mut num_data := hex.decode(res#[..-2].bytestr())!
2022-06-18 17:59:22 +02:00
for num_data.len < 8 {
num_data.insert(0, 0)
}
num := binary.big_endian_u64(num_data)
// This only occurs for the very last chunk, which always reports a size of
// 0.
if num == 0 {
2022-11-01 19:14:25 +01:00
return error('end of stream')
2022-06-18 17:59:22 +02:00
}
return num
}
// StreamFormatReader parses an underlying stream of Docker logs, removing the
// header bytes.
struct StreamFormatReader {
mut:
reader ChunkedResponseReader
bytes_left_in_chunk u32
}
// new_stream_format_reader creates a new StreamFormatReader using the given
// reader.
pub fn new_stream_format_reader(reader ChunkedResponseReader) &StreamFormatReader {
r := &StreamFormatReader{
reader: reader
}
return r
}
// read satisfies the io.Reader interface.
2022-11-01 19:14:25 +01:00
pub fn (mut r StreamFormatReader) read(mut buf []u8) !int {
2022-06-18 17:59:22 +02:00
if r.bytes_left_in_chunk == 0 {
2022-11-01 19:14:25 +01:00
r.bytes_left_in_chunk = r.read_chunk_size()!
2022-06-18 17:59:22 +02:00
}
mut c := 0
if buf.len > r.bytes_left_in_chunk {
2022-11-01 19:14:25 +01:00
c = r.reader.read(mut buf[..r.bytes_left_in_chunk])!
2022-06-18 17:59:22 +02:00
} else {
2022-11-01 19:14:25 +01:00
c = r.reader.read(mut buf)!
2022-06-18 17:59:22 +02:00
}
r.bytes_left_in_chunk -= u32(c)
return c
}
// read_chunk_size advances the reader & reads the header bytes for the length
// of the next chunk.
2022-11-01 19:14:25 +01:00
fn (mut r StreamFormatReader) read_chunk_size() !u32 {
2022-06-18 17:59:22 +02:00
mut buf := []u8{len: 8}
2022-11-01 19:14:25 +01:00
r.reader.read(mut buf)!
2022-06-18 17:59:22 +02:00
num := binary.big_endian_u32(buf[4..])
if num == 0 {
2022-11-01 19:14:25 +01:00
return error('end of stream')
2022-06-18 17:59:22 +02:00
}
return num
}