pg: support for copy sql commands (#9272)
parent
00dedaf6c1
commit
cccca51788
102
vlib/pg/pg.v
102
vlib/pg/pg.v
|
@ -1,11 +1,17 @@
|
|||
module pg
|
||||
|
||||
import io
|
||||
|
||||
#flag -lpq
|
||||
#flag linux -I/usr/include/postgresql
|
||||
#flag darwin -I/opt/local/include/postgresql11
|
||||
#flag windows -I @VROOT/thirdparty/pg/include
|
||||
#flag windows -L @VROOT/thirdparty/pg/win64
|
||||
|
||||
// PostgreSQL Source Code
|
||||
// https://doxygen.postgresql.org/libpq-fe_8h.html
|
||||
#include <libpq-fe.h>
|
||||
|
||||
pub struct DB {
|
||||
mut:
|
||||
conn &C.PGconn
|
||||
|
@ -16,7 +22,8 @@ pub mut:
|
|||
vals []string
|
||||
}
|
||||
|
||||
struct C.PGResult
|
||||
struct C.PGResult {
|
||||
}
|
||||
|
||||
pub struct Config {
|
||||
pub:
|
||||
|
@ -41,12 +48,25 @@ fn C.PQntuples(&C.PGResult) int
|
|||
|
||||
fn C.PQnfields(&C.PGResult) int
|
||||
|
||||
fn C.PQexec(voidptr) &C.PGResult
|
||||
fn C.PQexec(voidptr, byteptr) &C.PGResult
|
||||
|
||||
fn C.PQexecParams(voidptr) &C.PGResult
|
||||
// Params:
|
||||
// const Oid *paramTypes
|
||||
// const char *const *paramValues
|
||||
// const int *paramLengths
|
||||
// const int *paramFormats
|
||||
fn C.PQexecParams(conn voidptr, command byteptr, nParams int, paramTypes int, paramValues byteptr, paramLengths int, paramFormats int, resultFormat int) &C.PGResult
|
||||
|
||||
fn C.PQputCopyData(conn voidptr, buffer byteptr, nbytes int) int
|
||||
|
||||
fn C.PQputCopyEnd(voidptr, int) int
|
||||
|
||||
fn C.PQgetCopyData(conn voidptr, buffer &byteptr, async int) int
|
||||
|
||||
fn C.PQclear(&C.PGResult) voidptr
|
||||
|
||||
fn C.PQfreemem(voidptr)
|
||||
|
||||
fn C.PQfinish(voidptr)
|
||||
|
||||
// connect makes a new connection to the database server using
|
||||
|
@ -63,7 +83,7 @@ pub fn connect(config Config) ?DB {
|
|||
// We force the construction of a new string as the
|
||||
// error message will be freed by the next `PQfinish`
|
||||
// call
|
||||
c_error_msg := unsafe {C.PQerrorMessage(conn).vstring()}
|
||||
c_error_msg := unsafe { C.PQerrorMessage(conn).vstring() }
|
||||
error_msg := '$c_error_msg'
|
||||
C.PQfinish(conn)
|
||||
return error('Connection to a PG database failed: $error_msg')
|
||||
|
@ -82,7 +102,7 @@ fn res_to_rows(res voidptr) []Row {
|
|||
mut row := Row{}
|
||||
for j in 0 .. nr_cols {
|
||||
val := C.PQgetvalue(res, i, j)
|
||||
sval := unsafe {val.vstring()}
|
||||
sval := unsafe { val.vstring() }
|
||||
row.vals << sval
|
||||
}
|
||||
rows << row
|
||||
|
@ -114,7 +134,7 @@ pub fn (db DB) q_int(query string) ?int {
|
|||
return val.int()
|
||||
}
|
||||
|
||||
// q_int submit a command to the database server and
|
||||
// q_string submit a command to the database server and
|
||||
// returns an the first field in the first tuple
|
||||
// as a string. If no row is found or on
|
||||
// command failure, an error is returned
|
||||
|
@ -154,7 +174,7 @@ fn rows_first_or_empty(rows []Row) ?Row {
|
|||
|
||||
pub fn (db DB) exec_one(query string) ?Row {
|
||||
res := C.PQexec(db.conn, query.str)
|
||||
e := unsafe {C.PQerrorMessage(db.conn).vstring()}
|
||||
e := unsafe { C.PQerrorMessage(db.conn).vstring() }
|
||||
if e != '' {
|
||||
return error('pg exec error: "$e"')
|
||||
}
|
||||
|
@ -164,7 +184,7 @@ pub fn (db DB) exec_one(query string) ?Row {
|
|||
|
||||
// exec_param_many executes a query with the provided parameters
|
||||
pub fn (db DB) exec_param_many(query string, params []string) ?[]Row {
|
||||
mut param_vals := []charptr{ len: params.len }
|
||||
mut param_vals := []charptr{len: params.len}
|
||||
for i in 0 .. params.len {
|
||||
param_vals[i] = params[i].str
|
||||
}
|
||||
|
@ -182,10 +202,74 @@ pub fn (db DB) exec_param(query string, param string) ?[]Row {
|
|||
}
|
||||
|
||||
fn (db DB) handle_error_or_result(res voidptr, elabel string) ?[]Row {
|
||||
e := unsafe {C.PQerrorMessage(db.conn).vstring()}
|
||||
e := unsafe { C.PQerrorMessage(db.conn).vstring() }
|
||||
if e != '' {
|
||||
C.PQclear(res)
|
||||
return error('pg $elabel error:\n$e')
|
||||
}
|
||||
return res_to_rows(res)
|
||||
}
|
||||
|
||||
// copy_expert execute COPY commands
|
||||
// https://www.postgresql.org/docs/9.5/libpq-copy.html
|
||||
pub fn (db DB) copy_expert(query string, file io.ReaderWriter) ?int {
|
||||
res := C.PQexec(db.conn, query.str)
|
||||
status := C.PQresultStatus(res)
|
||||
|
||||
defer {
|
||||
C.PQclear(res)
|
||||
}
|
||||
|
||||
e := unsafe { C.PQerrorMessage(db.conn).vstring() }
|
||||
if e != '' {
|
||||
return error('pg copy error:\n$e')
|
||||
}
|
||||
|
||||
if status == C.PGRES_COPY_IN {
|
||||
mut buf := []byte{len: 4 * 1024}
|
||||
for {
|
||||
n := file.read(mut buf) or {
|
||||
msg := 'pg copy error: Failed to read from input'
|
||||
C.PQputCopyEnd(db.conn, msg.str)
|
||||
return err
|
||||
}
|
||||
if n <= 0 {
|
||||
break
|
||||
}
|
||||
|
||||
code := C.PQputCopyData(db.conn, buf.data, n)
|
||||
if code == -1 {
|
||||
return error('pg copy error: Failed to send data, code=$code')
|
||||
}
|
||||
}
|
||||
|
||||
code := C.PQputCopyEnd(db.conn, 0)
|
||||
|
||||
if code != 1 {
|
||||
return error('pg copy error: Failed to finish copy command, code: $code')
|
||||
}
|
||||
} else if status == C.PGRES_COPY_OUT {
|
||||
for {
|
||||
address := byteptr(0)
|
||||
n_bytes := C.PQgetCopyData(db.conn, &address, 0)
|
||||
if n_bytes > 0 {
|
||||
mut local_buf := []byte{len: n_bytes}
|
||||
unsafe { C.memcpy(byteptr(local_buf.data), address, n_bytes) }
|
||||
file.write(local_buf) or {
|
||||
C.PQfreemem(address)
|
||||
return err
|
||||
}
|
||||
} else if n_bytes == -1 {
|
||||
break
|
||||
} else if n_bytes == -2 {
|
||||
// consult PQerrorMessage for the reason
|
||||
return error('pg copy error: read error')
|
||||
}
|
||||
if address != 0 {
|
||||
C.PQfreemem(address)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue