I have this code that dumps documents into MongoDB once an ArrayBlockingQueue fills it's quota. When I run the code, it seems to only run once and then gives me a stack trace. My guess is that the BulkWriteOperation someone has to 'reset' or start over again.
Also, I create the BulkWriteOperations in the constructor...
bulkEvent = eventsCollection.initializeOrderedBulkOperation();
bulkSession = sessionsCollection.initializeOrderedBulkOperation();
Here's the stacktrace.
10 records inserted
java.lang.IllegalStateException: already executed
at org.bson.util.Assertions.isTrue(Assertions.java:36)
at com.mongodb.BulkWriteOperation.insert(BulkWriteOperation.java:62)
at willkara.monkai.impl.managers.DataManagers.MongoDBManager.dumpQueue(MongoDBManager.java:104)
at willkara.monkai.impl.managers.DataManagers.MongoDBManager.addToQueue(MongoDBManager.java:85)
Here's the code for the Queues:
public void addToQueue(Object item) {
if (item instanceof SakaiEvent) {
if (eventQueue.offer((SakaiEvent) item)) {
} else {
dumpQueue(eventQueue);
}
}
if (item instanceof SakaiSession) {
if (sessionQueue.offer((SakaiSession) item)) {
} else {
dumpQueue(sessionQueue);
}
}
}
And here is the code that reads from the queues and adds them to an BulkWriteOperation (initializeOrderedBulkOperation) to execute it and then dump it to the database. Only 10 documents get written and then it fails.
private void dumpQueue(BlockingQueue q) {
Object item = q.peek();
Iterator itty = q.iterator();
BulkWriteResult result = null;
if (item instanceof SakaiEvent) {
while (itty.hasNext()) {
bulkEvent.insert(((SakaiEvent) itty.next()).convertToDBObject());
//It's failing at that line^^
}
result = bulkEvent.execute();
}
if (item instanceof SakaiSession) {
while (itty.hasNext()) {
bulkSession.insert(((SakaiSession) itty.next()).convertToDBObject());
}
result = bulkSession.execute();
}
System.out.println(result.getInsertedCount() + " records inserted");
}