4
\$\begingroup\$

I have a python script that loads a database table and checks an API for the entries found.

I do it like this:

import threading
import constants
import pymysql
import requests
from datetime import datetime
from dateutil.parser import parse
import os
import urllib.request
import shutil
import json


class CheckStreams(threading.Thread):
  def __init__(self, debug):
        threading.Thread.__init__(self)
        self.debug = debug
        self.newGames = False
        self.online = 0

  def get_new_games(self):
        return self.newGames

  def set_new_games(self):
        self.newGames = True

  def get_online(self):
        return self.online

  def set_online(self):
        self.online += 1

  def run(self):

    # Connect to the database
    connect = pymysql.connect(host=constants.HOST,
                              user=constants.USERNAME,
                              password=constants.PASSWORD,
                              db=constants.DATABASE,
                              charset='utf8mb4',
                              port=constants.PORT,
                              cursorclass=pymysql.cursors.DictCursor)
    # Get Streamer Table #
    try:

        with connect.cursor() as cursor:

            # Read all Streamers #
            sql = "SELECT * FROM `ls_streamer` s LEFT JOIN `ls_platforms` p ON s.platform = p.id"

            # Execute Query #
            cursor.execute(sql)

            # Result to streamers Var #
            streamers = cursor.fetchall()

            # Check if we have Streamers at all #
            if cursor.rowcount > 0:

                # Show total Streamers #
                if self.debug: print('Total Streamers: %s' % constants.color('green', cursor.rowcount))

                # Loop through all streamers #
                for streamer in streamers:

                    # Output Current Streamer #
                    if self.debug: print(
                        '\n%s - %s' % (
                        streamer["channel_user"], constants.color(streamer["name"], streamer["name"])))

                    # Fetch from Platform #
                    if streamer["platform"] == 1:

                        # Twitch.tv #
                        if self.debug: print('Gathering %s' % constants.color('Twitch.tv', 'Twitch.tv'))
                        # gatherTwitch()
                        self.gatherTwitch(streamer)
                        #self.gatherTwitchVods(streamer)
                    else:

                        # None #
                        if self.debug: print(
                            'Wrong Platform ID: #%s' % constants.color('red', streamer["platform"]))

            else:
                if self.debug: print('Total Streamers: %s' % constants.color('red', '0'))

    except Exception:
        constants.PrintException()

    finally:
        connect.close()


