From cb164902a3fb8ed799015e8ea912b3914f14d4ec Mon Sep 17 00:00:00 2001 From: schochastics Date: Thu, 17 Nov 2022 22:08:46 +0100 Subject: [PATCH 1/8] added stream function outlines --- NAMESPACE | 3 ++ R/stream_statuses.R | 106 +++++++++++++++++++++++++++++++++++++++++ man/stream_timeline.Rd | 63 ++++++++++++++++++++++++ 3 files changed, 172 insertions(+) create mode 100644 R/stream_statuses.R create mode 100644 man/stream_timeline.Rd diff --git a/NAMESPACE b/NAMESPACE index a20b871..ecd4815 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -34,5 +34,8 @@ export(get_timeline_public) export(post_toot) export(post_user) export(search_accounts) +export(stream_timeline_hashtag) +export(stream_timeline_list) +export(stream_timeline_public) export(verify_credentials) export(verify_envvar) diff --git a/R/stream_statuses.R b/R/stream_statuses.R new file mode 100644 index 0000000..9264183 --- /dev/null +++ b/R/stream_statuses.R @@ -0,0 +1,106 @@ +#' Collect live streams of Mastodon data +#' @name stream_timeline +#' @param timeout Integer, Number of seconds to stream toots for. Stream indefinitely with timeout = Inf. The stream can be interrupted at any time, and file_name will still be a valid file. +#' @param local logical, Show only local statuses? +#' @param file_name character, name of file. If not specified, will write to a temporary file stream_toots*.json. +#' @param append logical, if TRUE will append to the end of file_name; if FALSE, will overwrite. +#' @inheritParams get_instance +#' @export +stream_timeline_public <- function( + timeout = 30, + local = FALSE, + file_name = NULL, + append = TRUE, + instance = NULL, + token = NULL, + anonymous = FALSE, + parse = TRUE){ + + path <- "/api/v1/streaming/public" + if(isTRUE(local)){ + path <- paste0(path,"/local") + } + params <- list() +} + +#' @rdname stream_timeline +#' @export +stream_timeline_hashtag <- function( + hashtag = "rstats", + timeout = 30, + local = FALSE, + file_name = NULL, + append = TRUE, + instance = NULL, + token = NULL, + anonymous = FALSE, + parse = TRUE){ + + path <- "/api/v1/streaming/hashtag" + if(isTRUE(local)){ + path <- paste0(path,"/local") + } + params = (tag = hashtag) + +} + +#' @rdname stream_timeline +#' @export +stream_timeline_list <- function( + list_id, + timeout = 30, + file_name = NULL, + append = TRUE, + instance = NULL, + token = NULL, + anonymous = FALSE, + parse = TRUE){ + + path <- "api/v1/streaming/list" + params <- list(list = list_id) +} + + + +make_stream_request <- function(timeout,file_name, append, token, path, params, + instance = NULL, anonymous = FALSE, ...){ + if (is.null(instance) && anonymous) { + stop("provide either an instance or a token") + } + if (is.null(instance)) { + token <- check_token_rtoot(token) + url <- prepare_url(token$instance) + config <- httr::add_headers(Authorization = paste('Bearer', token$bearer)) + } else { + url <- prepare_url(instance) + config = list() + } + if(is.null(file_name)){ + file_name <- tempfile(pattern = "stream_toots", fileext = ".json") + } + url <- httr::modify_url(url,path = path,query = params) + + #TODO: add handle + output <- file(file_name) + con <- curl::curl(url) + open(output,open = "ab") + open(con = con, "rb", blocking = FALSE) + + while(isIncomplete(con)){ #TODO add timeout condition, make sure file is written on interrupt + buf <- readLines(con) + if(length(buf)){ + line <- buf[grepl("created_at",buf)] # This seems unstable but rtweet does something similar + line <- gsub("^data:\\s+","",line) + line <- complete_line(line) + writeLines(line,output) + } + } + close(con) + close(outstream) +} + +complete_line <- function(line){ + line <- line[grepl("\\}$",line)] #only complete lines + line <- line[line!=""] + line +} diff --git a/man/stream_timeline.Rd b/man/stream_timeline.Rd new file mode 100644 index 0000000..17843a5 --- /dev/null +++ b/man/stream_timeline.Rd @@ -0,0 +1,63 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/stream_statuses.R +\name{stream_timeline} +\alias{stream_timeline} +\alias{stream_timeline_public} +\alias{stream_timeline_hashtag} +\alias{stream_timeline_list} +\title{Collect live streams of Mastodon data} +\usage{ +stream_timeline_public( + timeout = 30, + local = FALSE, + file_name = NULL, + append = TRUE, + instance = NULL, + token = NULL, + anonymous = FALSE, + parse = TRUE +) + +stream_timeline_hashtag( + hashtag = "rstats", + timeout = 30, + local = FALSE, + file_name = NULL, + append = TRUE, + instance = NULL, + token = NULL, + anonymous = FALSE, + parse = TRUE +) + +stream_timeline_list( + list_id, + timeout = 30, + file_name = NULL, + append = TRUE, + instance = NULL, + token = NULL, + anonymous = FALSE, + parse = TRUE +) +} +\arguments{ +\item{timeout}{Integer, Number of seconds to stream toots for. Stream indefinitely with timeout = Inf. The stream can be interrupted at any time, and file_name will still be a valid file.} + +\item{local}{logical, Show only local statuses?} + +\item{file_name}{character, name of file. If not specified, will write to a temporary file stream_toots*.json.} + +\item{append}{logical, if TRUE will append to the end of file_name; if FALSE, will overwrite.} + +\item{instance}{character, the server name of the instance where the status is located. If \code{NULL}, the same instance used to obtain the token is used.} + +\item{token}{user bearer token (read from file by default)} + +\item{anonymous}{logical, should the API call be made anonymously? Defaults to TRUE but some instances might need authentication here} + +\item{parse}{logical, if \code{TRUE}, the default, returns a tibble. Use \code{FALSE} to return the "raw" list corresponding to the JSON returned from the Mastodon API.} +} +\description{ +Collect live streams of Mastodon data +} From e5fc4ee00e9a0be39007bf1e0fc5fd1cf17836c5 Mon Sep 17 00:00:00 2001 From: schochastics Date: Thu, 17 Nov 2022 23:50:23 +0100 Subject: [PATCH 2/8] added internal stream functions and a parser --- NAMESPACE | 1 + R/stream_statuses.R | 72 +++++++++++++++++++++++++++++++++--------- man/parse_stream.Rd | 23 ++++++++++++++ man/stream_timeline.Rd | 8 ++--- 4 files changed, 85 insertions(+), 19 deletions(-) create mode 100644 man/parse_stream.Rd diff --git a/NAMESPACE b/NAMESPACE index ecd4815..0fc68d6 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -31,6 +31,7 @@ export(get_timeline_hashtag) export(get_timeline_home) export(get_timeline_list) export(get_timeline_public) +export(parse_stream) export(post_toot) export(post_user) export(search_accounts) diff --git a/R/stream_statuses.R b/R/stream_statuses.R index 9264183..6edd926 100644 --- a/R/stream_statuses.R +++ b/R/stream_statuses.R @@ -4,6 +4,7 @@ #' @param local logical, Show only local statuses? #' @param file_name character, name of file. If not specified, will write to a temporary file stream_toots*.json. #' @param append logical, if TRUE will append to the end of file_name; if FALSE, will overwrite. +#' @param verbose logical whether to display messages #' @inheritParams get_instance #' @export stream_timeline_public <- function( @@ -14,13 +15,22 @@ stream_timeline_public <- function( instance = NULL, token = NULL, anonymous = FALSE, - parse = TRUE){ + verbose = TRUE){ path <- "/api/v1/streaming/public" if(isTRUE(local)){ path <- paste0(path,"/local") } params <- list() + + if(is.null(file_name)){ + file_name <- tempfile(pattern = "stream_toots", fileext = ".json") + } + + quiet_interrupt( + stream_toots(timeout,file_name, append, token, path, params, instance, anonymous) + ) + invisible(NULL) } #' @rdname stream_timeline @@ -34,7 +44,7 @@ stream_timeline_hashtag <- function( instance = NULL, token = NULL, anonymous = FALSE, - parse = TRUE){ + verbose = TRUE){ path <- "/api/v1/streaming/hashtag" if(isTRUE(local)){ @@ -42,6 +52,9 @@ stream_timeline_hashtag <- function( } params = (tag = hashtag) + if(is.null(file_name)){ + file_name <- tempfile(pattern = "stream_toots", fileext = ".json") + } } #' @rdname stream_timeline @@ -54,16 +67,37 @@ stream_timeline_list <- function( instance = NULL, token = NULL, anonymous = FALSE, - parse = TRUE){ + verbose = TRUE){ path <- "api/v1/streaming/list" params <- list(list = list_id) + + if(is.null(file_name)){ + file_name <- tempfile(pattern = "stream_toots", fileext = ".json") + } } +#' Parser of Mastodon stream +#' +#' Converts Mastodon stream data (JSON file) into parsed tibble. +#' @param path Character, name of JSON file with data collected by any [stream_timeline] function. +#' @export +#' @seealso `stream_timeline_public()`, `stream_timeline_hashtag()`,`stream_timeline_list()` +#' @examples +#' \dontrun{ +#' stream_timeline_public(1,file_name = "stream.json") +#' parse_stream("stream.json") +#' } +parse_stream <- function(path){ + json <- readLines(path) + tbl <- dplyr::bind_rows(lapply(json,function(x) parse_status(jsonlite::fromJSON(x)))) + dplyr::arrange(tbl,created_at) + +} -make_stream_request <- function(timeout,file_name, append, token, path, params, - instance = NULL, anonymous = FALSE, ...){ +stream_toots <- function(timeout,file_name = NULL, append, token, path, params, + instance = NULL, anonymous = FALSE, verbose = TRUE,...){ if (is.null(instance) && anonymous) { stop("provide either an instance or a token") } @@ -75,18 +109,19 @@ make_stream_request <- function(timeout,file_name, append, token, path, params, url <- prepare_url(instance) config = list() } - if(is.null(file_name)){ - file_name <- tempfile(pattern = "stream_toots", fileext = ".json") - } + url <- httr::modify_url(url,path = path,query = params) - #TODO: add handle + stopifnot(is.numeric(timeout), timeout > 0) + stop_time <- Sys.time() + timeout + + #TODO: add handle with bearer output <- file(file_name) con <- curl::curl(url) - open(output,open = "ab") + open(output,open = if (append) "ab" else "b") open(con = con, "rb", blocking = FALSE) - while(isIncomplete(con)){ #TODO add timeout condition, make sure file is written on interrupt + while(isIncomplete(con) && Sys.time() < stop_time){ buf <- readLines(con) if(length(buf)){ line <- buf[grepl("created_at",buf)] # This seems unstable but rtweet does something similar @@ -95,12 +130,19 @@ make_stream_request <- function(timeout,file_name, append, token, path, params, writeLines(line,output) } } - close(con) - close(outstream) + on.exit({ + close(con) + close(output) + }) + invisible() } complete_line <- function(line){ - line <- line[grepl("\\}$",line)] #only complete lines - line <- line[line!=""] + line <- line[grepl("\\}$",line)] #delete incomplete lines + line <- line[line!=""] # delete empty lines line } + +quiet_interrupt <- function(code) { + tryCatch(code, interrupt = function(e) NULL) +} diff --git a/man/parse_stream.Rd b/man/parse_stream.Rd new file mode 100644 index 0000000..d097005 --- /dev/null +++ b/man/parse_stream.Rd @@ -0,0 +1,23 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/stream_statuses.R +\name{parse_stream} +\alias{parse_stream} +\title{Parser of Mastodon stream} +\usage{ +parse_stream(path) +} +\arguments{ +\item{path}{Character, name of JSON file with data collected by any \link{stream_timeline} function.} +} +\description{ +Converts Mastodon stream data (JSON file) into parsed tibble. +} +\examples{ +\dontrun{ +stream_timeline_public(1,file_name = "stream.json") +parse_stream("stream.json") +} +} +\seealso{ +\code{stream_timeline_public()}, \code{stream_timeline_hashtag()},\code{stream_timeline_list()} +} diff --git a/man/stream_timeline.Rd b/man/stream_timeline.Rd index 17843a5..a98b67d 100644 --- a/man/stream_timeline.Rd +++ b/man/stream_timeline.Rd @@ -15,7 +15,7 @@ stream_timeline_public( instance = NULL, token = NULL, anonymous = FALSE, - parse = TRUE + verbose = TRUE ) stream_timeline_hashtag( @@ -27,7 +27,7 @@ stream_timeline_hashtag( instance = NULL, token = NULL, anonymous = FALSE, - parse = TRUE + verbose = TRUE ) stream_timeline_list( @@ -38,7 +38,7 @@ stream_timeline_list( instance = NULL, token = NULL, anonymous = FALSE, - parse = TRUE + verbose = TRUE ) } \arguments{ @@ -56,7 +56,7 @@ stream_timeline_list( \item{anonymous}{logical, should the API call be made anonymously? Defaults to TRUE but some instances might need authentication here} -\item{parse}{logical, if \code{TRUE}, the default, returns a tibble. Use \code{FALSE} to return the "raw" list corresponding to the JSON returned from the Mastodon API.} +\item{verbose}{logical whether to display messages} } \description{ Collect live streams of Mastodon data From d03171f08a3cd532f46c60875d60805daa7fc278 Mon Sep 17 00:00:00 2001 From: schochastics Date: Fri, 18 Nov 2022 07:36:45 +0100 Subject: [PATCH 3/8] added curl/jsonlite to Imports and fixed some doc --- DESCRIPTION | 2 ++ R/stream_statuses.R | 27 +++++++++++++++++++++++---- man/stream_timeline.Rd | 26 +++++++++++++++++++++++++- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 2b1b9a9..e33a2e5 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -16,8 +16,10 @@ Depends: R (>= 3.6) Imports: clipr, + curl, dplyr, httr, + jsonlite, tibble Suggests: knitr, diff --git a/R/stream_statuses.R b/R/stream_statuses.R index 6edd926..ded2ce8 100644 --- a/R/stream_statuses.R +++ b/R/stream_statuses.R @@ -1,12 +1,32 @@ #' Collect live streams of Mastodon data #' @name stream_timeline #' @param timeout Integer, Number of seconds to stream toots for. Stream indefinitely with timeout = Inf. The stream can be interrupted at any time, and file_name will still be a valid file. -#' @param local logical, Show only local statuses? +#' @param local logical, Show only local statuses (either statuses from your instance or the one provided in `instance`)? #' @param file_name character, name of file. If not specified, will write to a temporary file stream_toots*.json. #' @param append logical, if TRUE will append to the end of file_name; if FALSE, will overwrite. #' @param verbose logical whether to display messages +#' @param list_id character, id of list to stream +#' @param hashtag character, hashtag to stream #' @inheritParams get_instance +#' @details +#' \describe{ +#' \item{stream_timeline_public}{stream all statuses} +#' \item{stream_timeline_hashtag}{stream all statuses containing a specific hashtag } +#' \item{stream_timeline_list}{stream the statuses of a list} +#' } #' @export +#' @examples +#' \dontrun{ +#' # stream public timeline for 30 seconds +#' stream_timeline_public(timeout = 30) +#' # stream timeline of mastodon.social for 30 seconds +#' stream_timeline_public(timeout = 30, local = TRUE, instance = "mastodon.social") +#' +#' # stream hashtag timeline for 30 seconds +#' stream_timeline_hashtag("rstats", timeout = 30) +#' # stream hashtag timeline of mastodon.social for 30 seconds +#' stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, instance = "fosstodon.org") +#' } stream_timeline_public <- function( timeout = 30, local = FALSE, @@ -91,8 +111,7 @@ stream_timeline_list <- function( parse_stream <- function(path){ json <- readLines(path) tbl <- dplyr::bind_rows(lapply(json,function(x) parse_status(jsonlite::fromJSON(x)))) - dplyr::arrange(tbl,created_at) - + tbl[order(tbl[["created_at"]]),] } @@ -122,7 +141,7 @@ stream_toots <- function(timeout,file_name = NULL, append, token, path, params, open(con = con, "rb", blocking = FALSE) while(isIncomplete(con) && Sys.time() < stop_time){ - buf <- readLines(con) + buf <- readLines(con,warn = FALSE) if(length(buf)){ line <- buf[grepl("created_at",buf)] # This seems unstable but rtweet does something similar line <- gsub("^data:\\s+","",line) diff --git a/man/stream_timeline.Rd b/man/stream_timeline.Rd index a98b67d..a82ee56 100644 --- a/man/stream_timeline.Rd +++ b/man/stream_timeline.Rd @@ -44,7 +44,7 @@ stream_timeline_list( \arguments{ \item{timeout}{Integer, Number of seconds to stream toots for. Stream indefinitely with timeout = Inf. The stream can be interrupted at any time, and file_name will still be a valid file.} -\item{local}{logical, Show only local statuses?} +\item{local}{logical, Show only local statuses (either statuses from your instance or the one provided in \code{instance})?} \item{file_name}{character, name of file. If not specified, will write to a temporary file stream_toots*.json.} @@ -57,7 +57,31 @@ stream_timeline_list( \item{anonymous}{logical, should the API call be made anonymously? Defaults to TRUE but some instances might need authentication here} \item{verbose}{logical whether to display messages} + +\item{hashtag}{character, hashtag to stream} + +\item{list_id}{character, id of list to stream} } \description{ Collect live streams of Mastodon data } +\details{ +\describe{ +\item{stream_timeline_public}{stream all statuses} +\item{stream_timeline_hashtag}{stream all statuses containing a specific hashtag } +\item{stream_timeline_list}{stream the statuses of a list} +} +} +\examples{ +\dontrun{ +# stream public timeline for 30 seconds +stream_timeline_public(timeout = 30) +# stream timeline of mastodon.social for 30 seconds +stream_timeline_public(timeout = 30, local = TRUE, instance = "mastodon.social") + +# stream hashtag timeline for 30 seconds +stream_timeline_hashtag("rstats", timeout = 30) +# stream hashtag timeline of mastodon.social for 30 seconds +stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, instance = "fosstodon.org") +} +} From 132e50dc450c1ac826a1d4d26b9e0240361d587f Mon Sep 17 00:00:00 2001 From: schochastics Date: Fri, 18 Nov 2022 08:26:50 +0100 Subject: [PATCH 4/8] added handle for authorization --- R/stream_statuses.R | 15 +++++++-------- man/stream_timeline.Rd | 8 ++++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/R/stream_statuses.R b/R/stream_statuses.R index ded2ce8..d81ec8e 100644 --- a/R/stream_statuses.R +++ b/R/stream_statuses.R @@ -18,14 +18,14 @@ #' @examples #' \dontrun{ #' # stream public timeline for 30 seconds -#' stream_timeline_public(timeout = 30) +#' stream_timeline_public(timeout = 30,file_name = "public.json") #' # stream timeline of mastodon.social for 30 seconds -#' stream_timeline_public(timeout = 30, local = TRUE, instance = "mastodon.social") +#' stream_timeline_public(timeout = 30, local = TRUE, instance = "mastodon.social",file_name = "social.json") #' #' # stream hashtag timeline for 30 seconds -#' stream_timeline_hashtag("rstats", timeout = 30) +#' stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json") #' # stream hashtag timeline of mastodon.social for 30 seconds -#' stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, instance = "fosstodon.org") +#' stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, instance = "fosstodon.org", file_name = "rstats_foss.json") #' } stream_timeline_public <- function( timeout = 30, @@ -120,13 +120,13 @@ stream_toots <- function(timeout,file_name = NULL, append, token, path, params, if (is.null(instance) && anonymous) { stop("provide either an instance or a token") } + h <- curl::new_handle(verbose = FALSE) if (is.null(instance)) { token <- check_token_rtoot(token) url <- prepare_url(token$instance) - config <- httr::add_headers(Authorization = paste('Bearer', token$bearer)) + curl::handle_setheaders(h, "Authorization" = paste0("Bearer ",token$bearer)) } else { url <- prepare_url(instance) - config = list() } url <- httr::modify_url(url,path = path,query = params) @@ -134,9 +134,8 @@ stream_toots <- function(timeout,file_name = NULL, append, token, path, params, stopifnot(is.numeric(timeout), timeout > 0) stop_time <- Sys.time() + timeout - #TODO: add handle with bearer output <- file(file_name) - con <- curl::curl(url) + con <- curl::curl(url,handle = h) open(output,open = if (append) "ab" else "b") open(con = con, "rb", blocking = FALSE) diff --git a/man/stream_timeline.Rd b/man/stream_timeline.Rd index a82ee56..bf91515 100644 --- a/man/stream_timeline.Rd +++ b/man/stream_timeline.Rd @@ -75,13 +75,13 @@ Collect live streams of Mastodon data \examples{ \dontrun{ # stream public timeline for 30 seconds -stream_timeline_public(timeout = 30) +stream_timeline_public(timeout = 30,file_name = "public.json") # stream timeline of mastodon.social for 30 seconds -stream_timeline_public(timeout = 30, local = TRUE, instance = "mastodon.social") +stream_timeline_public(timeout = 30, local = TRUE, instance = "mastodon.social",file_name = "social.json") # stream hashtag timeline for 30 seconds -stream_timeline_hashtag("rstats", timeout = 30) +stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json") # stream hashtag timeline of mastodon.social for 30 seconds -stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, instance = "fosstodon.org") +stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, instance = "fosstodon.org", file_name = "rstats_foss.json") } } From 886771d3fa6b5837057070d0054844b61fa59dcc Mon Sep 17 00:00:00 2001 From: schochastics Date: Fri, 18 Nov 2022 10:33:45 +0100 Subject: [PATCH 5/8] moved file to stream_toots --- R/stream_statuses.R | 35 +++++++++++++++++++++-------------- man/stream_timeline.Rd | 6 ++++-- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/R/stream_statuses.R b/R/stream_statuses.R index d81ec8e..787b070 100644 --- a/R/stream_statuses.R +++ b/R/stream_statuses.R @@ -20,12 +20,14 @@ #' # stream public timeline for 30 seconds #' stream_timeline_public(timeout = 30,file_name = "public.json") #' # stream timeline of mastodon.social for 30 seconds -#' stream_timeline_public(timeout = 30, local = TRUE, instance = "mastodon.social",file_name = "social.json") +#' stream_timeline_public(timeout = 30, local = TRUE, +#' instance = "mastodon.social", file_name = "social.json") #' #' # stream hashtag timeline for 30 seconds #' stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json") #' # stream hashtag timeline of mastodon.social for 30 seconds -#' stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, instance = "fosstodon.org", file_name = "rstats_foss.json") +#' stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, +#' instance = "fosstodon.org", file_name = "rstats_foss.json") #' } stream_timeline_public <- function( timeout = 30, @@ -43,10 +45,6 @@ stream_timeline_public <- function( } params <- list() - if(is.null(file_name)){ - file_name <- tempfile(pattern = "stream_toots", fileext = ".json") - } - quiet_interrupt( stream_toots(timeout,file_name, append, token, path, params, instance, anonymous) ) @@ -70,11 +68,12 @@ stream_timeline_hashtag <- function( if(isTRUE(local)){ path <- paste0(path,"/local") } - params = (tag = hashtag) + params <- list(tag = hashtag) - if(is.null(file_name)){ - file_name <- tempfile(pattern = "stream_toots", fileext = ".json") - } + quiet_interrupt( + stream_toots(timeout,file_name, append, token, path, params, instance, anonymous) + ) + invisible(NULL) } #' @rdname stream_timeline @@ -92,9 +91,10 @@ stream_timeline_list <- function( path <- "api/v1/streaming/list" params <- list(list = list_id) - if(is.null(file_name)){ - file_name <- tempfile(pattern = "stream_toots", fileext = ".json") - } + quiet_interrupt( + stream_toots(timeout,file_name, append, token, path, params, instance, anonymous) + ) + invisible(NULL) } #' Parser of Mastodon stream @@ -129,6 +129,10 @@ stream_toots <- function(timeout,file_name = NULL, append, token, path, params, url <- prepare_url(instance) } + if(is.null(file_name)){ + file_name <- tempfile(pattern = "stream_toots", fileext = ".json") + } + url <- httr::modify_url(url,path = path,query = params) stopifnot(is.numeric(timeout), timeout > 0) @@ -138,7 +142,8 @@ stream_toots <- function(timeout,file_name = NULL, append, token, path, params, con <- curl::curl(url,handle = h) open(output,open = if (append) "ab" else "b") open(con = con, "rb", blocking = FALSE) - + sayif(verbose,"Streaming toots until ",stop_time) + n_seen <- 0 while(isIncomplete(con) && Sys.time() < stop_time){ buf <- readLines(con,warn = FALSE) if(length(buf)){ @@ -146,6 +151,8 @@ stream_toots <- function(timeout,file_name = NULL, append, token, path, params, line <- gsub("^data:\\s+","",line) line <- complete_line(line) writeLines(line,output) + n_seen <- n_seen + length(line) + cat("streamed toots: ",n_seen,"\r") } } on.exit({ diff --git a/man/stream_timeline.Rd b/man/stream_timeline.Rd index bf91515..6028c27 100644 --- a/man/stream_timeline.Rd +++ b/man/stream_timeline.Rd @@ -77,11 +77,13 @@ Collect live streams of Mastodon data # stream public timeline for 30 seconds stream_timeline_public(timeout = 30,file_name = "public.json") # stream timeline of mastodon.social for 30 seconds -stream_timeline_public(timeout = 30, local = TRUE, instance = "mastodon.social",file_name = "social.json") +stream_timeline_public(timeout = 30, local = TRUE, + instance = "mastodon.social", file_name = "social.json") # stream hashtag timeline for 30 seconds stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json") # stream hashtag timeline of mastodon.social for 30 seconds -stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, instance = "fosstodon.org", file_name = "rstats_foss.json") +stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, + instance = "fosstodon.org", file_name = "rstats_foss.json") } } From 7228b318ba1c40f82d228b313101bf8579f5176b Mon Sep 17 00:00:00 2001 From: chainsawriot Date: Fri, 18 Nov 2022 11:57:02 +0100 Subject: [PATCH 6/8] Make `stream_timeline_hashtag` accept the hash So that it is the same as `get_timeline_hashtag` --- R/stream_statuses.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/stream_statuses.R b/R/stream_statuses.R index 787b070..b8ab385 100644 --- a/R/stream_statuses.R +++ b/R/stream_statuses.R @@ -68,7 +68,7 @@ stream_timeline_hashtag <- function( if(isTRUE(local)){ path <- paste0(path,"/local") } - params <- list(tag = hashtag) + params <- list(tag = gsub("^#+", "", hashtag)) quiet_interrupt( stream_toots(timeout,file_name, append, token, path, params, instance, anonymous) From 02f463d3bc3146d138bb638317260711d1b55d00 Mon Sep 17 00:00:00 2001 From: schochastics Date: Fri, 18 Nov 2022 13:22:02 +0100 Subject: [PATCH 7/8] echoing tmp file, added vignette, and extended README --- NEWS.md | 3 ++- R/stream_statuses.R | 7 +++++-- README.Rmd | 19 +++++++++++++++++ README.md | 28 +++++++++++++++++++++++++ man/stream_timeline.Rd | 6 ++++-- vignettes/stream.Rmd | 47 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 vignettes/stream.Rmd diff --git a/NEWS.md b/NEWS.md index d7c52f5..4bb5227 100644 --- a/NEWS.md +++ b/NEWS.md @@ -3,7 +3,8 @@ * possible to set token via environment variable (#68) * paginating results (#70) * adding ratelimit checking (#43) -* added pkgdown site(#79) +* added pkgdown site (#79) +* added streaming (#84) # rtoot 0.1.0 diff --git a/R/stream_statuses.R b/R/stream_statuses.R index b8ab385..4258148 100644 --- a/R/stream_statuses.R +++ b/R/stream_statuses.R @@ -10,8 +10,8 @@ #' @inheritParams get_instance #' @details #' \describe{ -#' \item{stream_timeline_public}{stream all statuses} -#' \item{stream_timeline_hashtag}{stream all statuses containing a specific hashtag } +#' \item{stream_timeline_public}{stream all public statuses on any instance} +#' \item{stream_timeline_hashtag}{stream all statuses containing a specific hashtag} #' \item{stream_timeline_list}{stream the statuses of a list} #' } #' @export @@ -28,6 +28,8 @@ #' # stream hashtag timeline of mastodon.social for 30 seconds #' stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, #' instance = "fosstodon.org", file_name = "rstats_foss.json") +#' # json files can be parsed with parse_stream() +#' parse_stream("rstats_foss.json") #' } stream_timeline_public <- function( timeout = 30, @@ -131,6 +133,7 @@ stream_toots <- function(timeout,file_name = NULL, append, token, path, params, if(is.null(file_name)){ file_name <- tempfile(pattern = "stream_toots", fileext = ".json") + message("Writting to ",file_name) } url <- httr::modify_url(url,path = path,query = params) diff --git a/README.Rmd b/README.Rmd index 4060c00..ce061eb 100644 --- a/README.Rmd +++ b/README.Rmd @@ -43,6 +43,8 @@ You can install the development version of rtoot from GitHub: devtools::install_github("schochastics/rtoot") ``` +*(The current dev version has many more features than the current version on CRAN)* + ## Authenticate First you should set up your own credentials (see also `vignette("auth")`) @@ -129,6 +131,23 @@ post_toot(status = "my first rtoot #rstats", media="path/to/media", You can mark the toot as sensitive by setting `sensitive = TRUE` and add a spoiler text with `spoiler_text`. +## Streaming + +`rtoot` allows to stream statuses from three different streams. +To get any public status on any instance use `stream_timeline_public()` +```{r, eval = FALSE} +stream_timeline_public(timeout = 30,file_name = "public.json") +``` +the timeout parameter is the time in seconds data should be streamed (set to `Inf` for indefinite streaming). If just the local timeline is needed, use `local=TRUE` and set an instance (or use your own provided by the token). + +`stream_timeline_hashtag()` streams all statuses containing a specific hashtag + +```{r, eval = FALSE} +stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json") +``` + +The statuses are directly written to file as json. The function `parse_stream()` can be used +to read in and convert a json to a data frame. ## Pagination diff --git a/README.md b/README.md index d175bc8..877ffc4 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,9 @@ You can install the development version of rtoot from GitHub: devtools::install_github("schochastics/rtoot") ``` +*(The current dev version has many more features than the current +version on CRAN)* + ## Authenticate First you should set up your own credentials (see also @@ -140,6 +143,31 @@ post_toot(status = "my first rtoot #rstats", media="path/to/media", You can mark the toot as sensitive by setting `sensitive = TRUE` and add a spoiler text with `spoiler_text`. +## Streaming + +`rtoot` allows to stream statuses from three different streams. To get +any public status on any instance use `stream_timeline_public()` + +``` r +stream_timeline_public(timeout = 30,file_name = "public.json") +``` + +the timeout parameter is the time in seconds data should be streamed +(set to `Inf` for indefinite streaming). If just the local timeline is +needed, use `local=TRUE` and set an instance (or use your own provided +by the token). + +`stream_timeline_hashtag()` streams all statuses containing a specific +hashtag + +``` r +stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json") +``` + +The statuses are directly written to file as json. The function +`parse_stream()` can be used to read in and convert a json to a data +frame. + ## Pagination All relevant functions in the package support pagination of results if diff --git a/man/stream_timeline.Rd b/man/stream_timeline.Rd index 6028c27..21da24b 100644 --- a/man/stream_timeline.Rd +++ b/man/stream_timeline.Rd @@ -67,8 +67,8 @@ Collect live streams of Mastodon data } \details{ \describe{ -\item{stream_timeline_public}{stream all statuses} -\item{stream_timeline_hashtag}{stream all statuses containing a specific hashtag } +\item{stream_timeline_public}{stream all public statuses on any instance} +\item{stream_timeline_hashtag}{stream all statuses containing a specific hashtag} \item{stream_timeline_list}{stream the statuses of a list} } } @@ -85,5 +85,7 @@ stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json" # stream hashtag timeline of mastodon.social for 30 seconds stream_timeline_hashtag("rstats", timeout = 30, local = TRUE, instance = "fosstodon.org", file_name = "rstats_foss.json") +# json files can be parsed with parse_stream() +parse_stream("rstats_foss.json") } } diff --git a/vignettes/stream.Rmd b/vignettes/stream.Rmd new file mode 100644 index 0000000..a2ef2ae --- /dev/null +++ b/vignettes/stream.Rmd @@ -0,0 +1,47 @@ +--- +title: "Live streaming toots" +output: rmarkdown::html_vignette +vignette: > + %\VignetteIndexEntry{Live streaming toots} + %\VignetteEngine{knitr::rmarkdown} + %\VignetteEncoding{UTF-8} +--- + +```{r, include = FALSE} +knitr::opts_chunk$set( + collapse = TRUE, + comment = "#>" +) +``` + +```{r setup, eval=FALSE} +library("rtoot") +``` + +The package allows to access three different streams of Mastodon data. +Public timelines via `stream_timeline_public()`, timelines from a given hashtag via `stream_timeline_hashtag()` and timelines from lists via `stream_timeline_list()`. + +## Specifying parameters + +By default, all functions stream statuses for 30 seconds. This can be adjusted via the parameter `timeout`. If set to `Inf`, data is streamed indefinitely. The parameter `file_name` is used to specify to which file the data should be written. If non is provided, a temporary file is created. However, we recommend to always set this parameter explicitely. + +For `stream_timeline_public()` and `stream_timeline_hashtag()`, you can also decide if you want to +stream globally, or from a specific instance. If you want to stream from a specific instance, set `local=TRUE` and set `instance` to the desired instance. If instance is NULL, then the function uses the instance you obtained a token from (see vignette on [authentication](auth.html)). + +## Streaming and parsing + +Once parameters are specified, you can start the desired stream. +Streaming will occupy your current instance of R until the specified time has elapsed or any error occurs. Streaming itself shouldn't be very memory intensive so you can start a new R instance in parallel. + +```{r eval=FALSE} +#stream a minute of all statuses +stream_timeline_public(timeout = 60, file_name = "public.json") + +#stream a minute of all statuses using the rstats hashtag +stream_timeline_public(hashtag = "rstats", timeout = 60, file_name = "public.json") +``` + +If `verbose=TRUE`, the functions will indicate when streaming is supposed to stop and the number of statuses that have been written to file. + +Note that in contrast to `rtweet`, the streaming functions never directly return any data. +This can be done afterwards using `parse_stream()` which reads in the json and converts it to a data frame. Note that this process can take a while depending on the number of statuses in the file. From 3a4a2ce9626587092901eb519eb38e89881ff817 Mon Sep 17 00:00:00 2001 From: chainsawriot Date: Fri, 18 Nov 2022 14:02:21 +0100 Subject: [PATCH 8/8] retify the `verbose` behavior of all `stream_*` functions --- R/stream_statuses.R | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/R/stream_statuses.R b/R/stream_statuses.R index 4258148..7be194b 100644 --- a/R/stream_statuses.R +++ b/R/stream_statuses.R @@ -48,7 +48,9 @@ stream_timeline_public <- function( params <- list() quiet_interrupt( - stream_toots(timeout,file_name, append, token, path, params, instance, anonymous) + stream_toots(timeout = timeout, file_name = file_name, append = append, token = token, + path = path, params = params, instance = instance, anonymous = anonymous, + verbose = verbose) ) invisible(NULL) } @@ -73,7 +75,9 @@ stream_timeline_hashtag <- function( params <- list(tag = gsub("^#+", "", hashtag)) quiet_interrupt( - stream_toots(timeout,file_name, append, token, path, params, instance, anonymous) + stream_toots(timeout = timeout, file_name = file_name, append = append, token = token, + path = path, params = params, instance = instance, anonymous = anonymous, + verbose = verbose) ) invisible(NULL) } @@ -94,7 +98,9 @@ stream_timeline_list <- function( params <- list(list = list_id) quiet_interrupt( - stream_toots(timeout,file_name, append, token, path, params, instance, anonymous) + stream_toots(timeout = timeout, file_name = file_name, append = append, token = token, + path = path, params = params, instance = instance, anonymous = anonymous, + verbose = verbose) ) invisible(NULL) } @@ -133,9 +139,8 @@ stream_toots <- function(timeout,file_name = NULL, append, token, path, params, if(is.null(file_name)){ file_name <- tempfile(pattern = "stream_toots", fileext = ".json") - message("Writting to ",file_name) } - + sayif(verbose, "Writing to ",file_name) url <- httr::modify_url(url,path = path,query = params) stopifnot(is.numeric(timeout), timeout > 0) @@ -155,7 +160,9 @@ stream_toots <- function(timeout,file_name = NULL, append, token, path, params, line <- complete_line(line) writeLines(line,output) n_seen <- n_seen + length(line) - cat("streamed toots: ",n_seen,"\r") + if (isTRUE(verbose)) { + cat("streamed toots: ",n_seen,"\r") + } } } on.exit({