Skip to content

Commit d032f29

Browse files
authored
Merge pull request #519 from scipp/fix-conda-build
Fix conda build
2 parents b30d330 + f8d64b6 commit d032f29

File tree

10 files changed

+43
-20
lines changed

10 files changed

+43
-20
lines changed

.buildconfig/ci-linux.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,4 @@ dependencies:
4141
- sphinxcontrib-bibtex==2.6.2
4242

4343
# docs and tests
44-
- sciline==24.01.1
44+
- sciline==24.04.1

.copier-answers.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Changes here will be overwritten by Copier; NEVER EDIT MANUALLY
2-
_commit: cac2bde
2+
_commit: 9bb9876
33
_src_path: gh:scipp/copier_template
44
description: Neutron scattering tools for Data Reduction
55
max_python: '3.12'

conda/meta.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ test:
3939
- pythreejs==2.4.2
4040
- python-confluent-kafka==2.1.1 [linux64]
4141
- ess-streaming-data-types==v0.14.0 [linux64]
42+
- sciline==24.04.1
4243
source_files:
4344
- pyproject.toml
4445
- tests/

docs/conf.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
import doctest
22
import os
33
import sys
4+
from importlib.metadata import PackageNotFoundError
45
from importlib.metadata import version as get_version
56

7+
from sphinx.util import logging
8+
69
sys.path.insert(0, os.path.abspath('.'))
710

11+
logger = logging.getLogger(__name__)
12+
813
# General information about the project.
914
project = 'ScippNeutron'
1015
copyright = '2024 Scipp contributors'
@@ -33,6 +38,8 @@
3338
import sciline.sphinxext.domain_types # noqa: F401
3439

3540
extensions.append('sciline.sphinxext.domain_types')
41+
# See https://github.com/tox-dev/sphinx-autodoc-typehints/issues/457
42+
suppress_warnings = ["config.cache"]
3643
except ModuleNotFoundError:
3744
pass
3845

@@ -119,8 +126,15 @@
119126
# built documents.
120127
#
121128

122-
release = get_version("scippneutron")
123-
version = ".".join(release.split('.')[:3]) # CalVer
129+
try:
130+
release = get_version("scippneutron")
131+
version = ".".join(release.split('.')[:3]) # CalVer
132+
except PackageNotFoundError:
133+
logger.info(
134+
"Warning: determining version from package metadata failed, falling back to "
135+
"a dummy version number."
136+
)
137+
release = version = "0.0.0-dev"
124138

125139
warning_is_error = True
126140

docs/developer/data-stream.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,9 @@ Unit Testing
180180

