Skip to content

pybes3.besio

Looking for usage examples?

See the BES3 Data Reading user guide for practical examples.

besio

concatenate_raw(files, *, entry_start=0, entry_stop=-1, filter_name=None, verbose=False)

Concatenate multiple raw binary files into ak.Array

Parameters:

Name Type Description Default
files str | Path | list[str | Path]

files to be read.

required
entry_start int

The starting entry to read. Defaults to 0.

0
entry_stop int

The stopping entry to read. Defaults to -1, which means read until the end.

-1
filter_name Union[str, list, None]

A filter to select specific fields to read. Defaults to None, which means read all fields.

None
verbose bool

Show reading process. Defaults to False.

False

Returns:

Type Description
Array

Concatenated raw data array.

Source code in src/pybes3/besio/raw_io.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def concatenate(
    files: str | Path | list[str | Path],
    *,
    entry_start: int = 0,
    entry_stop: int = -1,
    filter_name: Union[str, list, None] = None,
    verbose: bool = False,
) -> ak.Array:
    """
    Concatenate multiple raw binary files into `ak.Array`

    Parameters:
        files (str | Path | list[str | Path]): files to be read.
        entry_start (int, optional): The starting entry to read. Defaults to 0.
        entry_stop (int, optional): The stopping entry to read. Defaults to -1, which means read until the end.
        filter_name (Union[str, list, None], optional): A filter to select specific fields to read. Defaults to `None`, which means read all fields.
        verbose (bool, optional): Show reading process. Defaults to `False`.

    Returns:
        Concatenated raw data array.
    """
    if not isinstance(files, list):
        files = glob.glob(files, recursive=True)

    files = [str(Path(file).resolve()) for file in files if _is_raw(file)]

    if len(files) == 0:
        raise ValueError("No valid raw files found")

    n_cum_entries = 0
    readers_with_entry_range: list[tuple[RawBinaryReader, int, int]] = []
    for file in files:
        reader = RawBinaryReader(file)

        if n_cum_entries + reader.entries < entry_start:
            n_cum_entries += reader.entries
            continue

        entry_start_for_reader = None
        entry_stop_for_reader = None

        # entry_start for this reader
        if n_cum_entries < entry_start and n_cum_entries + reader.entries >= entry_start:
            entry_start_for_reader = entry_start - n_cum_entries
        else:
            entry_start_for_reader = 0

        # entry_stop for this reader
        if entry_stop < 0:
            entry_stop_for_reader = -1
        elif n_cum_entries <= entry_stop and n_cum_entries + reader.entries > entry_stop:
            entry_stop_for_reader = entry_stop - n_cum_entries
        else:
            entry_stop_for_reader = -1

        readers_with_entry_range.append(
            (reader, entry_start_for_reader, entry_stop_for_reader)
        )

        n_cum_entries += reader.entries
        if entry_stop >= 0 and n_cum_entries >= entry_stop:
            break

    try:
        res = []
        n_cum_read = 0
        for reader, entry_start_for_reader, entry_stop_for_reader in readers_with_entry_range:
            n_read = (
                entry_stop_for_reader - entry_start_for_reader
                if entry_stop_for_reader >= 0
                else reader.entries - entry_start_for_reader
            )

            if verbose:
                print(
                    f"Reading file {reader.path}: {n_cum_read} -> {n_cum_read + n_read} entries ...",
                )

            res.append(
                reader.arrays(
                    entry_start=entry_start_for_reader,
                    entry_stop=entry_stop_for_reader,
                    filter_name=filter_name,
                )
            )

            n_cum_read += n_read
    finally:
        for reader, _, _ in readers_with_entry_range:
            reader.close()

    if len(res) == 0:
        return ak.Array([])

    return ak.concatenate(res)

open(file, **kwargs)

Alias for uproot.open.

Returns:

Type Description
Any

The uproot file object.

Warning

This function is deprecated and will be removed in future versions. Use uproot.open instread.

