Skip to content

Commit

Permalink
It is working, but very slow
Browse files Browse the repository at this point in the history
  • Loading branch information
GarethCabournDavies committed Sep 10, 2024
1 parent 004f618 commit 7dee85d
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 187 deletions.
210 changes: 93 additions & 117 deletions pycbc/events/threshold_cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,57 +32,22 @@

logger = logging.getLogger('pycbc.events.threshold_cuda')

#threshold_op = """
# if (i == 0)
# bn[0] = 0;
#
# pycuda::complex<float> val = in[i];
# if ( abs(val) > threshold){
# int n_w = atomicAdd(bn, 1);
# outv[n_w] = val;
# outl[n_w] = i;
# }
#
#"""

#threshold_kernel = ElementwiseKernel(
# " %(tp_in)s *in, %(tp_out1)s *outv, %(tp_out2)s *outl, %(tp_th)s threshold, %(tp_n)s *bn" % {
# "tp_in": dtype_to_ctype(numpy.complex64),
# "tp_out1": dtype_to_ctype(numpy.complex64),
# "tp_out2": dtype_to_ctype(numpy.uint32),
# "tp_th": dtype_to_ctype(numpy.float32),
# "tp_n": dtype_to_ctype(numpy.uint32),
# },
# threshold_op,
# "getstuff")

threshold_op = r"""
extern "C" __global__ void getstuff(const float2* in, float2* outv, unsigned int* outl, float threshold, unsigned int* bn) {
int i = blockIdx.x * blockDim.x + threadIdx.x;
// Initialize count at the start
if (i == 0) {
kernel_code = """
extern "C" __global__
void getstuff(const float2 *in, const float2 *outv, unsigned int *outl, float threshold, unsigned int *bn) {
if (i == 0)
bn[0] = 0;
}
__syncthreads(); // Ensure bn[0] is initialized before processing
if (i < blockDim.x * gridDim.x) { // Check array bounds
float2 val = in[i];
float abs_val = sqrtf(val.x * val.x + val.y * val.y); // Compute absolute value
if (abs_val > threshold) {
int n_w = atomicAdd(bn, 1); // Atomically increment the count
outv[n_w] = val; // Store the value exceeding the threshold
outl[n_w] = i; // Store the index
}
cuComplex val = in[i];
if ( cuCabsf(val) > threshold){
int n_w = atomicAdd(bn, 1);
outv[n_w] = val;
outl[n_w] = i;
}
}
"""

# Compile the kernel
module = cp.RawModule(code=threshold_op)
threshold_kernel = module.get_function('getstuff')
threshold_kernel = cp.RawKernel(kernel_code, 'getstuff')

# Allocate device memory
#n = cp.cuda.alloc_pinned_memory(1 * cp.uint32().itemsize)
Expand Down Expand Up @@ -112,17 +77,20 @@ class T():
tl.gpudata = lptr
tn.flags = tv.flags = tl.flags = n.flags

tkernel1 = mako.template.Template("""
#include <cupy/cuda/cupy_cuda.h>
import cupy as cp
import numpy as np
from cupy import RawKernel

