Skip to content

Single cell memmap dataset

FileNames

Bases: str, Enum

Names of files that are generated in SingleCellCollection.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
35
36
37
38
39
40
41
42
43
44
class FileNames(str, Enum):
    """Names of files that are generated in SingleCellCollection."""

    DATA = "data.npy"
    COLPTR = "col_ptr.npy"
    ROWPTR = "row_ptr.npy"
    METADATA = "metadata.json"
    DTYPE = "dtypes.json"
    FEATURES = "features"
    VERSION = "version.json"

METADATA

Bases: str, Enum

Stored metadata.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
59
60
61
62
class METADATA(str, Enum):
    """Stored metadata."""

    NUM_ROWS = "num_rows"

Mode

Bases: str, Enum

Valid modes for the single cell memory mapped dataset.

The write append mode is 'w+' while the read append mode is 'r+'.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
47
48
49
50
51
52
53
54
55
56
class Mode(str, Enum):
    """Valid modes for the single cell memory mapped dataset.

    The write append mode is 'w+' while the read append mode is 'r+'.
    """

    CREATE_APPEND = "w+"
    READ_APPEND = "r+"
    READ = "r"
    CREATE = "w"

SingleCellMemMapDataset

Bases: SingleCellRowDataset

Represents one or more AnnData matrices.

Data is stored in large, memory-mapped arrays that enables fast access of datasets larger than the available amount of RAM on a system. SCMMAP implements a consistent API defined in SingleCellRowDataset.

Attributes:

Name Type Description
data_path str

Location of np.memmap files to be loaded from or that will be

mode Mode

Whether the dataset will be read in (r+) from np.memmap files or

data Optional[ndarray]

A numpy array of the data

row_index Optional[ndarray]

A numpy array of row pointers

col_index Optional[ndarray]

A numpy array of column values

metadata Dict[str, int]

Various metata about the dataset.

_feature_index RowFeatureIndex

The corresponding RowFeatureIndex where features are

dtypes Dict[FileNames, str]

A dictionary containing the datatypes of the data, row_index,

_version str

