I have a python script that loads a database table and checks an API for the entries found.
I do it like this:
import threading
import constants
import pymysql
import requests
from datetime import datetime
from dateutil.parser import parse
import os
import urllib.request
import shutil
import json
class CheckStreams(threading.Thread):
def __init__(self, debug):
threading.Thread.__init__(self)
self.debug = debug
self.newGames = False
self.online = 0
def get_new_games(self):
return self.newGames
def set_new_games(self):
self.newGames = True
def get_online(self):
return self.online
def set_online(self):
self.online += 1
def run(self):
# Connect to the database
connect = pymysql.connect(host=constants.HOST,
user=constants.USERNAME,
password=constants.PASSWORD,
db=constants.DATABASE,
charset='utf8mb4',
port=constants.PORT,
cursorclass=pymysql.cursors.DictCursor)
# Get Streamer Table #
try:
with connect.cursor() as cursor:
# Read all Streamers #
sql = "SELECT * FROM `ls_streamer` s LEFT JOIN `ls_platforms` p ON s.platform = p.id"
# Execute Query #
cursor.execute(sql)
# Result to streamers Var #
streamers = cursor.fetchall()
# Check if we have Streamers at all #
if cursor.rowcount > 0:
# Show total Streamers #
if self.debug: print('Total Streamers: %s' % constants.color('green', cursor.rowcount))
# Loop through all streamers #
for streamer in streamers:
# Output Current Streamer #
if self.debug: print(
'\n%s - %s' % (
streamer["channel_user"], constants.color(streamer["name"], streamer["name"])))
# Fetch from Platform #
if streamer["platform"] == 1:
# Twitch.tv #
if self.debug: print('Gathering %s' % constants.color('Twitch.tv', 'Twitch.tv'))
# gatherTwitch()
self.gatherTwitch(streamer)
#self.gatherTwitchVods(streamer)
else:
# None #
if self.debug: print(
'Wrong Platform ID: #%s' % constants.color('red', streamer["platform"]))
else:
if self.debug: print('Total Streamers: %s' % constants.color('red', '0'))
except Exception:
constants.PrintException()
finally:
connect.close()
def gatherTwitch(self, streamer):
# Connect to the database
connect = pymysql.connect(host=constants.HOST,
user=constants.USERNAME,
password=constants.PASSWORD,
db=constants.DATABASE,
charset='utf8mb4',
port=constants.PORT,
cursorclass=pymysql.cursors.DictCursor)
# Internal Vars #
isOnline = False
description = None
language = None
thumbnail = None
setOffline = False
banner = "None"
logo = "None"
partner = False
# Create Stream Url from Vars #
streamUrl = '%s%s' % (streamer["url"], streamer["channel_id"])
# Send Request w/ Twitch Headers #
r = requests.get(
streamUrl,
headers=
{
"Accept": "application/vnd.twitchtv.v5+json",
"Client-ID": constants.TWITCH_CLIENT_ID
}
)
# Check Response Code #
if r.status_code == 200:
# JSON #
data = r.json()
# Check if Stream is valid #
if "stream" in data.keys():
# Get Info #
streamData = data["stream"]
if type(streamData) is dict:
# Check if Streaming LoL #
if streamData["game"].lower() == "league of legends":
# Streaming LoL - YAY #
if self.debug: print('%s' % constants.color('green', 'IS STREAMING'))
# Gather Vars #
lastSeen = datetime.now()
if streamer["is_online"] == True:
lastSeen = streamer["modified"]
totalStreamed = streamer["total_online"]
totalStreamed += constants.convertTimeDelta((datetime.now() - lastSeen))
viewers = streamData["viewers"]
resolution = streamData["video_height"]
fps = streamData["average_fps"]
delay = streamData["delay"]
started = parse(streamData["created_at"], None, ignoretz=True)
isOnline = True
# Channel Data #
channelData = streamData["channel"]
if type(channelData) is dict:
description = channelData["status"]
language = channelData["language"]
logo = channelData["logo"]
banner = channelData["profile_banner"]
if channelData["partner"] == "true":
partner = True
if channelData["partner"] is True:
partner = True
else:
# Output Debug #
if self.debug: print('Channel Data %s' % constants.color('red', 'NOT FOUND'))
# Peview Data #
previewData = streamData["preview"]
if type(previewData) is dict:
thumbnail = previewData["medium"]
else:
# Output Debug #
if self.debug: print('Preview Data %s' % constants.color('red', 'NOT FOUND'))
try:
with connect.cursor() as cursorStreamer:
# Got all Vars put to DB #
sqlUpdate = """
UPDATE
`ls_streamer`
SET
total_online = %s,
viewers = %s,
resolution = %s,
fps = %s,
delay = %s,
description = %s,
language = %s,
thumbnail = %s,
logo = %s,
banner = %s,
started = %s,
is_online = %s,
modified = %s,
is_partner = %s
WHERE
id = %s
"""
# Execute Query #
connect.escape(sqlUpdate)
cursorStreamer.execute(sqlUpdate, (
totalStreamed, viewers, resolution, fps, delay, description, language, thumbnail,
logo, banner, started, isOnline, datetime.now(), partner, streamer["id"]))
connect.commit()
self.set_online()
# Now crawl the Riot API for Live Games for all connected summoners #
self.getLiveGames(streamer)
except Exception:
constants.PrintException()
finally:
if self.debug: print('Update ls_streamer: %s' % constants.color('green', 'SUCCESS'))
else:
# Set Offline #
setOffline = True
# Stream online but not streaming LoL #
if self.debug: print(
'Stream is %s but %s streaming LoL' % (
constants.color('green', 'online'), constants.color('red', 'NOT')))
else:
# Set Offline #
setOffline = True
# Output Debug #
if self.debug: print('Streamer is %s' % constants.color('red', 'OFFLINE'))
else:
# Set Offline #
setOffline = True
# Output Debug #
if self.debug: print('Streamer is %s' % constants.color('red', 'OFFLINE'))
else:
# Set Offline #
setOffline = True
# Output Debug #
if self.debug: print('->%s %s' % ('Response Code', constants.color('red', r.status_code)))
# Output Url #
if self.debug: print('Streamer Url: %s' % streamUrl)
if setOffline == True:
try:
with connect.cursor() as cursorStreamerOff:
# Got all Vars put to DB #
sqlOffline = """
UPDATE
`ls_streamer`
SET
is_online = %s
WHERE
id = %s
"""
# Execute Query #
connect.escape(sqlOffline)
cursorStreamerOff.execute(sqlOffline, (False, streamer["id"]))
connect.commit()
except Exception:
constants.PrintException()
# Create Stream Url from Vars #
streamUrlVod = '%s%s/videos?broadcast_type=archive&limit=100' % (
streamer["channelurl"], streamer["channel_id"])
# Send Request w/ Twitch Headers #
rVod = requests.get(
streamUrlVod,
headers=
{
"Accept": "application/vnd.twitchtv.v5+json",
"Client-ID": constants.TWITCH_CLIENT_ID
}
)
# Check Response Code #
if rVod.status_code == 200:
# JSON #
data = rVod.json()
# Check if Stream is valid #
if "videos" in data.keys():
# Get Info #
videos = data["videos"]
for video in videos:
thumb = video["preview"]["medium"]
video_id = video["_id"]
length = video["length"]
created = video["created_at"]
if video["game"] == "League of Legends" and video["viewable"] == "public":
# Check if video exists #
try:
with connect.cursor() as cursorVod:
sqlInsert = """
INSERT INTO
`ls_vods`
(streamer_id, thumbnail, video_id, created, `length`, last_check)
VALUES
(%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
last_check = %s,
thumbnail = %s,
`length` = %s
"""
cursorVod.execute(sqlInsert,
(streamer["id"], thumb, video_id, created, length, datetime.now(), datetime.now(), thumb, length))
connect.commit()
except Exception:
constants.PrintException()
else:
if self.debug: print('Videos not found in URL: %s' % streamUrl)
else:
# Output Debug #
if self.debug: print('->%s %s' % ('Response Code', constants.color('red', r.status_code)))
# Output Url #
if self.debug: print('Streamer Url: %s' % streamUrl)
def getLiveGames(self, streamer):
# Connect to the database
connect = pymysql.connect(host=constants.HOST,
user=constants.USERNAME,
password=constants.PASSWORD,
db=constants.DATABASE,
charset='utf8mb4',
port=constants.PORT,
cursorclass=pymysql.cursors.DictCursor)
# Select all Summoners from Streamer #
try:
with connect.cursor() as cursorSummoner:
sqlSummoner = """
SELECT * FROM
`ls_summoner` s
LEFT JOIN
`ls_regions` r
ON
r.id = s.region_id
WHERE
streamer_id = %s
"""
# Execute Query #
cursorSummoner.execute(sqlSummoner, (streamer["id"],))
summoners = cursorSummoner.fetchall()
if cursorSummoner.rowcount > 0:
for summoner in summoners:
# Output which Summoner #
if self.debug: print('Checking Live Game for Summoner %s-%s' % (
summoner["short"].upper(), summoner["name"]))
urlLive = "https://%s.api.riotgames.com/lol/spectator/v3/active-games/by-summoner/%s?api_key=%s" % (
summoner["long"].lower(), summoner["summoner_id"],
constants.RIOT_API_KEY)
r = requests.get(urlLive)
if self.debug: print("Checking %s" % urlLive)
# Check Response Code #
if r.status_code == 200:
# Output #
if self.debug: print('->%s Found Live Game for %s-%s' % (
constants.color('green', 'FOUND'), summoner["short"].upper(), summoner["name"]))
# JSON #
game = r.json()
# Gather Vars #
map = game["mapId"]
gameId = game["gameId"]
gameMode = game["gameMode"]
gameType = game["gameType"]
queueId = game["gameQueueConfigId"]
played = game["gameLength"]
spell1Id = None
spell2Id = None
teamId = None
champion = None
runes = None
masteries = None
pp = game["participants"]
if type(pp) is list:
for part in pp:
if part["summonerId"] == summoner["summoner_id"]:
teamId = part["teamId"]
spell1Id = part["spell1Id"]
spell2Id = part["spell2Id"]
champion = part["championId"]
runes = json.dumps(part["runes"])
masteries = json.dumps(part["masteries"])
if self.debug: print('Game %s. Game ID #%s' % (constants.color('green', 'found'), gameId))
# MySQL #
try:
with connect.cursor() as cursorUpdateCurrent:
# Check Champion in DB #
sqlChampion = "SELECT * FROM `ls_champions` WHERE id = %s"
cursorUpdateCurrent.execute(sqlChampion,
(champion,))
if cursorUpdateCurrent.rowcount == 0:
# Champion not in DB TODO: SEND MAIL #
if self.debug: print("Champion %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Check Map in DB #
sqlMap = "SELECT * FROM `ls_maps` WHERE id = %s"
cursorUpdateCurrent.execute(sqlMap,
(map,))
if cursorUpdateCurrent.rowcount == 0:
# Map not in DB TODO: SEND MAIL #
if self.debug: print("Map %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Check Queue in DB #
sqlQueue = "SELECT * FROM `ls_queues` WHERE id = %s"
cursorUpdateCurrent.execute(sqlQueue,
(queueId,))
if cursorUpdateCurrent.rowcount == 0:
# Queue not in DB TODO: SEND MAIL #
if self.debug: print("Queue %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Check P1Spell1 in DB #
sqlP1Spell1 = "SELECT * FROM `ls_spells` WHERE id = %s"
cursorUpdateCurrent.execute(sqlP1Spell1,
(spell1Id,))
if cursorUpdateCurrent.rowcount == 0:
# P1Spell1 not in DB TODO: SEND MAIL #
if self.debug: print("P1Spell1 %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Check P1Spell2 in DB #
sqlP1Spell2 = "SELECT * FROM `ls_spells` WHERE id = %s"
cursorUpdateCurrent.execute(
sqlP1Spell2,
(spell2Id,))
if cursorUpdateCurrent.rowcount == 0:
# P1Spell2 not in DB TODO: SEND MAIL #
if self.debug: print("P1Spell2 %",
constants.color('red',
'NOT IN DB'))
# Exit current Game Update #
break
# Select Current Match #
sqlSelectCurrent = " SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
cursorUpdateCurrent.execute(sqlSelectCurrent, (summoner["id"]))
if cursorUpdateCurrent.rowcount > 0:
# Found a Current Match #
sqlUpdateCurrent = """
UPDATE
`ls_current_match`
SET
champion_id = %s,
map_id = %s,
summoner_id = %s,
queue_id = %s,
match_id = %s,
team = %s,
length = %s,
type = %s,
mode = %s,
modified = %s,
p1_spell1_id = %s,
p1_spell2_id = %s,
is_playing = %s,
runes = %s,
masteries = %s
WHERE
summoner_id = %s
"""
cursorUpdateCurrent.execute(
sqlUpdateCurrent, (
champion,
map,
summoner["id"],
queueId,
gameId,
teamId,
played,
gameType,
gameMode,
datetime.now(),
spell1Id,
spell2Id,
True,
runes,
masteries, summoner["id"]
))
connect.commit()
if self.debug: print(
'%s Current Match in Database\n' % constants.color('yellow',
'UPDATED'))
else:
# No current Match for Summoner Id found #
sqlNewCurrent = """
INSERT INTO
`ls_current_match`
(champion_id, map_id, summoner_id, queue_id, match_id, team, length, type, mode, modified, p1_spell1_id, p1_spell2_id, is_playing, runes, masteries)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursorUpdateCurrent.execute(sqlNewCurrent, (
champion,
map,
summoner["id"],
queueId,
gameId,
teamId,
played,
gameType,
gameMode,
datetime.now(),
spell1Id,
spell2Id,
True,
runes,
masteries
))
connect.commit()
if self.debug: print(
'%s Current Match in Database\n' % constants.color(
'green', 'INSERTED'))
except Exception:
constants.PrintException()
# TODO: ENTER TO MATCHES DB #
# RESONS: If Game Ends and new One Starts before its detected in the While Loop, it gets overwritten #
# Exit if One Playing found #
break
else:
# Response Code not 200 #
if self.debug: print(
'->%s %s' % ('Response Code', constants.color('red', r.status_code)))
# Reset Games in Current Matches #
try:
with connect.cursor() as cursorUpdateCurrent:
# Select Current Match #
sqlSelectCurrent = "SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
cursorUpdateCurrent.execute(sqlSelectCurrent, (summoner["id"]))
if cursorUpdateCurrent.rowcount == 0:
# Output not found in Current Match -> has never played #
if self.debug: print(
'Not found, Summoner %s' % constants.color('yellow', 'NEVER PLAYED'))
else:
# Set isPlaying = false #
sqlSelectCurrent = "UPDATE `ls_current_match` SET is_playing = %s WHERE summoner_id = %s"
cursorUpdateCurrent.execute(sqlSelectCurrent, (False, summoner["id"]))
connect.commit()
if self.debug: print(
'%s Current Match in Database, set isPlaying = false\n' % constants.color(
'green', 'UPDATED'))
# Create new Matchc in MatchHistory #
sqlSelectCurrentRe = " SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
cursorUpdateCurrent.execute(sqlSelectCurrentRe, (summoner["id"]))
cG = cursorUpdateCurrent.fetchone()
# See if Match in MatchHistory ->should not be the case #
sqlSelectMatch = "SELECT * FROM `ls_matches` WHERE match_id = %s AND streamer = %s"
cursorUpdateCurrent.execute(sqlSelectMatch, (cG["match_id"], streamer["id"]))
if cursorUpdateCurrent.rowcount == 0:
sqlNewMatch = """
INSERT INTO
`ls_matches`
(streamer, champion, map, match_id, team, lane, role, length, type, win, modified, crawled, summoner, region)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursorUpdateCurrent.execute(
sqlNewMatch, (
streamer["id"],
cG["champion_id"],
cG["map_id"],
cG["match_id"],
cG["team"],
'NONE',
'NONE',
cG["length"],
cG["type"],
1,
datetime.now(),
False,
cG["summoner_id"],
summoner["region_id"]
))
connect.commit()
else:
# Output that Match already exists !?? #
if self.debug: print('Match-ID %s %s' % (
cG["match_id"], constants.color('yellow', 'already crawled')))
self.set_new_games()
except Exception:
constants.PrintException()
else:
# No Summoners for Streamer #
if self.debug: print("%s found" % constants.color('red', 'NO SUMMONERS'))
except Exception:
constants.PrintException()
This works okay-ish, but as the table grows, it takes so much longer for the individual entries to be checked. Currently it takes about 1 second to check the API per entry, and I have around 300 entries. So a whole db check takes ~300 seconds. As the database grows, the checks take longer and longer. But, I don't want the first entry to be updated every 1000+ seconds if I have 1000+ entries.
Is there a way to multi-thread this without spawning a thread for each entry/db-id? Also the table updates frequently (added and deleted rows).
I am coming from a PHP background, so threading isn't anything I have worked with before, and I only have little understanding of it. Currently the CheckStreams function is one of three threads my main class spawns.