Motivation
I have been trying to work on my first bigger scale Python project, however I am struggling to create pythonic solutions. Instead some of the functions (and especially the naming) I've done so far seems like more of a hacky solution, than a best coding practice. Hope someone can clear up some confusions and lead show me in the right direction.
Brief Overview
The script processes Google Spreadsheets by interacting with the Google Docs Sheets API. A class Sheets handles the API calls. The second class CustomSheet handles the application-specific data logic and parsing of the API call responses. At the same time there are 5 different instances of CustomSheet shared amongst different scripts to perform various automation on the data.
Because the Google Docs API is limited too 100req/100s, instances are only initialized once. Initially I had the idea of writing a script coordinating the handling of the instances and sub-scripts, however this added a lot of complexity and little benefit. Instead whenever a CustomSheet is initialized, the instance is appended to instances on class-level so scripts can get them autonomously – and of course it also made sense to implement a classmethod that initializes all instances automatically by calling initializeAll, as their initialization requirements are predictable. While it adds a lot of comfort, it seems like a lot of the logic that should be handled on script level is now moved to class level.
Questions
- Is initializing all relevant class instances (
initializeAll) ok? - And handling instances using
@classmethods(get,getAll)? - Should
getCustomSheetbe renamed togetSheetwhich then simply callssuper()? But what if the necessity arises to make a raw API call from one of the scripts? - The
errorResilienceshould really be a decorator. However having to work with slices and indices on return values either leaves the choice of passing them to a decorator, e.g.@error_resilience([0]['api_call'])which does not seem possible, or to catch anIndexErrorwithin the decorator, but then again the returned value is not available in the decorator context (, is it?) - There are a lot of functions in
CustomSheetperforming evaluations on instance variables. E.g.Entriesare evaluated usingCustomSheetinstances (searchEntry,filterEntry,conv). An alternative solution could be addinginstancesforEntryas well and moving the functions there, so the logic of evaluating entries is in the Entry class, however this seems to be unpractical during normal runtime, sinceEntrywould have to be imported in all scripts, instead of just importingCustomSheet. Logic seems scattered amongst multiple classes, but it seems to make sense - Any other general remarks on the code? I feel like I use a lot of
for ... in ...:loops. Any feedback is greatly appreciated.
Code
modules/Sheets.py
import requests
import json
from time import sleep
from random import randint
from modules.PositionRange import PositionRange
import logging
logger = logging.getLogger(__name__)
from settings import CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, PROXY
class Sheets():
""" Google Docs API Library """
PROXIES = {'http': PROXY, 'https': PROXY}
header = {
'Content-Type': 'application/json; charset=utf-8',
}
spreadsheetId = ''
accessToken = ''
def __init__(self, spreadsheetName):
self.getToken()
self.setSpreadsheet(name=spreadsheetName)
def getToken(self):
""" Gets authentication token from Google Docs API
if no Global API token is set on Class Level yet. """
if not Sheets.accessToken:
self.refreshToken()
else:
self.header.update({'Authorization': f'Bearer {Sheets.accessToken}'})
def refreshToken(self):
refreshGUrl = 'https://www.googleapis.com/oauth2/v4/token'
header = {
'Content-Type': 'application/x-www-form-urlencoded'
}
body = {
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'refresh_token': REFRESH_TOKEN,
'grant_type': 'refresh_token'
}
r = requests.post(refreshGUrl, headers=header, data=body, proxies=Sheets.PROXIES)
token = self.errorResilience(r.json(), self.refreshToken, {})['access_token']
Sheets.accessToken = token
self.header.update({'Authorization': f'Bearer {Sheets.accessToken}'})
return token
def setSpreadsheet(self, name=None, spreadsheetId=None):
if(name):
spreadsheetId = self.getSpreadsheet(name)
if(spreadsheetId and self.spreadsheetId != spreadsheetId):
logger.debug(f'Setting spreadsheetId to [{spreadsheetId}]')
self.spreadsheetId = spreadsheetId
spreadsheetInfo = self.getSpreadsheetInfo()
self.spreadsheetName = spreadsheetInfo['properties']['title']
self.sheets = spreadsheetInfo['sheets']
logger.info(f'Selected Spreadsheet: {self.spreadsheetName} [{self.spreadsheetId}]')
else:
logger.debug(f'SpreadsheetId already selected [{spreadsheetId}] or None')
def getSpreadsheet(self, name):
try:
logger.info(f'Trying to resolve spreadsheetId for {name}...')
query = f'name = "{name}"'
driveGUrl='https://www.googleapis.com/drive/v3/files'
params = {'q': query}
r = requests.get(driveGUrl, params=params, headers=self.header, proxies=Sheets.PROXIES)
logger.debug(f'RESPONSE: {r.json()}')
return self.errorResilience(r.json(), self.getSpreadsheet, {'name': name})['files'][0]['id']
except IndexError as e:
logger.error(f'Error during spreadsheetId lookup. File {name} was probably deleted.')
logger.exception(f'[ERROR] getSpreadsheet: {name}')
raise EOFError('File not found.') from None
def getSpreadsheetInfo(self):
logger.info(f'Getting all spreadsheet information [{self.spreadsheetId}]')
sheetGUrl = f'https://sheets.googleapis.com/v4/spreadsheets/{self.spreadsheetId}'
r = requests.get(sheetGUrl, headers=self.header, proxies=Sheets.PROXIES)
sheetData = r.json()
return self.errorResilience(sheetData, self.getSpreadsheetInfo, {})
def getSheet(self, sheetName: str, posRange: PositionRange) -> dict:
""" Gets the content of one specific sheet """
sheetGUrl = f'https://sheets.googleapis.com/v4/spreadsheets/{self.spreadsheetId}'
logger.info(f'Getting sheet content: {sheetName}{posRange} [{self.spreadsheetName} | {self.spreadsheetId}]')
sheetGUrl = f'{sheetGUrl}/values/{requests.utils.quote(sheetName)}{posRange}'
r = requests.get(sheetGUrl, headers=self.header, proxies=Sheets.PROXIES)
sheetData = r.json()
return self.errorResilience(sheetData, self.getSheet, {'sheetName': sheetName, 'posRange': posRange})
def errorResilience(self, sheetData, callingFunc, kwargs):
""" Centralized Error Handling for API Calls. Would ideally
be a decorator, however working with different slices and indices
(e.g. refreshToken) in return values doesn't make this possible(?) """
args = []
if('error' in sheetData.keys()):
code = sheetData['error']['code']
if(code == 401):
logger.error('UNAUTHORIZED. API TOKEN LIKELY EXPIRED...')
self.refreshToken()
sleep(5)
return callingFunc(*args, **kwargs)
elif(code == 403):
logger.error('The request is missing a valid API key.')
self.getToken()
elif(code == 404):
logger.error('FILE NOT FOUND. SPREADSHEETID INVALID')
raise IndexError(f'Spreadsheet does not exist {self.name} [{self.spreadsheetId}]')
elif(code == 429):
tsleep = 100 + randint(10, 50)
logger.error(f'API LIMIT EXCEEDED. AUTO-RECOVERING BY WAITING {tsleep}s...')
sleep(tsleep)
return callingFunc(*args, **kwargs)
elif(code == 400):
logger.error('SPECIFIED SHEET DOES NOT EXIST OR ILLEGAL RANGE.')
raise IndexError(sheetData['error']['message'])
else:
logger.error('AN UNKNOWN ERROR OCCURRED.')
return sheetData
modules/CustomSheet.py
from datetime import datetime
from copy import copy
from dateutil.relativedelta import relativedelta
from time import sleep
from modules.Sheets import Sheets
from modules.Entry import Entry
from modules.Synonyms import Synonyms
from modules.PositionRange import PositionRange
from collections import Counter
import logging
logger = logging.getLogger(__name__)
class CustomSheet(Sheets):
""" Custom class that holds """
MONTHS = ['Error', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
TYP = 'CustomSheet'
POS = PositionRange.from_str('A4:R')
instances = []
def __init__(self, date=datetime.now()):
super(CustomSheet, self).__init__(spreadsheetName=CustomSheet.getCustomSheet(date))
self.datum = date
self.name = self.spreadsheetName
self.sheetData = []
self.updateSynonyms()
self.entries = []
self.name = self.spreadsheetName
CustomSheet.append(self)
def __new__(cls, date=datetime.now()):
name = CustomSheet.getCustomSheet(date)
x = CustomSheet.get(name)
if x:
logger.debug(f'{name} already exists. Returning instance...')
return x
else:
logger.debug(f'{name} does not exist already. Creating new instance')
return super(CustomSheet, cls).__new__(cls)
def __getnewargs__(self):
return self.datum
def __str__(self):
return f'{self.spreadsheetName}'
def __eq__(self, value):
return self.name == value
def __lt__(self, other):
return self.datum < other.datum
def __lt__(self, other):
return self.datum > other.datum
def getCustomSheetSheet(self, sheetName):
sheetData = {}
posRange = self.POS
sheetData[sheetName] = self.getSheet(sheetName=sheetName)
return self.parseCustomSheet(sheetData=sheetData, posRange=posRange)
def getCustomSheets(self):
sheetData = {}
sheets = self.sheets
posRange = self.POS
for sheet in sheets:
sheetName = sheet['properties']['title']
if(sheetName.isdigit()):
sheetData[sheetName] = self.getSheet(posRange=posRange, sheetName=sheetName)
return self.parseCustomSheet(sheetData=sheetData, posRange=posRange)
def parseCustomSheet(self, sheetData, posRange):
""" Creates Entries from Spreadsheet Data; basically a dict
so we don't have to work with lists we get from Google Docs """
logger.debug(f'Parsing (raw data -> Entry) {sheetData}')
length = posRange.column_length()
logger.debug(f'LENGTH: {length}')
appended = []
for sheetName, rows in sheetData.items():
pos = copy(posRange)
pos.decrement_row()
for row in rows['values']:
while(len(row) < length+1):
row.append('')
pos.increment_row()
entry = Entry.from_customsheet(self, sheetName, row, pos)
if not entry.isValid():
logger.debug('NO VALID ENTRY FOR DICT ABOVE')
continue
logger.debug('IS VALID ENTRY')
self.sheetData.append(entry)
appended.append(entry)
return appended
def filter(self, field: str, value: str) -> list:
""" Filters Entries for <field> having a certain <value> """
found = []
if not isinstance(value, CustomSheet):
value = value.strip().upper()
for entry in instance.sheetData:
if(entry.__dict__[field.lower()] == value
and 'SYNC' not in entry.sheet):
found.append(entry)
return found
def hasEntry(self, entry: Entry) -> bool:
return entry in self.sheetData
@staticmethod
def getCustomSheet(date):
""" Function to build spreadsheet names by
internal naming convention. """
name = f'Sheet {CustomSheet.MONTHS[date.month]} {str(date.year)}'
return name
@staticmethod
def getTime(relativeMonth=0, absoluteMonth=0):
""" Helpfunction that helps iterating over months
while automatically decrementing years. """
relativeMonth = int(relativeMonth)
absoluteMonth = int(absoluteMonth)
thisMonth = datetime.today().replace(day=1, hour=4, minute=20, second=0, microsecond=0)
date = thisMonth - relativedelta(months=relativeMonth)
if(absoluteMonth != 0):
date = datetime.today()
while(date.month != absoluteMonth):
date = date - relativedelta(months=1)
return date
@classmethod
def get(cls, value: str):
""" Gets a certain CustomSheet instance by its name """
if(isinstance(value, datetime)):
value = CustomSheet.getCustomSheet(value)
for instance in cls.instances:
if instance.name == value:
return instance
@classmethod
def getAll(cls):
return cls.instances
@classmethod
def append(cls, instance) -> None:
if isinstance(instance, list):
instances = instance
for instance in instances:
CustomSheet.append(instance)
return
assert isinstance(instance, CustomSheet)
if(instance not in cls.instances):
cls.instances.append(instance)
@staticmethod
def initializeAll():
""" Helpfunction that initializes all sheets
of the last four months. """
initialized = []
for i in range(0, 4):
try:
initialize = CustomSheet(CustomSheet.getTime(i))
logger.info(f'Building CustomSheet Cache {initialize.name} [iteration {i+1}/4]')
initialize.getCustomSheets()
logger.debug(f'Sheet data [iteration {i+1}]: {initialize.sheetData}')
initialized.append(initialize)
logger.info(f'###- PASSED CUSTOMSHEET CACHE [iteration {i+1}/4]')
sleep(12)
except EOFError as e:
# Fallback in case a file was deleted on Google Docs
logger.exception(f'Skipping month trying to autorecover [iteration {i+1}/4]')
continue
return initialized
@classmethod
def searchEntry(cls, sentry):
""" Searches a specific Entry in all available instances """
found = []
for instance in cls.instances:
for entry in instance.sheetData:
if(entry == sentry):
found.append(entry)
return found
@classmethod
def filterEntry(cls, field, value):
found = []
for instance in cls.instances:
found.extend(instance.filter(field=field, value=value))
return found
@staticmethod
def conv(*entry_list):
""" Used to combine multiple search criteria using .filter()
Only keeps entries that are available in all lists of <entry_list> """
seen = set()
repeated = set()
for entries in entry_list:
for entry in set(entries):
if entry in seen:
repeated.add(entry)
else:
seen.add(entry)
return list(repeated)
def updateSynonyms(self) -> None:
self.synonyms = []
self.synonyms.extend(Synonyms.update(self))
logger.debug(f'New Synonyms: {self.synonyms}')
@classmethod
def searchSynonyms(cls, xSynonyms: list, typ: str='', name: str='') -> list:
found = []
if isinstance(synonym, str):
synonym = [synonym]
for instance in cls.instances:
for xSynonym in xSynonyms:
for synonym in instance.synonyms:
if(synonym.matches(synonym=xSynonym, typ=typ, name=name)):
found.append(synonym)
logger.debug(f'SYNONYM {xSynonyms} FOUND; {found}')
filtered = Synonyms.filter(found)
logger.info(f'Synonym {xSynonyms} found {filtered}')
return filtered
modules/Entry.py
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class Entry():
HEADERS = ['Abr', 'Kunde', 'Tätigkeit', 'Techniker', 'AZ Anfang', 'AZ Ende', 'Dauer',
'AZ Abzug', 'Anfahrt', 'AZ Typ', 'Bemerkung', 'Freigegeben', '', '', '', '',
'Wartung Anfang', 'Wartung Ende']
def __init__(self, *args, **kwargs):
"""
**kwargs {
'Datum': datetime.datetime(2020, 2, 6, 0, 0),
'pos': < modules.PositionRange.PositionRange object at 0x1101f41d0 > ,
'Abr': '',
'Kunde': 'Test',
'Tätigkeit': 'Something',
'Techniker': 'T2',
'AZ Anfang': '14:00',
'AZ Ende': '15:30',
'Dauer': '1,50',
'AZ Abzug': '0',
'Anfahrt': '',
'AZ Typ': '4',
'Bemerkung': 'b.A.',
'Freigegeben': 'nein',
'Wartung Anfang': '',
'Wartung Ende': '',
...
}
"""
self.abr = kwargs.get('Abr', '').strip().upper()
self.kunde = kwargs.get('Kunde', '').strip().upper()
self.tätigkeit = kwargs.get('Tätigkeit', '').strip().upper()
self.techniker = kwargs.get('Techniker', '').strip().upper()
self.anfang = kwargs.get('AZ Anfang', '')[0:5].replace('24:', '00:')
self.ende = kwargs.get('AZ Ende', '')[0:5].replace('24:', '00:')
self.dauer = kwargs.get('Dauer', '').strip().upper()
self.abzug = kwargs.get('AZ Abzug', '').strip().upper()
self.anfahrt = kwargs.get('Anfahrt', '').strip().upper()
self.typ = kwargs.get('AZ Typ', '').strip().upper()
self.bemerkung = kwargs.get('Bemerkung', '')
self.freigegeben = kwargs.get('Freigegeben', '')
self.wartunganfang = kwargs.get('Wartung Anfang', '')
self.wartungende = kwargs.get('Wartung Ende', '')
self.datum = kwargs.get('Datum')
self.sheet = kwargs.get('sheet').strip().upper()
self.pos = kwargs.get('pos')
self.ref = kwargs.get('ref')
self.sync = datetime.now()
try:
hanfang, manfang = self.anfang.split(':')
hende, mende = self.ende.split(':')
self.dtanfang = self.datum.replace(hour=int(hanfang), minute=int(manfang))
self.dtende = self.datum.replace(hour=int(hende), minute=int(mende))
except Exception as e:
self.dtanfang = self.datum
self.dtende = self.datum
#logger.debug(f'DT: {self}: {e}')
def __str__(self):
return f'{self.kunde} @ {self.techniker} {self.dauer} {self.datum.strftime("%d/%b")} ({self.sheet}{self.pos}) [{self.ref.name}]'
def __repr__(self):
return str(self.__dict__)
def __hash__(self):
return hash(f'{self.datum}{self.sheet}{self.kunde}{self.tätigkeit}{self.techniker}{self.typ}')
def __eq__(self, other):
try:
if(self.datum == other.datum
and self.kunde == other.kunde
and self.techniker == other.techniker
and self.tätigkeit == other.tätigkeit):
return True
else:
return False
except Exception as e:
logger.exception('You may only compare this to another Eintrag object.')
def __lt__(self, other):
if(self.sheet == other.sheet):
return self.dtanfang < other.dtanfang
else:
return self.sheet < other.sheet
def __le__(self, other):
if(self.sheet == other.sheet):
return self.dtanfang <= other.dtanfang
else:
return self.sheet <= other.sheet
def __ne__(self, other):
return not(self == other)
def __gt__(self, other):
if(self.sheet == other.sheet):
return self.dtanfang > other.dtanfang
else:
return self.sheet > other.sheet
def __ge__(self, other):
if(self.sheet == other.sheet):
return self.dtanfang >= other.dtanfang
else:
return self.sheet >= other.sheet
@classmethod
def from_customsheet(cls, ref, sheetName, sheetRow, posRange):
""" Creates an Entry from a sheetData dict """
logger.debug(f'Creating entry from {sheetRow}')
logger.debug(f'POSRANGE: {posRange}')
date = datetime.strptime(f'{ref.datum.year} '
f'{ref.datum.month} '
f'{sheetName}', '%Y %m %d')
parseDict = {
'Datum': date,
'sheet': sheetName.upper(),
'pos': posRange,
'ref': ref
}
for i in range(0, len(Entry.HEADERS)):
if(Entry.HEADERS[i] != ''):
logger.debug(f'{Entry.HEADERS[i]}: {sheetRow[i]}')
parseDict.update({Entry.HEADERS[i]: sheetRow[i].strip()})
logger.debug(parseDict)
return cls(**parseDict)
def isValid(self):
if(Entry.stripString(self.techniker) != ''
or Entry.stripString(self.kunde) != ''
or Entry.stripString(self.tätigkeit) != ''):
return True
else:
return False
def isComplete(self):
if(Entry.stripString(self.techniker) != ''
and Entry.stripString(self.kunde) != ''
and Entry.stripString(self.tätigkeit) != ''
and Entry.stripString(self.anfang != '')
and Entry.stripString(self.ende != '')):
return True
else:
return False
@staticmethod
def stripString(string):
string = string.strip()
string = string.replace('\\r\\n','')
string = string.replace('\r\n','')
string = string.replace(' ', '')
return string
modules/PositionRange.py
import logging
logger = logging.getLogger(__name__)
class PositionRange():
def __init__(self, p1=None, p2=None):
self.p1 = str(p1).upper().replace('!','') or ''
self.p2 = str(p2).upper() or p1
def __str__(self):
if(self.p1 and self.p2):
return f'!{self.p1}:{self.p2}'
elif(self.p1):
return f'!{self.p1}'
else:
return ''
def __repr__(self):
return f'{self.p1}:{self.p2}'
@classmethod
def from_str(cls, posRange):
""" Class from stringified version e.g. A1:F10 """
try:
p1, p2 = posRange.split(':')
except:
p1 = posRange.split(':')
p2 = p1
return cls(p1, p2)
def p1_column(self):
"""
Gibt den Buchstaben für p1
der aktuellen POSRange zurück
"""
chars = 0
for char in self.p1:
if(char.isalpha()):
chars += 1
return self.p1[0:chars]
def p2_column(self):
"""
Gibt den Buchstaben für p2
der aktuellen POSRange zurück
"""
chars = 0
for char in self.p2:
if(char.isalpha()):
chars += 1
return self.p2[0:chars]
def p1_column_number(self):
"""
Holt den Alphanumerischen Wert für p1, also
den für den Buchstaben den Index
"""
chars = 0
for char in self.p1:
if(char.isalpha()):
chars += 1
x = (chars - 1) * 25
x = x + (ord(self.p1[chars-1].lower()) - 97)
return x
def p2_column_number(self):
"""
Holt den Alphanumerischen Wert für p2, also
den für den Buchstaben den Index
"""
chars = 0
for char in self.p2:
if(char.isalpha()):
chars += 1
x = (chars - 1) * 25
x = x + (ord(self.p2[chars-1].lower()) - 97)
return x
def p1_row(self):
if(len(self.p1) <= 1):
return 1
else:
x = ''
for c in self.p1:
if(c.isdigit()):
x = x + c
return int(x)
def p2_row(self):
if(len(self.p2) <= 1):
return 999999999
else:
x = ''
for c in self.p2:
if(c.isdigit()):
x = x + c
return int(x)
def column_length(self):
"""
Rechnet aus, wie groß Zeilenrange ist, indem
der Abstand zwischen beiden berechnet wird
(bspw. für A4!M => müsste 12 sein)
"""
length = self.p2_column_number() - self.p1_column_number()
return length
def column_index(self, column):
indexLength = self.column_length()
indexStart = self.p1_column_number()
indexFind = (ord(column.lower()) - 97)
index = indexLength - (indexLength - (indexFind - indexStart))
return index
def column_headers(self, row=1):
for char in pos:
if(self.p1[0].isalpha()):
p1 = f'{self.p1[0]}{row}'
if(self.p2[0].isalpha()):
p2 = f'{self.p2[0]}{row}'
return PositionRange(p1, p2)
def increment_row(self):
row = str(self.p1_row() + 1)
self.p1 = self.p1_column() + row
self.p2 = self.p2_column() + row
def decrement_row(self):
row = str(self.p1_row() - 1)
self.p1 = self.p1_column() + row
self.p2 = self.p2_column() + row
modules/Synonyms.py
from collections import Counter
import logging
logger = logging.getLogger(__name__)
class Synonyms():
def __init__(self, *args, **kwargs):
self.synonym = kwargs.get('synonym', '').strip().upper()
self.sheet = kwargs.get('sheet', '').strip().upper()
self.typ = kwargs.get('typ', '').strip().upper()
self.ref = kwargs.get('ref')
def __str__(self):
return f'{self.synonym} ({self.sheet}) [{self.ref.name}]'
def __repr__(self):
return str(self.__dict__)
def __eq__(self, other: str):
if(self.synonym == other.strip().upper()):
return True
else:
return False
def matches(synonym: str, typ: str, name: str) -> bool:
if(self == synonym.upper().strip()):
if(typ and self.typ != typ.upper().strip()):
return False
if(name and self.ref.name.upper().strip() != name.upper().strip()):
return False
return True
else:
return False
@staticmethod
def update(instance) -> None:
logger.info(f'Updating Synonyms for {instance.name}...')
typ = instance.TYP
if(typ == 'CustomSheet'):
return Synonyms.updateCustomSheet(instance)
elif(typ == 'Projektliste'):
return Synonyms.updateOtherCustomSheet(instance)
else:
logger.error(f'Cannot update synonyms. {typ} is unknown instance.')
@staticmethod
def updateCustomSheet(instance) -> None:
synonyms = []
typ = instance.TYP
synonym = {'synonym': instance.name, 'sheet': '', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
synonym = {'synonym': instance.name.replace('Sheet ', ''), 'sheet': '', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
if(instance.datum.month == instance.getTime().month):
synonym = {'synonym': 'CURRENT', 'sheet': '', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
synonym = {'synonym': 'SYNC', 'sheet': 'SYNC', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
synonym = {'synonym': 'PJ-SYNC', 'sheet': 'PJ-SYNC', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
elif(instance.datum.month == instance.getTime(1).month):
synonym = {'synonym': 'PREVIOUS', 'sheet': '', 'ref': instance, 'typ': typ}
synonyms.append(Synonyms(synonym))
for sheet in instance.sheets:
sheetName = sheet['properties']['title']
if(sheetName.isdigit()):
x = instance.datum.replace(day=int(sheetName))
synonym = {'synonym': x.strftime('%d%m%Y'), 'sheet': '', 'name': sheetName, 'typ': typ}
synonyms.append(Synonyms(synonym))
return synonyms
@staticmethod
def filter(synonyms):
""" Filters synonyms list to from .searchSynonyms()
for the Greatest Common Denominator """
greatestCommon = Counter(synonym.ref for synonym in synonyms if synonym.ref)
maxOccurences = 0
for name, occurences in greatestCommon.most_common():
if(occurences == maxOccurences):
raise EOFError(f'Search synonym no max determinable for {greatestCommon}')
elif(occurences > maxOccurences):
maxOccurences = occurences
try:
spreadsheet = greatestCommon.most_common(1)[0][0]
subSearch = [synonym for synonym in synonyms if synonym.ref == spreadsheet]
greatestCommon = Counter(xsearch.sheet for xsearch in subSearch if xsearch.sheet)
sheetName = greatestCommon.most_common(1)[0][0]
# just making sure
result = [x for x in subSearch if x.sheet == sheetName]
logger.debug(f'FILTERED SYNONYM: {result}')
return result[0]
except IndexError as e:
raise EOFError(f'No synonyms specified in {synonyms}') from None
settings.py
"""
Dummy account for Stackoverflow with two sheets using
|- https://stackoverflow.com/questions/19766912/how-do-i-authorise-an-app-web-or-installed-without-user-intervention
"""
CLIENT_ID = '255572645365-h0b1joml2eml85045u1htq062scebu4m.apps.googleusercontent.com'
CLIENT_SECRET = 'Mtx71-OaHyfHyZs6zxSFbJHR'
REFRESH_TOKEN = '1//04dwAK3oaiVrmCgYIARAAGAQSNwF-L9IrmzgKSCRRNMTGiPm9Ih-mCtsv5iIlJpPemHeHpoW7CzM85VxlxbobeoaP3j1uXxt5UvY'
PROXY = ''
example.py
import pickle
import os
from time import sleep
from modules.CustomSheet import CustomSheet
from modules.Synonyms import Synonyms
from modules.Entry import Entry
import logging
import logging.handlers
logger = logging.getLogger(__name__)
CACHE_PICKLE = 'GoogleCache.dat'
CACHE_DIR = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
logging.basicConfig(
format='[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s():%(lineno)d] – %(message)s',
datefmt='%Y/%m/%d %H:%M:%S',
level=logging.DEBUG,
handlers=[
logging.StreamHandler(),
]
)
def saveCache(sheetDataX):
logger.info('Saving cache...')
path = os.path.join(CACHE_DIR, CACHE_PICKLE)
logger.debug(f'Path: {path}')
with open(path, "wb") as f:
pickle.dump(sheetDataX, f, pickle.HIGHEST_PROTOCOL)
sleep(.5)
def loadCache():
logger.info('Loading cache...')
try:
path = os.path.join(CACHE_DIR, CACHE_PICKLE)
with open(path, "rb") as f:
cache = pickle.load(f)
CustomSheet.append(cache)
logger.debug(f'Cache: {cache}')
return cache
except FileNotFoundError as e:
logger.exception('Offline cache store not found. Was probably deleted; recreating completely...')
return buildCache()
def buildCache():
x = CustomSheet.initializeAll()
saveCache(x)
if __name__ == '__main__':
buildCache()