Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,7 @@ rever/
# pixi environments
.pixi
*.egg-info

# Produced by tests
Ni/
unnamed_sample/
63 changes: 59 additions & 4 deletions pdfstream/analyzers/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,62 @@
from configparser import ConfigParser

import numpy as np
from bluesky.callbacks.core import CallbackBase
from databroker.client import BlueskyRun
from bluesky_tiled_plugins import BlueskyRun


def iter_documents_filled(run: BlueskyRun):
"""Iterate filled (name, doc) pairs from a tiled BlueskyRun.

Reconstructs the document stream by reading column data directly from
the tiled event streams, since run.documents() does not include data.
"""
# start
yield "start", dict(run.start)
# streams
for stream_name in run.keys():
stream = run[stream_name]
meta = dict(stream.metadata)
descriptor_doc = {
"uid": meta.get("uid", ""),
"run_start": run.start.get("uid", ""),
"time": meta.get("time", 0),
"data_keys": meta.get("data_keys", {}),
"configuration": meta.get("configuration", {}),
"name": stream_name,
"hints": meta.get("hints", {}),
"object_keys": meta.get("object_keys", {}),
}
yield "descriptor", descriptor_doc
# events - read column data
data_keys = set(meta.get("data_keys", {}).keys())
columns = list(stream.keys())
if not columns:
continue
n_events = len(stream[columns[0]].read())
col_data = {col: stream[col].read() for col in columns}
for i in range(n_events):
event_data = {}
event_timestamps = {}
for key in data_keys:
if key in col_data:
val = col_data[key][i]
event_data[key] = np.asarray(val) if hasattr(val, '__array__') else val
ts_key = f"ts_{key}"
if ts_key in col_data:
event_timestamps[key] = float(col_data[ts_key][i])
event_doc = {
"descriptor": descriptor_doc["uid"],
"uid": f"event-{descriptor_doc['uid']}-{i + 1}",
"time": float(col_data["time"][i]) if "time" in col_data else 0,
"seq_num": int(col_data["seq_num"][i]) if "seq_num" in col_data else i + 1,
"data": event_data,
"timestamps": event_timestamps,
"filled": {k: True for k in data_keys},
}
yield "event", event_doc
# stop
yield "stop", dict(run.stop)


class AnalyzerConfig(ConfigParser):
Expand All @@ -19,9 +74,9 @@ class Analyzer(CallbackBase):

def analyze(self, run: BlueskyRun):
"""Analyze the data in a bluesky run."""
for name, doc in run.canonical(fill="yes"):
for name, doc in iter_documents_filled(run):
# inject the original_db
if name == "start":
doc = doc.to_dict()
doc["original_db"] = run.catalog_object.name
doc = dict(doc)
doc["original_db"] = run.uri
self.__call__(name, doc)
14 changes: 7 additions & 7 deletions pdfstream/analyzers/xpd_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import typing as tp

from databroker import catalog
from databroker.client import BlueskyRun
from bluesky_tiled_plugins import BlueskyRun
from tiled.client import from_uri

from pdfstream.analyzers.base import AnalyzerConfig, Analyzer
from pdfstream.servers.xpd_server import XPDRouter, XPDConfig
Expand All @@ -17,7 +17,7 @@ class XPDAnalyzer(XPDRouter, Analyzer):
pass