181181
For unit tests it would be convenient to use a fake consumer object in place of ``KafkaConsumer``
182182
instances. However, any input arguments or variables passed via the queue to the ``mp.Process``
183-
must be pickleable or ``mp.Queue``. This makes dependency injection difficult. To get around
183+
must be pickleable or ``mp.queues.Queue``. This makes dependency injection difficult. To get around
184184
this an enum can be passed via ``data_stream`` to the ``data_consumption_manager`` to tell it
185-
to create instances of ``FakeConsumer`` instead of ``KafkaConsumer``, additionally an ``mp.Queue``
185+
to create instances of ``FakeConsumer`` instead of ``KafkaConsumer``, additionally an ``mp.queues.Queue``
186186
can be provided and is passed to the ``FakeConsumer``. The ``FakeConsumer`` simply polls for messages
187187
on the queue instead of Kafka, thus allowing the test to provide the messages. There is no other
188188
configuration of ``FakeConsumer`` possible or necessary.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ extend-exclude = [
102102

103103
[tool.ruff.lint]
104104
# See https://docs.astral.sh/ruff/rules/
105-
select = ["B", "C4", "DTZ", "E", "F", "G", "I", "PERF", "PGH", "PT", "PYI", "RUF", "S", "T20", "W"]
105+
select = ["B", "C4", "DTZ", "E", "F", "G", "I", "PERF", "PGH", "PT", "PYI", "RUF", "S", "T20", "UP", "W"]
106106
ignore = [
107107
# Conflict with ruff format, see
108108
# https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules

src/scippneutron/data_streaming/_consumer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"""
2020

2121
import multiprocessing as mp
22+
import multiprocessing.queues
2223
import threading
2324
from collections.abc import Callable
2425
from queue import Empty as QueueEmpty
@@ -43,7 +44,7 @@ class FakeConsumer:
4344
to avoid network io in unit tests
4445
"""
4546

46-
def __init__(self, input_queue: mp.Queue | None):
47+
def __init__(self, input_queue: mp.queues.Queue | None):
4748
if input_queue is None:
4849
raise RuntimeError(
4950
"A multiprocessing queue for test messages "
@@ -237,7 +238,7 @@ def create_consumers(
237238
kafka_broker: str,
238239
consumer_type_enum: ConsumerType, # so we can inject fake consumer
239240
callback: Callable,
240-
test_message_queue: mp.Queue | None,
241+
test_message_queue: mp.queues.Queue | None,
241242
) -> list[KafkaConsumer]:
242243
"""
243244
Creates one consumer per TopicPartition that start consuming

src/scippneutron/data_streaming/_data_buffer.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-License-Identifier: BSD-3-Clause
22
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
33
import multiprocessing as mp
4+
import multiprocessing.queues
45
import threading
56
from collections.abc import Callable
67
from datetime import datetime
@@ -108,7 +109,9 @@ class _FastMetadataBuffer:
108109
rather than via EPICS and the Forwarder.
109110
"""
110111

111-
def __init__(self, stream_info: StreamInfo, buffer_size: int, data_queue: mp.Queue):
112+
def __init__(
113+
self, stream_info: StreamInfo, buffer_size: int, data_queue: mp.queues.Queue
114+
):
112115
self._buffer_mutex = threading.Lock()
113116
self._buffer_size = buffer_size
114117
self._name = stream_info.source_name
@@ -202,7 +205,9 @@ class _ChopperMetadataBuffer:
202205
serialised according to the flatbuffer schema with id CHOPPER_FB_ID.
203206
"""
204207

205-
def __init__(self, stream_info: StreamInfo, buffer_size: int, data_queue: mp.Queue):
208+
def __init__(
209+
self, stream_info: StreamInfo, buffer_size: int, data_queue: mp.queues.Queue
210+
):
206211
self._buffer_mutex = threading.Lock()
207212
self._buffer_size = buffer_size
208213
self._name = stream_info.source_name
@@ -270,7 +275,7 @@ class StreamedDataBuffer:
270275

271276
def __init__(
272277
self,
273-
queue: mp.Queue,
278+
queue: mp.queues.Queue,
274279
event_buffer_size: int,
275280
slow_metadata_buffer_size: int,
276281
fast_metadata_buffer_size: int,

src/scippneutron/data_streaming/_data_consumption_manager.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-License-Identifier: BSD-3-Clause
22
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
33
import multiprocessing as mp
4+
import multiprocessing.queues
45
from dataclasses import dataclass
56
from enum import Enum
67
from queue import Empty as QueueEmpty
@@ -40,15 +41,15 @@ def data_consumption_manager(
4041
slow_metadata_buffer_size: int,
4142
fast_metadata_buffer_size: int,
4243
chopper_buffer_size: int,
43-
worker_instruction_queue: mp.Queue,
44-
data_queue: mp.Queue,
45-
test_message_queue: mp.Queue | None,
44+
worker_instruction_queue: mp.queues.Queue,
45+
data_queue: mp.queues.Queue,
46+
test_message_queue: mp.queues.Queue | None,
4647
):
4748
"""
4849
Starts and stops buffers and data consumers which collect data and
4950
send them back to the main process via a queue.
5051
51-
All input args must be mp.Queue or pickleable as this function is launched
52+
All input args must be mp.queues.Queue or pickleable as this function is launched
5253
as a multiprocessing.Process.
5354
"""
5455
buffer = StreamedDataBuffer(

src/scippneutron/data_streaming/data_stream.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import asyncio
66
import multiprocessing as mp
7+
import multiprocessing.queues
78
import time
89
from collections.abc import Generator
910
from enum import Enum
@@ -136,16 +137,16 @@ def validate_buffer_size_args(
136137
raise ValueError(f"{buffer_name} must be greater than zero")
137138

138139

139-
def _cleanup_queue(queue: mp.Queue | None):
140+
def _cleanup_queue(queue: mp.queues.Queue | None):
140141
if queue is not None:
141142
queue.cancel_join_thread()
142143
queue.close()
143144
queue.join_thread()
144145

145146

146147
async def _data_stream(
147-
data_queue: mp.Queue,
148-
worker_instruction_queue: mp.Queue,
148+
data_queue: mp.queues.Queue,
149+
worker_instruction_queue: mp.queues.Queue,
149150
kafka_broker: str,
150151
topics: list[str] | None,
151152
interval: sc.Variable,
@@ -160,7 +161,7 @@ async def _data_stream(
160161
consumer_type: ConsumerType = ConsumerType.REAL,
161162
halt_after_n_data_chunks: int = np.iinfo(np.int32).max,
162163
halt_after_n_warnings: int = np.iinfo(np.int32).max,
163-
test_message_queue: mp.Queue | None = None, # for tests
164+
test_message_queue: mp.queues.Queue | None = None, # for tests
164165
timeout: sc.Variable | None = None, # for tests
165166
) -> Generator[sc.DataArray, None, None]:
166167
"""

0 commit comments

Comments
 (0)