def gatherTwitch(self, streamer):

    # Connect to the database
    connect = pymysql.connect(host=constants.HOST,
                              user=constants.USERNAME,
                              password=constants.PASSWORD,
                              db=constants.DATABASE,
                              charset='utf8mb4',
                              port=constants.PORT,
                              cursorclass=pymysql.cursors.DictCursor)

    # Internal Vars #
    isOnline = False
    description = None
    language = None
    thumbnail = None
    setOffline = False
    banner = "None"
    logo = "None"
    partner = False

    # Create Stream Url from Vars #
    streamUrl = '%s%s' % (streamer["url"], streamer["channel_id"])

    # Send Request w/ Twitch Headers #
    r = requests.get(
        streamUrl,
        headers=
        {
            "Accept": "application/vnd.twitchtv.v5+json",
            "Client-ID": constants.TWITCH_CLIENT_ID
        }
    )

    # Check Response Code #
    if r.status_code == 200:

        # JSON #
        data = r.json()

        # Check if Stream is valid #
        if "stream" in data.keys():

            # Get Info #
            streamData = data["stream"]
            if type(streamData) is dict:

                # Check if Streaming LoL #
                if streamData["game"].lower() == "league of legends":

                    # Streaming LoL - YAY #
                    if self.debug: print('%s' % constants.color('green', 'IS STREAMING'))

                    # Gather Vars #
                    lastSeen = datetime.now()
                    if streamer["is_online"] == True:
                        lastSeen = streamer["modified"]

                    totalStreamed = streamer["total_online"]
                    totalStreamed += constants.convertTimeDelta((datetime.now() - lastSeen))

                    viewers = streamData["viewers"]
                    resolution = streamData["video_height"]
                    fps = streamData["average_fps"]
                    delay = streamData["delay"]
                    started = parse(streamData["created_at"], None, ignoretz=True)
                    isOnline = True

                    # Channel Data #
                    channelData = streamData["channel"]
                    if type(channelData) is dict:

                        description = channelData["status"]
                        language = channelData["language"]
                        logo = channelData["logo"]
                        banner = channelData["profile_banner"]
                        if channelData["partner"] == "true":
                            partner = True
                        if channelData["partner"] is True:
                            partner = True

                    else:
                        # Output Debug #
                        if self.debug: print('Channel Data %s' % constants.color('red', 'NOT FOUND'))

                    # Peview Data #
                    previewData = streamData["preview"]
                    if type(previewData) is dict:

                        thumbnail = previewData["medium"]

                    else:
                        # Output Debug #
                        if self.debug: print('Preview Data %s' % constants.color('red', 'NOT FOUND'))

                    try:
                        with connect.cursor() as cursorStreamer:

                            # Got all Vars put to DB #
                            sqlUpdate = """
                            UPDATE
                              `ls_streamer`
                            SET
                              total_online = %s,
                              viewers = %s,
                              resolution = %s,
                              fps = %s,
                              delay = %s,
                              description = %s,
                              language = %s,
                              thumbnail = %s,
                              logo = %s,
                              banner = %s,
                              started = %s,
                              is_online = %s,
                              modified = %s,
                              is_partner = %s
                            WHERE
                              id = %s
                            """

                            # Execute Query #
                            connect.escape(sqlUpdate)
                            cursorStreamer.execute(sqlUpdate, (
                                totalStreamed, viewers, resolution, fps, delay, description, language, thumbnail,
                                logo, banner, started, isOnline, datetime.now(), partner, streamer["id"]))
                            connect.commit()
                            self.set_online()

                            # Now crawl the Riot API for Live Games for all connected summoners #
                            self.getLiveGames(streamer)

                    except Exception:
                        constants.PrintException()

                    finally:
                        if self.debug: print('Update ls_streamer: %s' % constants.color('green', 'SUCCESS'))

                else:

                    # Set Offline #
                    setOffline = True
                    # Stream online but not streaming LoL #
                    if self.debug: print(
                        'Stream is %s but %s streaming LoL' % (
                        constants.color('green', 'online'), constants.color('red', 'NOT')))

            else:

                # Set Offline #
                setOffline = True
                # Output Debug #
                if self.debug: print('Streamer is %s' % constants.color('red', 'OFFLINE'))

        else:

            # Set Offline #
            setOffline = True
            # Output Debug #
            if self.debug: print('Streamer is %s' % constants.color('red', 'OFFLINE'))

    else:

        # Set Offline #
        setOffline = True

        # Output Debug #
        if self.debug: print('->%s %s' % ('Response Code', constants.color('red', r.status_code)))

        # Output Url #
        if self.debug: print('Streamer Url: %s' % streamUrl)

    if setOffline == True:
        try:
            with connect.cursor() as cursorStreamerOff:

                # Got all Vars put to DB #
                sqlOffline = """
                   UPDATE
                     `ls_streamer`
                   SET
                     is_online = %s
                   WHERE
                     id = %s
                   """

                # Execute Query #
                connect.escape(sqlOffline)
                cursorStreamerOff.execute(sqlOffline, (False, streamer["id"]))
                connect.commit()

        except Exception:
            constants.PrintException()

    # Create Stream Url from Vars #
    streamUrlVod = '%s%s/videos?broadcast_type=archive&limit=100' % (
    streamer["channelurl"], streamer["channel_id"])

    # Send Request w/ Twitch Headers #
    rVod = requests.get(
        streamUrlVod,
        headers=
        {
            "Accept": "application/vnd.twitchtv.v5+json",
            "Client-ID": constants.TWITCH_CLIENT_ID
        }
    )

    # Check Response Code #
    if rVod.status_code == 200:

        # JSON #
        data = rVod.json()

        # Check if Stream is valid #
        if "videos" in data.keys():
            # Get Info #
            videos = data["videos"]

            for video in videos:

                thumb = video["preview"]["medium"]
                video_id = video["_id"]
                length = video["length"]
                created = video["created_at"]
                if video["game"] == "League of Legends" and video["viewable"] == "public":

                    # Check if video exists #
                    try:
                        with connect.cursor() as cursorVod:
                            sqlInsert = """
                                INSERT INTO
                                  `ls_vods`
                                  (streamer_id, thumbnail, video_id, created, `length`, last_check)
                                VALUES
                                  (%s, %s, %s, %s, %s, %s)
                                ON DUPLICATE KEY UPDATE
                                  last_check = %s,
                                  thumbnail = %s,
                                  `length` = %s
                            """

                            cursorVod.execute(sqlInsert,
                                           (streamer["id"], thumb, video_id, created, length, datetime.now(), datetime.now(), thumb, length))
                            connect.commit()
                    except Exception:
                        constants.PrintException()
        else:
            if self.debug: print('Videos not found in URL: %s' % streamUrl)
    else:
        # Output Debug #
        if self.debug: print('->%s %s' % ('Response Code', constants.color('red', r.status_code)))

        # Output Url #
        if self.debug: print('Streamer Url: %s' % streamUrl)

