问题背景

在使用pandas.apply函数时,遇到这样一个问题: 在pandas.apply(func,axis=1) 的函数func中,直接对传入的行进行操作, 例如:

df = pd.DataFrame({
    'A': range(5),
    'B': range(5, 10)
})
buffer=[]
def func(row):
    if row['A'] > 2:
        row['A'] = 999
    else:
        row['A'] = 888
    buffer.append(row) # 
    # buffer.append(row.to_dict()) # 正常
    row_id = id(row)
    ids.append(row_id)

df.apply(func, axis=1)

会发生以下现象:

  1. func中,直接对row进行修改时,执行速度相比 使用row.to_dict() 重新赋值慢10倍不止。
  2. 当把修改后的row添加到全局变量buffer中时,输出的值都相同。

pandas的官方文档提到:在使用UDF(用户自定函数)时,迭代容器时不应改变容器,否则可能导致要访问的数据提前遭到修改和删除。这也与普适的编程思想一致。

但当我对传入的row参数进行添加,并存储到buffer中,所产生的现象却不是前文所能解释的,我需要对apply的传参方式进行探究。

apply传入参数方式及其问题

使用id()函数查看apply传入参数在python中的唯一id,发现每个传入的row的id都一样,尽管他们的值不相同。

def non_parallel_modifyparam_function(row):
    """
    模拟不可并行化的函数,并且对传入参数进行修改
    使用time.sleep模拟耗时
    """
    # time.sleep(0.00001)  # 模拟I/O延迟
    global buffer
    print(f"modifyparam:{id(row)}")

发现输出的id都相同,但是row每次的值不同

import pandas as pd

# 创建一个简单的 DataFrame
df = pd.DataFrame({
    'A': range(5),
    'B': range(5, 10)
})

# 定义一个函数来打印每行的 id 并记录下来
ids = []

def record_id(row):
    row_id = id(row)
    ids.append(row_id)
    print(f"id of row: {row_id}")

# 应用函数
df.apply(record_id, axis=1)

# 检查所有 id 是否相同
print(f"All ids are the same: {len(set(ids)) == 1}")

输出:

id of row: 139715457879392
id of row: 139715457879392
id of row: 139715457879392
id of row: 139715457879392
id of row: 139715457879392
All ids are the same: True

这说明,实际上虽然apply占有一整个dataframe,传入时并不是每行新建一个series对象传入,他们的唯一标识都是一样的。apply的行传入可能有某种重用机制。

python的id()

id() 函数返回的是对象的唯一标识符,通常称为对象的“身份”(identity)。这个标识符是一个整数,它在对象的生命周期内是唯一的且恒定的。对于 CPython 解释器而言,这个标识符实际上是对象在内存中的地址

为什么修改传入参数,记录到buffer中后,结果都一样?

import pandas as pd

# 创建一个简单的 DataFrame
df = pd.DataFrame({
    'A': range(5),
    'B': range(5, 10)
})

# 定义一个函数来打印每行的 id 并记录下来
ids = []

# 添加到buffer中
buffer = []
def record_id(row):
    if row['A'] > 2:
        row['A'] = 999
    else:
        row['A'] = 888
    buffer.append(row) # 
    # buffer.append(row.to_dict()) # 正常
    row_id = id(row)
    ids.append(row_id)

# 应用函数
df.apply(record_id, axis=1)

# 检查所有 id 是否相同
print(f"All ids are the same: {len(set(ids)) == 1}")

print(df)

for i in buffer:
    print(str(i))
# print(buffer)
# 输出如下:
# A    999
# B      9
# Name: 4, dtype: int64
# A    999
# B      9
# Name: 4, dtype: int64
# A    999
# B      9
# Name: 4, dtype: int64
# A    999
# B      9
# Name: 4, dtype: int64
# A    999
# B      9
# Name: 4, dtype: int64

问题出在 row 对象上。当 apply 方法中使用 axis=1 参数时,传入的Series对象实际上是对同一个内部对象的引用。(每行数据的一个视图) 从始至终只用到一个对象,一个地址,但是每次迭代更新这个对象的值。

所以,当 record_id 函数修改了 row 对象后,它实际上是在修改一个共享的对象。因此,当你将 row 添加到 buffer 列表中时,实际上是在多次添加同一个对象的引用,里面所有的引用地址都一样。

当循环结束后,buffer 列表中的所有元素都指向了最后一次修改后的 row 对象,也就是 ‘A’ 列值被设置为 999 的那行数据。