The version of the dataset

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
class SingleCellMemMapDataset(SingleCellRowDataset):
    """Represents one or more AnnData matrices.

    Data is stored in large, memory-mapped arrays that enables fast access of
    datasets larger than the available amount of RAM on a system. SCMMAP
    implements a consistent API defined in SingleCellRowDataset.

    Attributes:
        data_path: Location of np.memmap files to be loaded from or that will be
        created.
        mode: Whether the dataset will be read in (r+) from np.memmap files or
        written to np.memmap files (w+).
        data: A numpy array of the data
        row_index: A numpy array of row pointers
        col_index: A numpy array of column values
        metadata: Various metata about the dataset.
        _feature_index: The corresponding RowFeatureIndex where features are
        stored
        dtypes: A dictionary containing the datatypes of the data, row_index,
        and col_index arrays.
        _version: The version of the dataset
    """

    def __init__(
        self,
        data_path: str,
        h5ad_path: Optional[str] = None,
        num_elements: Optional[int] = None,
        num_rows: Optional[int] = None,
        mode: Mode = Mode.READ_APPEND,
        paginated_load_cutoff: int = 10_000,
        load_block_row_size: int = 1_000_000,
    ) -> None:
        """Instantiate the class.

        Args:
            data_path: The location where the data np.memmap files are read from
            or stored.
            h5ad_path: Optional, the location of the h5_ad path.
            num_elements: The total number of elements in the array.
            num_rows: The number of rows in the data frame.
            mode: Whether to read or write from the data_path.
            paginated_load_cutoff: MB size on disk at which to load the h5ad structure with paginated load.
            load_block_row_size: Number of rows to load into memory with paginated load
        """
        self._version: str = importlib.metadata.version("bionemo.scdl")
        self.data_path: str = data_path
        self.mode: Mode = mode
        self.paginated_load_cutoff = paginated_load_cutoff
        self.load_block_row_size = load_block_row_size
        # Backing arrays
        self.data: Optional[np.ndarray] = None
        self.row_index: Optional[np.ndarray] = None
        self.row_index: Optional[np.ndarray] = None

        # Metadata and attributes
        self.metadata: Dict[str, int] = {}

        # Stores the Feature Index, which tracks
        # the original AnnData features (e.g., gene names)
        # and allows us to store ragged arrays in our SCMMAP structure.
        self._feature_index: RowFeatureIndex = RowFeatureIndex()

        # Variables for int packing / reduced precision
        self.dtypes: Dict[FileNames, str] = {
            f"{FileNames.DATA.value}": "float32",
            f"{FileNames.COLPTR.value}": "uint32",
            f"{FileNames.ROWPTR.value}": "uint64",
        }

        if mode == Mode.CREATE_APPEND and os.path.exists(data_path):
            raise FileExistsError(f"Output directory already exists: {data_path}")

        if h5ad_path is not None and (data_path is not None and os.path.exists(data_path)):
            raise FileExistsError(
                "Invalid input; both an existing SCMMAP and an h5ad file were passed. "
                "Please pass either an existing SCMMAP or an h5ad file."
            )

        # If there is only a data path, and it exists already, load SCMMAP data.
        elif data_path is not None and os.path.exists(data_path):
            self.__init__obj()
            self.load(data_path)

        # If there is only an h5ad path, load the HDF5 data
        elif h5ad_path is not None:
            self.__init__obj()
            self.load_h5ad(h5ad_path)
        else:
            match num_rows, num_elements:
                case (int(), int()):
                    self.__init__obj()
                    self._init_arrs(num_elements=num_elements, num_rows=num_rows)
                case _:
                    raise ValueError(
                        "An np.memmap path, an h5ad path, or the number of elements and rows is required" ""
                    )

    def __init__obj(self):
        """Initializes the datapath and writes the version."""
        os.makedirs(self.data_path, exist_ok=True)

        # Write the version
        if not os.path.exists(f"{self.data_path}/{FileNames.VERSION.value}"):
            with open(f"{self.data_path}/{FileNames.VERSION.value}", "w") as vfi:
                json.dump(self.version(), vfi)

    def _init_arrs(self, num_elements: int, num_rows: int) -> None:
        self.mode = Mode.CREATE_APPEND
        data_arr, col_arr, row_arr = _create_compressed_sparse_row_memmaps(
            num_elements=num_elements,
            num_rows=num_rows,
            memmap_dir_path=Path(self.data_path),
            mode=self.mode,
            dtypes=self.dtypes,
        )
        self.data = data_arr
        self.col_index = col_arr
        self.row_index = row_arr

    def version(self) -> str:
        """Returns a version number.

        (following <major>.<minor>.<point> convention).
        """
        return self._version

    def get_row(
        self,
        index: int,
        return_features: bool = False,
        feature_vars: Optional[List[str]] = None,
    ) -> Tuple[Tuple[np.ndarray, np.ndarray], pd.DataFrame]:
        """Returns a given row in the dataset along with optional features.

        Args:
            index: The row to be returned. This is in the range of [0, num_rows)
            return_features: boolean that indicates whether to return features
            feature_vars: Optional, feature variables to extract
        Return:
            [Tuple[np.ndarray, np.ndarray]: data values and column pointes
            pd.DataFrame: optional, corresponding features.
        """
        start = self.row_index[index]
        end = self.row_index[index + 1]
        values = self.data[start:end]
        columns = self.col_index[start:end]
        ret = (values, columns)
        if return_features:
            return ret, self._feature_index.lookup(index, select_features=feature_vars)[0]
        else:
            return ret, None

    def get_row_padded(
        self,
        index: int,
        return_features: bool = False,
        feature_vars: Optional[List[str]] = None,
    ) -> Tuple[np.ndarray, pd.DataFrame]:
        """Returns a padded version of a row in the dataset.

        A padded version is one where the a sparse array representation is
        converted to a conventional represenentation. Optionally, features are
        returned.

        Args:
            index: The row to be returned
            return_features: boolean that indicates whether to return features
            feature_vars: Optional, feature variables to extract
        Return:
            np.ndarray: conventional row representation
            pd.DataFrame: optional, corresponding features.
        """
        (row_values, row_column_pointer), features = self.get_row(index, return_features, feature_vars)
        return (
            _pad_sparse_array(row_values, row_column_pointer, self._feature_index.number_vars_at_row(index)),
            features,
        )

    def get_row_column(self, index: int, column: int, impute_missing_zeros: bool = True) -> Optional[float]:
        """Returns the value at a given index and the corresponding column.

        Args:
            index: The index to be returned
            column: The column to be returned
            impute_missing_zeros: boolean that indicates whether to set missing
            data to 0
        Return:
            A float that is the value in the array or None.
        """
        (row_values, row_column_pointer), _ = self.get_row(index)
        if column is not None:
            for col_index, col in enumerate(row_column_pointer):
                if col == column:
                    # return the value at this position
                    return row_values[col_index]
                elif col > column:
                    try:
                        raise ValueError(f"Column pointer {col} is larger than the column {column}.")
                    except ValueError:
                        break
            return 0.0 if impute_missing_zeros else None

    def features(self) -> Optional[RowFeatureIndex]:
        """Return the corresponding RowFeatureIndex."""
        return self._feature_index

    def _load_mmap_file_if_exists(self, file_path, dtype):
        if os.path.exists(file_path):
            return np.memmap(file_path, dtype=dtype, mode=self.mode)
        else:
            raise FileNotFoundError(f"The mmap file at {file_path} is missing")

    def load(self, stored_path: str) -> None:
        """Loads the data at store_path that is an np.memmap format.

        Args:
            stored_path: directory with np.memmap files
        Raises:
            FileNotFoundError if the corresponding directory or files are not
            found, or if the metadata file is not present.
        """
        if not os.path.exists(stored_path):
            raise FileNotFoundError(
                f"""Error: the specified data path to the mmap files {stored_path} does not exist.
                                    Specify an updated filepath or provide an h5ad path to the dataset. The data can
                                    be loaded with SingleCellMemMapDataset.load_h5ad. Alternatively, the class can be instantiated
                                    with  SingleCellMemMapDataset(<path to data that will be created>, h5ad_path=<path to h5ad file>"""
            )
        self.data_path = stored_path
        self.mode = Mode.READ_APPEND

        # Metadata is required, so we must check if it exists and fail if not.
        if not os.path.exists(f"{self.data_path}/{FileNames.METADATA.value}"):
            raise FileNotFoundError(
                f"Error: the metadata file {self.data_path}/{FileNames.METADATA.value} does not exist."
            )

        with open(f"{self.data_path}/{FileNames.METADATA.value}", Mode.READ_APPEND.value) as mfi:
            self.metadata = json.load(mfi)

        if os.path.exists(f"{self.data_path}/{FileNames.FEATURES.value}"):
            self._feature_index = RowFeatureIndex.load(f"{self.data_path}/{FileNames.FEATURES.value}")

        if os.path.exists(f"{self.data_path}/{FileNames.DTYPE.value}"):
            with open(f"{self.data_path}/{FileNames.DTYPE.value}") as dfi:
                self.dtypes = json.load(dfi)

        # mmap the existing arrays
        self.data = self._load_mmap_file_if_exists(
            f"{self.data_path}/{FileNames.DATA.value}", self.dtypes[f"{FileNames.DATA.value}"]
        )
        self.row_index = self._load_mmap_file_if_exists(
            f"{self.data_path}/{FileNames.ROWPTR.value}", dtype=self.dtypes[f"{FileNames.ROWPTR.value}"]
        )
        self.col_index = self._load_mmap_file_if_exists(
            f"{self.data_path}/{FileNames.COLPTR.value}", dtype=self.dtypes[f"{FileNames.COLPTR.value}"]
        )

    def _write_metadata(self) -> None:
        with open(f"{self.data_path}/{FileNames.METADATA.value}", f"{Mode.CREATE.value}") as mfi:
            json.dump(self.metadata, mfi)

    def regular_load_h5ad(
        self,
        anndata_path: str,
    ) -> Tuple[pd.DataFrame, int]:
        """Method for loading an h5ad file into memorySu and converting it to the SCDL format.

        Args:
            anndata_path: location of data to load
        Raises:
            NotImplementedError if the data is not in scipy.sparse.spmatrix format
            ValueError it there is not count data
        Returns:
            pd.DataFrame: var variables for features
            int: number of rows in the dataframe.

        """
        adata = ad.read_h5ad(anndata_path)  # slow

        if not isinstance(adata.X, scipy.sparse.spmatrix):
            raise NotImplementedError("Error: dense matrix loading not yet implemented.")

        # Check if raw data is present
        raw = getattr(adata, "raw", None)
        count_data = None
        if raw is not None:
            # If it is, attempt to get the counts in the raw data.
            count_data = getattr(raw, "X", None)

        if count_data is None:
            # No raw counts were present, resort to normalized
            count_data = getattr(adata, "X")
        if count_data is None:
            raise ValueError("This file does not have count data")

        shape = count_data.shape
        num_rows = shape[0]

        num_elements_stored = count_data.nnz

        self.dtypes[f"{FileNames.DATA.value}"] = count_data.dtype

        # Create the arrays.
        self._init_arrs(num_elements_stored, num_rows)
        # Store data
        self.data[0:num_elements_stored] = count_data.data

        # Store the col idx array
        self.col_index[0:num_elements_stored] = count_data.indices.astype(int)

        # Store the row idx array
        self.row_index[0 : num_rows + 1] = count_data.indptr.astype(int)

        return adata.var, num_rows

    def paginated_load_h5ad(
        self,
        anndata_path: str,
    ) -> Tuple[pd.DataFrame, int]:
        """Method for block loading a larger h5ad file and converting it to the SCDL format.

        This should be used in the case when the entire anndata file cannot be loaded into memory.
        The anndata is loaded into memory load_block_row_size number of rows at a time. Each chunk
        is converted into numpy memory maps which are then concatenated together.

        Raises:
            NotImplementedError if the data is not loaded in the CSRDataset format.

        Returns:
            pd.DataFrame: var variables for features
            int: number of rows in the dataframe.
        """
        adata = ad.read_h5ad(anndata_path, backed=True)

        if not isinstance(adata.X, ad.experimental.CSRDataset):
            raise NotImplementedError("Non-sparse format cannot be loaded: {type(adata.X)}.")
        num_rows = adata.X.shape[0]

        self.dtypes[f"{FileNames.DATA.value}"] = adata.X.dtype

        # Read the row indices into a memory map.
        mode = Mode.CREATE_APPEND
        self.row_index = _create_row_memmaps(num_rows, Path(self.data_path), mode, self.dtypes)
        self.row_index[:] = adata.X._indptr.astype(int)

        # The data from each column and data chunk of the original anndata file is read in. This is saved into the final
        # location of the memmap file. In this step, it is saved in the binary file format.
        memmap_dir_path = Path(self.data_path)
        with (
            open(f"{memmap_dir_path}/{FileNames.COLPTR.value}", "wb") as col_file,
            open(f"{memmap_dir_path}/{FileNames.DATA.value}", "wb") as data_file,
        ):
            n_elements = 0
            for row_start in range(0, num_rows, self.load_block_row_size):
                # Write each array's data to the file in binary format
                col_block = adata.X[row_start : row_start + self.load_block_row_size].indices
                col_file.write(col_block.tobytes())

                data_block = adata.X[row_start : row_start + self.load_block_row_size].data
                data_file.write(data_block.tobytes())

                n_elements += len(data_block)

        # The column and data files are re-opened as memory-mapped arrays with the final shape
        mode = Mode.READ_APPEND
        self.col_index = np.memmap(
            f"{memmap_dir_path}/{FileNames.COLPTR.value}",
            self.dtypes[f"{FileNames.COLPTR.value}"],
            mode=mode,
            shape=(n_elements,),
        )
        self.data = np.memmap(
            f"{memmap_dir_path}/{FileNames.DATA.value}",
            dtype=self.dtypes[f"{FileNames.DATA.value}"],
            mode=mode,
            shape=(n_elements,),
        )
        return adata.var, num_rows

    def load_h5ad(
        self,
        anndata_path: str,
    ) -> None:
        """Loads an existing AnnData archive from disk.

        This creates a new backing data structure which is saved.
        Note: the storage utilized will roughly double. Currently, the data must
        be in a scipy.sparse.spmatrix format.

        Args:
            anndata_path: location of data to load
        Raises:
            FileNotFoundError if the data path does not exist.
            NotImplementedError if the data is not in scipy.sparse.spmatrix
            format
            ValueError it there is not count data
        """
        if not os.path.exists(anndata_path):
            raise FileNotFoundError(f"Error: could not find h5ad path {anndata_path}")
        file_size_MB = os.path.getsize(anndata_path) / (1_024**2)

        if file_size_MB < self.paginated_load_cutoff:
            features, num_rows = self.regular_load_h5ad(anndata_path)

        else:
            features, num_rows = self.paginated_load_h5ad(anndata_path)

        # Collect features and store in FeatureIndex
        self._feature_index.append_features(n_obs=num_rows, features=features, label=anndata_path)

        self.save()

    def save(self, output_path: Optional[str] = None) -> None:
        """Saves the class to a given output path.

        Args:
            output_path: The location to save - not yet implemented and should
            be self.data_path

        Raises:
           NotImplementedError if output_path is not None.
        """
        if f"{METADATA.NUM_ROWS.value}" not in self.metadata:
            self.metadata[f"{METADATA.NUM_ROWS.value}"] = self.number_of_rows()

        self._write_metadata()
        # Write the feature index. This may not exist.
        self._feature_index.save(f"{self.data_path}/{FileNames.FEATURES.value}")

        # Ensure the object is in a valid state. These are saved at creation!
        for postfix in [
            f"{FileNames.VERSION.value}",
            f"{FileNames.DATA.value}",
            f"{FileNames.COLPTR.value}",
            f"{FileNames.ROWPTR.value}",
            f"{FileNames.FEATURES.value}",
        ]:
            if not os.path.exists(f"{self.data_path}/{postfix}"):
                raise FileNotFoundError(f"This file should exist from object creation: {self.data_path}/{postfix}")

        self.data.flush()
        self.row_index.flush()
        self.col_index.flush()

        if output_path is not None:
            raise NotImplementedError("Saving to separate path is not yet implemented.")

        return True

    def number_of_values(self) -> int:
        """Get the total number of values in the array.

        For each index, the length of the corresponding dataframe is counted.

        Returns:
            The sum of lengths of the features in every row
        """
        return sum(self._feature_index.number_of_values())

    def number_of_rows(self) -> int:
        """The number of rows in the dataset.

        Returns:
            The number of rows in the dataset
        Raises:
            ValueError if the length of the number of rows in the feature
            index does not correspond to the number of stored rows.
        """
        if len(self._feature_index) > 0 and self._feature_index.number_of_rows() != self.row_index.size - 1:
            raise ValueError(
                f"""The nuber of rows in the feature index {self._feature_index.number_of_rows()}
                             does not correspond to the number of rows in the row_index {self.row_index.size - 1}"""
            )
        return self._feature_index.number_of_rows()

    def number_nonzero_values(self) -> int:
        """Number of non zero entries in the dataset."""
        return self.data.size

    def __len__(self):
        """Return the number of rows."""
        return self.number_of_rows()

    def __getitem__(self, idx: int) -> torch.Tensor:
        """Get the row values located and index idx."""
        return torch.from_numpy(np.stack(self.get_row(idx)[0]))

    def number_of_variables(self) -> List[int]:
        """Get the number of features in every entry in the dataset.

        Returns:
            A list containing the lengths of the features in every row
        """
        feats = self._feature_index
        if len(feats) == 0:
            return [0]
        num_vars = feats.column_dims()
        return num_vars

    def shape(self) -> Tuple[int, List[int]]:
        """Get the shape of the dataset.

        This is the number of entries by the the length of the feature index
        corresponding to that variable.

        Returns:
            The number of elements in the dataset
            A list containing the number of variables for each row.
        """
        return self.number_of_rows(), self.number_of_variables()

    def concat(
        self,
        other_dataset: Union[list["SingleCellMemMapDataset"], "SingleCellMemMapDataset"],
    ) -> None:
        """Concatenates another SingleCellMemMapDataset to the existing one.

        The data is stored in the same place as for the original data set. This
        necessitates using _swap_memmap_array.

        Args:
            other_dataset: A SingleCellMemMapDataset or a list of
            SingleCellMemMapDatasets

        Raises:
           ValueError if the other dataset(s) are not of the same version or
           something of another type is passed in.
        """
        # Verify the other dataset or datasets are of the same type.
        match other_dataset:
            case self.__class__():
                other_dataset = [other_dataset]
            case list():
                pass
            case _:
                raise ValueError(
                    f"Expecting either a {SingleCellMemMapDataset} or a list thereof. Actually got: {type(other_dataset)}"
                )

        for dataset in other_dataset:
            if self.version() != dataset.version():
                raise ValueError(
                    f"""Incompatable versions: input version: {dataset.version()},
            this version:  {self.version}"""
                )

        # Set our mode:
        self.mode: Mode = Mode.READ_APPEND

        mmaps = []
        mmaps.extend(other_dataset)
        # Calculate the size of our new dataset arrays
        total_num_elements = (self.number_nonzero_values() if self.number_of_rows() > 0 else 0) + sum(
            [m.number_nonzero_values() for m in mmaps]
        )
        total_num_rows = self.number_of_rows() + sum([m.number_of_rows() for m in mmaps])

        # Create new arrays to store the data, colptr, and rowptr.
        with tempfile.TemporaryDirectory(prefix="_tmp", dir=self.data_path) as tmp:
            data_arr, col_arr, row_arr = _create_compressed_sparse_row_memmaps(
                num_elements=total_num_elements,
                num_rows=total_num_rows,
                memmap_dir_path=Path(tmp),
                mode=Mode.CREATE_APPEND,
                dtypes=self.dtypes,
            )
            # Copy the data from self and other into the new arrays.
            cumulative_elements = 0
            cumulative_rows = 0
            if self.number_of_rows() > 0:
                data_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.data.data
                col_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.col_index.data
                row_arr[cumulative_rows : cumulative_rows + self.number_of_rows() + 1] = self.row_index.data
                cumulative_elements += self.number_nonzero_values()
                cumulative_rows += self.number_of_rows()
            for mmap in mmaps:
                # Fill the data array for the span of this scmmap
                data_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.data.data
                # fill the col array for the span of this scmmap
                col_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.col_index.data
                # Fill the row array for the span of this scmmap
                row_arr[cumulative_rows : cumulative_rows + mmap.number_of_rows() + 1] = (
                    mmap.row_index + int(cumulative_elements)
                ).data

                self._feature_index.concat(mmap._feature_index)
                # Update counters
                cumulative_elements += mmap.number_nonzero_values()
                cumulative_rows += mmap.number_of_rows()
            # The arrays are swapped to ensure that the data remains stored at self.data_path and
            # not at a temporary filepath.
            _swap_mmap_array(
                data_arr,
                f"{tmp}/{FileNames.DATA.value}",
                self.data,
                f"{self.data_path}/{FileNames.DATA.value}",
                destroy_src=True,
            )
            _swap_mmap_array(
                col_arr,
                f"{tmp}/{FileNames.COLPTR.value}",
                self.col_index,
                f"{self.data_path}/{FileNames.COLPTR.value}",
                destroy_src=True,
            )
            _swap_mmap_array(
                row_arr,
                f"{tmp}/{FileNames.ROWPTR.value}",
                self.row_index,
                f"{self.data_path}/{FileNames.ROWPTR.value}",
                destroy_src=True,
            )
            # Reopen the data, colptr, and rowptr arrays
            self.data = np.memmap(
                f"{self.data_path}/{FileNames.DATA.value}",
                dtype=self.dtypes[f"{FileNames.DATA.value}"],
                shape=(cumulative_elements,),
                mode=Mode.READ_APPEND.value,
            )
            self.row_index = np.memmap(
                f"{self.data_path}/{FileNames.ROWPTR.value}",
                dtype=self.dtypes[f"{FileNames.ROWPTR.value}"],
                shape=(cumulative_rows + 1,),
                mode=Mode.READ_APPEND.value,
            )
            self.col_index = np.memmap(
                f"{self.data_path}/{FileNames.COLPTR.value}",
                dtype=self.dtypes[f"{FileNames.COLPTR.value}"],
                shape=(cumulative_elements,),
                mode=Mode.READ_APPEND.value,
            )

        self.save()