def getLiveGames(self, streamer):

    # Connect to the database
    connect = pymysql.connect(host=constants.HOST,
                              user=constants.USERNAME,
                              password=constants.PASSWORD,
                              db=constants.DATABASE,
                              charset='utf8mb4',
                              port=constants.PORT,
                              cursorclass=pymysql.cursors.DictCursor)

    # Select all Summoners from Streamer #
    try:
        with connect.cursor() as cursorSummoner:
            sqlSummoner = """
            SELECT * FROM
              `ls_summoner` s
            LEFT JOIN
              `ls_regions` r
            ON
              r.id = s.region_id
            WHERE
              streamer_id = %s
            """

            # Execute Query #
            cursorSummoner.execute(sqlSummoner, (streamer["id"],))
            summoners = cursorSummoner.fetchall()

            if cursorSummoner.rowcount > 0:

                for summoner in summoners:

                    # Output which Summoner #
                    if self.debug: print('Checking Live Game for Summoner %s-%s' % (
                        summoner["short"].upper(), summoner["name"]))

                    urlLive = "https://%s.api.riotgames.com/lol/spectator/v3/active-games/by-summoner/%s?api_key=%s" % (
                        summoner["long"].lower(), summoner["summoner_id"],
                        constants.RIOT_API_KEY)

                    r = requests.get(urlLive)

                    if self.debug: print("Checking %s" % urlLive)

                    # Check Response Code #
                    if r.status_code == 200:

                        # Output #
                        if self.debug: print('->%s Found Live Game for %s-%s' % (
                            constants.color('green', 'FOUND'), summoner["short"].upper(), summoner["name"]))

                        # JSON #
                        game = r.json()

                        # Gather Vars #
                        map = game["mapId"]
                        gameId = game["gameId"]
                        gameMode = game["gameMode"]
                        gameType = game["gameType"]
                        queueId = game["gameQueueConfigId"]
                        played = game["gameLength"]
                        spell1Id = None
                        spell2Id = None
                        teamId = None
                        champion = None
                        runes = None
                        masteries = None

                        pp = game["participants"]
                        if type(pp) is list:
                            for part in pp:
                                if part["summonerId"] == summoner["summoner_id"]:
                                    teamId = part["teamId"]
                                    spell1Id = part["spell1Id"]
                                    spell2Id = part["spell2Id"]
                                    champion = part["championId"]
                                    runes = json.dumps(part["runes"])
                                    masteries = json.dumps(part["masteries"])

                        if self.debug: print('Game %s. Game ID #%s' % (constants.color('green', 'found'), gameId))

                        # MySQL #
                        try:
                            with connect.cursor() as cursorUpdateCurrent:

                                # Check Champion in DB #
                                sqlChampion = "SELECT * FROM `ls_champions` WHERE id = %s"
                                cursorUpdateCurrent.execute(sqlChampion,
                                                            (champion,))
                                if cursorUpdateCurrent.rowcount == 0:
                                    # Champion not in DB TODO: SEND MAIL #
                                    if self.debug: print("Champion %",
                                                         constants.color('red',
                                                                         'NOT IN DB'))
                                    # Exit current Game Update #
                                    break

                                # Check Map in DB #
                                sqlMap = "SELECT * FROM `ls_maps` WHERE id = %s"
                                cursorUpdateCurrent.execute(sqlMap,
                                                            (map,))
                                if cursorUpdateCurrent.rowcount == 0:
                                    # Map not in DB TODO: SEND MAIL #
                                    if self.debug: print("Map %",
                                                         constants.color('red',
                                                                         'NOT IN DB'))
                                    # Exit current Game Update #
                                    break

                                # Check Queue in DB #
                                sqlQueue = "SELECT * FROM `ls_queues` WHERE id = %s"
                                cursorUpdateCurrent.execute(sqlQueue,
                                                            (queueId,))
                                if cursorUpdateCurrent.rowcount == 0:
                                    # Queue not in DB TODO: SEND MAIL #
                                    if self.debug: print("Queue %",
                                                         constants.color('red',
                                                                         'NOT IN DB'))
                                    # Exit current Game Update #
                                    break

                                # Check P1Spell1 in DB #
                                sqlP1Spell1 = "SELECT * FROM `ls_spells` WHERE id = %s"
                                cursorUpdateCurrent.execute(sqlP1Spell1,
                                                            (spell1Id,))
                                if cursorUpdateCurrent.rowcount == 0:
                                    # P1Spell1 not in DB TODO: SEND MAIL #
                                    if self.debug: print("P1Spell1 %",
                                                         constants.color('red',
                                                                         'NOT IN DB'))
                                    # Exit current Game Update #
                                    break

                                # Check P1Spell2 in DB #
                                sqlP1Spell2 = "SELECT * FROM `ls_spells` WHERE id = %s"
                                cursorUpdateCurrent.execute(
                                    sqlP1Spell2,
                                    (spell2Id,))
                                if cursorUpdateCurrent.rowcount == 0:
                                    # P1Spell2 not in DB TODO: SEND MAIL #
                                    if self.debug: print("P1Spell2 %",
                                                         constants.color('red',
                                                                         'NOT IN DB'))
                                    # Exit current Game Update #
                                    break

                                # Select Current Match #
                                sqlSelectCurrent = " SELECT * FROM `ls_current_match` WHERE summoner_id = %s"

                                cursorUpdateCurrent.execute(sqlSelectCurrent, (summoner["id"]))

                                if cursorUpdateCurrent.rowcount > 0:

                                    # Found a Current Match #
                                    sqlUpdateCurrent = """
                                    UPDATE
                                      `ls_current_match`
                                    SET
                                      champion_id = %s,
                                      map_id = %s,
                                      summoner_id = %s,
                                      queue_id = %s,
                                      match_id = %s,
                                      team = %s,
                                      length = %s,
                                      type = %s,
                                      mode = %s,
                                      modified = %s,
                                      p1_spell1_id = %s,
                                      p1_spell2_id = %s,
                                      is_playing = %s,
                                      runes = %s,
                                      masteries = %s
                                    WHERE
                                      summoner_id = %s
                                    """

                                    cursorUpdateCurrent.execute(
                                        sqlUpdateCurrent, (
                                            champion,
                                            map,
                                            summoner["id"],
                                            queueId,
                                            gameId,
                                            teamId,
                                            played,
                                            gameType,
                                            gameMode,
                                            datetime.now(),
                                            spell1Id,
                                            spell2Id,
                                            True,
                                            runes,
                                            masteries, summoner["id"]
                                        ))

                                    connect.commit()
                                    if self.debug: print(
                                        '%s Current Match in Database\n' % constants.color('yellow',
                                                                                           'UPDATED'))



                                else:

                                    # No current Match for Summoner Id found #
                                    sqlNewCurrent = """
                                    INSERT INTO
                                      `ls_current_match`
                                    (champion_id, map_id, summoner_id, queue_id, match_id, team, length, type, mode, modified, p1_spell1_id, p1_spell2_id, is_playing, runes, masteries)
                                    VALUES
                                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                                    """

                                    cursorUpdateCurrent.execute(sqlNewCurrent, (
                                        champion,
                                        map,
                                        summoner["id"],
                                        queueId,
                                        gameId,
                                        teamId,
                                        played,
                                        gameType,
                                        gameMode,
                                        datetime.now(),
                                        spell1Id,
                                        spell2Id,
                                        True,
                                        runes,
                                        masteries
                                    ))

                                    connect.commit()
                                    if self.debug: print(
                                        '%s Current Match in Database\n' % constants.color(
                                            'green', 'INSERTED'))





                        except Exception:
                            constants.PrintException()

                        # TODO: ENTER TO MATCHES DB #
                        # RESONS: If Game Ends and new One Starts before its detected in the While Loop, it gets overwritten #

                        # Exit if One Playing found #
                        break


                    else:
                        # Response Code not 200 #
                        if self.debug: print(
                            '->%s %s' % ('Response Code', constants.color('red', r.status_code)))

                        # Reset Games in Current Matches #
                        try:
                            with connect.cursor() as cursorUpdateCurrent:
                                # Select Current Match #
                                sqlSelectCurrent = "SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
                                cursorUpdateCurrent.execute(sqlSelectCurrent, (summoner["id"]))

                                if cursorUpdateCurrent.rowcount == 0:

                                    # Output not found in Current Match -> has never played #
                                    if self.debug: print(
                                        'Not found, Summoner %s' % constants.color('yellow', 'NEVER PLAYED'))

                                else:

                                    # Set isPlaying = false #
                                    sqlSelectCurrent = "UPDATE  `ls_current_match`  SET  is_playing = %s WHERE summoner_id = %s"
                                    cursorUpdateCurrent.execute(sqlSelectCurrent, (False, summoner["id"]))
                                    connect.commit()

                                    if self.debug: print(
                                        '%s Current Match in Database, set isPlaying = false\n' % constants.color(
                                            'green', 'UPDATED'))

                                    # Create new Matchc in MatchHistory #
                                    sqlSelectCurrentRe = " SELECT * FROM `ls_current_match` WHERE summoner_id = %s"
                                    cursorUpdateCurrent.execute(sqlSelectCurrentRe, (summoner["id"]))
                                    cG = cursorUpdateCurrent.fetchone()

                                    # See if Match in MatchHistory ->should not be the case #
                                    sqlSelectMatch = "SELECT * FROM  `ls_matches` WHERE match_id = %s AND streamer = %s"
                                    cursorUpdateCurrent.execute(sqlSelectMatch, (cG["match_id"], streamer["id"]))

                                    if cursorUpdateCurrent.rowcount == 0:

                                        sqlNewMatch = """
                                        INSERT INTO
                                          `ls_matches`
                                          (streamer, champion, map, match_id, team, lane, role, length, type, win, modified, crawled, summoner, region)
                                        VALUES
                                          (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                                        """

                                        cursorUpdateCurrent.execute(
                                            sqlNewMatch, (
                                                streamer["id"],
                                                cG["champion_id"],
                                                cG["map_id"],
                                                cG["match_id"],
                                                cG["team"],
                                                'NONE',
                                                'NONE',
                                                cG["length"],
                                                cG["type"],
                                                1,
                                                datetime.now(),
                                                False,
                                                cG["summoner_id"],
                                                summoner["region_id"]

                                            ))

                                        connect.commit()

                                    else:

                                        # Output that Match already exists !?? #
                                        if self.debug: print('Match-ID %s %s' % (
                                            cG["match_id"], constants.color('yellow', 'already crawled')))

                                    self.set_new_games()

                        except Exception:
                            constants.PrintException()
            else:
                # No Summoners for Streamer #
                if self.debug: print("%s found" % constants.color('red', 'NO SUMMONERS'))

    except Exception:
        constants.PrintException()

