0

Comparing codes from multiple data sets and assigning new codes or keeping the one they have if not used. Need to integrate a queue or shared memory so multiple processes can run different shards at the same time. As it is, the script will send out the same "new code" multiple times.

import pyspark.sql.functions as F
import pyspark.sql.types as T
import random
    
    used_codes = []
    new_codes = []
    
    def generate_code():
        random_number = random.randint(1000,9000)
        return random_number
    
    
    def create_codes():
        global new_codes
        new_codes = []
        for i in range(0, 10):
            new_codes.append(generate_code())
    
    
    def get_code(code):
        global new_codes
        global used_codes
        if(code not in used_codes):
            used_codes.append(code)
            return code
        created_code = new_codes[0]
        new_codes = new_codes[1:]
        while(created_code in used_codes):
            created_code = new_codes[0]
            new_codes = new_codes[1:]
        return created_code
    
    get_code_udf = F.udf(lambda code: get_code(code), T.StringType())


 



5
  • Multiple processes do not share memory. What you CAN do is use a Queue to send information from the child processes back to a master controller, who can collect and organize it. Commented Oct 2, 2021 at 5:39
  • 1
    multiprocessing.shared_memory is a thing. Commented Oct 2, 2021 at 5:41
  • Is this Windows or a Unix like system? In unix, subprocesses share a copy-on-write view of the parent memory space, which means they can see the parent state at the time of fork, but neither side sees changes. You can leverage this to read the shared data, but you need some mechanism to pass any useful results back to the parent. Commented Oct 2, 2021 at 5:44
  • What is F as in F.udf? See How to create a Minimal, Reproducible Example. Commented Oct 2, 2021 at 13:22
  • 1
    regarding the whole "Processes don't share memory" thing. It is extremely good practice to design your workflow around passing messages (such as by using a queue) rather than sharing data. The only real exception to this is when you have large blocks of data like numpy arrays, or large structs. These can be mapped to large blocks of shared memory, which is faster and more efficient than passing huge messages around. You then still need to be careful about reading / writing to the same location in that array / struct from multiple processes. This can be with locks or sometimes by code structure Commented Oct 4, 2021 at 1:19

1 Answer 1

2

You use Python 3.8 and later?

See at https://docs.python.org/3/library/multiprocessing.shared_memory.html - but basically that's shared memory speak bytes. Together with some glue code that works quite well, here NumPy:

>>> # In the first Python interactive shell 
>>> import numpy as np 
>>> a = np.array([1, 1, 2, 3, 5, 8]) 
# Start with an existing NumPy array 
>>> from multiprocessing import shared_memory    
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes) 
>>> # Now create a NumPy array backed by shared memory 
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) 
>>> b[:] = a[:] # Copy the original data into shared memory 
>>> b array([1, 1, 2, 3, 5, 8]) 
>>> type(b) <class 'numpy.ndarray'> 
>>> type(a) <class 'numpy.ndarray'> 
>>> shm.name # We did not specify a name so one was chosen for us 'psm_21467_46075' 

>>> # In either the same shell or a new Python shell on the same machine 
>>> import numpy as np 
>>> from multiprocessing import shared_memory 
>>> # Attach to the existing shared memory block 
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075') 
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example 
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf) 
>>> c array([1, 1, 2, 3, 5, 8]) 
>>> c[-1] = 888 
>>> c array([ 1, 1, 2, 3, 5, 888]) 
>>> # Back in the first Python interactive shell, b reflects this change 
>>> b array([ 1, 1, 2, 3, 5, 888])

>>> # Clean up from within the second Python shell 
>>> del c # Unnecessary; merely emphasizing the array is no longer used 
>>> existing_shm.close() 
>>> # Clean up from within the first Python shell 
>>> del b # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close() 
>>> shm.unlink() # Free and release the shared memory block at the very end
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.