1

I have this code that dumps documents into MongoDB once an ArrayBlockingQueue fills it's quota. When I run the code, it seems to only run once and then gives me a stack trace. My guess is that the BulkWriteOperation someone has to 'reset' or start over again.

Also, I create the BulkWriteOperations in the constructor...

bulkEvent = eventsCollection.initializeOrderedBulkOperation();
bulkSession = sessionsCollection.initializeOrderedBulkOperation();

Here's the stacktrace.

10 records inserted
java.lang.IllegalStateException: already executed
    at org.bson.util.Assertions.isTrue(Assertions.java:36)
    at com.mongodb.BulkWriteOperation.insert(BulkWriteOperation.java:62)
    at willkara.monkai.impl.managers.DataManagers.MongoDBManager.dumpQueue(MongoDBManager.java:104)
    at willkara.monkai.impl.managers.DataManagers.MongoDBManager.addToQueue(MongoDBManager.java:85)

Here's the code for the Queues:

public void addToQueue(Object item) {
        if (item instanceof SakaiEvent) {
            if (eventQueue.offer((SakaiEvent) item)) {
            } else {
                dumpQueue(eventQueue);
            }

        }
        if (item instanceof SakaiSession) {
            if (sessionQueue.offer((SakaiSession) item)) {
            } else {
                dumpQueue(sessionQueue);
            }
        }

    }

And here is the code that reads from the queues and adds them to an BulkWriteOperation (initializeOrderedBulkOperation) to execute it and then dump it to the database. Only 10 documents get written and then it fails.

private void dumpQueue(BlockingQueue q) {
        Object item = q.peek();
        Iterator itty = q.iterator();
        BulkWriteResult result = null;

        if (item instanceof SakaiEvent) {
            while (itty.hasNext()) {
                bulkEvent.insert(((SakaiEvent) itty.next()).convertToDBObject());
                //It's failing at that line^^
            }
            result = bulkEvent.execute();

        }
        if (item instanceof SakaiSession) {
            while (itty.hasNext()) {
                bulkSession.insert(((SakaiSession) itty.next()).convertToDBObject());
            }
            result = bulkSession.execute();
        }

        System.out.println(result.getInsertedCount() + " records inserted");
    }

1 Answer 1

4

The general documentation applies to all driver implementations in this case:

"After execution, you cannot re-execute the Bulk() object without reinitializing."

So the .execute() method effectively "drains" the current list of operations that have been sent to it and now contains state information about how the commands were actually sent. So you cannot add more entries or call .execute() again on the same instance without reinitializing .

So after you call execute on each "Bulk" object, you need to call the intialize again:

bulkEvent = eventsCollection.initializeOrderedBulkOperation();
bulkSession = sessionsCollection.initializeOrderedBulkOperation();

Each of those lines placed again repectively after each .execute() call in your function. Then further calls to those instances can add operations and call execute again continuing the cycle.

Note that "Bulk" operations objects will store as many items as you want to put into them but will break up requests to the server into maximum amounts of 1000 items. After execution the state of the operations list will reflect exactly how this is done should you want to inspect that.

Sign up to request clarification or add additional context in comments.

1 Comment

That's what my original thinking was, must have just missed it in the documentation. I'll rewrite the code to reflect that then. I knew about the 1000 item limit too. Thanks!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.