__getitem__(idx)

Get the row values located and index idx.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
695
696
697
def __getitem__(self, idx: int) -> torch.Tensor:
    """Get the row values located and index idx."""
    return torch.from_numpy(np.stack(self.get_row(idx)[0]))

__init__(data_path, h5ad_path=None, num_elements=None, num_rows=None, mode=Mode.READ_APPEND, paginated_load_cutoff=10000, load_block_row_size=1000000)

Instantiate the class.

Parameters:

Name Type Description Default
data_path str

The location where the data np.memmap files are read from

required
h5ad_path Optional[str]

Optional, the location of the h5_ad path.

None
num_elements Optional[int]

The total number of elements in the array.

None
num_rows Optional[int]

The number of rows in the data frame.

None
mode Mode

Whether to read or write from the data_path.

READ_APPEND
paginated_load_cutoff int

MB size on disk at which to load the h5ad structure with paginated load.

10000
load_block_row_size int

Number of rows to load into memory with paginated load

1000000
Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def __init__(
    self,
    data_path: str,
    h5ad_path: Optional[str] = None,
    num_elements: Optional[int] = None,
    num_rows: Optional[int] = None,
    mode: Mode = Mode.READ_APPEND,
    paginated_load_cutoff: int = 10_000,
    load_block_row_size: int = 1_000_000,
) -> None:
    """Instantiate the class.

    Args:
        data_path: The location where the data np.memmap files are read from
        or stored.
        h5ad_path: Optional, the location of the h5_ad path.
        num_elements: The total number of elements in the array.
        num_rows: The number of rows in the data frame.
        mode: Whether to read or write from the data_path.
        paginated_load_cutoff: MB size on disk at which to load the h5ad structure with paginated load.
        load_block_row_size: Number of rows to load into memory with paginated load
    """
    self._version: str = importlib.metadata.version("bionemo.scdl")
    self.data_path: str = data_path
    self.mode: Mode = mode
    self.paginated_load_cutoff = paginated_load_cutoff
    self.load_block_row_size = load_block_row_size
    # Backing arrays
    self.data: Optional[np.ndarray] = None
    self.row_index: Optional[np.ndarray] = None
    self.row_index: Optional[np.ndarray] = None

    # Metadata and attributes
    self.metadata: Dict[str, int] = {}

    # Stores the Feature Index, which tracks
    # the original AnnData features (e.g., gene names)
    # and allows us to store ragged arrays in our SCMMAP structure.
    self._feature_index: RowFeatureIndex = RowFeatureIndex()

    # Variables for int packing / reduced precision
    self.dtypes: Dict[FileNames, str] = {
        f"{FileNames.DATA.value}": "float32",
        f"{FileNames.COLPTR.value}": "uint32",
        f"{FileNames.ROWPTR.value}": "uint64",
    }

    if mode == Mode.CREATE_APPEND and os.path.exists(data_path):
        raise FileExistsError(f"Output directory already exists: {data_path}")

    if h5ad_path is not None and (data_path is not None and os.path.exists(data_path)):
        raise FileExistsError(
            "Invalid input; both an existing SCMMAP and an h5ad file were passed. "
            "Please pass either an existing SCMMAP or an h5ad file."
        )

    # If there is only a data path, and it exists already, load SCMMAP data.
    elif data_path is not None and os.path.exists(data_path):
        self.__init__obj()
        self.load(data_path)

    # If there is only an h5ad path, load the HDF5 data
    elif h5ad_path is not None:
        self.__init__obj()
        self.load_h5ad(h5ad_path)
    else:
        match num_rows, num_elements:
            case (int(), int()):
                self.__init__obj()
                self._init_arrs(num_elements=num_elements, num_rows=num_rows)
            case _:
                raise ValueError(
                    "An np.memmap path, an h5ad path, or the number of elements and rows is required" ""
                )

