Skip to content

Commit

Permalink
Merge pull request #86 from schochastics/streaming
Browse files Browse the repository at this point in the history
Enabled streaming toots (fix #84)
  • Loading branch information
schochastics authored Nov 18, 2022
2 parents 07d813e + 3a4a2ce commit b99d6f1
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 1 deletion.
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ Depends:
R (>= 3.6)
Imports:
clipr,
curl,
dplyr,
httr,
jsonlite,
tibble
Suggests:
knitr,
Expand Down
4 changes: 4 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ export(get_timeline_hashtag)
export(get_timeline_home)
export(get_timeline_list)
export(get_timeline_public)
export(parse_stream)
export(post_toot)
export(post_user)
export(search_accounts)
export(stream_timeline_hashtag)
export(stream_timeline_list)
export(stream_timeline_public)
export(verify_credentials)
export(verify_envvar)
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
* possible to set token via environment variable (#68)
* paginating results (#70)
* adding ratelimit checking (#43)
* added pkgdown site(#79)
* added pkgdown site (#79)
* added streaming (#84)

# rtoot 0.1.0

Expand Down
183 changes: 183 additions & 0 deletions R/stream_statuses.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#' Collect live streams of Mastodon data
#' @name stream_timeline
#' @param timeout Integer, Number of seconds to stream toots for. Stream indefinitely with timeout = Inf. The stream can be interrupted at any time, and file_name will still be a valid file.
#' @param local logical, Show only local statuses (either statuses from your instance or the one provided in `instance`)?
#' @param file_name character, name of file. If not specified, will write to a temporary file stream_toots*.json.
#' @param append logical, if TRUE will append to the end of file_name; if FALSE, will overwrite.
#' @param verbose logical whether to display messages
#' @param list_id character, id of list to stream
#' @param hashtag character, hashtag to stream
#' @inheritParams get_instance
#' @details
#' \describe{
#' \item{stream_timeline_public}{stream all public statuses on any instance}
#' \item{stream_timeline_hashtag}{stream all statuses containing a specific hashtag}
#' \item{stream_timeline_list}{stream the statuses of a list}
#' }
#' @export
#' @examples
#' \dontrun{
#' # stream public timeline for 30 seconds
#' stream_timeline_public(timeout = 30,file_name = "public.json")
#' # stream timeline of mastodon.social for 30 seconds
#' stream_timeline_public(timeout = 30, local = TRUE,
#' instance = "mastodon.social", file_name = "social.json")
#'
#' # stream hashtag timeline for 30 seconds
#' stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json")
#' # stream hashtag timeline of mastodon.social for 30 seconds
#' stream_timeline_hashtag("rstats", timeout = 30, local = TRUE,
#' instance = "fosstodon.org", file_name = "rstats_foss.json")
#' # json files can be parsed with parse_stream()
#' parse_stream("rstats_foss.json")
#' }
stream_timeline_public <- function(
timeout = 30,
local = FALSE,
file_name = NULL,
append = TRUE,
instance = NULL,
token = NULL,
anonymous = FALSE,
verbose = TRUE){

path <- "/api/v1/streaming/public"
if(isTRUE(local)){
path <- paste0(path,"/local")
}
params <- list()

quiet_interrupt(
stream_toots(timeout = timeout, file_name = file_name, append = append, token = token,
path = path, params = params, instance = instance, anonymous = anonymous,
verbose = verbose)
)
invisible(NULL)
}

#' @rdname stream_timeline
#' @export
stream_timeline_hashtag <- function(
hashtag = "rstats",
timeout = 30,
local = FALSE,
file_name = NULL,
append = TRUE,
instance = NULL,
token = NULL,
anonymous = FALSE,
verbose = TRUE){

path <- "/api/v1/streaming/hashtag"
if(isTRUE(local)){
path <- paste0(path,"/local")
}
params <- list(tag = gsub("^#+", "", hashtag))

quiet_interrupt(
stream_toots(timeout = timeout, file_name = file_name, append = append, token = token,
path = path, params = params, instance = instance, anonymous = anonymous,
verbose = verbose)
)
invisible(NULL)
}

#' @rdname stream_timeline
#' @export
stream_timeline_list <- function(
list_id,
timeout = 30,
file_name = NULL,
append = TRUE,
instance = NULL,
token = NULL,
anonymous = FALSE,
verbose = TRUE){

path <- "api/v1/streaming/list"
params <- list(list = list_id)

quiet_interrupt(
stream_toots(timeout = timeout, file_name = file_name, append = append, token = token,
path = path, params = params, instance = instance, anonymous = anonymous,
verbose = verbose)
)
invisible(NULL)
}

#' Parser of Mastodon stream
#'
#' Converts Mastodon stream data (JSON file) into parsed tibble.
#' @param path Character, name of JSON file with data collected by any [stream_timeline] function.
#' @export
#' @seealso `stream_timeline_public()`, `stream_timeline_hashtag()`,`stream_timeline_list()`
#' @examples
#' \dontrun{
#' stream_timeline_public(1,file_name = "stream.json")
#' parse_stream("stream.json")
#' }
parse_stream <- function(path){
json <- readLines(path)
tbl <- dplyr::bind_rows(lapply(json,function(x) parse_status(jsonlite::fromJSON(x))))
tbl[order(tbl[["created_at"]]),]
}


stream_toots <- function(timeout,file_name = NULL, append, token, path, params,
instance = NULL, anonymous = FALSE, verbose = TRUE,...){
if (is.null(instance) && anonymous) {
stop("provide either an instance or a token")
}
h <- curl::new_handle(verbose = FALSE)
if (is.null(instance)) {
token <- check_token_rtoot(token)
url <- prepare_url(token$instance)
curl::handle_setheaders(h, "Authorization" = paste0("Bearer ",token$bearer))
} else {
url <- prepare_url(instance)
}

if(is.null(file_name)){
file_name <- tempfile(pattern = "stream_toots", fileext = ".json")
}
sayif(verbose, "Writing to ",file_name)
url <- httr::modify_url(url,path = path,query = params)

stopifnot(is.numeric(timeout), timeout > 0)
stop_time <- Sys.time() + timeout

output <- file(file_name)
con <- curl::curl(url,handle = h)
open(output,open = if (append) "ab" else "b")
open(con = con, "rb", blocking = FALSE)
sayif(verbose,"Streaming toots until ",stop_time)
n_seen <- 0
while(isIncomplete(con) && Sys.time() < stop_time){
buf <- readLines(con,warn = FALSE)
if(length(buf)){
line <- buf[grepl("created_at",buf)] # This seems unstable but rtweet does something similar
line <- gsub("^data:\\s+","",line)
line <- complete_line(line)
writeLines(line,output)
n_seen <- n_seen + length(line)
if (isTRUE(verbose)) {
cat("streamed toots: ",n_seen,"\r")
}
}
}
on.exit({
close(con)
close(output)
})
invisible()
}

complete_line <- function(line){
line <- line[grepl("\\}$",line)] #delete incomplete lines
line <- line[line!=""] # delete empty lines
line
}

quiet_interrupt <- function(code) {
tryCatch(code, interrupt = function(e) NULL)
}
19 changes: 19 additions & 0 deletions README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ You can install the development version of rtoot from GitHub:
devtools::install_github("schochastics/rtoot")
```

*(The current dev version has many more features than the current version on CRAN)*

## Authenticate

First you should set up your own credentials (see also `vignette("auth")`)
Expand Down Expand Up @@ -129,6 +131,23 @@ post_toot(status = "my first rtoot #rstats", media="path/to/media",

You can mark the toot as sensitive by setting `sensitive = TRUE` and add a spoiler text with `spoiler_text`.

## Streaming

`rtoot` allows to stream statuses from three different streams.
To get any public status on any instance use `stream_timeline_public()`
```{r, eval = FALSE}
stream_timeline_public(timeout = 30,file_name = "public.json")
```
the timeout parameter is the time in seconds data should be streamed (set to `Inf` for indefinite streaming). If just the local timeline is needed, use `local=TRUE` and set an instance (or use your own provided by the token).

`stream_timeline_hashtag()` streams all statuses containing a specific hashtag

```{r, eval = FALSE}
stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json")
```

The statuses are directly written to file as json. The function `parse_stream()` can be used
to read in and convert a json to a data frame.

## Pagination

Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ You can install the development version of rtoot from GitHub:
devtools::install_github("schochastics/rtoot")
```

*(The current dev version has many more features than the current
version on CRAN)*

## Authenticate

First you should set up your own credentials (see also
Expand Down Expand Up @@ -140,6 +143,31 @@ post_toot(status = "my first rtoot #rstats", media="path/to/media",
You can mark the toot as sensitive by setting `sensitive = TRUE` and add
a spoiler text with `spoiler_text`.

## Streaming

`rtoot` allows to stream statuses from three different streams. To get
any public status on any instance use `stream_timeline_public()`

``` r
stream_timeline_public(timeout = 30,file_name = "public.json")
```

the timeout parameter is the time in seconds data should be streamed
(set to `Inf` for indefinite streaming). If just the local timeline is
needed, use `local=TRUE` and set an instance (or use your own provided
by the token).

`stream_timeline_hashtag()` streams all statuses containing a specific
hashtag

``` r
stream_timeline_hashtag("rstats", timeout = 30, file_name = "rstats_public.json")
```

The statuses are directly written to file as json. The function
`parse_stream()` can be used to read in and convert a json to a data
frame.

## Pagination

All relevant functions in the package support pagination of results if
Expand Down
23 changes: 23 additions & 0 deletions man/parse_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b99d6f1

Please sign in to comment.