5

Hello! I am trying to write web crawler with python. I wanted to use python multithreading. Even after reading earlier suggested papers and tutorials, I still have problem. My code is here (whole source code is here):

class Crawler(threading.Thread):

    global g_URLsDict 
    varLock = threading.Lock()
    count = 0

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.url = self.queue.get()

    def run(self):
        while 1:
            print self.getName()+" started" 
            self.page = getPage(self.url)
            self.parsedPage = getParsedPage(self.page, fix=True)
            self.urls = getLinksFromParsedPage(self.parsedPage)

            for url in self.urls:

                self.fp = hashlib.sha1(url).hexdigest()

                #url-seen check
                Crawler.varLock.acquire() #lock for global variable g_URLs
                if self.fp in g_URLsDict:
                    Crawler.varLock.release() #releasing lock
                else:
                    #print url+" does not exist"
                    Crawler.count +=1
                    print "total links: %d"%len(g_URLsDict)
                    print self.fp
                    g_URLsDict[self.fp] = url
                    Crawler.varLock.release() #releasing lock
                    self.queue.put(url)

                    print self.getName()+ " %d"%self.queue.qsize()
                    self.queue.task_done()
            #self.queue.task_done()
        #self.queue.task_done()


print g_URLsDict
queue = Queue.Queue()
queue.put("http://www.ertir.com")

for i in range(5):
    t = Crawler(queue)
    t.setDaemon(True)
    t.start()

queue.join()

it does not work as needed, it does not give any result after thread 1 and it excutes differently some time gives this error:

Exception in thread Thread-2 (most likely raised during interpreter shutdown):

How can I fix it? And also I do not think this is more effective than just for loop.

I have tried to fix run():

def run(self):
    while 1:
        print self.getName()+" started" 
        self.page = getPage(self.url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()
                #self.queue.task_done()
        #self.queue.task_done()
    self.queue.task_done()

I experimented with task_done() command, in different places, can anyone explain difference?

11
  • Is that first example missing some indentation? Looks like the class members should be pushed in by one level.. ? Commented May 29, 2012 at 14:23
  • Can you post a working example? What modules do you import? Commented May 29, 2012 at 14:35
  • snipt.org/ujhW9 working source code Commented May 29, 2012 at 14:42
  • I'm confused. You create 5 threads that each read from the queue once. Then lots of urls get added to the queue from those first five pages, but they are never read? Commented May 29, 2012 at 14:46
  • what do you mean? they are added to the queue, but threads also read urls from that queue. Commented May 29, 2012 at 14:51

1 Answer 1

4

You only call self.url = self.queue.get() when the threads initialise. You need to try and re-acquire urls from your queue inside your while loop if you want to pick up new urls for processing further down the line.

Try replacing self.page = getPage(self.url) with self.page = getPage(self.queue.get()). Be aware that the get function will block indefinitely. You probably want to timeout after a while and add some way for your background threads to exit gracefully by request (which would eliminate the Exception you saw).

There are some good examples on effbot.org which use get() in the way I've described above.

Edit - Answers to your initial comments:

Have a look at the docs for task_done(); For every call to get() (which doesn't timeout) you should call task_done() which tells any blocking calls to join() that everything on that queue is now processed. Each call to get() will block (sleep) while it waits for a new url to be posted on the queue.

Edit2 - Try this alternative run function:

def run(self):
    while 1:
        print self.getName()+" started"
        url = self.queue.get() # <-- note that we're blocking here to wait for a url from the queue
        self.page = getPage(url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                Crawler.count +=1
                print "total links: %d"%len(g_URLsDict)
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()

        self.queue.task_done() # <-- We've processed the url this thread pulled off the queue so indicate we're done with it.
Sign up to request clarification or add additional context in comments.

8 Comments

and how about task_done() command, where should i put it, and how it affects? is thread is sleeped and time is given to another thread when task_done() command is called? if so where is concurrency? I am confused.
calling task_done() will just tell other threads that is Queue can be used? I mean, i get url from queue and then immediately call task_done(), and the second way I get url, process it, parse pages (at this time I want other threads to use Queue, because processing pages takes some time), then I call task_done(), what is the difference? Which one will be effective
as I expected putting task_done() right after q.get() terminates all threads
Please read the docs I linked to; task_done() is only telling queue.join() that one call to queue.put() has been dealt with. It's the calls to queue.get() which do the blocking you're interested in. Each time you call queue.put() one thread which was blocking on it's call to queue.get() will wake up and start processing...
the point I can not understand is this: every created thread gets url from Queue, then it processes it, and after that calls task_done(), then where is the real multithreading efficiency, threads are just changing places, they do not anything in parallel, that can give speed ????
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.