2

I have a DRF application with a python queue that I'm writing tests for. Somehow,

  1. My queue thread cannot find an object that exists in the test database.
  2. The main thread cannot destroy the db as it's in use by 1 other session.

To explain the usecase a bit further, I use Django's user model and have a table for metadata of files which you can upload. One of this fields is a created_by, which is a ForeignKey to django.conf.settings.AUTH_USER_MODEL. As shown below, I create a user in the TestCase's setUp(), which I then use to create an entry in the Files table. The creation of this entry happens in a queue however. During testing, this results in an error DETAIL: Key (created_by_id)=(4) is not present in table "auth_user".. When the tests are completed, and the tearDown tries to destroy the test DB, I get another error DETAIL: There is 1 other session using the database.. The two seem related, and I'm probably handling the queue incorrectly.

The tests are written with Django's TestCase and run with python manage.py test.

from django.contrib.auth.models import User
from rest_framework.test import APIClient
from django.test import TestCase

class MyTest(TestCase):

    def setUp(self):
        self.client = APIClient()
        self.client.force_authenticate()
        user = User.objects.create_user('TestUser', '[email protected]', 'testpass')
        self.client.force_authenticate(user)
   
    def test_failing(self):
        self.client.post('/totestapi', data={'files': [open('tmp.txt', 'rt')]})

The queue is defined in separate file, app/queue.py.

from app.models import FileMeta
from queue import Queue
from threading import Thread


def queue_handler():
    while True:
        user, files = queue.get()
        for file in files:
            upload(file)
            FileMeta(user=user, filename=file.name).save()
        queue.task_done()


queue = Queue()
thread = Thread(target=queue_handler, daemon=True)

def start_upload_thread():
    thread.start()

def put_upload_thread(*args):
    queue.put(args)

Finally, the queue is started from app/views.py, which is always called when Django is started, and contains all the APIs.

from rest_framework import APIView
from app.queue import start_upload_thread, put_upload_thread


start_upload_thread()

class ToTestAPI(APIView):

    def post(self, request):
        put_upload_thread(request.user, request.FILES.getlist('files'))
2
  • 2
    There is far too much going on here for me. Which test runner? what is a "python native queue"? (Is it RQ?) How is it started & stopped in the test? How are you managing transactions? #1) your consumer can't see the work your test case did and #2) your consumer is not being properly disposed after the test completes. For #1 - is your test case committing the changes? Is the consumer opening a new txn for the read? For #2 - how is the consumer started & stopped? We run synchronous queues for unit testing, and have a separate integration test for async. Commented Feb 4, 2022 at 15:44
  • Apologies for the sloppy ticket. Truly didn't realize until I read your comment. I edited the code, I use the Queue and Threading libraries, starting happens when the test runner parses the app/views.py and frankly, I don't stop the queue, which is probably the cause of #2. However, I don't think I can stop the thread/queue. AFAIK, the test case is committing, when I query the Users I find my created user. What's a txn? At this point my guess is that I should switch to RQ/Django-Q Commented Feb 4, 2022 at 16:33

1 Answer 1

2

Apologies that this is not a "real" answer but it was getting longer than a comment would allow.

The new ticket looks good. I did notice that there was no stoping of the background thread, as you did. That is probably what is causing that issue with the db still being active.

You use TestCase, which runs a db transaction and undoes all database changes when the test function ends. That means you won't be able to see data from the test case in another thread using a different connection to the database. You can see it inside your tests and views, since they share a connection.

Celery and RQ are the standard job queues - Celery is more flexible, but RQ is simpler. Start with RQ and keep things simple and isolated.

Some notes:

  • Pass in the PK of objects not the whole object
  • Read up on pickle if you do need to pass larger data.
  • Set the queues to async=False (run like normal code) in tests.

Queue consumers are a separate process running anywhere in the system, so data needs to get to them somehow. If you use full objects those need to be pickled, or serialized, and saved in the queue itself (i.e. redis) to be retrieved and processed. Just be careful and don't pass large objects this way - use the PK, store the file somewhere in S3 or another object storage, etc.

For Django-RQ I use this snippet to set the queues to sync mode when in testing, and then just run things as normal.

if IS_TESTING:
    for q in RQ_QUEUES.keys():
        RQ_QUEUES[q]['ASYNC'] = False

Good luck!

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks, that clarifies!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.