Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabled streaming toots (fix #84) #86

Merged
merged 8 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ Depends:
R (>= 3.6)
Imports:
clipr,
curl,
dplyr,
httr,
jsonlite,
tibble
Suggests:
knitr,
Expand Down
4 changes: 4 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ export(get_timeline_hashtag)
export(get_timeline_home)
export(get_timeline_list)
export(get_timeline_public)
export(parse_stream)
export(post_toot)
export(post_user)
export(search_accounts)
export(stream_timeline_hashtag)
export(stream_timeline_list)
export(stream_timeline_public)
export(verify_credentials)
export(verify_envvar)
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
* possible to set token via environment variable (#68)
* paginating results (#70)
* adding ratelimit checking (#43)
* added pkgdown site(#79)
* added pkgdown site (#79)
* added streaming (#84)

# rtoot 0.1.0

Expand Down
183 changes: 183 additions & 0 deletions R/stream_statuses.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#' Collect live streams of Mastodon data
#' @name stream_timeline
#' @param timeout Integer, Number of seconds to stream toots for. Stream indefinitely with timeout = Inf. The stream can be interrupted at any time, and file_name will still be a valid file.
#' @param local logical, Show only local statuses (either statuses from your instance or the one provided in `instance`)?
#' @param file_name character, name of file. If not specified, will write to a temporary file stream_toots*.json.
#' @param append logical, if TRUE will append to the end of file_name; if FALSE, will overwrite.
#' @param verbose logical whether to display messages
#' @param list_id character, id of list to stream
#' @param hashtag character, hashtag to stream
#' @inheritParams get_instance
#' @details
#' \describe{
#' \item{stream_timeline_public}{stream all public statuses on any instance}
#' \item{stream_timeline_hashtag}{stream all statuses containing a specific hashtag}
#' \item{stream_timeline_list}{stream the statuses of a list}
#' }
#' @export
#' @examples
#' \dontrun{
#' # stream public timeline for 30 seconds
#' stream_timeline_public(timeout = 30,file_name = "public.json")
#' # stream timeline of mastodon.social for 30 seconds
#' stream_timeline_public(timeout = 30, local = TRUE,
#' instance = "mastodon.social", file_name = "social.json")
#'
#' # stream hashtag timeline for 30 seconds
#' stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json")
#' # stream hashtag timeline of mastodon.social for 30 seconds
#' stream_timeline_hashtag("rstats", timeout = 30, local = TRUE,
#' instance = "fosstodon.org", file_name = "rstats_foss.json")
#' # json files can be parsed with parse_stream()
#' parse_stream("rstats_foss.json")
#' }
stream_timeline_public <- function(
timeout = 30,
local = FALSE,
file_name = NULL,
append = TRUE,
instance = NULL,
token = NULL,
anonymous = FALSE,
verbose = TRUE){

path <- "/api/v1/streaming/public"
if(isTRUE(local)){
path <- paste0(path,"/local")
}
params <- list()

quiet_interrupt(
stream_toots(timeout = timeout, file_name = file_name, append = append, token = token,
path = path, params = params, instance = instance, anonymous = anonymous,
verbose = verbose)
)
invisible(NULL)
}

#' @rdname stream_timeline
#' @export
stream_timeline_hashtag <- function(
hashtag = "rstats",
timeout = 30,
local = FALSE,
file_name = NULL,
append = TRUE,
instance = NULL,
token = NULL,
anonymous = FALSE,
verbose = TRUE){

path <- "/api/v1/streaming/hashtag"
if(isTRUE(local)){
path <- paste0(path,"/local")
}
params <- list(tag = gsub("^#+", "", hashtag))

quiet_interrupt(
stream_toots(timeout = timeout, file_name = file_name, append = append, token = token,
path = path, params = params, instance = instance, anonymous = anonymous,
verbose = verbose)
)
invisible(NULL)
}

#' @rdname stream_timeline
#' @export
stream_timeline_list <- function(
list_id,
timeout = 30,
file_name = NULL,
append = TRUE,
instance = NULL,
token = NULL,
anonymous = FALSE,
verbose = TRUE){

path <- "api/v1/streaming/list"
params <- list(list = list_id)

quiet_interrupt(
stream_toots(timeout = timeout, file_name = file_name, append = append, token = token,
path = path, params = params, instance = instance, anonymous = anonymous,
verbose = verbose)
)
invisible(NULL)
}

#' Parser of Mastodon stream
#'
#' Converts Mastodon stream data (JSON file) into parsed tibble.
#' @param path Character, name of JSON file with data collected by any [stream_timeline] function.
#' @export
#' @seealso `stream_timeline_public()`, `stream_timeline_hashtag()`,`stream_timeline_list()`
#' @examples
#' \dontrun{
#' stream_timeline_public(1,file_name = "stream.json")
#' parse_stream("stream.json")
#' }
parse_stream <- function(path){
json <- readLines(path)
tbl <- dplyr::bind_rows(lapply(json,function(x) parse_status(jsonlite::fromJSON(x))))
tbl[order(tbl[["created_at"]]),]
}


stream_toots <- function(timeout,file_name = NULL, append, token, path, params,
instance = NULL, anonymous = FALSE, verbose = TRUE,...){
if (is.null(instance) && anonymous) {
stop("provide either an instance or a token")
}
h <- curl::new_handle(verbose = FALSE)
if (is.null(instance)) {
token <- check_token_rtoot(token)
url <- prepare_url(token$instance)
curl::handle_setheaders(h, "Authorization" = paste0("Bearer ",token$bearer))
} else {
url <- prepare_url(instance)
}

if(is.null(file_name)){
file_name <- tempfile(pattern = "stream_toots", fileext = ".json")
}
sayif(verbose, "Writing to ",file_name)
url <- httr::modify_url(url,path = path,query = params)

stopifnot(is.numeric(timeout), timeout > 0)
stop_time <- Sys.time() + timeout

output <- file(file_name)
con <- curl::curl(url,handle = h)
open(output,open = if (append) "ab" else "b")
open(con = con, "rb", blocking = FALSE)
sayif(verbose,"Streaming toots until ",stop_time)
n_seen <- 0
while(isIncomplete(con) && Sys.time() < stop_time){
buf <- readLines(con,warn = FALSE)
if(length(buf)){
line <- buf[grepl("created_at",buf)] # This seems unstable but rtweet does something similar
line <- gsub("^data:\\s+","",line)
line <- complete_line(line)
writeLines(line,output)
n_seen <- n_seen + length(line)
if (isTRUE(verbose)) {
cat("streamed toots: ",n_seen,"\r")
}
}
}
on.exit({
close(con)
close(output)
})
invisible()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@schochastics Would it be better to return an invisible file_name here (the same for all stream_* functions)? It would be quite useful when the input path (or file_name for stream_*) is NULL (the default). If file_name doesn't get returned, one can probably never get the path of that temp file (unless going through all temp files).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cases like this:

x <- stream_timeline_public(10)

One can never get the file back. (We might implement a parse parameter in the future, akin rtweet::stream_tweets)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah i didnt quite understand what the parsing does in rtweet? Let me try to fix this. We shouldnt loose toots in tempfiles😬

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Their parse is just to return a tibble if TRUE and NULL if FALSE; kind of similar to the parse parameter of our get_* functions. Maybe we can do better to return a tibble if TRUE and the (actual) file_name if FALSE.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does rtweet not loose track of the tmp file? https://github.com/ropensci/rtweet/blob/master/R/stream.R#L83-L88

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok i think i prefer to echo the tmp file name and not return anything. The parsing should be done when reading the json. I will push something later as a suggestion

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@schochastics Sure thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chainsawriot Ok done. tmp file is now only echoed. I encourgae users in the vignette to always set a file_name

}

complete_line <- function(line){
line <- line[grepl("\\}$",line)] #delete incomplete lines
line <- line[line!=""] # delete empty lines
line
}

quiet_interrupt <- function(code) {
tryCatch(code, interrupt = function(e) NULL)
}
19 changes: 19 additions & 0 deletions README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ You can install the development version of rtoot from GitHub:
devtools::install_github("schochastics/rtoot")
```

*(The current dev version has many more features than the current version on CRAN)*

## Authenticate

First you should set up your own credentials (see also `vignette("auth")`)
Expand Down Expand Up @@ -129,6 +131,23 @@ post_toot(status = "my first rtoot #rstats", media="path/to/media",

You can mark the toot as sensitive by setting `sensitive = TRUE` and add a spoiler text with `spoiler_text`.

## Streaming

`rtoot` allows to stream statuses from three different streams.
To get any public status on any instance use `stream_timeline_public()`
```{r, eval = FALSE}
stream_timeline_public(timeout = 30,file_name = "public.json")
```
the timeout parameter is the time in seconds data should be streamed (set to `Inf` for indefinite streaming). If just the local timeline is needed, use `local=TRUE` and set an instance (or use your own provided by the token).

`stream_timeline_hashtag()` streams all statuses containing a specific hashtag

```{r, eval = FALSE}
stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json")
```

The statuses are directly written to file as json. The function `parse_stream()` can be used
to read in and convert a json to a data frame.

## Pagination

Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ You can install the development version of rtoot from GitHub:
devtools::install_github("schochastics/rtoot")
```

*(The current dev version has many more features than the current
version on CRAN)*

## Authenticate

First you should set up your own credentials (see also
Expand Down Expand Up @@ -140,6 +143,31 @@ post_toot(status = "my first rtoot #rstats", media="path/to/media",
You can mark the toot as sensitive by setting `sensitive = TRUE` and add
a spoiler text with `spoiler_text`.

## Streaming

`rtoot` allows to stream statuses from three different streams. To get
any public status on any instance use `stream_timeline_public()`

``` r
stream_timeline_public(timeout = 30,file_name = "public.json")
```

the timeout parameter is the time in seconds data should be streamed
(set to `Inf` for indefinite streaming). If just the local timeline is
needed, use `local=TRUE` and set an instance (or use your own provided
by the token).

`stream_timeline_hashtag()` streams all statuses containing a specific
hashtag

``` r
stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json")
```

The statuses are directly written to file as json. The function
`parse_stream()` can be used to read in and convert a json to a data
frame.

## Pagination

All relevant functions in the package support pagination of results if
Expand Down
23 changes: 23 additions & 0 deletions man/parse_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading