Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabled streaming toots (fix #84) #86

Merged
merged 8 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ Depends:
R (>= 3.6)
Imports:
clipr,
curl,
dplyr,
httr,
jsonlite,
tibble
Suggests:
knitr,
Expand Down
4 changes: 4 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ export(get_timeline_hashtag)
export(get_timeline_home)
export(get_timeline_list)
export(get_timeline_public)
export(parse_stream)
export(post_toot)
export(post_user)
export(search_accounts)
export(stream_timeline_hashtag)
export(stream_timeline_list)
export(stream_timeline_public)
export(verify_credentials)
export(verify_envvar)
173 changes: 173 additions & 0 deletions R/stream_statuses.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#' Collect live streams of Mastodon data
#' @name stream_timeline
#' @param timeout Integer, Number of seconds to stream toots for. Stream indefinitely with timeout = Inf. The stream can be interrupted at any time, and file_name will still be a valid file.
#' @param local logical, Show only local statuses (either statuses from your instance or the one provided in `instance`)?
#' @param file_name character, name of file. If not specified, will write to a temporary file stream_toots*.json.
#' @param append logical, if TRUE will append to the end of file_name; if FALSE, will overwrite.
#' @param verbose logical whether to display messages
#' @param list_id character, id of list to stream
#' @param hashtag character, hashtag to stream
#' @inheritParams get_instance
#' @details
#' \describe{
#' \item{stream_timeline_public}{stream all statuses}
#' \item{stream_timeline_hashtag}{stream all statuses containing a specific hashtag }
#' \item{stream_timeline_list}{stream the statuses of a list}
#' }
#' @export
#' @examples
#' \dontrun{
#' # stream public timeline for 30 seconds
#' stream_timeline_public(timeout = 30,file_name = "public.json")
#' # stream timeline of mastodon.social for 30 seconds
#' stream_timeline_public(timeout = 30, local = TRUE,
#' instance = "mastodon.social", file_name = "social.json")
#'
#' # stream hashtag timeline for 30 seconds
#' stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json")
#' # stream hashtag timeline of mastodon.social for 30 seconds
#' stream_timeline_hashtag("rstats", timeout = 30, local = TRUE,
#' instance = "fosstodon.org", file_name = "rstats_foss.json")
#' }
stream_timeline_public <- function(
timeout = 30,
local = FALSE,
file_name = NULL,
append = TRUE,
instance = NULL,
token = NULL,
anonymous = FALSE,
verbose = TRUE){

path <- "/api/v1/streaming/public"
if(isTRUE(local)){
path <- paste0(path,"/local")
}
params <- list()

quiet_interrupt(
stream_toots(timeout,file_name, append, token, path, params, instance, anonymous)
)
invisible(NULL)
}

#' @rdname stream_timeline
#' @export
stream_timeline_hashtag <- function(
hashtag = "rstats",
timeout = 30,
local = FALSE,
file_name = NULL,
append = TRUE,
instance = NULL,
token = NULL,
anonymous = FALSE,
verbose = TRUE){

path <- "/api/v1/streaming/hashtag"
if(isTRUE(local)){
path <- paste0(path,"/local")
}
params <- list(tag = hashtag)

quiet_interrupt(
stream_toots(timeout,file_name, append, token, path, params, instance, anonymous)
)
invisible(NULL)
}

#' @rdname stream_timeline
#' @export
stream_timeline_list <- function(
list_id,
timeout = 30,
file_name = NULL,
append = TRUE,
instance = NULL,
token = NULL,
anonymous = FALSE,
verbose = TRUE){

path <- "api/v1/streaming/list"
params <- list(list = list_id)

quiet_interrupt(
stream_toots(timeout,file_name, append, token, path, params, instance, anonymous)
)
invisible(NULL)
}

#' Parser of Mastodon stream
#'
#' Converts Mastodon stream data (JSON file) into parsed tibble.
#' @param path Character, name of JSON file with data collected by any [stream_timeline] function.
#' @export
#' @seealso `stream_timeline_public()`, `stream_timeline_hashtag()`,`stream_timeline_list()`
#' @examples
#' \dontrun{
#' stream_timeline_public(1,file_name = "stream.json")
#' parse_stream("stream.json")
#' }
parse_stream <- function(path){
json <- readLines(path)
tbl <- dplyr::bind_rows(lapply(json,function(x) parse_status(jsonlite::fromJSON(x))))
tbl[order(tbl[["created_at"]]),]
}


stream_toots <- function(timeout,file_name = NULL, append, token, path, params,
instance = NULL, anonymous = FALSE, verbose = TRUE,...){
if (is.null(instance) && anonymous) {
stop("provide either an instance or a token")
}
h <- curl::new_handle(verbose = FALSE)
if (is.null(instance)) {
token <- check_token_rtoot(token)
url <- prepare_url(token$instance)
curl::handle_setheaders(h, "Authorization" = paste0("Bearer ",token$bearer))
} else {
url <- prepare_url(instance)
}

if(is.null(file_name)){
file_name <- tempfile(pattern = "stream_toots", fileext = ".json")
}

url <- httr::modify_url(url,path = path,query = params)

stopifnot(is.numeric(timeout), timeout > 0)
stop_time <- Sys.time() + timeout

output <- file(file_name)
con <- curl::curl(url,handle = h)
open(output,open = if (append) "ab" else "b")
open(con = con, "rb", blocking = FALSE)
sayif(verbose,"Streaming toots until ",stop_time)
n_seen <- 0
while(isIncomplete(con) && Sys.time() < stop_time){
buf <- readLines(con,warn = FALSE)
if(length(buf)){
line <- buf[grepl("created_at",buf)] # This seems unstable but rtweet does something similar
line <- gsub("^data:\\s+","",line)
line <- complete_line(line)
writeLines(line,output)
n_seen <- n_seen + length(line)
cat("streamed toots: ",n_seen,"\r")
}
}
on.exit({
close(con)
close(output)
})
invisible()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@schochastics Would it be better to return an invisible file_name here (the same for all stream_* functions)? It would be quite useful when the input path (or file_name for stream_*) is NULL (the default). If file_name doesn't get returned, one can probably never get the path of that temp file (unless going through all temp files).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cases like this:

x <- stream_timeline_public(10)

One can never get the file back. (We might implement a parse parameter in the future, akin rtweet::stream_tweets)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah i didnt quite understand what the parsing does in rtweet? Let me try to fix this. We shouldnt loose toots in tempfiles😬

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Their parse is just to return a tibble if TRUE and NULL if FALSE; kind of similar to the parse parameter of our get_* functions. Maybe we can do better to return a tibble if TRUE and the (actual) file_name if FALSE.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does rtweet not loose track of the tmp file? https://github.com/ropensci/rtweet/blob/master/R/stream.R#L83-L88

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok i think i prefer to echo the tmp file name and not return anything. The parsing should be done when reading the json. I will push something later as a suggestion

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@schochastics Sure thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chainsawriot Ok done. tmp file is now only echoed. I encourgae users in the vignette to always set a file_name

}

complete_line <- function(line){
line <- line[grepl("\\}$",line)] #delete incomplete lines
line <- line[line!=""] # delete empty lines
line
}

quiet_interrupt <- function(code) {
tryCatch(code, interrupt = function(e) NULL)
}
23 changes: 23 additions & 0 deletions man/parse_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 89 additions & 0 deletions man/stream_timeline.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.