Skip to content

Add finalizer to ZStream and make TranscodingStreams.finalize a noop #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions src/compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ function GzipCompressor(;level::Integer=Z_DEFAULT_COMPRESSION,
elseif !(9 ≤ windowbits ≤ 15)
throw(ArgumentError("windowbits must be within 9..15"))
end
return GzipCompressor(ZStream(), level, windowbits+16)
zstream = ZStream()
finalizer(compress_finalizer!, zstream)
return GzipCompressor(zstream, level, windowbits+16)
end

const GzipCompressorStream{S} = TranscodingStream{GzipCompressor,S} where S<:IO
Expand Down Expand Up @@ -85,7 +87,9 @@ function ZlibCompressor(;level::Integer=Z_DEFAULT_COMPRESSION,
elseif !(9 ≤ windowbits ≤ 15)
throw(ArgumentError("windowbits must be within 9..15"))
end
return ZlibCompressor(ZStream(), level, windowbits)
zstream = ZStream()
finalizer(compress_finalizer!, zstream)
return ZlibCompressor(zstream, level, windowbits)
end

const ZlibCompressorStream{S} = TranscodingStream{ZlibCompressor,S} where S<:IO
Expand Down Expand Up @@ -133,7 +137,9 @@ function DeflateCompressor(;level::Integer=Z_DEFAULT_COMPRESSION,
elseif !(9 ≤ windowbits ≤ 15)
throw(ArgumentError("windowbits must be within 9..15"))
end
return DeflateCompressor(ZStream(), level, -Int(windowbits))
zstream = ZStream()
finalizer(compress_finalizer!, zstream)
return DeflateCompressor(zstream, level, -Int(windowbits))
end

const DeflateCompressorStream{S} = TranscodingStream{DeflateCompressor,S} where S<:IO
Expand All @@ -155,17 +161,6 @@ end
# Methods
# -------

function TranscodingStreams.finalize(codec::CompressorCodec)
zstream = codec.zstream
if zstream.state != C_NULL
code = deflate_end!(zstream)
if code != Z_OK
zerror(zstream, code)
end
end
return
end

function TranscodingStreams.startproc(codec::CompressorCodec, state::Symbol, error_ref::Error)
if codec.zstream.state == C_NULL
code = deflate_init!(codec.zstream, codec.level, codec.windowbits)
Expand Down
23 changes: 9 additions & 14 deletions src/decompression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ function GzipDecompressor(;windowbits::Integer=Z_DEFAULT_WINDOWBITS, gziponly::B
if !(8 ≤ windowbits ≤ 15)
throw(ArgumentError("windowbits must be within 8..15"))
end
return GzipDecompressor(ZStream(), windowbits+(gziponly ? 16 : 32))
zstream = ZStream()
finalizer(decompress_finalizer!, zstream)
return GzipDecompressor(zstream, windowbits+(gziponly ? 16 : 32))
end

const GzipDecompressorStream{S} = TranscodingStream{GzipDecompressor,S} where S<:IO
Expand Down Expand Up @@ -78,7 +80,9 @@ function ZlibDecompressor(;windowbits::Integer=Z_DEFAULT_WINDOWBITS)
if !(8 ≤ windowbits ≤ 15)
throw(ArgumentError("windowbits must be within 8..15"))
end
return ZlibDecompressor(ZStream(), windowbits)
zstream = ZStream()
finalizer(decompress_finalizer!, zstream)
return ZlibDecompressor(zstream, windowbits)
end

const ZlibDecompressorStream{S} = TranscodingStream{ZlibDecompressor,S} where S<:IO
Expand Down Expand Up @@ -121,7 +125,9 @@ function DeflateDecompressor(;windowbits::Integer=Z_DEFAULT_WINDOWBITS)
if !(8 ≤ windowbits ≤ 15)
throw(ArgumentError("windowbits must be within 8..15"))
end
return DeflateDecompressor(ZStream(), -Int(windowbits))
zstream = ZStream()
finalizer(decompress_finalizer!, zstream)
return DeflateDecompressor(zstream, -Int(windowbits))
end

const DeflateDecompressorStream{S} = TranscodingStream{DeflateDecompressor,S} where S<:IO
Expand All @@ -143,17 +149,6 @@ end
# Methods
# -------

function TranscodingStreams.finalize(codec::DecompressorCodec)
zstream = codec.zstream
if zstream.state != C_NULL
code = inflate_end!(zstream)
if code != Z_OK
zerror(zstream, code)
end
end
return
end

function TranscodingStreams.startproc(codec::DecompressorCodec, ::Symbol, error_ref::Error)
# indicate that no input data is being provided for future zlib compat
codec.zstream.next_in = C_NULL
Expand Down
26 changes: 25 additions & 1 deletion src/libz.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
reserved::Culong
end

@assert typemax(Csize_t) ≥ typemax(Cuint)

function zalloc(::Ptr{Cvoid}, items::Cuint, size::Cuint)::Ptr{Cvoid}
s, f = Base.Checked.mul_with_overflow(items, size)
if f
C_NULL

Check warning on line 31 in src/libz.jl

View check run for this annotation

Codecov / codecov/patch

src/libz.jl#L31

Added line #L31 was not covered by tests
else
ccall(:jl_malloc, Ptr{Cvoid}, (Csize_t,), s%Csize_t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we use ccall here rather than Libc.malloc ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make sure the memory being used is visible to the GC https://discourse.julialang.org/t/c-struct-garbage-collection-not-run-frequently-enough/116919/14

If I replace this with Libc.malloc Julia uses more memory in the memory leak test, because GC isn't run frequently enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some reference within the C code suggesting that calloc may be warranted here.

https://github.com/madler/zlib/blob/develop/deflate.c#L397

The comments here suggest that malloc may be an appropriate optimization on modern systems.

madler/zlib#517

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it looks like calloc is only needed if pointers are 16 bits.

end
end
zfree(::Ptr{Cvoid}, p::Ptr{Cvoid}) = ccall(:jl_free, Cvoid, (Ptr{Cvoid},), p)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since zfree is just a pass through to jl_free perhaps you should just provide the function pointer directly:

cglobal(:jl_free).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have different signatures. zfree takes a pointer to an allocator context and a pointer to free, while jl_free just takes a pointer to free.


function ZStream()
ZStream(
# input
Expand All @@ -32,7 +44,9 @@
# message and state
C_NULL, C_NULL,
# memory allocation
C_NULL, C_NULL, C_NULL,
@cfunction(zalloc, Ptr{Cvoid}, (Ptr{Cvoid}, Cuint, Cuint)),
@cfunction(zfree, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid})),
C_NULL,
# data type, adler and reserved
0, 0, 0)
end
Expand Down Expand Up @@ -83,6 +97,11 @@
return ccall((:deflateEnd, libz), Cint, (Ref{ZStream},), zstream)
end

function compress_finalizer!(zstream::ZStream)
deflate_end!(zstream)
nothing
end

function deflate!(zstream::ZStream, flush::Integer)
return ccall((:deflate, libz), Cint, (Ref{ZStream}, Cint), zstream, flush)
end
Expand All @@ -99,6 +118,11 @@
return ccall((:inflateEnd, libz), Cint, (Ref{ZStream},), zstream)
end

function decompress_finalizer!(zstream::ZStream)
inflate_end!(zstream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this gets called twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing.

InflateEnd starts by checking if zstream.state is null and returns Z_STREAM_ERROR if it is https://github.com/madler/zlib/blob/8a844d434f0eef87d972ae6406b00968f7c52944/inflate.c#L1268-L1269

InflateEnd also ends by setting zstream.state to null. https://github.com/madler/zlib/blob/8a844d434f0eef87d972ae6406b00968f7c52944/inflate.c#L1273

The same happens in DeflateEnd https://github.com/madler/zlib/blob/8a844d434f0eef87d972ae6406b00968f7c52944/deflate.c#L1266-L1283

Also, since the ZStream constructor initializes .state to C_NULL, the finalizer can run on uninitialized streams without doing anything.

nothing
end

function inflate!(zstream::ZStream, flush::Integer)
return ccall((:inflate, libz), Cint, (Ref{ZStream}, Cint), zstream, flush)
end
Expand Down
25 changes: 15 additions & 10 deletions test/big-mem-tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@
using Test
using CodecZlib

# Enable this when https://github.com/JuliaIO/CodecZlib.jl/issues/88 is fixed.
# @testset "memory leak" begin
# function foo()
# for i in 1:1000000
# c = transcode(GzipCompressor(), zeros(UInt8,16))
# u = transcode(GzipDecompressor(), c)
# end
# end
# foo()
# end
@testset "memory leak" begin
function foo()
for (encode, decode) in [
(GzipCompressor, GzipDecompressor),
(ZlibCompressor, ZlibDecompressor),
(DeflateCompressor, DeflateDecompressor),
]
for i in 1:1000000
c = transcode(encode(), zeros(UInt8,16))
u = transcode(decode(), c)
end
end
end
foo()
end

@testset "Big Memory Tests" begin
Sys.WORD_SIZE == 64 || error("tests require 64 bit word size")
Expand Down
Loading