Skip to content

Commit

Permalink
response streaming v2
Browse files Browse the repository at this point in the history
  • Loading branch information
guzba committed Oct 6, 2024
1 parent aa226d6 commit 2c1bf99
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 54 deletions.
261 changes: 207 additions & 54 deletions src/curly.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ when not compileOption("threads"):
{.error: "Using --threads:on is required by Curly.".}

import std/strutils, std/locks, std/random, webby/httpheaders, zippy,
std/deques, std/tables, std/options
std/deques, std/tables, std/options, std/times

import libcurl except Option

Expand Down Expand Up @@ -75,7 +75,15 @@ type

WaitGroup = ptr WaitGroupObj

ResponseStreamState = object
lock: Lock
cond: Cond
signaledAfterHeaders: bool
lastUpdate: int
done: bool

RequestWrapObj = object
curl: Curly
verb: string
url: string
headers: HttpHeaders
Expand All @@ -92,10 +100,18 @@ type
responseBodyForLibcurl: string
easyHandle: PCurl
response: Response
streamState: Option[ResponseStreamState]
error: string

RequestWrap = ptr RequestWrapObj

ResponseStream = object
code*: int ## HTTP status code of the response
url*: string ## Final URL, after any redirects
headers*: HttpHeaders
request*: RequestInfo ## Info about the request this response is for
internal: RequestWrap

CurlyObj* = object
lock: Lock
cond: Cond
Expand Down Expand Up @@ -137,6 +153,10 @@ proc destroy(waitGroup: WaitGroup) =
`=destroy`(waitGroup[])
deallocShared(waitGroup)

proc destroy(rw: RequestWrap) {.gcsafe.} =
`=destroy`(rw[])
deallocShared(rw)

{.push stackTrace: off.}

