1+ from pathlib import Path
2+ from typing import Optional
3+
4+ import numpy as np
5+ from bluesky .protocols import WritesStreamAssets , Readable
6+ from bluesky .utils import SyncOrAsyncIterator , StreamAsset
7+ from event_model import compose_stream_resource , DataKey
18from ophyd import Kind
29from ophyd .quadem import QuadEM # , QuadEMPort # TODO in the future once it's in ophyd
3-
410from ophyd import (
511 Device ,
612 EpicsSignalRO ,
@@ -174,30 +180,41 @@ def set_primary(self, n, value=None):
174180 getattr (stats , val ).kind = 'hinted'
175181
176182
177- class SpectrumAnalyzer (Device ):
183+ def _convert_path_to_posix (path : Path ) -> Path :
184+ """Assumes that the path is on a Windows machine with Z: drive."""
185+ # Convert to string to manipulate
186+ path_str = str (path )
187+
188+ # Replace Z: with the target directory
189+ if path_str .startswith ("Z:" ):
190+ path_str = path_str .replace ("Z:" , "/nsls2/data3/esm/legacy" , 1 )
191+ else :
192+ return path
193+
194+ # Convert backslashes to forward slashes for POSIX compatibility
195+ path_str = path_str .replace ("\\ " , "/" )
196+
197+ return Path (path_str )
198+
199+
200+ class SpectrumAnalyzer (Device , WritesStreamAssets , Readable ):
178201 # Acquisition control
179202 acquire = Cpt (EpicsSignal , "ACQUIRE" )
180203 acquisition_status = Cpt (EpicsSignalRO , "ACQ:STATUS" )
181204
182- # Monitor control
183- monitor_on = Cpt (EpicsSignal , "MON:ON" )
184- monitor_off = Cpt (EpicsSignal , "MON:OFF" )
185- monitor_status = Cpt (EpicsSignalRO , "MON:STATUS" )
186-
187- # Detector control
188- detector_off = Cpt (EpicsSignal , "DET:OFF" )
189- detector_status = Cpt (EpicsSignalRO , "DET:STATUS" )
190-
191- # Image acquisition
192- get_image = Cpt (EpicsSignal , "IMG:GET" )
193- get_stats = Cpt (EpicsSignal , "ACQ:STATS" )
194-
195205 # Status and info
196206 connection_status = Cpt (EpicsSignalRO , "SYS:CONNECTED" )
197- last_error = Cpt (EpicsSignalRO , "SYS:ERROR" )
198207 last_sync = Cpt (EpicsSignalRO , "SYS:LAST_SYNC" )
199208 sync = Cpt (EpicsSignal , "SYS:SYNC" )
200209
210+ # File writing
211+ file_capture = Cpt (EpicsSignal , "FILE:CAPTURE" )
212+ file_name = Cpt (EpicsSignal , "FILE:NAME" )
213+ file_path = Cpt (EpicsSignal , "FILE:PATH" )
214+ file_status = Cpt (EpicsSignalRO , "FILE:STATUS" )
215+ num_captured = Cpt (EpicsSignalRO , "FILE:NUM_CAPTURED" )
216+ num_processed = Cpt (EpicsSignalRO , "FILE:NUM_PROCESSED" )
217+
201218 # Detector parameters
202219 state = Cpt (EpicsSignalRO , "STATE" , string = True )
203220 endX = Cpt (EpicsSignal , "ENDX" )
@@ -262,11 +279,27 @@ def __init__(self, *args, **kwargs):
262279 self .stage_sigs .update (
263280 [
264281 (self .acquire , 0 ),
282+ (self .file_capture , 1 ),
265283 ]
266284 )
267285 self ._status = None
286+ self ._index = 0
287+ self ._last_emitted_index = 0
288+ self ._composer = None
289+ self ._full_path = None
268290
269291 def stage (self ):
292+ if self .file_capture .get (as_string = True ) == "On" :
293+ raise RuntimeError (
294+ "File capture must be off to stage the detector, otherwise the file will be corrupted"
295+ )
296+
297+ path = _convert_path_to_posix (Path (self .file_path .get ()))
298+ file_name = Path (self .file_name .get ())
299+ self ._full_path = str (path / file_name )
300+ self ._index = 0
301+ self ._last_emitted_index = 0
302+
270303 self .state .subscribe (self ._stage_changed , run = False )
271304 return super ().stage ()
272305
@@ -275,6 +308,7 @@ def _stage_changed(self, value=None, old_value=None, **kwargs):
275308 return
276309 if value == "STANDBY" and old_value == "RUNNING" :
277310 self ._status .set_finished ()
311+ self ._index += 1
278312 self ._status = None
279313
280314 def trigger (self ):
@@ -287,9 +321,54 @@ def trigger(self):
287321 self ._status = Status ()
288322 return self ._status
289323
324+ def describe (self ) -> dict [str , DataKey ]:
325+ describe = super ().describe ()
326+ describe .update (
327+ {
328+ f"{ self .name } _image" : DataKey (
329+ source = f"{ self ._full_path } " ,
330+ shape = (1 , 1080 , self .num_steps .get ()),
331+ dtype = "array" ,
332+ dtype_numpy = np .dtype (np .uint32 ).str ,
333+ external = "STREAM:" ,
334+ ),
335+ }
336+ )
337+ return describe
338+
339+ def get_index (self ) -> int :
340+ return self ._index
341+
342+ def collect_asset_docs (
343+ self , index : Optional [int ] = None
344+ ) -> SyncOrAsyncIterator [StreamAsset ]:
345+ if index is not None :
346+ msg = f"Indexing is not supported for this detector, got: { index } , current index: { self .get_index ()} "
347+ raise NotImplementedError (msg )
348+
349+ index = self .get_index ()
350+ if index :
351+ if not self ._composer :
352+ self ._composer = compose_stream_resource (
353+ data_key = f"{ self .name } _image" ,
354+ mimetype = "application/x-hdf5" ,
355+ uri = f"file://{ self ._full_path } " ,
356+ parameters = {"dataset" : "entry1/analyzer/data" },
357+ )
358+ yield "stream_resource" , self ._composer .stream_resource_doc
359+
360+ if index >= self ._last_emitted_index :
361+ indices = {
362+ "start" : self ._last_emitted_index ,
363+ "stop" : index ,
364+ }
365+ self ._last_emitted_index = index
366+ yield "stream_datum" , self ._composer .compose_stream_datum (indices )
367+
290368 def unstage (self ):
291369 super ().unstage ()
292370 self .state .unsubscribe (self ._stage_changed )
371+ self ._composer = None
293372
294373mbs = SpectrumAnalyzer ("XF:21ID1-ES{A1Soft}" , name = "mbs" )
295374
0 commit comments