Skip to content

Commit

Permalink
implement queues for serializing distributed computing results
Browse files Browse the repository at this point in the history
  • Loading branch information
JohannesBuchner committed Jan 24, 2025
1 parent e1c3e59 commit 5e73ea0
Show file tree
Hide file tree
Showing 2 changed files with 279 additions and 4 deletions.
93 changes: 90 additions & 3 deletions tests/test_netiterintegrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import os
import numpy as np
from ultranest.store import TextPointStore
from ultranest.netiter import PointPile, TreeNode, count_tree, print_tree, dump_tree
from ultranest.netiter import PointPile, RoundRobinPointQueue, SinglePointQueue, TreeNode, count_tree, print_tree, dump_tree
from ultranest.netiter import SingleCounter, MultiCounter, BreadthFirstIterator

from numpy.testing import assert_allclose


def integrate_singleblock(num_live_points, pointstore, x_dim, num_params, dlogz=0.5):
Expand Down Expand Up @@ -374,7 +374,94 @@ def test_treedump():
dump_tree("test_tree.hdf5", roots=tree.children, pointpile=pp)
dump_tree("test_tree.hdf5", tree.children, pp)
os.remove("test_tree.hdf5")


def test_pointpile():
udim = 2
for pdim in 2, 3:
pp = PointPile(udim, pdim)
pp.add(np.arange(udim), np.arange(pdim))
pp.add(np.arange(udim) + 2, np.arange(pdim) + 2)
assert_allclose(pp.getu(0), np.arange(udim))
assert_allclose(pp.getp(0), np.arange(pdim))
assert_allclose(pp.getu(1), np.arange(udim) + 2)
assert_allclose(pp.getp(1), np.arange(pdim) + 2)
for i in range(10001):
pp.add(np.arange(udim) + i, np.arange(pdim) + i)
assert_allclose(pp.getp(10000 + 2), np.arange(pdim) + 10000)


def test_singlepointqueue():
udim = 2
for pdim in 2, 3:
pp = SinglePointQueue(udim, pdim)
assert not pp.has(0)
pp.add(np.arange(udim), np.arange(pdim), 0, 0)
try:
pp.has(1)
assert False
except ValueError:
pass
assert pp.has(0)
try:
pp.add(np.arange(udim) + 1, np.arange(pdim) + 1, 1, 0)
assert False
except ValueError:
pass
u, p, L = pp.pop(0)
assert_allclose(u, np.arange(udim))
assert_allclose(p, np.arange(pdim))
assert_allclose(L, 0)
pp.add(np.arange(udim) + 42, np.arange(pdim) + 42, 42, 0)
u, p, L = pp.pop(0)
assert_allclose(u, np.arange(udim) + 42)
assert_allclose(p, np.arange(pdim) + 42)
assert_allclose(L, 42)

def test_roundrobinpointqueue():
udim = 2
for pdim in 2, 3:
pp = RoundRobinPointQueue(udim, pdim)
assert not pp.has(0)
pp.add(np.arange(udim), np.arange(pdim), 0, 42)
assert not pp.has(0)
assert pp.has(42)
pp.add(np.arange(udim) + 1, np.arange(pdim) + 1, 1, 32)
pp.add(np.arange(udim) + 5, np.arange(pdim) + 5, 5, 52)
pp.add(np.arange(udim) + 2, np.arange(pdim) + 2, 2, 42)
try:
pp.pop(0)
assert False
except IndexError:
pass
u, p, L = pp.pop(42)
assert_allclose(u, np.arange(udim))
assert_allclose(p, np.arange(pdim))
assert_allclose(L, 0)
u, p, L = pp.pop(52)
assert_allclose(u, np.arange(udim) + 5)
assert_allclose(p, np.arange(pdim) + 5)
assert_allclose(L, 5)
u, p, L = pp.pop(32)
assert_allclose(u, np.arange(udim) + 1)
assert_allclose(p, np.arange(pdim) + 1)
assert_allclose(L, 1)
u, p, L = pp.pop(42)
assert_allclose(u, np.arange(udim) + 2)
assert_allclose(p, np.arange(pdim) + 2)
assert_allclose(L, 2)
assert not pp.has(32)
assert not pp.has(42)
assert not pp.has(52)
for i in range(10001):
pp.add(np.arange(udim) + i, np.arange(pdim) + i, i, i % 42)
for i in range(10001):
assert pp.has(i % 42)
u, p, L = pp.pop(i % 42)
assert_allclose(u, np.arange(udim) + i)
assert_allclose(p, np.arange(pdim) + i)
assert_allclose(L, i)
for i in range(42):
assert not pp.has(i)

if __name__ == '__main__':
for nlive in [100, 400, 2000]:
Expand Down
190 changes: 189 additions & 1 deletion ultranest/netiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from numpy import exp, log, log1p, logaddexp