This works okay-ish, but as the table grows, it takes so much longer for the individual entries to be checked. Currently it takes about 1 second to check the API per entry, and I have around 300 entries. So a whole db check takes ~300 seconds. As the database grows, the checks take longer and longer. But, I don't want the first entry to be updated every 1000+ seconds if I have 1000+ entries.

Is there a way to multi-thread this without spawning a thread for each entry/db-id? Also the table updates frequently (added and deleted rows).

I am coming from a PHP background, so threading isn't anything I have worked with before, and I only have little understanding of it. Currently the CheckStreams function is one of three threads my main class spawns.

\$\endgroup\$
0

1 Answer 1

1
\$\begingroup\$

Layout

There are a lot of unnecessary blank lines between lines of code:

# Check Response Code #
if rVod.status_code == 200:

    # JSON #
    data = rVod.json()

    # Check if Stream is valid #
    if "videos" in data.keys():
        # Get Info #
        videos = data["videos"]

        for video in videos:

            thumb = video["preview"]["medium"]

It almost looks double-spaced. There is value in being able to see more lines of code on your computer screen; it helps to understand the code better.

Comments

Some of the comments can be deleted to reduce clutter. For example:

# JSON #
data = rVod.json()

It is obvious from the code that you are doing something with JSON, and the single-word comment adds nothing to help understand the code better.

Comments related to future enhancements should also be deleted:

# TODO: ENTER TO MATCHES DB #
# RESONS: If Game Ends and new One Starts before its detected in the While Loop, it gets overwritten #

Keep track of these in another file in your version control system.

Commented-out code should be deleted:

#self.gatherTwitchVods(streamer)

Tools

You could run code development tools to automatically find some style issues with your code.

ruff identifies several unused import lines:

import os
import urllib.request
import shutil

You can have ruff remove them for you using the --fix option.

It also identifies several instances of multiple statements on a single line:

if self.debug: print(

The black program can be used to fix those.

ruff also identifies a couple places where you compare against the boolean True value:

if setOffline == True:

This is simpler:

if setOffline:

try/except

The except statements are many lines away from the try lines.
PEP 8 recommends that you limit the try clause to the absolute minimum amount of code necessary to avoid masking bugs. It is hard to keep track of what line (or lines) are expected to result in the exception.

\$\endgroup\$

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.