尽管引用相同,但对象的内容在每次迭代中被更新为当前行的数据。这就是传入的参数中,地址一样,值不一样的原因。

源码阅读

查看df.apply 的定义,跳转到pandas/core/frame.py:

class DataFrame(NDFrame, OpsMixin):
	def apply(
        self,
        func: AggFuncType,
        axis: Axis = 0,
        raw: bool = False,
        result_type: Literal["expand", "reduce", "broadcast"] | None = None,
        args=(),
        by_row: Literal[False, "compat"] = "compat",
        engine: Literal["python", "numba"] = "python",
        engine_kwargs: dict[str, bool] | None = None,
        **kwargs,
    ):
	    from pandas.core.apply import frame_apply
	
		op = frame_apply(
			self,
			func=func,
			axis=axis,
			raw=raw,
			result_type=result_type,
			by_row=by_row,
			engine=engine,
			engine_kwargs=engine_kwargs,
			args=args,
			kwargs=kwargs,
		)
	    return op.apply().__finalize__(self, method="apply")

frame_apply方法实现

def frame_apply(
    obj: DataFrame,
    func: AggFuncType,
    axis: Axis = 0,
    raw: bool = False,
    result_type: str | None = None,
    by_row: Literal[False, "compat"] = "compat",
    engine: str = "python",
    engine_kwargs: dict[str, bool] | None = None,
    args=None,
    kwargs=None,
) -> FrameApply:
    """construct and return a row or column based frame apply object"""
    axis = obj._get_axis_number(axis)
    klass: type[FrameApply]
    if axis == 0:
        klass = FrameRowApply
    elif axis == 1:
        klass = FrameColumnApply

    _, func, _, _ = reconstruct_func(func, **kwargs)
    assert func is not None

    return klass(
        obj,
        func,
        raw=raw,
        result_type=result_type,
        by_row=by_row,
        engine=engine,
        engine_kwargs=engine_kwargs,
        args=args,
        kwargs=kwargs,
    )

可以看到,当axis=1时,其返回了一个FrameColumnApply对象,看来答案就在这个类的实现中了: 同样在pandas/core/apply.py中,部分实现如下:

class FrameColumnApply(FrameApply):
    axis: AxisInt = 1

    def apply_broadcast(self, target: DataFrame) -> DataFrame:
        result = super().apply_broadcast(target.T)
        return result.T

    @property
    def series_generator(self) -> Generator[Series, None, None]:
        values = self.values
        values = ensure_wrapped_if_datetimelike(values)
        assert len(values) > 0

        # We create one Series object, and will swap out the data inside
        #  of it.  Kids: don't do this at home.
        ser = self.obj._ixs(0, axis=0)
        mgr = ser._mgr

        is_view = mgr.blocks[0].refs.has_reference()  # type: ignore[union-attr]

        if isinstance(ser.dtype, ExtensionDtype):
            # values will be incorrect for this block
            # TODO(EA2D): special case would be unnecessary with 2D EAs
            obj = self.obj
            for i in range(len(obj)):
                yield obj._ixs(i, axis=0)

        else:
            for arr, name in zip(values, self.index):
                # GH#35462 re-pin mgr in case setitem changed it
                ser._mgr = mgr
                mgr.set_values(arr)
                object.__setattr__(ser, "_name", name)
                if not is_view:
                    # In apply_series_generator we store the a shallow copy of the
                    # result, which potentially increases the ref count of this reused
                    # `ser` object (depending on the result of the applied function)
                    # -> if that happened and `ser` is already a copy, then we reset
                    # the refs here to avoid triggering a unnecessary CoW inside the
                    # applied function (https://github.com/pandas-dev/pandas/pull/56212)
                    mgr.blocks[0].refs = BlockValuesRefs(mgr.blocks[0])  # type: ignore[union-attr]
                yield ser

这个类通过series_generator 方法生成一系列的series对象,在生成 Series 对象时,有一个重要的步骤是重用同一个 Series 对象,并在每次迭代中更新其内部数据

根据注释,他们创建了一个series对象ser,并在每次返回时交换这个对象的内部数据。mgr 是 Series 的数据管理器,通过 mgr.set_values(arr) 更新 Series 的数据。

pandas的开发者希望返回的Series是一个视图,并且,他们通过is_view 来确定,返回的ser 是否真的是一个引用。当ser不是一个引用时,会造成不必要的拷贝(CoW Copy-on-Write)

具体实现为:is_view = mgr.blocks[0].refs.has_reference() ,这一行代码检查 mgr 的第一个数据块是否有引用。如果有引用,说明 Series 对象是一个视图。

Reference