1

I want to read and write one json file by multiple threads in python.

Each thread

initial setting) open(file_path, "w+") (if file is empty, just dump empty json file)

when writing with threading.lock

1) load json file on memory.

2) update loaded json on memory by new key and value.

3) dump current json(on memory) to file.

because there is a lock when writing, I think it is safe to read and write the file even multiple threads runs. but it makes error.

class Writer(object):
    def __init__(self, path):
        self._lock = threading.Lock()

        self.path = path
        self.current_json = None

        self._init_opend_file()

    def _init_opend_file(self):
        with self._lock:
            self._opened_file = open(self.path, "w+")
            if self._opened_file.read() == "":
                json.dump({}, self._opened_file)
            else:
                pass

    def write(self, key, value):
        with self._lock:
            self._opened_file.seek(0)
            self.current_json = json.load(self._opened_file)
            self.current_json[key] = value
            self._opened_file.seek(0)
            self._opened_file.truncate()
            json.dump(self.current_json, self._opened_file)

if __name__ == "__main__":
    path = r"D:\test.json"

    def run(name, range_):
        writer = Writer(path)
        for i in range(range_):
            writer.write(name,i)

    t1 = threading.Thread(target=run, args=("one", 1000))
    t2 = threading.Thread(target=run, args=("two", 2000))

    t1.start()
    t2.start()

I expect to get {"one" : 1000, "two" : 2000} in test.json. But I've got {"one": 1} "two": 1}. It seems multiple threads access the file at the same time and write different things But, I can't not understand why it happens with threading.lock().

Exception in thread Thread-2:
Traceback (most recent call last):
  File "D:\Anaconda3_64\envs\atom\lib\threading.py", line 917, in _bootstrap_inner
    self.run()
  File "D:\Anaconda3_64\envs\atom\lib\threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "D:/Dropbox/000_ComputerScience/000_개발/Quant/Seperator/json_test.py", line 37, in run
    writer.write(name,i)
  File "D:/Dropbox/000_ComputerScience/000_개발/Quant/Seperator/json_test.py", line 24, in write
    self.current_json = json.load(self._opened_file)
  File "D:\Anaconda3_64\envs\atom\lib\json\__init__.py", line 296, in load
    parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw)
  File "D:\Anaconda3_64\envs\atom\lib\json\__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "D:\Anaconda3_64\envs\atom\lib\json\decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "D:\Anaconda3_64\envs\atom\lib\json\decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

2 Answers 2

2

This is happening because the 2 threads does not share the same lock. Try using ThreadPoolExecutor or extend the class as class Writer(threading.Thread):

ThreadPoolExecutor takes care of commonly shared resources between the threads itself. So, you don't need to worry about the lock.

ThreadPoolExecutor DOCUMENTATION

threading: Refer HERE

Example of ThreadPoolExecutor:

def data_write(z):
    sleep_wait = random.randint(0, 2)
    print("sleeping:", sleep_wait, ", data:", z)
    time.sleep(sleep_wait)
    print('{field: %s}' % z , file=f)
    return z

from concurrent.futures import ThreadPoolExecutor
with open('test', 'a') as f:
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    with ThreadPoolExecutor(max_workers=3) as executor:
        future = list(executor.map(data_write, data))
    print(future)
Sign up to request clarification or add additional context in comments.

Comments

0

Better to simplify things. Your class does only some writing, so you can do just as well with a simple function. The w+ mode you are using also truncates the file when opening, so you never see the previous state it was in. The truncate() method needs a number of bytes to truncate to, so I changed that to truncate(0). There's a single lock and it's acquired and released in the write function. And finally, range(1000) gives you value up to 999 ;) Here's the result:

import threading
import json

def write(path, key, value):
    lock.acquire()
    with open(path, "r+") as opened_file:
        current_json = opened_file.read()
        if current_json == "":
            current_json = {}
        else:
            current_json = json.loads(current_json)
        current_json[key] = value
        opened_file.seek(0)
        opened_file.truncate(0)
        json.dump(current_json, opened_file)
    lock.release()

if __name__ == "__main__":
    path = r"test.json"
    lock = threading.Lock()

    def run(name, range_):
        for i in range(range_):
            write(path, name,i)

    t1 = threading.Thread(target=run, args=("one", 1001))
    t2 = threading.Thread(target=run, args=("two", 2001))

    t1.start()
    t2.start()
    t1.join()
    t2.join()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.