Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3131a37
Allow extra parameters to be passed to dbWriteTable
Jan 31, 2023
61f9f19
Add in possible sf-read method for doltr
n8layman Feb 4, 2023
69b871f
Add error handling when setting crs w/ st_set_crs in case of unkown c…
n8layman Feb 4, 2023
371dcdf
Start at st_write method
n8layman Feb 4, 2023
8119ba5
Strip crs out of sfc after reading to ensure it's converted to wkb wi…
n8layman Feb 4, 2023
0a3166e
Add in patch to dbxInsert that allows sf geometry columns to be inser…
n8layman Feb 6, 2023
ea7ce09
Remove importFrom dbxInsert to test out sf patch
n8layman Feb 6, 2023
40e740b
Update collate field in description file to include new files `sf-rea…
n8layman Feb 6, 2023
ae9c942
Remove testing dbWriteTable left at the end of sf-read.R
n8layman Feb 6, 2023
11b49ee
Add in valuesClause function from dbx to custom dbxInsert.R
n8layman Feb 6, 2023
3bb45d6
Add in valuesClause function from dbx to custom dbxInsert.R take 2
n8layman Feb 6, 2023
f4f0449
Add default to custom dbxInsert show_sql argument
n8layman Feb 6, 2023
b6cfa22
Add sf namespace to st_set_crs
n8layman Feb 6, 2023
208c4a3
Add c("DoltConnection", "character", "sf") dbWriteTable signature to …
n8layman Feb 6, 2023
bbaeb29
Turn off show_sql which was used for debugging.
n8layman Feb 6, 2023
fc28f5c
Remove debug class printing in write-table.R
n8layman Feb 6, 2023
e1edd65
Fix dbxInsert crs handling
n8layman Feb 6, 2023
8fa3422
Fix sf_read crs bug
n8layman Feb 6, 2023
d8f0f6b
Update geom_col logic
n8layman Feb 6, 2023
89e8e87
Add st_read method
n8layman Feb 18, 2023
057275e
Fix collate problem
n8layman Feb 18, 2023
477a6a6
Fix setmethod for st_read
n8layman Feb 18, 2023
4c617c3
Add sf to DESCRIPTION imports
n8layman Feb 18, 2023
5202ecd
Working on importing sf_read from sf
n8layman Feb 18, 2023
9ea584d
Take two on importing sf
n8layman Feb 18, 2023
3282363
Try depends
n8layman Feb 18, 2023
db8f095
Keep on trying
n8layman Feb 18, 2023
4e1dfc0
Try changing NAMESPACE
n8layman Feb 18, 2023
0bb029f
One more NAMESPACE update
n8layman Feb 18, 2023
3c9bae8
Last try for the evening
n8layman Feb 18, 2023
7b6a261
Fix package build issue associated with vignette Rmd
n8layman Feb 21, 2023
ef4f441
Remove stringr dependency by replacing call to stringr::str_split() w…
n8layman Mar 1, 2023
24483d0
Fix bug in dbReadTable(). dbListTableType() call fixed to dbGetTableT…
n8layman Mar 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Imports:
RMariaDB,
R.utils,
rstudioapi,
sf,
utils
Suggests:
covr,
Expand Down Expand Up @@ -72,3 +73,5 @@ Collate:
'state.R'
'utils.R'
'write-table.R'
'dbxInsert.R'
'st-read.R'
3 changes: 3 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ exportMethods(dbSendStatement)
exportMethods(dbUnloadDriver)
exportMethods(dbWriteTable)
exportMethods(show)
exportMethods(st_read)
import(DBI)
import(RMariaDB)
import(sf)
import(methods)
importClassesFrom(RMariaDB,MariaDBConnection)
importClassesFrom(RMariaDB,MariaDBDriver)
Expand Down Expand Up @@ -114,6 +116,7 @@ importFrom(ps,ps_status)
importFrom(ps,ps_terminate)
importFrom(ps,signals)
importFrom(rlang,.data)
importFrom(sf,st_read)
importMethodsFrom(RMariaDB,dbClearResult)
importMethodsFrom(RMariaDB,dbDisconnect)
importMethodsFrom(RMariaDB,dbSendQuery)
Expand Down
104 changes: 104 additions & 0 deletions R/dbxInsert.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# With these changes to dbx I can successfully create a table using
# dbCreate table (so long as I specify field.types ahead of time),
# add records with dbxInsert, pull the table back down as an sf
# object and decode the WKB blob with my version of sf_read.
# Not sure yet if dbWriteTable works.

dbxInsert <- function(conn, table, records, batch_size=NULL, returning=NULL, show_sql = F) {
dbx:::inBatches(records, batch_size, function(batch) {
sql <- insertClause(conn, table, batch)
if(show_sql) message(sql)
dbx:::selectOrExecute(conn, sql, batch, returning=returning)
})
}

insertClause <- function(conn, table, records) {
cols <- colnames(records)

# quote
quoted_table <- dbx:::quoteIdent(conn, table)
quoted_cols <- dbx:::quoteIdent(conn, cols)

cols_sql <- dbx:::colsClause(quoted_cols)
records_sql <- valuesClause(conn, records)
paste0("INSERT INTO ", quoted_table, " (", cols_sql, ") VALUES ", records_sql)
}

valuesClause <- function(conn, records) {
quoted_records <- quoteRecords(conn, records)
rows <- apply(quoted_records, 1, function(x) { paste0(x, collapse=", ") })
paste0("(", rows, ")", collapse=", ")
}

########################################################################
################# PR to dbx would be the following #####################
########################################################################

# Check if column is sf
isSf <- function(col) {
inherits(col, "sf") | inherits(col, "sfc")
}

# Add in gis function call and appropriate literal quoting for sf columns
quoteRecords <- function(conn, records) {
quoted_records <- data.frame(matrix(ncol=0, nrow=nrow(records)))
for (i in 1:ncol(records)) {
col = records[, i, drop=T]
if(isSf(col)) {
crs <- unlist(strsplit(sf::st_crs(col)$input, ":"))[2] |> as.numeric()
if(is.na(crs)) {
col <- DBI::dbQuoteLiteral(conn, castData(conn, col))
col <- paste0("ST_GeomFromWKB(X", DBI::dbQuoteLiteral(conn, col), ")") # Not sure if this is needed. What does mySQL do if EPSG is null or missing?
} else {
sf::st_crs(col) = NA # First drop crs so it doesn't show up in a strange place in the WKB string
col <- DBI::dbQuoteLiteral(conn, castData(conn, col))
col <- paste0("ST_GeomFromWKB(X", DBI::dbQuoteLiteral(conn, col), ",", crs, ")")
}
} else {
col <- DBI::dbQuoteLiteral(conn, castData(conn, col))
}
quoted_records[, i] <- col
}
quoted_records
}

# Add in cast sf columns as wkb
castData <- function(conn, col) {
if (dbx:::isMySQL(conn) || dbx:::isSQLite(conn) || dbx:::isSQLServer(conn)) {
# since no standard for SQLite, store dates and datetimes in the same format as Rails
# store times without dates as strings to keep things simple
if(isSf(col)) {
col <- sf:::db_binary(col)
} else if (dbx:::isDatetime(col)) {
col <- format(col, tz=storageTimeZone(conn), "%Y-%m-%d %H:%M:%OS6")
} else if (dbx:::isDate(col)) {
col <- format(col)
} else if (dbx:::isTime(col)) {
col <- format(col)
}

} else if (dbx:::isPostgres(conn)) {

if (dbx:::isDatetime(col)) {
col <- format(col, tz=storageTimeZone(conn), "%Y-%m-%d %H:%M:%OS6 %Z")
} else if (dbx:::isTime(col)) {
col <- format(col)
} else if (is.logical(col) && dbx:::isRPostgreSQL(conn)) {
col <- as.character(col)
} else if (dbx:::isDate(col) && dbx:::isRPostgreSQL(conn)) {
col <- format(col)
} else if (isBinary(col)) {
if (dbx:::isRPostgreSQL(conn)) {
col <- as.character(lapply(col, function(x) { RPostgreSQL::postgresqlEscapeBytea(conn, x) }))
} else {
# removes AsIs
col <- blob::as.blob(lapply(col, function(x) { x }))
}
} else if (isDifftime(col) && isRPostgres(conn)) {
col <- as.character(col)
}
}

col
}

2 changes: 1 addition & 1 deletion R/read-table.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ setMethod("dbReadTable", c("DoltConnection", "character"),
name <- dbQuoteIdentifier(conn, name)

if (!is.null(as_of)) {
table_type <- dbListTableType(conn, name, as_of)
table_type <- dbGetTableType(conn, name, as_of)
if(!length(table_type)) warning("table does not exist at as_of commit")
if(length(table_type) > 0) {
if(table_type == "VIEW") {
Expand Down
156 changes: 156 additions & 0 deletions R/st-read.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#' Title
#'
#' @param dsn DoltLocalConnection.
#' @param layer character.
#'
#' @return
#' @export
#' @import sf
#'
#' @examples
setMethod("st_read", signature("DoltLocalConnection", "character"),
function(dsn = NULL,
layer = NULL,
query = NULL,
EWKB = TRUE,
quiet = TRUE,
as_tibble = FALSE,
geometry_column = NULL,
...) {
if (is.null(dsn))
stop("no connection provided") # nocov

if (as_tibble && !requireNamespace("tibble", quietly = TRUE)) {
stop("package tibble not available: install first?") # nocov
}

# check that ellipsis contains only what is needed
expe <- setdiff(names(list(...)), names(formals(st_sf)))
if(length(expe) > 0) {
# error, these arguments would be passed to st_sf
suggest <- NULL
if("table" %in% expe){
suggest <- c(suggest, "\nMaybe you should use `layer` rather than `table` ?")
}
pref <- if(length(expe) > 1) "\t *" else ""
stop(
"Unused arguments: ",
if(length(expe) > 1) "\n" else "",
paste(pref, expe, "=", list(...)[expe], collapse = "\n", sep = " "),
suggest,
"\nCheck arguments for `st_sf()` for details.",
call. = FALSE
)
}

# filter expected warnings (for RPostgreSQL driver)
filter_warning <- function(expr, regexp) {
wlist <- NULL
warning_handler <- function(w) {
wlist <<- c(wlist, list(w))
invokeRestart("muffleWarning")
}
msg <- function(x) x$message
out <- withCallingHandlers(expr, warning = warning_handler)
if(!all(grepl(regexp, wlist))) {
lapply(vapply(wlist, msg, character(1)), warning, call. = FALSE) # nocov
}
return(out)
}

# Check layer and query conflict
if (!is.null(layer)) {
if (!is.null(query)) {
warning("You provided both `layer` and `query` arguments,",
" will only use `layer`.", call. = FALSE)
}
# capture warnings from RPostgreSQL package
if (inherits(dsn, "PostgreSQLConnection")) {
tbl <- filter_warning(dbReadTable(dsn, layer), "unrecognized PostgreSQL field type geometry")
} else {
tbl <- dbReadTable(dsn, layer)
}
} else if(is.null(query)) {
stop("Provide either a `layer` or a `query`", call. = FALSE)
} else {
# capture warnings from RPostgreSQL package
if (inherits(dsn, "PostgreSQLConnection")) {
filter_warning(tbl <- dbGetQuery(dsn, query), "unrecognized PostgreSQL field type geometry")
} else {
tbl <- dbGetQuery(dsn, query)
}
}

if (is.null(tbl)) {
stop("Query `", query, "` returned no results.", call. = FALSE) #nocov
}

if (is.null(geometry_column)) {
# scan table for simple features column
geometry_column = sf:::is_geometry_column.default(dsn, tbl)
tbl[geometry_column] <- lapply(tbl[geometry_column], sf:::try_postgis_as_sfc, EWKB = EWKB, conn = dsn)
} else {
if (!all(geometry_column %in% names(tbl))) {
# prepare error message
nm <- names(tbl)
prefix <- ""
new_line <- ""
if(length(nm) > 1) {
prefix <- " *"
new_line <- "\n"
}
stop("Could not find `geometry_column` (\"", paste(geometry_column, collapse = "\", \""), "\") ",
"in column names. Available names are:",
new_line,
paste(prefix, nm, collapse = "\n", sep = " "),
call. = FALSE)
}
tbl[geometry_column] <- lapply(tbl[geometry_column], postgis_as_sfc, EWKB = EWKB, conn = dsn)
}

# if there are no simple features geometries, return a data frame
if (!any(vapply(tbl, inherits, logical(1), "sfc"))) {
# try reading blob columns:
blob_columns = vapply(tbl, inherits, logical(1), "blob")
success = FALSE
for (i in which(blob_columns)) {
message(i)
crs <- lapply(tbl[[i]], function(x) x[1:4])
sfc <- lapply(tbl[[i]], function(x) x[-c(1:4)])

try(sfc <- st_as_sfc(sfc), silent = TRUE)

if (!inherits(sfc, "try-error")) {
if (length(unique(crs)) < 1)
stop(paste("More than one crs found in column:", i))
crs = readBin(crs[[1]],
what = "int",
n = 1,
size = 4L)
st_crs(sfc) = crs

tbl[[i]] = sfc
success = TRUE
}
}
if (!success) {
warning("Could not find a simple features geometry column. Will return a `data.frame`.")
return(tbl)
}
}

x <- st_sf(tbl, ...)

if (!quiet) print(x, n = 0) # nocov

if (as_tibble) {
x <- tibble::new_tibble(x, nrow = nrow(x), class = "sf")
}
return(x)
})

# from: https://stackoverflow.com/questions/70884796/convert-hexadecimal-string-to-bytes-in-r
hex_to_raw <- function(x) {
digits <- strtoi(strsplit(x, "")[[1]], base=16L)
as.raw(bitwShiftL(digits[c(TRUE, FALSE)],4) + digits[c(FALSE, TRUE)])
}
25 changes: 21 additions & 4 deletions R/write-table.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ NULL
#'
#' @param batch_size The number of records to insert in a single SQL statement
#' (defaults to all)
#' @importFrom dbx dbxInsert
#' importFrom dbx dbxInsert. REMOVED BY NCL. CUSTOM SF FRIENDLY dbxInsert
#' @inheritParams DBI::sqlRownamesToColumn
#' @param conn a database connection
#' @param overwrite a logical specifying whether to overwrite an existing table
Expand All @@ -27,13 +27,14 @@ NULL
#' when the connection is closed. For `dbRemoveTable()`, only temporary
#' tables are considered if this argument is set to `TRUE`
#' @param batch_size The number of records to insert in a single statement (defaults to all)
#' @param ... for additional parameters passed on. Not currently used.
#' @export
#' @rdname dolt-write
#' @seealso dolt-read
setMethod("dbWriteTable", c("DoltConnection", "character", "data.frame"),
function(conn, name, value, field.types = NULL, row.names = FALSE,
overwrite = FALSE, append = FALSE, temporary = FALSE,
batch_size = NULL) {
batch_size = NULL, ...) {
if (!is.data.frame(value)) {
stopc("`value` must be data frame")
}
Expand Down Expand Up @@ -72,13 +73,12 @@ setMethod("dbWriteTable", c("DoltConnection", "character", "data.frame"),
found <- FALSE
}


if (overwrite) {
dbRemoveTable(conn, name, temporary = temporary,
fail_if_missing = FALSE)
}

row.names <- compatRowNames(row.names)
# row.names <- compatRowNames(row.names) # Already done on line 42
value <- sqlRownamesToColumn(value, row.names)
value <- factor_to_string(value)

Expand All @@ -103,6 +103,7 @@ setMethod("dbWriteTable", c("DoltConnection", "character", "data.frame"),
)

if (nrow(value) > 0) {

dbxInsert(
conn = conn,
table = name,
Expand All @@ -115,6 +116,22 @@ setMethod("dbWriteTable", c("DoltConnection", "character", "data.frame"),
})


# Have to add this to stop sf's method from taking over
setMethod("dbWriteTable", c("DoltConnection", "character", "sf"),
function(conn, name, value, field.types = NULL, row.names = FALSE,
overwrite = FALSE, append = FALSE, temporary = FALSE,
batch_size = NULL, ...) {

FUN <- selectMethod("dbWriteTable", signature = c("DoltConnection", "character", "data.frame"))

FUN(conn, name, value,
field.types = field.types,
row.names = row.names,
overwrite = overwrite,
append = append,
temporary = temporary,
batch_size = batch_size, ...)
})


# dolt_import_csv <- function(conn = dolt()) {
Expand Down
5 changes: 4 additions & 1 deletion man/dolt-write.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading