Skip to content

Commit

Permalink
streaming response prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
guzba committed Oct 5, 2024
1 parent 52e3cc1 commit 44e92b6
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 61 deletions.
264 changes: 203 additions & 61 deletions src/curly.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ type
url*: string ## Intitial request URL, before any redirects
tag*: string ## Arbtitrary user-provided data when batching requests

StringWrap = object
## As strings are value objects they need
## some sort of wrapper to be passed to C.
str: string

RequestBatch* = object
requests: seq[BatchedRequest]

Expand Down Expand Up @@ -93,13 +88,27 @@ type
headerStringsForLibcurl: seq[string]
slistsForLibcurl: seq[Slist]
pslistForLibcurl: Pslist
responseBodyForLibcurl: StringWrap
responseHeadersForLibcurl: StringWrap
responseBodyForLibcurl: string
responseHeadersForLibcurl: string
easyHandle: PCurl
response: Response
responseStream: bool
signaledAfterHeaders: bool
bufferLock: Lock
bufferCond: Cond
buffer: string
streamFinished: bool
error: string

RequestWrap = ptr RequestWrapObj

ResponseStream = object
code*: int ## HTTP status code of the response
url*: string ## Final URL, after any redirects
headers*: HttpHeaders
request*: RequestInfo ## Info about the request this response is for
internal: RequestWrap

CurlyObj* = object
lock: Lock
cond: Cond
Expand All @@ -115,19 +124,78 @@ type

Curly* = ptr CurlyObj

proc newWaitGroup(count: int): WaitGroup =
result = cast[WaitGroup](allocShared0(sizeof(WaitGroupObj)))
result.count = count
initLock(result.lock)
initCond(result.cond)

proc wait(waitGroup: WaitGroup) =
acquire(waitGroup.lock)
while waitGroup.count > 0:
wait(waitGroup.cond, waitGroup.lock)
release(waitGroup.lock)

proc done(waitGroup: WaitGroup) =
var signalCond: bool
withLock waitGroup.lock:
dec waitGroup.count
signalCond = (waitGroup.count == 0)
if signalCond:
signal(waitGroup.cond)

proc destroy(waitGroup: WaitGroup) =
deinitLock(waitGroup.lock)
deinitCond(waitGroup.cond)
`=destroy`(waitGroup[])
deallocShared(waitGroup)

{.push stackTrace: off.}

proc curlWriteFn(
proc curlHeaderWriteFn(
buffer: cstring,
size: int,
count: int,
outstream: pointer
): int {.cdecl.} =
let
outbuf = cast[ptr StringWrap](outstream)
i = outbuf.str.len
outbuf.str.setLen(outbuf.str.len + count)
copyMem(outbuf.str[i].addr, buffer, count)
rw = cast[RequestWrap](outstream)
i = rw.responseHeadersForLibcurl.len
rw.responseHeadersForLibcurl.setLen(i + count)
copyMem(rw.responseHeadersForLibcurl[i].addr, buffer, count)
result = size * count

proc curlBodyWriteFn(
buffer: cstring,
size: int,
count: int,
outstream: pointer
): int {.cdecl.} =
let rw = cast[RequestWrap](outstream)
if rw.responseStream:
if not rw.signaledAfterHeaders:
# This avoids a SIGSEGV on Mac with -d:release and a memory leak on Linux
let tmp4 = allocShared0(4)
discard rw.easyHandle.easy_getinfo(INFO_RESPONSE_CODE, tmp4)
var httpCode: uint32
copyMem(httpCode.addr, tmp4, 4)
deallocShared(tmp4)
rw.response.code = httpCode.int
let tmpcstring = cast[ptr cstring](allocShared0(sizeof(cstring)))
discard rw.easyHandle.easy_getinfo(INFO_EFFECTIVE_URL, tmpcstring)
rw.response.url = $tmpcstring[]
deallocShared(tmpcstring)
rw.waitGroup.done()
rw.signaledAfterHeaders = true
withLock rw.bufferLock:
let i = rw.buffer.len
rw.buffer.setLen(i + count)
copyMem(rw.buffer[i].addr, buffer, count)
signal(rw.bufferCond)
else:
let i = rw.responseBodyForLibcurl.len
rw.responseBodyForLibcurl.setLen(i + count)
copyMem(rw.responseBodyForLibcurl[i].addr, buffer, count)
result = size * count

{.pop.}
Expand Down Expand Up @@ -165,32 +233,6 @@ proc addHeaders(dst: var HttpHeaders, src: string) =
else:
dst.add((parts[0].strip(), ""))

proc newWaitGroup(count: int): WaitGroup =
result = cast[WaitGroup](allocShared0(sizeof(WaitGroupObj)))
result.count = count
initLock(result.lock)
initCond(result.cond)

proc wait(waitGroup: WaitGroup) =
acquire(waitGroup.lock)
while waitGroup.count > 0:
wait(waitGroup.cond, waitGroup.lock)
release(waitGroup.lock)

proc done(waitGroup: WaitGroup) =
var signalCond: bool
withLock waitGroup.lock:
dec waitGroup.count
signalCond = (waitGroup.count == 0)
if signalCond:
signal(waitGroup.cond)

proc destroy(waitGroup: WaitGroup) =
deinitLock(waitGroup.lock)
deinitCond(waitGroup.cond)
`=destroy`(waitGroup[])
deallocShared(waitGroup)

proc threadProc(curl: Curly) {.raises: [].} =
when not defined(windows): # Block SIGPIPE for this thread
var oldSet, empty: Sigset
Expand Down Expand Up @@ -219,6 +261,8 @@ proc threadProc(curl: Curly) {.raises: [].} =
for request in dequeued:
let easyHandle = curl.availableEasyHandles.popFirst()

request.easyHandle = easyHandle

discard easyHandle.easy_setopt(OPT_URL, request.url.cstring)
discard easyHandle.easy_setopt(OPT_CUSTOMREQUEST, request.verb.cstring)
discard easyHandle.easy_setopt(OPT_TIMEOUT, request.timeout)
Expand Down Expand Up @@ -247,16 +291,10 @@ proc threadProc(curl: Curly) {.raises: [].} =
discard easyHandle.easy_setopt(cast[libcurl.Option](216), 1 shl 4)

# Setup writers
discard easyHandle.easy_setopt(
OPT_WRITEDATA,
request.responseBodyForLibcurl.addr
)
discard easyHandle.easy_setopt(OPT_WRITEFUNCTION, curlWriteFn)
discard easyHandle.easy_setopt(
OPT_HEADERDATA,
request.responseHeadersForLibcurl.addr
)
discard easyHandle.easy_setopt(OPT_HEADERFUNCTION, curlWriteFn)
discard easyHandle.easy_setopt(OPT_WRITEDATA, request)
discard easyHandle.easy_setopt(OPT_WRITEFUNCTION, curlBodyWriteFn)
discard easyHandle.easy_setopt(OPT_HEADERDATA, request)
discard easyHandle.easy_setopt(OPT_HEADERFUNCTION, curlHeaderWriteFn)

let mc = multi_add_handle(curl.multiHandle, easyHandle)
if mc == M_OK:
Expand Down Expand Up @@ -320,15 +358,16 @@ proc threadProc(curl: Curly) {.raises: [].} =
# This request has completed
let code = cast[Code](m.whatever)
if code == E_OK:
# Avoid SIGSEGV on Mac with -d:release and a memory leak on Linux
zeroMem(tmp4, 4)
discard m.easy_handle.easy_getinfo(INFO_RESPONSE_CODE, tmp4)
var httpCode: uint32
copyMem(httpCode.addr, tmp4, 4)
request.response.code = httpCode.int
zeroMem(tmpcstring, sizeof(cstring))
discard m.easy_handle.easy_getinfo(INFO_EFFECTIVE_URL, tmpcstring)
request.response.url = $tmpcstring[]
if not request.responseStream:
# Avoid SIGSEGV on Mac with -d:release and a memory leak on Linux
zeroMem(tmp4, 4)
discard m.easy_handle.easy_getinfo(INFO_RESPONSE_CODE, tmp4)
var httpCode: uint32
copyMem(httpCode.addr, tmp4, 4)
request.response.code = httpCode.int
zeroMem(tmpcstring, sizeof(cstring))
discard m.easy_handle.easy_getinfo(INFO_EFFECTIVE_URL, tmpcstring)
request.response.url = $tmpcstring[]
else:
request.error =
$easy_strerror(code) & ' ' & request.verb & ' ' & request.url
Expand All @@ -337,6 +376,11 @@ proc threadProc(curl: Curly) {.raises: [].} =
easy_reset(m.easy_handle)
curl.availableEasyHandles.addLast(m.easy_handle)

if request.responseStream:
withLock request.bufferLock:
request.streamFinished = true
signal(request.bufferCond)

if request.waitGroup != nil:
request.waitGroup.done()
else:
Expand Down Expand Up @@ -424,6 +468,84 @@ proc clearQueue*(curl: Curly) {.gcsafe.} =
else:
curl.requestsCompleted.addLast(rw)

proc request*(
curl: Curly,
verb: sink string,
url: sink string,
headers: sink HttpHeaders = emptyHttpHeaders(),
body: openarray[char] = "".toOpenArray(0, -1),
timeout = 60
): ResponseStream {.gcsafe.} =
let rw = cast[RequestWrap](allocShared0(sizeof(RequestWrapObj)))
rw.verb = move verb
rw.url = move url
rw.headers = move headers
if body.len > 0:
rw.body = body[0].unsafeAddr
rw.bodyLen = body.len
rw.timeout = timeout
rw.waitGroup = newWaitGroup(1)
rw.responseStream = true
initLock(rw.bufferLock)
initCond(rw.bufferCond)

rw.prepHeadersForLibcurl()

withLock curl.lock:
curl.queue.addLast(rw)
signal(curl.cond)

rw.waitGroup.wait()

if rw.error == "":
result.code = rw.response.code
result.url = move rw.response.url
result.request.verb = move rw.verb
result.request.url = move rw.url
addHeaders(result.headers, rw.responseHeadersForLibcurl)
result.internal = rw
else:
deinitLock(rw.bufferLock)
deinitCond(rw.bufferCond)
destroy rw.waitGroup
destroy rw
raise newException(CatchableError, move rw.error)

proc read*(stream: ResponseStream, buffer: var string): int =
var
tmp: string
done: bool
acquire(stream.internal.bufferLock)
while not stream.internal.streamFinished and stream.internal.buffer.len == 0:
wait(stream.internal.bufferCond, stream.internal.bufferLock)
tmp = move stream.internal.buffer
done = stream.internal.streamFinished
release(stream.internal.bufferLock)

try:
if stream.internal.error == "":
if tmp != "":
result = tmp.len
if buffer.len > 0:
let i = buffer.len
buffer.setLen(buffer.len + tmp.len)
copyMem(
buffer[i].addr,
tmp.cstring,
tmp.len
)
else:
buffer = move tmp
else:
done = true
raise newException(CatchableError, move stream.internal.error)
finally:
if done:
deinitLock(stream.internal.bufferLock)
deinitCond(stream.internal.bufferCond)
destroy stream.internal.waitGroup
destroy stream.internal

proc makeRequest*(
curl: Curly,
verb: sink string,
Expand Down Expand Up @@ -458,8 +580,8 @@ proc makeRequest*(
result = move rw.response
result.request.verb = move rw.verb
result.request.url = move rw.url
addHeaders(result.headers, rw.responseHeadersForLibcurl.str)
result.body = move rw.responseBodyForLibcurl.str
addHeaders(result.headers, rw.responseHeadersForLibcurl)
result.body = move rw.responseBodyForLibcurl
if result.headers["Content-Encoding"] == "gzip":
result.body = uncompress(result.body, dfGzip)
else:
Expand Down Expand Up @@ -527,8 +649,8 @@ proc unwrapResponse(
response.request.url = move rw.url
response.request.tag = move rw.tag
if rw.error == "":
addHeaders(response.headers, rw.responseHeadersForLibcurl.str)
response.body = move rw.responseBodyForLibcurl.str
addHeaders(response.headers, rw.responseHeadersForLibcurl)
response.body = move rw.responseBodyForLibcurl
if response.headers["Content-Encoding"] == "gzip":
try:
response.body = uncompress(response.body, dfGzip)
Expand Down Expand Up @@ -763,6 +885,9 @@ type

CurlPool* = ptr CurlPoolObj

StringWrap = object
str: string

proc close*(pool: CurlPool) =
## Closes the libcurl handles then deallocates the pool.
## All libcurl handles should be returned to the pool before it is closed.
Expand Down Expand Up @@ -814,6 +939,23 @@ template withHandle*(pool: CurlPool, handle, body) =
finally:
pool.recycle(handle)

{.push stackTrace: off.}

proc curlWriteFn(
buffer: cstring,
size: int,
count: int,
outstream: pointer
): int {.cdecl.} =
let
outbuf = cast[ptr StringWrap](outstream)
i = outbuf.str.len
outbuf.str.setLen(outbuf.str.len + count)
copyMem(outbuf.str[i].addr, buffer, count)
result = size * count

{.pop.}

proc makeRequest*(
curl: PCurl,
verb: string,
Expand Down
17 changes: 17 additions & 0 deletions tests/test_responsestream.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import curly

let curl = newCurly()

let stream = curl.request("GET", "https://www.google.com")

echo stream.code
echo stream.headers

while true:
var buffer: string
let bytesRead = stream.read(buffer)
if bytesRead == 0:
break
echo buffer

echo "DONE"

0 comments on commit 44e92b6

Please sign in to comment.