0

I have to pass a data from my test cases to a mock server. What is the best way to do that ?

This is what I have so far

mock_server.py

class ThreadedUDPServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
    pass

class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler):
    def __init__(self, request, client_address, server):
        SocketServer.BaseRequestHandler.__init__(self,request,client_address,server)

    def handle(self):
        print server.data #this is where i need the data 

class server_wrap:
     def __init__(self):
        self.server = ThreadedUDPServer( ("127.0.0.1",49555) , ThreadedUDPRequestHandler)

     def set_data(self,data)
        self.server.data = data

     def start(self)
        server_thread = threading.Thread(target=self.server.serve_forever())

     def stop(self)
        self.server.shutdown()

test_mock.py

server_inst = server_wrap()
server_inst.start()
#code which sets the data and expects the handle method to print the data set
server_inst.stop()

The problem which i have with this code is, the execution stops at server_inst.start(), where the server goes in to an infinite listening mode

Other Solutions that I have tried, but failed:

  • Using global variables
  • Using queues
  • starting mock_server.py with its own main

Let me know about any other possible solutions. Thanks in advance

Update 1:

Using separate threads to send data to the socket:

Changes

test_mock.py

def test_set_data(data)
    server_inst = server_wrap()
    server_inst.set_data(data)
    server_inst.start()

if __name__ == "__main__":
    thread = Thread(target=test_set_data, args=("foo_data))
    thread.setDaemon(True)
    thread.start()
    #test code which verifies if data set is same
    #works so far, able to pass data

    #problem starts now
    thread = Thread(target=test_set_data, args=("bar_data))
    thread.setDaemon(True)
    thread.start()
    #says address already in use error
    #Tried calling server.shuddown() in handle , but error persists. Also there is no thread.shop in threading.Thread object

Thanks

1 Answer 1

1

The server should go to listening mode.

You don't need the server_inst.stop until all the data was sent, and the test finishes. Maybe in you test tear down, or when the the test suite is completed.

To send data to the server, and let the handle pick it, you should open a socket on anohter thread. Then send the data to the server via this socket.

This code should look something like this:

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    
sock.connect(("127.0.0.1",49555))
sock.send(... the data ...)
received = sock.recv(1024) # the handle can send a response
sock.close()

Add a function in your django code, which does run on another thread. This function will open the socket, connect, send the data and get the response. You can call it from a view, a middleware etc.

Sign up to request clarification or add additional context in comments.

4 Comments

I am facing the address_reuse_error due to not being able to close the thread. I've updated the question to reflect this. Thanks
when do you get address reuse?
when the next test case calls the test_set_data(data) method, to set the new test data, since there are multiple test case spawning different threads
you can't run multiple servers on the same address. You want one server. test_set_data should be the function that opens the socket and sends the data, not trying to run another server

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.