def replay(run: BlueskyRun) -> tp.Tuple[XPDAnalyzerConfig, XPDAnalyzer]:
def replay(run) -> tp.Tuple[XPDAnalyzerConfig, XPDAnalyzer]:
"""Generate the original data, original configure and the XPD analyzer of it.

Parameters
Expand All @@ -39,17 +39,17 @@ def replay(run: BlueskyRun) -> tp.Tuple[XPDAnalyzerConfig, XPDAnalyzer]:
return config, analyzer


def retrieve_original_run(run: BlueskyRun) -> tp.Union[None, BlueskyRun]:
def retrieve_original_run(run: BlueskyRun) -> tp.Union[None, tp.Any]:
"""Retrieve the original run."""
start = run.metadata['start']
if 'original_run_uid' not in start:
raise Warning("Missing original_run_uid. Cannot retrieve original run.")
if 'original_db' not in start:
raise Warning("Missing original_db. Cannot retrieve original run.")
try:
db = catalog[start['original_db']]
except KeyError:
raise Warning("Missing {} in catalog. Cannot retrieve original run.".format(start['original_db']))
db = from_uri(start['original_db'])
except Exception:
raise Warning("Cannot connect to {}. Cannot retrieve original run.".format(start['original_db']))
try:
return db[start['original_run_uid']]
except KeyError:
Expand Down
22 changes: 18 additions & 4 deletions pdfstream/callbacks/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import datetime
import typing
import typing as tp
from configparser import ConfigParser
from configparser import ConfigParser, NoOptionError
from pathlib import Path

import event_model
import matplotlib.pyplot as plt
import numpy as np
from bluesky.callbacks.stream import LiveDispatcher
from databroker.v1 import Broker
from event_model import RunRouter
from tiled.client import from_uri
from pyFAI.integrator.azimuthal import AzimuthalIntegrator
from suitcase.csv import Serializer as CSVSerializer
from suitcase.json_metadata import Serializer as JsonSerializer
Expand Down Expand Up @@ -42,6 +42,10 @@ class BasicAnalysisConfig(ConfigParser):
def raw_db(self) -> str:
return self.get("DATABASE", "raw_db", fallback="")

@property
def raw_db_api_key(self) -> str:
return self.get("DATABASE", "raw_db_api_key", fallback="")

@property
def dark_identifier(self):
return self.get("METADATA", "dk_identifier", fallback="dark_frame")
Expand Down Expand Up @@ -163,7 +167,14 @@ def __init__(self, config: AnalysisConfig):
self.init_config = config
self.config: typing.Union[AnalysisConfig, None] = None
db_name = config.raw_db
self.db = Broker.named(db_name) if db_name else None
db_api_key = config.raw_db_api_key
if db_name:
kwargs = {"uri": db_name}
if db_api_key:
kwargs["api_key"] = db_api_key
self.db = from_uri(**kwargs)
else:
self.db = None
self.valid_keys = config.valid_keys
self.start_doc = {}
self.ai = None
Expand Down Expand Up @@ -419,7 +430,10 @@ def directory_template(self):
@property
def tiff_base(self):
"""Settings for the base folder."""
dir_path = self.get("SUITCASE", "tiff_base")
try:
dir_path = self.get("SUITCASE", "tiff_base")
except NoOptionError:
dir_path = None
if not dir_path:
dir_path = "~/pdfstream_data"
io.server_message("Missing tiff_base in configuration. Use '{}'".format(dir_path))
Expand Down
3 changes: 1 addition & 2 deletions pdfstream/callbacks/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from bluesky.callbacks import CallbackBase
from bluesky.callbacks.best_effort import LivePlot, LiveScatter
from bluesky.callbacks.broker import LiveImage
from databroker.v2 import Broker
from event_model import unpack_event_page
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
Expand Down Expand Up @@ -182,7 +181,7 @@ class LiveMaskedImage(LiveImage):

def __init__(self, field: str, msk_field: str, *, cmap: str, norm: tp.Callable = None,
limit_func: tp.Callable = None, auto_draw: bool = True, interpolation: str = None,
window_title: str = None, db: Broker = None):
window_title: str = None, db=None):
self.msk_field = msk_field
self.msk_array = None
super(LiveMaskedImage, self).__init__(
Expand Down
11 changes: 9 additions & 2 deletions pdfstream/callbacks/calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import event_model
import numpy as np
from bluesky.callbacks.stream import LiveDispatcher
from databroker.v1 import Broker
from tifffile import TiffWriter
from tiled.client import from_uri

import pdfstream
import pdfstream.callbacks.analysis as an
Expand Down Expand Up @@ -68,7 +68,14 @@ def __init__(self, config: CalibrationConfig, *, test: bool = False):
self.config = config
self.cache = dict()
raw_db = self.config.raw_db
self.db = Broker.named(raw_db) if raw_db else None
raw_db_api_key = self.config.raw_db_api_key
if raw_db:
kwargs_db = {"uri": raw_db}
if raw_db_api_key:
kwargs_db["api_key"] = raw_db_api_key
self.db = from_uri(**kwargs_db)
else:
self.db = None
self.test = test
self.start_doc = {}
self.event_doc = {}
Expand Down
93 changes: 93 additions & 0 deletions pdfstream/callbacks/composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,96 @@ def compose_data_info(value: tp.Any) -> dict:
def compose_timestamps(data: tp.Dict[str, tp.Any]) -> tp.Dict[str, float]:
"""Compose the fake time for the data measurement."""
return {k: time.time() for k in data.keys()}


def gen_stream_external(
data_lst: tp.List[dict],
metadata: dict,
external_keys: tp.Set[str],
uid: str = None
) -> tp.Generator[tp.Tuple[str, dict], None, None]:
"""Generate a fake doc stream with external (stream_resource/stream_datum) references.