from .ordertest import UniformOrderAccumulator
from .utils import resample_equal
from .utils import resample_equal, submasks


class TreeNode:
Expand Down Expand Up @@ -465,6 +465,194 @@ def make_node(self, value, u, p):
return TreeNode(value=value, id=index)


class RoundRobinPointQueue:
"""A in-memory linearized storage of point coordinates and their likelihoods.
Allows storing points in an unordered way with an index,
then extracting them in order.
"""

def __init__(self, udim, pdim, chunksize=1000):
"""Set up point pile.
Parameters
-----------
udim: int
number of parameters, dimension of unit cube points
pdim: int
number of physical (and derived) parameters
chunksize: int
the point pile grows as needed, in these intervals.
"""
self.chunksize = chunksize
self.us = np.zeros((self.chunksize, udim))
self.ps = np.zeros((self.chunksize, pdim))
self.Ls = np.zeros(self.chunksize)
self.ranks = np.zeros(self.chunksize, dtype=int)
self.filled = np.zeros(self.chunksize, dtype=bool)
self.udim = udim
self.pdim = pdim

def add(self, newpointu, newpointp, newpointL, newrank):
"""Save point.
Parameters
----------
newpointu: array
point (in u-space)
newpointp: array
point (in p-space)
newpointL: float
loglikelihood
newrank: int
rank of point
"""
if self.filled.all():
# grow space
self.us = np.concatenate((self.us, np.zeros((self.chunksize, self.udim))))
self.ps = np.concatenate((self.ps, np.zeros((self.chunksize, self.pdim))))
self.Ls = np.concatenate((self.Ls, np.zeros(self.chunksize)))
self.ranks = np.concatenate(
(self.ranks, np.zeros(self.chunksize, dtype=int))
)
self.filled = np.concatenate(
(self.filled, np.zeros(self.chunksize, dtype=bool))
)
assert len(newpointu) == self.us.shape[1], (newpointu, self.us.shape)
assert len(newpointp) == self.ps.shape[1], (newpointp, self.ps.shape)
i = np.where(~self.filled)[0][0]
self.us[i, :] = newpointu
self.ps[i, :] = newpointp
self.Ls[i] = newpointL
self.ranks[i] = newrank
self.filled[i] = True

def pop(self, rank):
"""Get next point of a given rank.
Parameters
----------
rank: int
rank of point we want to fetch
Returns
-------
u: array
point (in u-space)
p: array
point (in p-space)
L: array
likelihood
"""
i = submasks(self.filled, self.ranks[self.filled] == rank)[0]
self.filled[i] = False
return self.us[i, :], self.ps[i, :], self.Ls[i]

def has(self, rank):
"""Check if there is a next point of a given rank.
Parameters
----------
rank: int
rank of point we want to fetch
Returns
-------
bool
whether theree is an entry with that rank.
"""
return (self.ranks[self.filled] == rank).any()


class SinglePointQueue:
"""A storage of one point coordinate and its likelihood."""

def __init__(self, udim, pdim, chunksize=1):
"""Set up point pile.
Parameters
-----------
udim: int
number of parameters, dimension of unit cube points
pdim: int
number of physical (and derived) parameters
chunksize: int
has to be 1
"""
if chunksize != 1:
raise ValueError("SinglePointQueue: chunksize can only be 1")
self.u = None
self.p = None
self.L = None

def add(self, newpointu, newpointp, newpointL, newrank):
"""Save point.
Parameters
----------
newpointu: array
point (in u-space)
newpointp: array
point (in p-space)
newpointL: float
loglikelihood
newrank: int
rank of point
"""
if newrank != 0:
raise ValueError("SinglePointQueue: rank must be 0")
if self.u is None and self.p is None and self.L is None:
self.u = newpointu
self.p = newpointp
self.L = newpointL
else:
raise ValueError("SinglePointQueue: queue not empty")

def pop(self, rank):
"""Get next point of a given rank.
Parameters
----------
rank: int
rank of point we want to fetch
Returns
-------
u: array
point (in u-space)
p: array
point (in p-space)
L: array
likelihood
"""
if rank != 0:
raise ValueError("SinglePointQueue: rank must be 0")
u = self.u
p = self.p
L = self.L
self.u = None
self.p = None
self.L = None
return u, p, L

def has(self, rank):
"""Check if there is a next point of a given rank.
Parameters
----------
rank: int
rank of point we want to fetch
Returns
-------
bool
whether theree is an entry with that rank.
"""
if rank != 0:
raise ValueError("SinglePointQueue: rank must be 0")
return self.L is not None


class SingleCounter:
"""Evidence log(Z) and posterior weight summation for a Nested Sampling tree."""

Expand Down

0 comments on commit 5e73ea0

Please sign in to comment.