Skip to content

Filecopyutil

extend_files(first, second, source_dtype, dest_dtype, elements_per_chunk=10 * 1024 * 1024, delete_file2_on_complete=False, offset=0, add_value=None, allow_downscaling=False)

Concatenates the contents of second into first using memory-efficient operations.

Supports optional dtype conversion for upscaling within the same family only: - uint upscaling: uint8 → uint16 → uint32 → uint64 - float upscaling: float32 → float64

Additionally, supports adding a scalar value to each converted element during copy.

Parameters: - first (str): Destination file path (will be extended). - second (str): Source file path (data read from here). - source_dtype (str): Source numpy dtype (e.g., 'uint32', 'float32'). - dest_dtype (str): Destination numpy dtype (e.g., 'uint64', 'float32'). - elements_per_chunk (int): Number of elements to read/write per chunk. - delete_file2_on_complete (bool): Whether to delete the source after completion. - offset (int): Byte offset to start reading within the source file. - add_value (int | None): Optional scalar added to each converted element (after casting). - allow_downscaling (bool): Whether to allow downscaling of the data dtype.

Raises: - ValueError: If conversion is not a safe upscaling operation.

Source code in bionemo/scdl/util/filecopyutil.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def extend_files(
    first: str,
    second: str,
    source_dtype: str,
    dest_dtype: str,
    elements_per_chunk: int = 10 * 1024 * 1024,
    delete_file2_on_complete: bool = False,
    offset: int = 0,
    add_value: int | None = None,
    allow_downscaling: bool = False,
):
    """Concatenates the contents of `second` into `first` using memory-efficient operations.

    Supports optional dtype conversion for upscaling within the same family only:
    - uint upscaling: uint8 → uint16 → uint32 → uint64
    - float upscaling:  float32 → float64

    Additionally, supports adding a scalar value to each converted element during copy.

    Parameters:
    - first (str): Destination file path (will be extended).
    - second (str): Source file path (data read from here).
    - source_dtype (str): Source numpy dtype (e.g., 'uint32', 'float32').
    - dest_dtype (str): Destination numpy dtype (e.g., 'uint64', 'float32').
    - elements_per_chunk (int): Number of elements to read/write per chunk.
    - delete_file2_on_complete (bool): Whether to delete the source after completion.
    - offset (int): Byte offset to start reading within the source file.
    - add_value (int | None): Optional scalar added to each converted element (after casting).
    - allow_downscaling (bool): Whether to allow downscaling of the data dtype.

    Raises:
    - ValueError: If conversion is not a safe upscaling operation.

    """
    if offset < 0 or offset % np.dtype(source_dtype).itemsize != 0:
        raise ValueError(
            f"Offset {offset} must be non-negative and divisible by source dtype size {np.dtype(source_dtype).itemsize}"
        )
    if not allow_downscaling:
        if source_dtype in INT_ORDER and dest_dtype in INT_ORDER:
            order = INT_ORDER
        elif source_dtype in FLOAT_ORDER and dest_dtype in FLOAT_ORDER:
            order = FLOAT_ORDER
        else:
            raise ValueError(
                f"Unsupported dtype conversion: {source_dtype}{dest_dtype}. Only same-family upscaling allowed."
            )
        if order.index(dest_dtype) < order.index(source_dtype):
            raise ValueError(f"Downscaling not allowed: {source_dtype}{dest_dtype}.")

    # Resolve dtypes once (native endianness) and sizes
    source_dtype = np.dtype(source_dtype).newbyteorder("=")
    dest_dtype = np.dtype(dest_dtype).newbyteorder("=")
    src_item = source_dtype.itemsize
    dst_item = dest_dtype.itemsize
    # Pre-cast scalar once to destination dtype for speed
    add_scalar = None
    if add_value is not None and add_value != 0:
        add_scalar = np.array(add_value, dtype=dest_dtype).item()

    # Source sizing
    size2 = os.path.getsize(second)
    remaining = size2 - offset
    if remaining % src_item != 0:
        raise ValueError(
            f"Source size minus offset ({remaining} bytes) not divisible by source dtype size ({src_item})."
        )
    num_elements = remaining // src_item

    # Pre-extend destination to final size
    extend_bytes = num_elements * dst_item
    size1 = os.path.getsize(first)
    with open(first, "r+b") as f_dest:
        if extend_bytes > 0:
            f_dest.seek(size1 + extend_bytes - 1)
            f_dest.write(b"\0")

        write_position = size1

        # Reusable output buffer
        out_buf = bytearray(elements_per_chunk * dst_item)

        with open(second, "rb") as f_source:
            if offset > 0:
                f_source.seek(offset)

            elements_processed = 0
            while elements_processed < num_elements:
                target_elements = min(elements_per_chunk, num_elements - elements_processed)
                bytes_to_read = target_elements * src_item

                chunk_bytes = f_source.read(bytes_to_read)
                if not chunk_bytes:
                    # Unexpected EOF
                    raise OSError(f"Short read at element {elements_processed}: expected {bytes_to_read} bytes, got 0")

                # Derive actual elements from bytes read to tolerate partial reads
                actual_elements = len(chunk_bytes) // src_item
                if actual_elements == 0:
                    continue

                if source_dtype == dest_dtype and add_scalar is None and len(chunk_bytes) == bytes_to_read:
                    dst_mv = chunk_bytes
                else:
                    src = np.frombuffer(chunk_bytes, dtype=source_dtype, count=actual_elements)
                    dst_mv = memoryview(out_buf)[: actual_elements * dst_item]
                    dst = np.frombuffer(dst_mv, dtype=dest_dtype, count=actual_elements)
                    if add_scalar is not None:
                        np.add(src.astype(dest_dtype, copy=False), add_scalar, out=dst)
                    else:
                        safe_casting = "unsafe" if allow_downscaling else "safe"
                        np.copyto(dst, src, casting=safe_casting)

                f_dest.seek(write_position)
                f_dest.write(dst_mv)
                write_position += len(dst_mv)
                elements_processed += actual_elements

    if delete_file2_on_complete:
        os.remove(second)