This simulates the document stream that would arrive over ZMQ from a detector
that writes data to an external file and uses stream_resource/stream_datum.
The external keys will have ``external: 'STREAM:'`` in the descriptor and
unfilled references in the events.

Parameters
----------
data_lst : list of dict
The data for each event. External keys should still contain the actual
data values (used for computing data_keys shapes), but they will be
replaced with datum uid references in the emitted events.
metadata : dict
Run metadata for the start document.
external_keys : set of str
Which data keys should be treated as external (stream_resource/stream_datum).
uid : str, optional
UID for the run. Generated if not provided.
"""
run_uid = uid if uid else str(uuid.uuid4())
crb = compose_run(metadata=metadata, uid=run_uid)
yield "start", crb.start_doc
if len(data_lst) == 0:
yield "stop", crb.compose_stop()
return

# Build data_keys, marking external keys
data_keys = {}
for k, v in data_lst[0].items():
info = compose_data_info(v)
info["source"] = "PV:{}".format(k.upper())
if k in external_keys:
info["external"] = "STREAM:"
data_keys[k] = info

cdb: ComposeDescriptorBundle = crb.compose_descriptor(
name="primary",
data_keys=data_keys,
)
yield "descriptor", cdb.descriptor_doc
desc_uid = cdb.descriptor_doc["uid"]

# Emit stream_resource and stream_datum for each external key
sr_bundles = {}
for key in external_keys:
sr_bundle = crb.compose_stream_resource(
mimetype="application/x-hdf5",
uri="file:///tmp/fake_{}.h5".format(key),
data_key=key,
parameters={"dataset": ["entry", "data", key]},
)
sr_bundles[key] = sr_bundle
yield "stream_resource", sr_bundle.stream_resource_doc

for i, data in enumerate(data_lst):
# Emit stream_datum for each external key
for key in external_keys:
sd_doc = sr_bundles[key].compose_stream_datum(
seq_nums={"start": i, "stop": i + 1},
indices={"start": i, "stop": i + 1},
)
sd_doc["descriptor"] = desc_uid
yield "stream_datum", sd_doc

# Emit event with unfilled external keys
event_data = {}
event_ts = {}
event_filled = {}
for k, v in data.items():
if k in external_keys:
event_data[k] = "unfilled_datum_ref"
event_filled[k] = False
else:
event_data[k] = v
event_filled[k] = True
event_ts[k] = time.time()

yield "event", cdb.compose_event(
data=event_data,
timestamps=event_ts,
filled=event_filled,
)

yield "stop", crb.compose_stop()
Loading