Source code in src/pybes3/besio/__init__.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def open(file: str, **kwargs: object) -> Any:
    """
    Alias for `uproot.open`.

    Returns:
        The uproot file object.

    Warning:
        This function is deprecated and will be removed in future versions.
        Use `uproot.open` instread.
    """
    # TODO: Remove this in the future.
    warn(
        "`pybes3.open` is deprecated and will be removed in future versions. "
        "Use `uproot.open` instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    return uproot.open(file, **kwargs)

concatenate(files, expressions=None, cut=None, **kwargs)

Alias for uproot.concatenate.

Returns:

Type Description
Any

The concatenated array.

Warning

This function is deprecated and will be removed in future versions. Use uproot.concatenate instread.

Source code in src/pybes3/besio/__init__.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def concatenate(
    files: str | list[str],
    expressions: str | list[str] | None = None,
    cut: str | None = None,
    **kwargs: object,
) -> Any:
    """
    Alias for `uproot.concatenate`.

    Returns:
        The concatenated array.

    Warning:
        This function is deprecated and will be removed in future versions.
        Use `uproot.concatenate` instread.
    """
    # TODO: Remove this in the future.
    warn(
        "`pybes3.concatenate` is deprecated and will be removed in future versions. "
        "Use `uproot.concatenate` instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    return uproot.concatenate(files, expressions, cut, **kwargs)

open_raw(file)

Open a raw binary file.

Parameters:

Name Type Description Default
file str

The file to open.

required

Returns:

Type Description
RawBinaryReader

The raw binary reader.

Source code in src/pybes3/besio/__init__.py
60
61
62
63
64
65
66
67
68
69
70
def open_raw(file: str) -> RawBinaryReader:
    """
    Open a raw binary file.

    Parameters:
        file (str): The file to open.

    Returns:
        (RawBinaryReader): The raw binary reader.
    """
    return RawBinaryReader(file)

root_io

Bes3Interpretation

Bases: AsCustom

Custom interpretation for Bes3 data.

Source code in src/pybes3/besio/root_io.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
class Bes3Interpretation(AsCustom):
    """
    Custom interpretation for Bes3 data.
    """

    target_branches: set[str] = set(bes3_branch2types.keys())

    def __init__(self, branch, context, simplify):
        super().__init__(branch, context, simplify)
        self._typename = bes3_branch2types[regularize_object_path(branch.object_path)]
        self.is_bes3 = self._typename is not None

        if self.is_bes3:
            self._typename = f"TObjArray<{self._typename}>"

    def final_array(
        self,
        basket_arrays,
        entry_start,
        entry_stop,
        entry_offsets,
        library,
        branch,
        options,
    ):
        arr = super().final_array(
            basket_arrays,
            entry_start,
            entry_stop,
            entry_offsets,
            library,
            branch,
            options,
        )

        # preprocess awkward array and return
        full_branch_path = regularize_object_path(branch.object_path)
        return preprocess_subbranch(full_branch_path, arr)

    def __repr__(self) -> str:
        """
        The string representation of the interpretation.
        """
        if self.is_bes3:
            return f"AsBes3({self._typename})"
        return super().__repr__()
__repr__()

The string representation of the interpretation.

Source code in src/pybes3/besio/root_io.py
405
406
407
408
409
410
411
def __repr__(self) -> str:
    """
    The string representation of the interpretation.
    """
    if self.is_bes3:
        return f"AsBes3({self._typename})"
    return super().__repr__()

process_digi_subbranch(org_arr)

Processes the TRawData subbranch of the input awkward array and returns a new array with the subbranch fields merged into the top level.

Parameters:

Name Type Description Default
org_arr Array

The input awkward array containing the TRawData subbranch.

required

Returns:

Type Description
Array

A new awkward array with the fields of TRawData merged into the top level.

Raises:

Type Description
AssertionError

If TRawData is not found in the input array fields.

Source code in src/pybes3/besio/root_io.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def process_digi_subbranch(org_arr: ak.Array) -> ak.Array:
    """
    Processes the `TRawData` subbranch of the input awkward array and returns a new array with the subbranch fields
    merged into the top level.

    Parameters:
        org_arr (ak.Array): The input awkward array containing the `TRawData` subbranch.

    Returns:
        A new awkward array with the fields of `TRawData` merged into the top level.

    Raises:
        AssertionError: If `TRawData` is not found in the input array fields.
    """
    if not org_arr.fields:
        assert ak.count(org_arr) == 0, "Input array is empty but has no fields"
        return org_arr

    assert "TRawData" in org_arr.fields, "TRawData not found in the input array"

    fields = {}
    for field_name in org_arr.fields:
        if field_name == "TRawData":
            for raw_field_name in org_arr[field_name].fields:
                fields[raw_field_name] = org_arr[field_name][raw_field_name]
        else:
            fields[field_name] = org_arr[field_name]

    return ak.zip(fields)

raw_io

RawBinaryReader

Source code in src/pybes3/besio/raw_io.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class RawBinaryReader:
    def __init__(self, file: str):
        # load cgem-elec-table
        global _info_tables
        if _info_tables is None:
            _info_tables = {}

            with np.load(CGEM_ELEC_TABLE) as f:
                cgem_elec_table = dict(f)

            for k, v in cgem_elec_table.items():
                _info_tables[f"cgem_{k}"] = v

            _info_tables["mdc_re2te"] = _reid.build_mdc_re2te()
            _info_tables["tof_re2te"] = _reid.build_tof_re2te()
            _info_tables["emc_re2te"] = _reid.build_emc_re2te()
            _info_tables["muc_re2te"] = _reid.build_muc_re2te()
            _info_tables["muc_strsqc"] = _reid.build_muc_strsqc()

        self.path = str(Path(file).resolve())
        self._file = open(file, "rb")

        self.file_version: int = -1
        self.file_number: int = -1
        self.file_date: int = -1
        self.file_time: int = -1

        self.app_name: str = None
        self.app_tag: str = None

        self.run_number: int = -1
        self.max_events: int = -1
        self.rec_enable: int = -1
        self.trigger_type: int = -1
        self.detector_mask: int = -1
        self.beam_type: int = -1
        self.beam_energy: int = -1

        self.entries: int = -1

        self._data_start: int = 0  # in char
        self._data_end: int = 0  # in char
        self.size: int = 0  # in char
        self._data_size: int = 0  # in char

        self._entry_starts: np.ndarray = None  # in char
        self._entry_stops: np.ndarray = None  # in char
        self._max_looped_entry: int = -1

        self._preprocess_file()

    def close(self) -> None:
        self._file.close()

    def arrays(
        self,
        *,
        entry_start: int = 0,
        entry_stop: int = -1,
        filter_name: Union[str, list, None] = None,
    ) -> ak.Array:
        """
        Read and return arrays of data from the BES raw file.

        Parameters:
            entry_start (int, optional): The starting entry to read. Defaults to 0.
            entry_stop (int, optional): The stopping entry to read. Defaults to -1, which means read all entries.
            filter_name (Union[str, list, None], optional): A filter to select specific fields to read. Defaults to `None`, which means read all fields.

        Returns:
            An Awkward Array containing the read data.
        """
        # process parameters
        if entry_stop == -1:
            entry_stop = self.entries
        entry_stop = min(entry_stop, self.entries)

        filter_func = regularize_filter(filter_name)
        fields = [field for field in _RAW_FIELDS if filter_func(field)]

        batch_data = self._read_event(entry_start, entry_stop)

        org_dict = read_bes_raw(batch_data, fields, _info_tables)
        return _raw_dict_to_ak(org_dict)

    def _read(self) -> int:
        return int.from_bytes(self._file.read(4), "little")

    def _skip(self, n: int = 1) -> None:
        self._file.seek(4 * n, 1)

    def _preprocess_file(self):
        # file header
        assert self._read() == BesFlag.FILE_START, "Invalid start flag"
        self._skip()

        self.file_version = self._read()
        self.file_number = self._read()
        self.file_date = self._read()
        self.file_time = self._read()
        self._skip(2)

        # file name
        assert self._read() == BesFlag.FILE_NAME, "Invalid file name flag"

        nchar_name = self._read()
        nbytes_name = np.ceil(nchar_name / 4).astype(int)
        self.file_name = self._file.read(nbytes_name * 4).decode("utf-8").strip()

        nchar_tag = self._read()
        nbytes_tag = np.ceil(nchar_tag / 4).astype(int)
        self.file_tag = self._file.read(nbytes_tag * 4).decode("utf-8").strip()

        # run parameters
        assert self._read() == BesFlag.RUN_PARAMS, "Invalid run params flag"
        self._skip()

        self.run_number = self._read()
        self.max_events = self._read()
        self.rec_enable = self._read()
        self.trigger_type = self._read()
        self.detector_mask = self._read()
        self.beam_type = self._read()
        self.beam_energy = self._read()

        # other information
        self._data_start = self._file.tell()
        self._file.seek(0, 2)
        self.size = self._file.tell()
        self._data_end = self.size - 10 * 4
        self._data_size = self._data_end - self._data_start

        # read file tail
        self._file.seek(-10 * 4, 2)
        assert self._read() == BesFlag.FILE_TAIL_START, "Invalid file tail start flag"
        self._skip(3)
        self.entries = self._read()
        self._skip(4)
        assert self._read() == BesFlag.FILE_END, "Invalid file end flag"

        # post process
        self._entry_starts = np.full(self.entries, -1, dtype=np.int64)
        self._entry_stops = np.full(self.entries, -1, dtype=np.int64)
        self._reset_cursor()

    def _reset_cursor(self) -> None:
        self._file.seek(self._data_start)

    def _skip_event(self) -> tuple[int, int]:
        flag = self._read()
        if flag == BesFlag.DATA_SEPERATOR:
            self._skip(3)
            flag = self._read()

        assert flag == BesFlag.FULL_EVENT_FRAGMENT, "Invalid event fragment flag"
        pos_start = self._file.tell() - 4

        total_size = self._read()
        self._skip(total_size - 2)
        pos_end = self._file.tell()

        return pos_start, pos_end

    def _read_block(self, n_blocks: int) -> NDArray[np.uint32]:
        """
        Read a batch of data with the specified number of blocks.
          - If n_blocks < 0, read all data.
          - If n_blocks >= 0, read n_blocks blocks of data. Each block is separated by a DATA_SEPERATOR flag.

        This method is designed for generating testing data.

        Parameters:
            n_blocks: The number of blocks to read. If negative, read all data.

        Returns:
            NDArray[np.uint32]: The batch of data read.
        """

        self._reset_cursor()
        if n_blocks < 0:
            return np.frombuffer(self._file.read(self._data_size), dtype=np.uint32)

        for _ in range(n_blocks):
            if self._file.tell() >= self._data_end:
                assert self._file.tell() == self._data_end, "Invalid data end"
                break

            assert self._read() == BesFlag.DATA_SEPERATOR, "Invalid data seperator flag"
            self._skip(2)
            block_size = self._read()
            self._skip(block_size // 4)

        pos_end = self._file.tell()

        self._file.seek(self._data_start)
        batch_data = np.frombuffer(
            self._file.read(pos_end - self._data_start), dtype=np.uint32
        )

        return batch_data

    def _read_event(self, entry_start: int, entry_stop: int) -> np.ndarray:
        if entry_start >= entry_stop or entry_start >= self.entries:
            return np.array([], dtype=np.uint32)

        pos_start = None
        pos_stop = None
        if entry_start == 0:
            pos_start = self._data_start
        if entry_stop == self.entries:
            pos_stop = self._data_end

        if pos_start is not None and pos_stop is not None:
            self._file.seek(self._data_start)
            return np.frombuffer(self._file.read(self._data_size), dtype=np.uint32)

        cur_entry = self._max_looped_entry

        # move cursor to the maximum position of the already looped events to avoid unnecessary skipping
        if cur_entry < 0:
            self._file.seek(self._data_start)
        else:
            self._file.seek(self._entry_stops[cur_entry])

        # loop until the entry_start-1 entry is reached, to record the start position of the entry_start entry
        if pos_start is None:
            while cur_entry < entry_start:
                cur_entry += 1
                self._entry_starts[cur_entry], self._entry_stops[cur_entry] = (
                    self._skip_event()
                )

            self._max_looped_entry = cur_entry
            pos_start = self._entry_starts[entry_start]

        # loop until the entry_stop-1 entry is reached, to record the stop position of the entry_stop entry
        if pos_stop is None:
            while cur_entry < entry_stop - 1:
                cur_entry += 1
                self._entry_starts[cur_entry], self._entry_stops[cur_entry] = (
                    self._skip_event()
                )

            self._max_looped_entry = cur_entry
            pos_stop = self._entry_stops[entry_stop - 1]

        # read the target entries
        self._file.seek(pos_start)
        return np.frombuffer(
            self._file.read(pos_stop - pos_start),
            dtype=np.uint32,
        )

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self._file.close()

    def __repr__(self) -> str:
        file_name = Path(self.path).name
        size = int(self.size / 1024 / 1024)
        return f"<RawData filename='{file_name}' Entries={self.entries} Size='{size} MB'>"
arrays(*, entry_start=0, entry_stop=-1, filter_name=None)

Read and return arrays of data from the BES raw file.

Parameters:

Name Type Description Default
entry_start int

The starting entry to read. Defaults to 0.

0
entry_stop int

The stopping entry to read. Defaults to -1, which means read all entries.

-1
filter_name Union[str, list, None]

A filter to select specific fields to read. Defaults to None, which means read all fields.

None

Returns:

Type Description
Array

An Awkward Array containing the read data.

Source code in src/pybes3/besio/raw_io.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def arrays(
    self,
    *,
    entry_start: int = 0,
    entry_stop: int = -1,
    filter_name: Union[str, list, None] = None,
) -> ak.Array:
    """
    Read and return arrays of data from the BES raw file.

    Parameters:
        entry_start (int, optional): The starting entry to read. Defaults to 0.
        entry_stop (int, optional): The stopping entry to read. Defaults to -1, which means read all entries.
        filter_name (Union[str, list, None], optional): A filter to select specific fields to read. Defaults to `None`, which means read all fields.

    Returns:
        An Awkward Array containing the read data.
    """
    # process parameters
    if entry_stop == -1:
        entry_stop = self.entries
    entry_stop = min(entry_stop, self.entries)

    filter_func = regularize_filter(filter_name)
    fields = [field for field in _RAW_FIELDS if filter_func(field)]

    batch_data = self._read_event(entry_start, entry_stop)

    org_dict = read_bes_raw(batch_data, fields, _info_tables)
    return _raw_dict_to_ak(org_dict)

concatenate(files, *, entry_start=0, entry_stop=-1, filter_name=None, verbose=False)

Concatenate multiple raw binary files into ak.Array

Parameters:

Name Type Description Default
files str | Path | list[str | Path]

files to be read.

required
entry_start int

The starting entry to read. Defaults to 0.

0
entry_stop int

The stopping entry to read. Defaults to -1, which means read until the end.

-1
filter_name Union[str, list, None]

A filter to select specific fields to read. Defaults to None, which means read all fields.

None
verbose bool

Show reading process. Defaults to False.

False

Returns:

Type Description
Array

Concatenated raw data array.

Source code in src/pybes3/besio/raw_io.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def concatenate(
    files: str | Path | list[str | Path],
    *,
    entry_start: int = 0,
    entry_stop: int = -1,
    filter_name: Union[str, list, None] = None,
    verbose: bool = False,
) -> ak.Array:
    """
    Concatenate multiple raw binary files into `ak.Array`

    Parameters:
        files (str | Path | list[str | Path]): files to be read.
        entry_start (int, optional): The starting entry to read. Defaults to 0.
        entry_stop (int, optional): The stopping entry to read. Defaults to -1, which means read until the end.
        filter_name (Union[str, list, None], optional): A filter to select specific fields to read. Defaults to `None`, which means read all fields.
        verbose (bool, optional): Show reading process. Defaults to `False`.

    Returns:
        Concatenated raw data array.
    """
    if not isinstance(files, list):
        files = glob.glob(files, recursive=True)

    files = [str(Path(file).resolve()) for file in files if _is_raw(file)]

    if len(files) == 0:
        raise ValueError("No valid raw files found")

    n_cum_entries = 0
    readers_with_entry_range: list[tuple[RawBinaryReader, int, int]] = []
    for file in files:
        reader = RawBinaryReader(file)

        if n_cum_entries + reader.entries < entry_start:
            n_cum_entries += reader.entries
            continue

        entry_start_for_reader = None
        entry_stop_for_reader = None

        # entry_start for this reader
        if n_cum_entries < entry_start and n_cum_entries + reader.entries >= entry_start:
            entry_start_for_reader = entry_start - n_cum_entries
        else:
            entry_start_for_reader = 0

        # entry_stop for this reader
        if entry_stop < 0:
            entry_stop_for_reader = -1
        elif n_cum_entries <= entry_stop and n_cum_entries + reader.entries > entry_stop:
            entry_stop_for_reader = entry_stop - n_cum_entries
        else:
            entry_stop_for_reader = -1

        readers_with_entry_range.append(
            (reader, entry_start_for_reader, entry_stop_for_reader)
        )

        n_cum_entries += reader.entries
        if entry_stop >= 0 and n_cum_entries >= entry_stop:
            break

    try:
        res = []
        n_cum_read = 0
        for reader, entry_start_for_reader, entry_stop_for_reader in readers_with_entry_range:
            n_read = (
                entry_stop_for_reader - entry_start_for_reader
                if entry_stop_for_reader >= 0
                else reader.entries - entry_start_for_reader
            )

            if verbose:
                print(
                    f"Reading file {reader.path}: {n_cum_read} -> {n_cum_read + n_read} entries ...",
                )

            res.append(
                reader.arrays(
                    entry_start=entry_start_for_reader,
                    entry_stop=entry_stop_for_reader,
                    filter_name=filter_name,
                )
            )

            n_cum_read += n_read
    finally:
        for reader, _, _ in readers_with_entry_range:
            reader.close()

    if len(res) == 0:
        return ak.Array([])

    return ak.concatenate(res)