__init__obj()

Initializes the datapath and writes the version.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
308
309
310
311
312
313
314
315
def __init__obj(self):
    """Initializes the datapath and writes the version."""
    os.makedirs(self.data_path, exist_ok=True)

    # Write the version
    if not os.path.exists(f"{self.data_path}/{FileNames.VERSION.value}"):
        with open(f"{self.data_path}/{FileNames.VERSION.value}", "w") as vfi:
            json.dump(self.version(), vfi)

__len__()

Return the number of rows.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
691
692
693
def __len__(self):
    """Return the number of rows."""
    return self.number_of_rows()

concat(other_dataset)

Concatenates another SingleCellMemMapDataset to the existing one.

The data is stored in the same place as for the original data set. This necessitates using _swap_memmap_array.

Parameters:

Name Type Description Default
other_dataset Union[list[SingleCellMemMapDataset], SingleCellMemMapDataset]

A SingleCellMemMapDataset or a list of

required
Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
def concat(
    self,
    other_dataset: Union[list["SingleCellMemMapDataset"], "SingleCellMemMapDataset"],
) -> None:
    """Concatenates another SingleCellMemMapDataset to the existing one.

    The data is stored in the same place as for the original data set. This
    necessitates using _swap_memmap_array.

    Args:
        other_dataset: A SingleCellMemMapDataset or a list of
        SingleCellMemMapDatasets

    Raises:
       ValueError if the other dataset(s) are not of the same version or
       something of another type is passed in.
    """
    # Verify the other dataset or datasets are of the same type.
    match other_dataset:
        case self.__class__():
            other_dataset = [other_dataset]
        case list():
            pass
        case _:
            raise ValueError(
                f"Expecting either a {SingleCellMemMapDataset} or a list thereof. Actually got: {type(other_dataset)}"
            )

    for dataset in other_dataset:
        if self.version() != dataset.version():
            raise ValueError(
                f"""Incompatable versions: input version: {dataset.version()},
        this version:  {self.version}"""
            )

    # Set our mode:
    self.mode: Mode = Mode.READ_APPEND

    mmaps = []
    mmaps.extend(other_dataset)
    # Calculate the size of our new dataset arrays
    total_num_elements = (self.number_nonzero_values() if self.number_of_rows() > 0 else 0) + sum(
        [m.number_nonzero_values() for m in mmaps]
    )
    total_num_rows = self.number_of_rows() + sum([m.number_of_rows() for m in mmaps])

    # Create new arrays to store the data, colptr, and rowptr.
    with tempfile.TemporaryDirectory(prefix="_tmp", dir=self.data_path) as tmp:
        data_arr, col_arr, row_arr = _create_compressed_sparse_row_memmaps(
            num_elements=total_num_elements,
            num_rows=total_num_rows,
            memmap_dir_path=Path(tmp),
            mode=Mode.CREATE_APPEND,
            dtypes=self.dtypes,
        )
        # Copy the data from self and other into the new arrays.
        cumulative_elements = 0
        cumulative_rows = 0
        if self.number_of_rows() > 0:
            data_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.data.data
            col_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.col_index.data
            row_arr[cumulative_rows : cumulative_rows + self.number_of_rows() + 1] = self.row_index.data
            cumulative_elements += self.number_nonzero_values()
            cumulative_rows += self.number_of_rows()
        for mmap in mmaps:
            # Fill the data array for the span of this scmmap
            data_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.data.data
            # fill the col array for the span of this scmmap
            col_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.col_index.data
            # Fill the row array for the span of this scmmap
            row_arr[cumulative_rows : cumulative_rows + mmap.number_of_rows() + 1] = (
                mmap.row_index + int(cumulative_elements)
            ).data

            self._feature_index.concat(mmap._feature_index)
            # Update counters
            cumulative_elements += mmap.number_nonzero_values()
            cumulative_rows += mmap.number_of_rows()
        # The arrays are swapped to ensure that the data remains stored at self.data_path and
        # not at a temporary filepath.
        _swap_mmap_array(
            data_arr,
            f"{tmp}/{FileNames.DATA.value}",
            self.data,
            f"{self.data_path}/{FileNames.DATA.value}",
            destroy_src=True,
        )
        _swap_mmap_array(
            col_arr,
            f"{tmp}/{FileNames.COLPTR.value}",
            self.col_index,
            f"{self.data_path}/{FileNames.COLPTR.value}",
            destroy_src=True,
        )
        _swap_mmap_array(
            row_arr,
            f"{tmp}/{FileNames.ROWPTR.value}",
            self.row_index,
            f"{self.data_path}/{FileNames.ROWPTR.value}",
            destroy_src=True,
        )
        # Reopen the data, colptr, and rowptr arrays
        self.data = np.memmap(
            f"{self.data_path}/{FileNames.DATA.value}",
            dtype=self.dtypes[f"{FileNames.DATA.value}"],
            shape=(cumulative_elements,),
            mode=Mode.READ_APPEND.value,
        )
        self.row_index = np.memmap(
            f"{self.data_path}/{FileNames.ROWPTR.value}",
            dtype=self.dtypes[f"{FileNames.ROWPTR.value}"],
            shape=(cumulative_rows + 1,),
            mode=Mode.READ_APPEND.value,
        )
        self.col_index = np.memmap(
            f"{self.data_path}/{FileNames.COLPTR.value}",
            dtype=self.dtypes[f"{FileNames.COLPTR.value}"],
            shape=(cumulative_elements,),
            mode=Mode.READ_APPEND.value,
        )

    self.save()