proc curlHeaderWriteFn(
Expand All @@ -158,11 +178,32 @@ proc curlBodyWriteFn(
count: int,
outstream: pointer
): int {.cdecl.} =
let
request = cast[RequestWrap](outstream)
i = request.responseBodyForLibcurl.len
request.responseBodyForLibcurl.setLen(i + count)
copyMem(request.responseBodyForLibcurl[i].addr, buffer, count)
let request = cast[RequestWrap](outstream)
if request.streamState.isSome:
if not request.streamstate.get.signaledAfterHeaders:
discard request.easyHandle.easy_setopt(OPT_TIMEOUT, 0) # Remove timeout
let tmp4 = allocShared0(4)
discard request.easyHandle.easy_getinfo(INFO_RESPONSE_CODE, tmp4)
var httpCode: uint32
copyMem(httpCode.addr, tmp4, 4)
deallocShared(tmp4)
request.response.code = httpCode.int
let tmpcstring = cast[ptr cstring](allocShared0(sizeof(cstring)))
discard request.easyHandle.easy_getinfo(INFO_EFFECTIVE_URL, tmpcstring)
request.response.url = $tmpcstring[]
deallocShared(tmpcstring)
request.waitGroup.done()
request.streamstate.get.signaledAfterHeaders = true
withLock request.streamState.get.lock:
let i = request.responseBodyForLibcurl.len
request.responseBodyForLibcurl.setLen(i + count)
copyMem(request.responseBodyForLibcurl[i].addr, buffer, count)
request.streamState.get.lastUpdate = epochTime().int
signal(request.streamState.get.cond)
else:
let i = request.responseBodyForLibcurl.len
request.responseBodyForLibcurl.setLen(i + count)
copyMem(request.responseBodyForLibcurl[i].addr, buffer, count)
result = size * count

{.pop.}
Expand Down Expand Up @@ -214,7 +255,9 @@ proc threadProc(curl: Curly) {.raises: [].} =
tmp4 = allocShared0(4)
tmpcstring = cast[ptr cstring](allocShared0(sizeof(cstring)))

var dequeued: seq[RequestWrap]
var
dequeued: seq[RequestWrap]
timeouts: seq[RequestWrap]
while true:
if curl.availableEasyHandles.len > 0:
withLock curl.lock:
Expand All @@ -226,61 +269,73 @@ proc threadProc(curl: Curly) {.raises: [].} =
dequeued.add(curl.queue.popFirst())

for request in dequeued:
let easyHandle = curl.availableEasyHandles.popFirst()
if request.easyHandle != nil:
# This request was added to the queue to be canceled / freed
discard multi_remove_handle(curl.multiHandle, request.easyHandle)
curl.inFlight.del(request.easyHandle)
easy_reset(request.easyHandle)
curl.availableEasyHandles.addLast(request.easyHandle)
deinitLock(request.streamState.get.lock)
deinitCond(request.streamState.get.cond)
destroy request.waitGroup
destroy request
echo "FREED"
else:
let easyHandle = curl.availableEasyHandles.popFirst()

request.easyHandle = easyHandle
request.easyHandle = easyHandle

discard easyHandle.easy_setopt(OPT_URL, request.url.cstring)
discard easyHandle.easy_setopt(OPT_CUSTOMREQUEST, request.verb.cstring)
discard easyHandle.easy_setopt(OPT_TIMEOUT, request.timeout)
discard easyHandle.easy_setopt(OPT_URL, request.url.cstring)
discard easyHandle.easy_setopt(OPT_CUSTOMREQUEST, request.verb.cstring)
discard easyHandle.easy_setopt(OPT_TIMEOUT, request.timeout)

# Set CURLOPT_PIPEWAIT
discard easyHandle.easy_setopt(cast[libcurl.Option](237), 1)
# Set CURLOPT_PIPEWAIT
discard easyHandle.easy_setopt(cast[libcurl.Option](237), 1)

discard easyHandle.easy_setopt(OPT_HTTPHEADER, request.pslistForLibcurl)
discard easyHandle.easy_setopt(OPT_HTTPHEADER, request.pslistForLibcurl)

if cmpIgnoreCase(request.verb, "HEAD") == 0:
discard easyHandle.easy_setopt(OPT_NOBODY, 1)
elif cmpIgnoreCase(request.verb, "POST") == 0 or request.bodyLen > 0:
discard easyHandle.easy_setopt(OPT_POSTFIELDSIZE, request.bodyLen)
if request.bodyLen > 0:
discard easyHandle.easy_setopt(OPT_POSTFIELDS, request.body)
if cmpIgnoreCase(request.verb, "HEAD") == 0:
discard easyHandle.easy_setopt(OPT_NOBODY, 1)
elif cmpIgnoreCase(request.verb, "POST") == 0 or request.bodyLen > 0:
discard easyHandle.easy_setopt(OPT_POSTFIELDSIZE, request.bodyLen)
if request.bodyLen > 0:
discard easyHandle.easy_setopt(OPT_POSTFIELDS, request.body)

# Follow up to 10 redirects
discard easyHandle.easy_setopt(OPT_FOLLOWLOCATION, 1)
discard easyHandle.easy_setopt(OPT_MAXREDIRS, 10)
# Follow up to 10 redirects
discard easyHandle.easy_setopt(OPT_FOLLOWLOCATION, 1)
discard easyHandle.easy_setopt(OPT_MAXREDIRS, 10)

# https://curl.se/libcurl/c/threadsafe.html
discard easyHandle.easy_setopt(OPT_NOSIGNAL, 1)
# https://curl.se/libcurl/c/threadsafe.html
discard easyHandle.easy_setopt(OPT_NOSIGNAL, 1)

when defined(windows):
# CURLOPT_SSL_OPTIONS, CURLSSLOPT_NATIVE_CA
discard easyHandle.easy_setopt(cast[libcurl.Option](216), 1 shl 4)
when defined(windows):
# CURLOPT_SSL_OPTIONS, CURLSSLOPT_NATIVE_CA
discard easyHandle.easy_setopt(cast[libcurl.Option](216), 1 shl 4)

# Setup writers
discard easyHandle.easy_setopt(OPT_HEADERDATA, request)
discard easyHandle.easy_setopt(OPT_HEADERFUNCTION, curlHeaderWriteFn)
discard easyHandle.easy_setopt(OPT_WRITEDATA, request)
discard easyHandle.easy_setopt(OPT_WRITEFUNCTION, curlBodyWriteFn)
# Setup writers
discard easyHandle.easy_setopt(OPT_HEADERDATA, request)
discard easyHandle.easy_setopt(OPT_HEADERFUNCTION, curlHeaderWriteFn)
discard easyHandle.easy_setopt(OPT_WRITEDATA, request)
discard easyHandle.easy_setopt(OPT_WRITEFUNCTION, curlBodyWriteFn)

let mc = multi_add_handle(curl.multiHandle, easyHandle)
if mc == M_OK:
curl.inFlight[easyHandle] = request
else:
# Reset this easy_handle and add it back as available
easy_reset(easyHandle)
curl.availableEasyHandles.addLast(easyHandle)
let mc = multi_add_handle(curl.multiHandle, easyHandle)
if mc == M_OK:
curl.inFlight[easyHandle] = request
else:
# Reset this easy_handle and add it back as available
easy_reset(easyHandle)
curl.availableEasyHandles.addLast(easyHandle)

# Set the error so an exception is raised
request.error = "Unexpected libcurl multi_add_handle error: " &
$mc & ' ' & $multi_strerror(mc)
# Set the error so an exception is raised
request.error = "Unexpected libcurl multi_add_handle error: " &
$mc & ' ' & $multi_strerror(mc)

if request.waitGroup != nil:
request.waitGroup.done()
else:
withLock curl.lock:
curl.requestsCompleted.addLast(request)
signal(curl.requestCompletedCond)
if request.waitGroup != nil:
request.waitGroup.done()
else:
withLock curl.lock:
curl.requestsCompleted.addLast(request)
signal(curl.requestCompletedCond)

dequeued.setLen(0) # Reset for next loop

Expand Down Expand Up @@ -342,13 +397,35 @@ proc threadProc(curl: Curly) {.raises: [].} =
easy_reset(m.easy_handle)
curl.availableEasyHandles.addLast(m.easy_handle)

if request.streamState.isSome:
withLock request.streamState.get.lock:
request.streamState.get.done = true
signal(request.streamState.get.cond)

if request.waitGroup != nil:
request.waitGroup.done()
else:
withLock curl.lock:
curl.requestsCompleted.addLast(request)
signal(curl.requestCompletedCond)

let now = epochTime().int
for _, request in curl.inFlight:
if request.streamState.isSome and request.streamState.get.lastUpdate > 0:
let secondsSinceLastUpdate = now - request.streamState.get.lastUpdate
if secondsSinceLastUpdate >= request.timeout and request.timeout > 0:
timeouts.add(request)

if timeouts.len > 0:
for request in timeouts:
discard multi_remove_handle(curl.multiHandle, request.easyHandle)
curl.inFlight.del(request.easyHandle)
request.error = "Timeout was reached " & request.verb & ' ' & request.url
easy_reset(request.easyHandle)
curl.availableEasyHandles.addLast(request.easyHandle)
signal(request.streamState.get.cond)
timeouts.setLen(0)

if numRunningHandles == 0:
# Sleep if there are no running handles and the queue is empty
{.gcsafe.}:
Expand Down Expand Up @@ -397,10 +474,6 @@ proc close*(curl: Curly) =
`=destroy`(curl[])
deallocShared(curl)

proc destroy(rw: RequestWrap) {.gcsafe.} =
`=destroy`(rw[])
deallocShared(rw)

proc hasRequests*(curl: Curly): bool {.gcsafe.} =
## Returns true if there are requests in-flight or queued.
withLock curl.lock:
Expand Down Expand Up @@ -429,6 +502,83 @@ proc clearQueue*(curl: Curly) {.gcsafe.} =
else:
curl.requestsCompleted.addLast(rw)

proc request*(
curl: Curly,
verb: sink string,
url: sink string,
headers: sink HttpHeaders = emptyHttpHeaders(),
body: openarray[char] = "".toOpenArray(0, -1),
timeout = 60
): ResponseStream {.gcsafe.} =
let rw = cast[RequestWrap](allocShared0(sizeof(RequestWrapObj)))
rw.curl = curl
rw.verb = move verb
rw.url = move url
rw.headers = move headers
if body.len > 0:
rw.body = body[0].unsafeAddr
rw.bodyLen = body.len
rw.timeout = timeout
rw.waitGroup = newWaitGroup(1)

rw.prepHeadersForLibcurl()

rw.streamState = some(ResponseStreamState())
initLock(rw.streamState.get.lock)
initCond(rw.streamState.get.cond)

withLock curl.lock:
curl.queue.addLast(rw)
signal(curl.cond)

rw.waitGroup.wait()

if rw.error == "":
result.code = rw.response.code
result.url = rw.response.url
result.request.verb = rw.verb
result.request.url = rw.url
addHeaders(result.headers, rw.responseHeadersForLibcurl)
result.internal = rw
else:
deinitLock(rw.streamState.get.lock)
deinitCond(rw.streamState.get.cond)
destroy rw.waitGroup
destroy rw
raise newException(CatchableError, move rw.error)

proc read*(stream: ResponseStream, buffer: var string): int {.gcsafe.} =
var tmp, error: string
acquire(stream.internal.streamState.get.lock)
while not stream.internal.streamState.get.done and
stream.internal.responseBodyForLibcurl == "" and
stream.internal.error == "":
wait(stream.internal.streamState.get.cond, stream.internal.streamState.get.lock)
tmp = move stream.internal.responseBodyForLibcurl
error = stream.internal.error
release(stream.internal.streamState.get.lock)

if error == "":
result = tmp.len
if tmp != "":
if buffer.len > 0:
let i = buffer.len
buffer.setLen(buffer.len + tmp.len)
copyMem(
buffer[i].addr,
tmp.cstring,
tmp.len
)
else:
buffer = move tmp
else:
raise newException(CatchableError, move error)

proc close*(stream: ResponseStream) {.gcsafe.} =
withLock stream.internal.curl.lock:
stream.internal.curl.queue.addLast(stream.internal)
signal(stream.internal.curl.cond)

proc makeRequest*(
curl: Curly,
verb: sink string,
Expand All @@ -441,6 +591,7 @@ proc makeRequest*(
## a response cannot be received (due to timeout, DNS failure, broken
## connection, etc), an exception is raised.
let rw = cast[RequestWrap](allocShared0(sizeof(RequestWrapObj)))
rw.curl = curl
rw.verb = move verb
rw.url = move url
rw.headers = move headers
Expand Down Expand Up @@ -563,6 +714,7 @@ proc makeRequests*(
var wrapped: seq[RequestWrap]
for request in batch.requests:
let rw = cast[RequestWrap](allocShared0(sizeof(RequestWrapObj)))
rw.curl = curl
rw.verb = request.verb
rw.url = request.url
rw.headers = request.headers
Expand Down Expand Up @@ -682,6 +834,7 @@ proc startRequests*(
var wrapped: seq[RequestWrap]
for request in batch.requests.mitems:
let rw = cast[RequestWrap](allocShared0(sizeof(RequestWrapObj)))
rw.curl = curl
rw.verb = move request.verb
rw.url = move request.url
rw.headers = move request.headers
Expand Down
20 changes: 20 additions & 0 deletions tests/test_responsestream.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import curly

let curl = newCurly()

let stream = curl.request("GET", "https://sse.dev/test", timeout = 5)

echo stream.code
echo stream.headers

try:
while true:
var buffer: string
let bytesRead = stream.read(buffer)
if bytesRead == 0:
break
echo buffer
finally:
stream.close()

echo "DONE"

0 comments on commit 2c1bf99

Please sign in to comment.