Câu hỏi Các đối tượng bộ nhớ chia sẻ trong đa xử lý


Giả sử tôi có một mảng lớn trong bộ nhớ, tôi có một hàm func mà có trong mảng khổng lồ này làm đầu vào (cùng với một số tham số khác). func với các tham số khác nhau có thể chạy song song. Ví dụ:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Nếu tôi sử dụng thư viện đa xử lý, thì mảng khổng lồ đó sẽ được sao chép nhiều lần vào các quy trình khác nhau.

Có cách nào để cho các quy trình khác nhau chia sẻ cùng một mảng không? Đối tượng mảng này là chỉ đọc và sẽ không bao giờ bị sửa đổi.

Điều gì phức tạp hơn, nếu arr không phải là một mảng, nhưng một đối tượng python tùy ý, có cách nào để chia sẻ nó không?

[CHỈNH SỬA]

Tôi đọc câu trả lời nhưng tôi vẫn còn chút bối rối. Vì fork () là copy-on-write, chúng ta không nên gọi bất kỳ chi phí bổ sung nào khi sinh ra các quy trình mới trong thư viện đa xử lý python. Nhưng đoạn mã sau gợi ý rằng có một chi phí khổng lồ:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

đầu ra (và bằng cách này, chi phí tăng lên khi kích thước của mảng tăng lên, vì vậy tôi nghi ngờ vẫn còn chi phí liên quan đến việc sao chép bộ nhớ):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Tại sao có chi phí khổng lồ như vậy, nếu chúng ta không sao chép mảng? Và phần bộ nhớ chia sẻ tiết kiệm cho tôi những gì?


76
2018-05-23 14:20


gốc


có thể trùng lặp Dữ liệu chỉ đọc được chia sẻ có được sao chép sang các quy trình khác nhau để xử lý đa nhân Python không? - Francis Avila
Bạn đã xem xét tài liệu, đúng? - Lev Levitsky
@FrancisAvila là có một cách để chia sẻ không chỉ mảng, nhưng các đối tượng python tùy ý? - CodeNoob
@LevLevitsky Tôi phải hỏi, là có một cách để chia sẻ không chỉ mảng, nhưng các đối tượng python tùy ý? - CodeNoob
Câu trả lời này giải thích rõ ràng tại sao không thể chia sẻ các đối tượng Python tùy ý. - Janne Karila


Các câu trả lời:


Nếu bạn sử dụng hệ điều hành sử dụng tính năng sao chép trên ghi fork() ngữ nghĩa (giống như bất kỳ unix phổ biến), sau đó miễn là bạn không bao giờ thay đổi cấu trúc dữ liệu của bạn nó sẽ có sẵn cho tất cả các tiến trình con mà không chiếm bộ nhớ bổ sung. Bạn sẽ không phải làm bất cứ điều gì đặc biệt (ngoại trừ việc chắc chắn bạn không thay đổi đối tượng).

Điều hiệu quả nhất bạn có thể làm cho vấn đề của bạn sẽ đóng gói mảng của bạn thành một cấu trúc mảng hiệu quả (sử dụng numpy hoặc là array), đặt trong bộ nhớ dùng chung, hãy bọc nó với multiprocessing.Arrayvà chuyển nó cho các chức năng của bạn. Câu trả lời này cho thấy cách thực hiện điều đó.

Nếu bạn muốn có thể ghi chia sẻ đối tượng, sau đó bạn sẽ cần phải quấn nó với một số loại đồng bộ hóa hoặc khóa. multiprocessing cung cấp hai phương pháp làm điều này: một sử dụng bộ nhớ dùng chung (phù hợp với các giá trị đơn giản, mảng hoặc ctypes) hoặc một Manager proxy, trong đó một tiến trình giữ bộ nhớ và một người quản lý phân xử truy cập vào nó từ các tiến trình khác (thậm chí qua mạng).

Các Manager phương pháp tiếp cận có thể được sử dụng với các đối tượng Python tùy ý, nhưng sẽ chậm hơn so với việc sử dụng bộ nhớ dùng chung vì các đối tượng cần được tuần tự hóa / deserialized và gửi giữa các tiến trình.

Có một sự giàu có của các thư viện và phương thức xử lý song song có sẵn trong Python. multiprocessing là một thư viện tuyệt vời và được làm tròn, nhưng nếu bạn có nhu cầu đặc biệt, có lẽ một trong những cách tiếp cận khác có thể tốt hơn.


76
2018-05-23 16:42



Chỉ cần lưu ý, trên Python fork () thực sự có nghĩa là sao chép trên truy cập (vì chỉ cần truy cập vào đối tượng sẽ thay đổi số đếm của nó). - Fabio Zadrozny
@FabioZadrozny Liệu nó có thực sự sao chép toàn bộ đối tượng, hay chỉ là trang bộ nhớ chứa số refcount của nó? - zigg
AFAIK, chỉ có trang bộ nhớ chứa số lần truy cập (vì vậy, 4kb trên mỗi đối tượng truy cập). - Fabio Zadrozny
@max Sử dụng một đóng cửa. Hàm được gán cho apply_async nên tham chiếu đối tượng được chia sẻ trong phạm vi trực tiếp thay vì thông qua các đối số của nó. - Francis Avila
@FrancisAvila làm thế nào để bạn sử dụng một đóng cửa? Không nên chọn hàm mà bạn cung cấp cho apply_async? Hoặc đây chỉ là hạn chế map_async? - GermanK


Tôi chạy vào cùng một vấn đề và đã viết một lớp tiện ích chia sẻ bộ nhớ nhỏ để làm việc xung quanh nó.

Tôi đang sử dụng multiprocessing.RawArray (lockfree), và cũng có thể truy cập vào các mảng không được đồng bộ ở tất cả (lockfree), hãy cẩn thận không để bắn chân của riêng bạn.

Với giải pháp tôi nhận được tăng tốc bởi một yếu tố khoảng 3 trên một i7 quad-core.

Đây là mã: Hãy sử dụng và cải thiện nó, và vui lòng báo cáo lại bất kỳ lỗi nào.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

11
2018-05-15 10:55



Chỉ cần nhận ra rằng bạn phải thiết lập mảng bộ nhớ chia sẻ của bạn trước khi bạn tạo hồ bơi đa xử lý, không biết tại sao nhưng nó chắc chắn sẽ không hoạt động theo cách khác. - martin.preinfalk
lý do tại sao là Multiprocessing pool gọi fork () khi Pool được khởi tạo, vì vậy bất cứ thứ gì sau đó sẽ không nhận được quyền truy cập tới con trỏ tới bất kỳ mem được chia sẻ nào được tạo sau đó. - Xiv
Khi tôi thử mã này dưới py35, tôi có ngoại lệ trong multiprocessing.sharedctypes.py, vì vậy tôi đoán mã này chỉ dành cho py2. - Dr. Hillier Dániel