features()

Return the corresponding RowFeatureIndex.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
413
414
415
def features(self) -> Optional[RowFeatureIndex]:
    """Return the corresponding RowFeatureIndex."""
    return self._feature_index

get_row(index, return_features=False, feature_vars=None)

Returns a given row in the dataset along with optional features.

Parameters:

Name Type Description Default
index int

The row to be returned. This is in the range of [0, num_rows)

required
return_features bool

boolean that indicates whether to return features

False
feature_vars Optional[List[str]]

Optional, feature variables to extract

None

Return: [Tuple[np.ndarray, np.ndarray]: data values and column pointes pd.DataFrame: optional, corresponding features.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def get_row(
    self,
    index: int,
    return_features: bool = False,
    feature_vars: Optional[List[str]] = None,
) -> Tuple[Tuple[np.ndarray, np.ndarray], pd.DataFrame]:
    """Returns a given row in the dataset along with optional features.

    Args:
        index: The row to be returned. This is in the range of [0, num_rows)
        return_features: boolean that indicates whether to return features
        feature_vars: Optional, feature variables to extract
    Return:
        [Tuple[np.ndarray, np.ndarray]: data values and column pointes
        pd.DataFrame: optional, corresponding features.
    """
    start = self.row_index[index]
    end = self.row_index[index + 1]
    values = self.data[start:end]
    columns = self.col_index[start:end]
    ret = (values, columns)
    if return_features:
        return ret, self._feature_index.lookup(index, select_features=feature_vars)[0]
    else:
        return ret, None

get_row_column(index, column, impute_missing_zeros=True)

Returns the value at a given index and the corresponding column.

Parameters:

Name Type Description Default
index int

The index to be returned

required
column int

The column to be returned

required
impute_missing_zeros bool

boolean that indicates whether to set missing

True

Return: A float that is the value in the array or None.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def get_row_column(self, index: int, column: int, impute_missing_zeros: bool = True) -> Optional[float]:
    """Returns the value at a given index and the corresponding column.

    Args:
        index: The index to be returned
        column: The column to be returned
        impute_missing_zeros: boolean that indicates whether to set missing
        data to 0
    Return:
        A float that is the value in the array or None.
    """
    (row_values, row_column_pointer), _ = self.get_row(index)
    if column is not None:
        for col_index, col in enumerate(row_column_pointer):
            if col == column:
                # return the value at this position
                return row_values[col_index]
            elif col > column:
                try:
                    raise ValueError(f"Column pointer {col} is larger than the column {column}.")
                except ValueError:
                    break
        return 0.0 if impute_missing_zeros else None

get_row_padded(index, return_features=False, feature_vars=None)

Returns a padded version of a row in the dataset.

A padded version is one where the a sparse array representation is converted to a conventional represenentation. Optionally, features are returned.

Parameters:

Name Type Description Default
index int

The row to be returned

required
return_features bool

boolean that indicates whether to return features

False
feature_vars Optional[List[str]]

Optional, feature variables to extract

None

Return: np.ndarray: conventional row representation pd.DataFrame: optional, corresponding features.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def get_row_padded(
    self,
    index: int,
    return_features: bool = False,
    feature_vars: Optional[List[str]] = None,
) -> Tuple[np.ndarray, pd.DataFrame]:
    """Returns a padded version of a row in the dataset.

    A padded version is one where the a sparse array representation is
    converted to a conventional represenentation. Optionally, features are
    returned.

    Args:
        index: The row to be returned
        return_features: boolean that indicates whether to return features
        feature_vars: Optional, feature variables to extract
    Return:
        np.ndarray: conventional row representation
        pd.DataFrame: optional, corresponding features.
    """
    (row_values, row_column_pointer), features = self.get_row(index, return_features, feature_vars)
    return (
        _pad_sparse_array(row_values, row_column_pointer, self._feature_index.number_vars_at_row(index)),
        features,
    )

load(stored_path)

Loads the data at store_path that is an np.memmap format.

Parameters:

Name Type Description Default
stored_path str

directory with np.memmap files

required

Raises: FileNotFoundError if the corresponding directory or files are not found, or if the metadata file is not present.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def load(self, stored_path: str) -> None:
    """Loads the data at store_path that is an np.memmap format.

    Args:
        stored_path: directory with np.memmap files
    Raises:
        FileNotFoundError if the corresponding directory or files are not
        found, or if the metadata file is not present.
    """
    if not os.path.exists(stored_path):
        raise FileNotFoundError(
            f"""Error: the specified data path to the mmap files {stored_path} does not exist.
                                Specify an updated filepath or provide an h5ad path to the dataset. The data can
                                be loaded with SingleCellMemMapDataset.load_h5ad. Alternatively, the class can be instantiated
                                with  SingleCellMemMapDataset(<path to data that will be created>, h5ad_path=<path to h5ad file>"""
        )
    self.data_path = stored_path
    self.mode = Mode.READ_APPEND

    # Metadata is required, so we must check if it exists and fail if not.
    if not os.path.exists(f"{self.data_path}/{FileNames.METADATA.value}"):
        raise FileNotFoundError(
            f"Error: the metadata file {self.data_path}/{FileNames.METADATA.value} does not exist."
        )

    with open(f"{self.data_path}/{FileNames.METADATA.value}", Mode.READ_APPEND.value) as mfi:
        self.metadata = json.load(mfi)

    if os.path.exists(f"{self.data_path}/{FileNames.FEATURES.value}"):
        self._feature_index = RowFeatureIndex.load(f"{self.data_path}/{FileNames.FEATURES.value}")

    if os.path.exists(f"{self.data_path}/{FileNames.DTYPE.value}"):
        with open(f"{self.data_path}/{FileNames.DTYPE.value}") as dfi:
            self.dtypes = json.load(dfi)

    # mmap the existing arrays
    self.data = self._load_mmap_file_if_exists(
        f"{self.data_path}/{FileNames.DATA.value}", self.dtypes[f"{FileNames.DATA.value}"]
    )
    self.row_index = self._load_mmap_file_if_exists(
        f"{self.data_path}/{FileNames.ROWPTR.value}", dtype=self.dtypes[f"{FileNames.ROWPTR.value}"]
    )
    self.col_index = self._load_mmap_file_if_exists(
        f"{self.data_path}/{FileNames.COLPTR.value}", dtype=self.dtypes[f"{FileNames.COLPTR.value}"]
    )

load_h5ad(anndata_path)

Loads an existing AnnData archive from disk.

This creates a new backing data structure which is saved. Note: the storage utilized will roughly double. Currently, the data must be in a scipy.sparse.spmatrix format.

Parameters:

Name Type Description Default
anndata_path str

location of data to load

required

Raises: FileNotFoundError if the data path does not exist. NotImplementedError if the data is not in scipy.sparse.spmatrix format ValueError it there is not count data

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
def load_h5ad(
    self,
    anndata_path: str,
) -> None:
    """Loads an existing AnnData archive from disk.

    This creates a new backing data structure which is saved.
    Note: the storage utilized will roughly double. Currently, the data must
    be in a scipy.sparse.spmatrix format.

    Args:
        anndata_path: location of data to load
    Raises:
        FileNotFoundError if the data path does not exist.
        NotImplementedError if the data is not in scipy.sparse.spmatrix
        format
        ValueError it there is not count data
    """
    if not os.path.exists(anndata_path):
        raise FileNotFoundError(f"Error: could not find h5ad path {anndata_path}")
    file_size_MB = os.path.getsize(anndata_path) / (1_024**2)

    if file_size_MB < self.paginated_load_cutoff:
        features, num_rows = self.regular_load_h5ad(anndata_path)

    else:
        features, num_rows = self.paginated_load_h5ad(anndata_path)

    # Collect features and store in FeatureIndex
    self._feature_index.append_features(n_obs=num_rows, features=features, label=anndata_path)

    self.save()

number_nonzero_values()

Number of non zero entries in the dataset.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
687
688
689
def number_nonzero_values(self) -> int:
    """Number of non zero entries in the dataset."""
    return self.data.size

number_of_rows()

The number of rows in the dataset.

Returns:

Type Description
int

The number of rows in the dataset

Raises: ValueError if the length of the number of rows in the feature index does not correspond to the number of stored rows.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
def number_of_rows(self) -> int:
    """The number of rows in the dataset.

    Returns:
        The number of rows in the dataset
    Raises:
        ValueError if the length of the number of rows in the feature
        index does not correspond to the number of stored rows.
    """
    if len(self._feature_index) > 0 and self._feature_index.number_of_rows() != self.row_index.size - 1:
        raise ValueError(
            f"""The nuber of rows in the feature index {self._feature_index.number_of_rows()}
                         does not correspond to the number of rows in the row_index {self.row_index.size - 1}"""
        )
    return self._feature_index.number_of_rows()

number_of_values()

Get the total number of values in the array.

For each index, the length of the corresponding dataframe is counted.

Returns:

Type Description
int

The sum of lengths of the features in every row

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
661
662
663
664
665
666
667
668
669
def number_of_values(self) -> int:
    """Get the total number of values in the array.

    For each index, the length of the corresponding dataframe is counted.

    Returns:
        The sum of lengths of the features in every row
    """
    return sum(self._feature_index.number_of_values())

number_of_variables()

Get the number of features in every entry in the dataset.

Returns:

Type Description
List[int]

A list containing the lengths of the features in every row

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
699
700
701
702
703
704
705
706
707
708
709
def number_of_variables(self) -> List[int]:
    """Get the number of features in every entry in the dataset.

    Returns:
        A list containing the lengths of the features in every row
    """
    feats = self._feature_index
    if len(feats) == 0:
        return [0]
    num_vars = feats.column_dims()
    return num_vars

paginated_load_h5ad(anndata_path)

Method for block loading a larger h5ad file and converting it to the SCDL format.

This should be used in the case when the entire anndata file cannot be loaded into memory. The anndata is loaded into memory load_block_row_size number of rows at a time. Each chunk is converted into numpy memory maps which are then concatenated together.

Returns:

Name Type Description
DataFrame

pd.DataFrame: var variables for features

int int

number of rows in the dataframe.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
def paginated_load_h5ad(
    self,
    anndata_path: str,
) -> Tuple[pd.DataFrame, int]:
    """Method for block loading a larger h5ad file and converting it to the SCDL format.

    This should be used in the case when the entire anndata file cannot be loaded into memory.
    The anndata is loaded into memory load_block_row_size number of rows at a time. Each chunk
    is converted into numpy memory maps which are then concatenated together.

    Raises:
        NotImplementedError if the data is not loaded in the CSRDataset format.

    Returns:
        pd.DataFrame: var variables for features
        int: number of rows in the dataframe.
    """
    adata = ad.read_h5ad(anndata_path, backed=True)

    if not isinstance(adata.X, ad.experimental.CSRDataset):
        raise NotImplementedError("Non-sparse format cannot be loaded: {type(adata.X)}.")
    num_rows = adata.X.shape[0]

    self.dtypes[f"{FileNames.DATA.value}"] = adata.X.dtype

    # Read the row indices into a memory map.
    mode = Mode.CREATE_APPEND
    self.row_index = _create_row_memmaps(num_rows, Path(self.data_path), mode, self.dtypes)
    self.row_index[:] = adata.X._indptr.astype(int)

    # The data from each column and data chunk of the original anndata file is read in. This is saved into the final
    # location of the memmap file. In this step, it is saved in the binary file format.
    memmap_dir_path = Path(self.data_path)
    with (
        open(f"{memmap_dir_path}/{FileNames.COLPTR.value}", "wb") as col_file,
        open(f"{memmap_dir_path}/{FileNames.DATA.value}", "wb") as data_file,
    ):
        n_elements = 0
        for row_start in range(0, num_rows, self.load_block_row_size):
            # Write each array's data to the file in binary format
            col_block = adata.X[row_start : row_start + self.load_block_row_size].indices
            col_file.write(col_block.tobytes())

            data_block = adata.X[row_start : row_start + self.load_block_row_size].data
            data_file.write(data_block.tobytes())

            n_elements += len(data_block)

    # The column and data files are re-opened as memory-mapped arrays with the final shape
    mode = Mode.READ_APPEND
    self.col_index = np.memmap(
        f"{memmap_dir_path}/{FileNames.COLPTR.value}",
        self.dtypes[f"{FileNames.COLPTR.value}"],
        mode=mode,
        shape=(n_elements,),
    )
    self.data = np.memmap(
        f"{memmap_dir_path}/{FileNames.DATA.value}",
        dtype=self.dtypes[f"{FileNames.DATA.value}"],
        mode=mode,
        shape=(n_elements,),
    )
    return adata.var, num_rows

regular_load_h5ad(anndata_path)

Method for loading an h5ad file into memorySu and converting it to the SCDL format.

Parameters:

Name Type Description Default
anndata_path str

location of data to load

required

Raises: NotImplementedError if the data is not in scipy.sparse.spmatrix format ValueError it there is not count data Returns: pd.DataFrame: var variables for features int: number of rows in the dataframe.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
def regular_load_h5ad(
    self,
    anndata_path: str,
) -> Tuple[pd.DataFrame, int]:
    """Method for loading an h5ad file into memorySu and converting it to the SCDL format.

    Args:
        anndata_path: location of data to load
    Raises:
        NotImplementedError if the data is not in scipy.sparse.spmatrix format
        ValueError it there is not count data
    Returns:
        pd.DataFrame: var variables for features
        int: number of rows in the dataframe.

    """
    adata = ad.read_h5ad(anndata_path)  # slow

    if not isinstance(adata.X, scipy.sparse.spmatrix):
        raise NotImplementedError("Error: dense matrix loading not yet implemented.")

    # Check if raw data is present
    raw = getattr(adata, "raw", None)
    count_data = None
    if raw is not None:
        # If it is, attempt to get the counts in the raw data.
        count_data = getattr(raw, "X", None)

    if count_data is None:
        # No raw counts were present, resort to normalized
        count_data = getattr(adata, "X")
    if count_data is None:
        raise ValueError("This file does not have count data")

    shape = count_data.shape
    num_rows = shape[0]

    num_elements_stored = count_data.nnz

    self.dtypes[f"{FileNames.DATA.value}"] = count_data.dtype

    # Create the arrays.
    self._init_arrs(num_elements_stored, num_rows)
    # Store data
    self.data[0:num_elements_stored] = count_data.data

    # Store the col idx array
    self.col_index[0:num_elements_stored] = count_data.indices.astype(int)

    # Store the row idx array
    self.row_index[0 : num_rows + 1] = count_data.indptr.astype(int)

    return adata.var, num_rows

save(output_path=None)

Saves the class to a given output path.

Parameters:

Name Type Description Default
output_path Optional[str]

The location to save - not yet implemented and should

None
Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
def save(self, output_path: Optional[str] = None) -> None:
    """Saves the class to a given output path.

    Args:
        output_path: The location to save - not yet implemented and should
        be self.data_path

    Raises:
       NotImplementedError if output_path is not None.
    """
    if f"{METADATA.NUM_ROWS.value}" not in self.metadata:
        self.metadata[f"{METADATA.NUM_ROWS.value}"] = self.number_of_rows()

    self._write_metadata()
    # Write the feature index. This may not exist.
    self._feature_index.save(f"{self.data_path}/{FileNames.FEATURES.value}")

    # Ensure the object is in a valid state. These are saved at creation!
    for postfix in [
        f"{FileNames.VERSION.value}",
        f"{FileNames.DATA.value}",
        f"{FileNames.COLPTR.value}",
        f"{FileNames.ROWPTR.value}",
        f"{FileNames.FEATURES.value}",
    ]:
        if not os.path.exists(f"{self.data_path}/{postfix}"):
            raise FileNotFoundError(f"This file should exist from object creation: {self.data_path}/{postfix}")

    self.data.flush()
    self.row_index.flush()
    self.col_index.flush()

    if output_path is not None:
        raise NotImplementedError("Saving to separate path is not yet implemented.")

    return True

shape()

Get the shape of the dataset.

This is the number of entries by the the length of the feature index corresponding to that variable.

Returns:

Type Description
int

The number of elements in the dataset

List[int]

A list containing the number of variables for each row.

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
711
712
713
714
715
716
717
718
719
720
721
def shape(self) -> Tuple[int, List[int]]:
    """Get the shape of the dataset.

    This is the number of entries by the the length of the feature index
    corresponding to that variable.

    Returns:
        The number of elements in the dataset
        A list containing the number of variables for each row.
    """
    return self.number_of_rows(), self.number_of_variables()

version()

Returns a version number.

(following .. convention).

Source code in bionemo/scdl/io/single_cell_memmap_dataset.py
330
331
332
333
334
335
def version(self) -> str:
    """Returns a version number.

    (following <major>.<minor>.<point> convention).
    """
    return self._version