refactor(docker): more tightly integrate streams

pull/183/head
Jef Roosens 2022-05-16 15:36:21 +02:00
parent 850cba6ab9
commit 3c87e60293
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
1 changed files with 11 additions and 23 deletions

View File

@ -9,15 +9,14 @@ import encoding.hex
// it as if it was a continuous stream of data. // it as if it was a continuous stream of data.
struct ChunkedResponseReader { struct ChunkedResponseReader {
mut: mut:
reader io.Reader reader io.BufferedReader
bytes_left_in_chunk u64 bytes_left_in_chunk u64
end_of_stream bool
started bool started bool
} }
// new_chunked_response_reader creates a new ChunkedResponseReader on the heap // new_chunked_response_reader creates a new ChunkedResponseReader on the heap
// with the provided reader. // with the provided reader.
pub fn new_chunked_response_reader(reader io.Reader) &ChunkedResponseReader { pub fn new_chunked_response_reader(reader io.BufferedReader) &ChunkedResponseReader {
r := &ChunkedResponseReader{ r := &ChunkedResponseReader{
reader: reader reader: reader
} }
@ -27,16 +26,10 @@ pub fn new_chunked_response_reader(reader io.Reader) &ChunkedResponseReader {
// read satisfies the io.Reader interface. // read satisfies the io.Reader interface.
pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int { pub fn (mut r ChunkedResponseReader) read(mut buf []u8) ?int {
if r.end_of_stream {
return none
}
if r.bytes_left_in_chunk == 0 { if r.bytes_left_in_chunk == 0 {
// An io.BufferedReader always returns none if its stream has
// ended.
r.bytes_left_in_chunk = r.read_chunk_size()? r.bytes_left_in_chunk = r.read_chunk_size()?
if r.end_of_stream {
return none
}
} }
mut c := 0 mut c := 0
@ -82,7 +75,7 @@ fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 {
// This only occurs for the very last chunk, which always reports a size of // This only occurs for the very last chunk, which always reports a size of
// 0. // 0.
if num == 0 { if num == 0 {
r.end_of_stream = true return none
} }
return num return num
@ -92,14 +85,13 @@ fn (mut r ChunkedResponseReader) read_chunk_size() ?u64 {
// header bytes. // header bytes.
struct StreamFormatReader { struct StreamFormatReader {
mut: mut:
reader io.Reader reader ChunkedResponseReader
bytes_left_in_chunk u32 bytes_left_in_chunk u32
end_of_stream bool
} }
// new_stream_format_reader creates a new StreamFormatReader using the given // new_stream_format_reader creates a new StreamFormatReader using the given
// reader. // reader.
pub fn new_stream_format_reader(reader io.Reader) &StreamFormatReader { pub fn new_stream_format_reader(reader ChunkedResponseReader) &StreamFormatReader {
r := &StreamFormatReader{ r := &StreamFormatReader{
reader: reader reader: reader
} }
@ -109,16 +101,8 @@ pub fn new_stream_format_reader(reader io.Reader) &StreamFormatReader {
// read satisfies the io.Reader interface. // read satisfies the io.Reader interface.
pub fn (mut r StreamFormatReader) read(mut buf []u8) ?int { pub fn (mut r StreamFormatReader) read(mut buf []u8) ?int {
if r.end_of_stream {
return none
}
if r.bytes_left_in_chunk == 0 { if r.bytes_left_in_chunk == 0 {
r.bytes_left_in_chunk = r.read_chunk_size()? r.bytes_left_in_chunk = r.read_chunk_size()?
if r.end_of_stream {
return none
}
} }
mut c := 0 mut c := 0
@ -143,5 +127,9 @@ fn (mut r StreamFormatReader) read_chunk_size() ?u32 {
num := binary.big_endian_u32(buf[4..]) num := binary.big_endian_u32(buf[4..])
if num == 0 {
return none
}
return num return num
} }