3

In the class foo in foomodule.py below, I am getting an error in the run_with_multiprocessing method. The method breaks up the number of records in self._data into chunks and calls somefunc() using a subset of the data, for example somefunc(data[0:800], 800) in the first iteration, if limit = 800.

I have done this, because running 10 * 1k records vs. 1 * 10k records shows a great performance improvement in a variation of the run_with_multiprocessing function that does the same thing, just without multiprocessing. Now I want to use multiprocessing to see if I can improve performance even more.

I am running python 3.8.2 on Windows 8.1. I am fairly new to python and multiprocessing. Thank you so much for your help.

# foomodule.py
import multiprocessing

class foo:
    def __init__(self, data, record_count):
        self._data = data
        self._record_count = record_count

    def some_func(self, data, record_count):
        # looping through self._data and doing some work    


    def run_with_multiprocessing(self, limit):
        step = 0
        while step < self._record_count:
            if self._record_count - step < limit:
                proc = multiprocessing.Process(target=self.some_func, args=(self._data[step:self._record_count], self._record_count-step))
                proc.start()
                proc.join()
                step = self._record_count
                break

            proc = multiprocessing.Process(target=self.some_func, args=(self._data[step:self._record_count], self._record_count-step))
            proc.start()
            proc.join()
            step += limit
        return

When using the class in script.py, I get the following error:

import foomodule

# data is a mysql result set with, say, 10'000 rows
start = time.time()
bar = foomodule.foo(data, 10000)
limit = 800
bar.run_with_multiprocessing(limit)
end = time.time()
print("finished after " + str(round(end-start, 2)) + "s")

Traceback (most recent call last):
  File "C:/coding/python/project/script.py", line 29, in <module>
    bar.run_with_multiprocessing(limit)
  File "C:\coding\python\project\foomodule.py", line 303, in run_with_multiprocessing
    proc.start()
  File "C:\...\Python\Python38-32\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\...\Python\Python38-32\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\...\Python\Python38-32\lib\multiprocessing\context.py", line 326, in _Popen
    return Popen(process_obj)
  File "C:\...\Python\Python38-32\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\...\Python\Python38-32\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\...\Python\Python38-32\lib\socket.py", line 272, in __getstate__
    raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
TypeError: cannot pickle 'SSLSocket' object
4
  • Is there some sort of connection to the database in data? Commented Apr 29, 2020 at 9:16
  • Hi Marc, thanks for looking into this. When the foo() object is created, several DB calls are made in different functions called from __init__() to populate attributes of the class with information. The method some_func() itself has no DB calls, but accesses those class attributes. The function does write to a log.txt file though.. does that matter? The log.txt file is opened in __init__() and closed in __del__(). Commented Apr 30, 2020 at 11:10
  • Since the error states that there is a SSLSocket somewhere lurking in your data I'm guessing there is a connection to your database still alive when you call the multiprocessing method. Could you try closing all connections prior to calling run_with_multiprocessing? Commented Apr 30, 2020 at 14:17
  • Also are the different processes writing to the same file? If that is the case I would advise you to avoid doing that. Commented Apr 30, 2020 at 14:20

1 Answer 1

3

You divide and you will win

Problem

If a SSLSocket object you add it as argument in multiprocessing.Process(), the SSLSocket cannot be serialized.

Solution

As you can't serialize a SSLSocket, you do it in the subprocess (function passed as argument in multiprocessing.Process())

Server

#!/usr/bin/python3
import ssl,multiprocessing
from sources.ClientListener import ClientListener

class SocketServer:
    def __init__(self,**kwargs):
        self.args = kwargs["args"]
        self.__createSocket()

    def __handlerClients(self):
        try:
            while self.sock:
                # The sock.accept() allows create a subprocess when there is a connection established
                # IMPORTANT: I don't add SSL at socket object because else the SSLSocket object can't pickle when pass it by argument in processing.Process()
                conn,addr = self.sock.accept()
                eventChildStop = multiprocessing.Event()
                subprocess = multiprocessing.Process(target=ClientListener, name="client", args=(conn,addr,eventChildStop))
                # This thread is responsible of close the client's child process
                threading.Thread(target=ClientListener.exitSubprocess,name="closeChildProcess",args=(eventChildStop,subprocess,)).start()
                subprocess.start()
                time.sleep(1)
        except:
            None

    def __createSocket(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        #this allows address/port to be reused immediately instead before of the TIME_WAIT state
        # https://stackoverflow.com/questions/12362542/python-server-only-one-usage-of-each-socket-address-is-normally-permitted
        # #sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(("",self.PORT))
        self.sock.listen(self.CLIENTS)
        print(logFile().message(f"Good days. I am running ClassAdmin server, listenning {self.CLIENTS} clients by port {self.PORT}...",True,"INFO"))
        #self.sockSSL = self.context.wrap_socket(sock,server_side=True)
        self.__handlerClients()

if __name__=="__main__":
    SocketServer(args=sys.argv)

As you can look, in the __handlerClients(self) method. I do a while loop of socket object. For each iteration I know if there is connection established thanks to:

conn,addr = self.sock.accept()

So, I pass the conn variable in the multiprocessing.Process(), because conn is a socket object. The different between conn and self.sock is what the conn has the raddr parameter, and self.sock hasn't it and the laddr is 0.0.0.0

self.sock

<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 7788)>

conn

<socket.socket fd=5, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('192.168.0.3', 7788), raddr=('192.168.0.20', 53078)>

multiprocessing

subprocess = multiprocessing.Process(target=ClientListener, name="client", args=(conn,addr,eventChildStop))

Is the same object.

Now go at ClientListener

ClientListener

class ClientListener:
    def __init__(self,conn,addr,event):
         # Get the connection's socket object and I in this connection add secure traffic encrypted with SSL thanks to object SSLSocket of socket module
         self.addr = addr
         self.conn = self.__SSLTunnel(conn)
         self.nick = ""
         self.__listenData()

    # This creates a ssl tunnel with the ClassAdmin's certificate and private key
    def __SSLTunnel(self,sock):
        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        context.load_cert_chain(Environment.SSL("crt"),Environment.SSL("key"))
        return context.wrap_socket(sock,server_side=True)

    def __listenData(self,sock):
       # [...]

As you can look in the __init__(self,conn,addr,event) I get the conn variable of previous code. And in the self.conn save the same object but passed by SSLSocket

self.conn = self.__SSLTunnel(conn)
    def __SSLTunnel(self,sock):
        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        context.load_cert_chain(Environment.SSL("crt"),Environment.SSL("key"))
        return context.wrap_socket(sock,server_side=True)

Important

The SSLSocket is declared in self.conn because this can work with send() and recv() method.

data = self.conn.recv(1024)

self.conn.send("sig.SystemExit(-5000,'The nick exists and is connected',True)".encode("utf-8"))

The self.sock variable can't allow accept() method.

this throw a error:

[Errno 22] Invalid argument in /etc/ClassAdmin/sources/ClientListener.py:14

What you have a good day.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.