__global__ void threshold_and_cluster(float2* in, float2* outv, int* outl, int window, float threshold){
# Define the CUDA kernel source code
threshold_kernel_source = """
extern "C" __global__ void threshold_and_cluster(float2* in, float2* outv, int* outl, int window, float threshold){
int s = window * blockIdx.x;
int e = s + window;
// shared memory for chuck size candidates
__shared__ float svr[${chunk}];
__shared__ float svi[${chunk}];
__shared__ int sl[${chunk}];
// shared memory for chunk size candidates
__shared__ float svr[128]; // Adjust size as needed
__shared__ float svi[128];
__shared__ int sl[128];
// shared memory for the warp size candidates
__shared__ float svv[32];
Expand All @@ -134,7 +102,7 @@ class T():
float re;
float im;
// Iterate trought the entire window size chunk and find blockDim.x number
// Iterate through the entire window size chunk and find blockDim.x number
// of candidates
for (int i = s + threadIdx.x; i < e; i += blockDim.x){
re = in[i].x;
Expand All @@ -156,9 +124,9 @@ class T():
if (threadIdx.x < 32){
int tl = threadIdx.x;
// Now that we have all the candiates for this chunk in shared memory
// Now that we have all the candidates for this chunk in shared memory
// Iterate through in the warp size to reduce to 32 candidates
for (int i = threadIdx.x; i < ${chunk}; i += 32){
for (int i = threadIdx.x; i < 128; i += 32){
re = svr[i];
im = svi[i];
if ((re * re + im * im) > (mvr * mvr + mvi * mvi)){
Expand Down Expand Up @@ -193,7 +161,6 @@ class T():
idx[threadIdx.x] = idx[threadIdx.x + 2];
}
// Save the 1 candidate maximum and location to the output vectors
if (threadIdx.x == 0){
if (svv[threadIdx.x] < svv[threadIdx.x + 1]){
Expand All @@ -202,7 +169,7 @@ class T():
}
if (svv[0] > threshold){
tl = idx[0];
int tl = idx[0];
outv[blockIdx.x].x = svr[tl];
outv[blockIdx.x].y = svi[tl];
outl[blockIdx.x] = sl[tl];
Expand All @@ -212,13 +179,10 @@ class T():
}
}
}
""")
tkernel2 = mako.template.Template("""
#include <cupy/cuda/cupy_cuda.h>
__global__ void threshold_and_cluster2(float2* outv, int* outl, float threshold, int window){
__shared__ int loc[${blocks}];
__shared__ float val[${blocks}];
extern "C" __global__ void threshold_and_cluster2(float2* outv, int* outl, float threshold, int window){
__shared__ int loc[128]; // Adjust size as needed
__shared__ float val[128];
int i = threadIdx.x;
Expand All @@ -230,27 +194,31 @@ class T():
val[i] = outv[i].x * outv[i].x + outv[i].y * outv[i].y;
// Check right
if ( (i < (${blocks} - 1)) && (val[i + 1] > val[i]) ){
if ((i < (128 - 1)) && (val[i + 1] > val[i])){
outl[i] = -1;
return;
}
// Check left
if ( (i > 0) && (val[i - 1] > val[i]) ){
if ((i > 0) && (val[i - 1] > val[i])){
outl[i] = -1;
return;
}
}
""")
"""

tfn_cache = {}
def get_tkernel(slen, window):
if window < 32:
raise ValueError("GPU threshold kernel does not support a window smaller than 32 samples")
# Compile the CUDA kernels
raw_kernel = RawKernel(threshold_kernel_source, 'threshold_and_cluster')
raw_kernel2 = RawKernel(threshold_kernel_source, 'threshold_and_cluster2')

elif window <= 4096:
def threshold_and_cluster(series, threshold, window):
slen = len(series)
threshold = np.float32(threshold * threshold)
window = np.int32(window)

# Determine block and grid sizes based on window size
if window <= 4096:
nt = 128
elif window <= 16384:
nt = 256
Expand All @@ -259,63 +227,71 @@ def get_tkernel(slen, window):
else:
nt = 1024

nb = int(numpy.ceil(slen / float(window)))
nb = int(np.ceil(slen / float(window)))

if nb > 1024:
raise ValueError("More than 1024 blocks not supported yet")

try:
return tfn_cache[(nt, nb)], nt, nb
except KeyError:
mod = cp.RawModule(tkernel1.render(chunk=nt))
mod2 = cp.RawModule(tkernel2.render(blocks=nb))
fn = mod.get_function("threshold_and_cluster")
fn2 = mod2.get_function("threshold_and_cluster2")
tfn_cache[(nt, nb)] = (fn, fn2)
return tfn_cache[(nt, nb)], nt, nb
# Prepare output arrays
outl = cp.zeros(nb, dtype=cp.int32)
outv = cp.zeros((nb,), dtype=cp.complex64)
series_gpu = cp.asarray(series, dtype=cp.complex64)

def threshold_and_cluster(series, threshold, window):
outl = tl.gpudata
outv = tv.gpudata
slen = len(series)
series = series.data.data.ptr
(fn, fn2), nt, nb = get_tkernel(slen, window)
threshold = numpy.float32(threshold * threshold)
window = numpy.int32(window)
# Launch the first kernel
raw_kernel((nb,), (nt,), (series_gpu, outv, outl, window, threshold))

# Launch the second kernel
raw_kernel2((1,), (nb,), (outv, outl, threshold, window))

cl = loc[0:nb]
cv = val[0:nb]
# Synchronize
cp.cuda.Device().synchronize()

fn.prepared_call((nb, 1), (nt, 1, 1), series, outv, outl, window, threshold,)
fn2.prepared_call((1, 1), (nb, 1, 1), outv, outl, threshold, window)
pycbc.scheme.mgr.state.context.synchronize()
w = (cl != -1)
return cv[w], cl[w]
# Filter valid results
cl = cp.asnumpy(outl)
cv = cp.asnumpy(outv)
valid_indices = cl != -1
return cv[valid_indices], cl[valid_indices]

class CUDAThresholdCluster(_BaseThresholdCluster):
class CUDAThresholdCluster:
def __init__(self, series):
self.series = series.data.data.ptr

self.outl = tl.gpudata
self.outv = tv.gpudata
self.series = cp.asarray(series, dtype=cp.complex64)
self.outl = cp.zeros(len(series), dtype=cp.int32)
self.outv = cp.zeros((len(series),), dtype=cp.complex64)
self.slen = len(series)

def threshold_and_cluster(self, threshold, window):
threshold = numpy.float32(threshold * threshold)
window = numpy.int32(window)

(fn, fn2), nt, nb = get_tkernel(self.slen, window)
fn = fn.prepared_call
fn2 = fn2.prepared_call
cl = loc[0:nb]
cv = val[0:nb]

fn((nb, 1), (nt, 1, 1), self.series, self.outv, self.outl, window, threshold,)
fn2((1, 1), (nb, 1, 1), self.outv, self.outl, threshold, window)
pycbc.scheme.mgr.state.context.synchronize()
w = (cl != -1)
return cv[w], cl[w]
threshold = np.float32(threshold * threshold)
window = np.int32(window)

# Determine block and grid sizes based on window size
if window <= 4096:
nt = 128
elif window <= 16384:
nt = 256
elif window <= 32768:
nt = 512
else:
nt = 1024

nb = int(np.ceil(self.slen / float(window)))

if nb > 1024:
raise ValueError("More than 1024 blocks not supported yet")

# Launch the first kernel
raw_kernel((nb,), (nt,), (self.series, self.outv, self.outl, window, threshold))

# Launch the second kernel
raw_kernel2((1,), (nb,), (self.outv, self.outl, threshold, window))

# Synchronize
cp.cuda.Device().synchronize()

# Filter valid results
cl = cp.asnumpy(self.outl)
cv = cp.asnumpy(self.outv)
valid_indices = cl != -1
return cv[valid_indices], cl[valid_indices]

def _threshold_cluster_factory(series):
return CUDAThresholdCluster

52 changes: 14 additions & 38 deletions pycbc/fft/cufft.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,17 @@

import pycbc.scheme
# The following is a hack, to ensure that any error in importing
# cu_fft is treated as the module being unavailable at runtime.
# cupy is treated as the module being unavailable at runtime.

try:
import cupy
import cupyx.scipy.fft as cu_fft
from cupyx.scipy.fft import get_fft_plan
except ImportError as e:
print("Unable to import cupy's scipy fft backend")
print("Unable to import cupy")
raise e

from .core import _check_fft_args
from .core import _BaseFFT, _BaseIFFT

#_forward_plans = {}
#_reverse_plans = {}

#These dicts need to be cleared before the cuda context is destroyed
#def _clear_plan_dicts():
# _forward_plans.clear()
# _reverse_plans.clear()

#pycbc.scheme.register_clean_cuda(_clear_plan_dicts)

# No need for separate forward and inverse plan functions.
# Use get_fft_plan for both forward and inverse FFTs when necessary.

#def fft(invec, outvec, prec, itype, otype):
# # Create a plan for FFT
# with get_fft_plan(invec.data) as plan:
# print(cu_fft.fft(invec.data))
# outvec.data[:] = cupy.asarray(cu_fft.fft(invec.data), dtype=otype)
#
#def ifft(invec, outvec, prec, itype, otype):
# # Create a plan for IFFT
# with get_fft_plan(invec.data) as plan:
# outvec[:] = cupy.asarray(cu_fft.ifft(invec.data), dtype=otype)

def fft(invec, outvec, _, itype, otype):
if invec.ptr == outvec.ptr:
raise NotImplementedError("cupy backend of pycbc.fft does not "
Expand Down Expand Up @@ -87,24 +62,25 @@ def ifft(invec, outvec, _, itype, otype):
raise ValueError(_INV_FFT_MSG.format("IFFT", itype, otype))

class FFT(_BaseFFT):
"""
Class for performing FFTs via the cupy interface.
"""
def __init__(self, invec, outvec, nbatch=1, size=None):
super(FFT, self).__init__(invec, outvec, nbatch, size)
self.invec = invec
self.outvec = outvec
self.prec, self.itype, self.otype = _check_fft_args(invec, outvec)

def execute(self):
# Create and use an FFT plan for the execution
with get_fft_plan(self.invec) as plan:
self.outvec[:] = cu_fft.fft(self.invec, plan=plan)
fft(self.invec, self.outvec, self.prec, self.itype, self.otype)


class IFFT(_BaseIFFT):
"""
Class for performing IFFTs via the cupy interface.
"""
def __init__(self, invec, outvec, nbatch=1, size=None):
super(IFFT, self).__init__(invec, outvec, nbatch, size)
self.invec = invec
self.outvec = outvec
self.prec, self.itype, self.otype = _check_fft_args(invec, outvec)

def execute(self):
# Create and use an FFT plan for the execution
with get_fft_plan(self.invec) as plan:
self.outvec[:] = cu_fft.fft(self.invec, plan=plan)
ifft(self.invec, self.outvec, self.prec, self.itype, self.otype)

Loading

0 comments on commit 7dee85d

Please sign in to comment.