From d1d08fc08047e55f8e9a5f7258583e8d9d45064d Mon Sep 17 00:00:00 2001 From: Brandon Cornejo Date: Tue, 13 May 2014 23:53:01 -0500 Subject: [PATCH] Create dotabot.py --- dotabot.py | 325 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 dotabot.py diff --git a/dotabot.py b/dotabot.py new file mode 100644 index 0000000..0e34139 --- /dev/null +++ b/dotabot.py @@ -0,0 +1,325 @@ +from ircutils import bot, events, format +import simplejson as json +import urllib2, random, re +import HTMLParser +from collections import deque + +class DotaMatch: + api_key="" + hero_data = {} + + def __init__(self, channel, user, command, params): + self.channel = channel + self.requester = user + self.command = command + + if not params: + self.query = [user] + else: + self.query = params + + self.matches = [] + self.notice = [] + self.steam_id = '' + + def get_hero_values(self): + xhr = urllib2.Request("https://api.steampowered.com/IEconDOTA2_570/GetHeroes/v0001/?key="+self.api_key+"&language=en_us") + o = urllib2.build_opener() + f = o.open(xhr) + data = json.load(f) + + for hero in data['result']['heroes']: + self.hero_data[hero['id']] = hero + + def lookup_player(self, query): + xhr = urllib2.Request("http://api.steampowered.com/ISteamUser/ResolveVanityURL/v0001/?key="+self.api_key+"&vanityurl="+query[0]) + o = urllib2.build_opener() + f = o.open(xhr) + data = json.load(f) + + if data['response']['success'] == 1: + self.steam_id = data['response']['steamid'] + else: + self.steam_id = 'NULL' + + def get_latest_match_id(self): + if self.command == "!MATCHBYID": + return True + + xhr = urllib2.Request("https://api.steampowered.com/IDOTA2Match_570/GetMatchHistory/V001/?key="+self.api_key+"&account_id="+self.steam_id) + o = urllib2.build_opener() + f = o.open(xhr) + data = json.load(f) + + # did we get matches back? + try: + self.matches = data['result']['matches'] + return True + except KeyError: + return False + + def get_match_info(self): + try: + xhr = urllib2.Request("https://api.steampowered.com/IDOTA2Match_570/GetMatchDetails/V001/?key="+self.api_key+"&match_id="+str(self.matches[0]['match_id'])) + o = urllib2.build_opener() + f = o.open(xhr) + data = json.load(f) + except Exception as e: + self.notice.append("Problem connecting to the Dota2 API.") + else: + # get users 32-bit SteamID so we can look them up + sid = int(self.steam_id) - 76561197960265728 + p = {} + + for player in data['result']['players']: + if player['account_id'] == sid: + p = player + break + if not p: + self.notice.append("Player {} not found in match {}.".format(self.query[0], self.query[1])) + return + + hero_id = self.hero_data[p['hero_id']]['localized_name'] + kills = p['kills'] + deaths = p['deaths'] + assists = p['assists'] + last_hits = p['last_hits'] + gpm = p['gold_per_min'] + time = data['result']['duration'] / 60 + + if( p['player_slot'] & (1<<7) == 0 ): + team = 'Radiant' + else: + team = 'Dire' + + if( team is 'Dire' and not bool(data['result']['radiant_win'])): + result = 'Victory!' + elif( team is 'Radiant' and bool(data['result']['radiant_win'])): + result = 'Victory!' + else: + result = 'Defeat!' + + # print match info + self.notice.append("Match for \x02{0}\x02 - Hero: \x0311{1}\x03, K/D/A: \x0303{2}/{3}/{4}\x03, Last Hits: \x0306{5}\x03, GPM: \x0308{6}\x03. Game Length: {7} mins. Team: {8} - {9}.".format(self.query[0], hero_id, kills, deaths, assists, last_hits, gpm, time, team, result)) + + def list_latest_matches(self): + # get users 32-bit SteamID so we can look them up + sid = int(self.steam_id) - 76561197960265728 + p = {} + self.notice.append("Recent Matches for " + self.query[0] + ":") + + for count, match in enumerate(self.matches): + if count > 3: + break + + try: + xhr = urllib2.Request("https://api.steampowered.com/IDOTA2Match_570/GetMatchDetails/V001/?key="+self.api_key+"&match_id="+str(match['match_id'])) + o = urllib2.build_opener() + f = o.open(xhr) + data = json.load(f) + except Exception as e: + self.notice.append("There was an error connecting to the Dota2 API.") + else: + for player in data['result']['players']: + if player['account_id'] == sid: + p = player + break + + print self.hero_data[p['hero_id']] + hero_id = self.hero_data[p['hero_id']]['localized_name'] + kills = p['kills'] + deaths = p['deaths'] + assists = p['assists'] + + if( p['player_slot'] & (1<<7) == 0 ): + team = 'Radiant' + else: + team = 'Dire' + + if( team is 'Dire' and not bool(data['result']['radiant_win'])): + result = 'Victory!' + elif( team is 'Radiant' and bool(data['result']['radiant_win'])): + result = 'Victory!' + else: + result = 'Defeat!' + + self.notice.append(" [\x02{0}\x02]: \x0311{1}\x03 - \x0303{2}/{3}/{4}\x03 - {5}".format(match['match_id'], hero_id, kills, deaths, assists, result)) + +class DoobBot(bot.SimpleBot): + auth_pool = deque() + actions = ["!MATCH", "!MATCHES", "!MATCHBYID"] + + def on_channel_message(self, event): + url = re.search('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F[0-9a-fA-F]))+', event.message) + channel = event.target + user = event.source + message = event.message.split(); + command = message[0].upper() + params = message[1:] + + if url: + self.do_printurltitle(channel, user, url.group()) + elif command == "!ROLL": + self.do_rolldice(channel, user, params) + elif command == "!BTCX": + self.do_getbtcinfo(channel, user, False, params) + elif command == "!BTC": + self.do_getbtcinfo(channel, user, True, params) + elif command == "!WEATHER": + self.do_getweather(channel, user, params) + elif command == "!REDDITNEWS": + self.do_getnews(channel, user, params) + elif command == "!ABOUT": + self.send_message(channel, "I am DotaBot, I bring info about Dota2 Matches. Ask for a !match. See !help for more.") + elif command == "!HELP": + self.send_message(user, "I am DotaBot - these are my available commands.") + self.send_message(user, "!MATCH - Get info about users latest match, if no argument passes your IRC nickname.") + self.send_message(user, "!MATCHES - Get a list of recent matches for the user, if no argument passes your IRC nickname.") + self.send_message(user, "!MATCHBYID - Get info about particular match by ID#.") + self.send_message(user, "!ROLL ?D? - Simulate dice rolling (e.g. !roll 1d6 or !roll 5d20)") + self.send_message(user, "!WEATHER - Query wunderground for weather data") + self.send_message(user, "!REDDITNEWS - Pull a new story from , defaults to dota2.") + self.send_message(user, "!DOTANEWS - pulls latest news story about Dota2 from Steamworks.") + self.send_message(user, "!BTC - List current price of BTC in various fiat currencies.") + self.send_message(user, "!BTCX - Convert bitcoins to the local currency of choice.") + self.send_message(user, "Written by binaryatrocity - br4n@atr0phy.net") + elif command in self.actions: + m = DotaMatch(channel, user, command, params) + self.auth_pool.append(m); + self.send_message('NICKSERV', 'STATUS {0}'.format(user)) + + def on_private_notice(self, event): + if(event.message[0] == "2"): + c = self.auth_pool.popleft() + c.lookup_player(c.query) + if c.steam_id == 'NULL': + self.send_message(c.channel, "Unable to find SteamID for player "+c.query[0]) + else: + status = c.get_latest_match_id() + if status: + if c.command == "!MATCHES": c.list_latest_matches() + elif c.command == "!MATCH": c.get_match_info() + elif c.command == "!MATCHBYID": + c.matches.append({'match_id':c.query[1]}) + c.get_match_info() + + for msg in c.notice: + self.send_message(c.channel, msg) + else: + self.send_message(c.channel, "No matches found for player "+c.query[0]) + + elif(event.message[0] == "1" or event.message[0] == "0"): + c = self.auth_pool.popleft() + self.send_message(c.channel, "You must be registered with NickServ before requesting match info! '/msg NickServ register HELP' for more info.") + + def on_private_message(self, event): + user = event.source + message = event.message.split() + command = message[0].upper() + params = message[1:] + + if command.upper() == "SAY": + mstr = '' + for msg in params: + mstr = mstr +' '+ msg + self.send_message(self.user, mstr) + if command.upper() == "QUIT": + self.disconnect(params) + + def do_getnews(self, channel, user, params): + if not params: + params.append('dota2') + try: + raw_data = urllib2.Request("http://www.reddit.com/r/"+params[0]+"/new.json?sort=new") + o = urllib2.build_opener() + f = o.open(raw_data) + data = json.load(f) + except urllib2.HTTPError as e: + self.send_message(channel, "There was a problem accessing news.") + except Exception as e: + print e + else: + items = [ x['data'] for x in data['data']['children'] ] + + rand = random.randint(0,10) + + notice = "News from r/{0}: {1} - [{2}]".format(params[0], items[rand]['title'], items[rand]['url']) + self.send_message(channel, notice) + + def do_printurltitle(self, channel, user, url): + try: + html_parser = HTMLParser.HTMLParser() + title = html_parser.unescape(urllib2.urlopen(url, timeout=1).read(10000).split('')[1].split('')[0].strip()).encode('utf-8') + notice = "{}'s URL: {}".format(user, title) + self.send_message(channel, notice) + except urllib2.URLError: + pass + except IndexError: + pass + except Exception as e: + print e + + def do_getweather(self, channel, user, params): + try: + raw_data = urllib2.Request("http://api.openweathermap.org/data/2.5/weather?q="+params[0]+"&units=imperial") + o = urllib2.build_opener() + f = o.open(raw_data) + + data = json.load(f) + city = data['name'] + country = data['sys']['country'] + lat, lon = data['coord']['lat'], data['coord']['lon'] + condition = data['weather'][0]['main'] + temp = data['main']['temp'] + humidity = data['main']['humidity'] + high = data['main']['temp_max'] + low = data['main']['temp_min'] + winds = data['wind']['speed'] + + notice = "Weather for {}, {} ({}, {}): Currently {} at {} degrees with {}% humidity and winds at {} mph. High: {}, Low: {}".format(city, country, lat, lon, condition, temp, humidity, winds, high, low) + self.send_message(channel, notice) + except urllib2.HTTPError as e: + self.send_message(channel, "Unable to find weather for "+params[0]) + except urllib2.URLError as e: + self.send_message(channel, "There was a problem accessing weather info.") + except Exception as e: + print e + + def do_getbtcinfo(self, channel, user, exch, params): + if not exch: + try: + data = json.load(urllib2.build_opener().open(urllib2.Request("http://blockchain.info/tobtc?currency="+params[1].upper()+"&value="+params[0]))) + + notice = "{}: Value of {} {} in BTC is \x03{}\x03".format(user, params[0], params[1].upper(), data) + self.send_message(channel, notice) + except Exception: + pass + else: + try: + data = json.load(urllib2.build_opener().open(urllib2.Request("http://blockchain.info/ticker"))) + + notice = "Current price of BTC: USD[{}], GBP[{}], EUR[{}]".format(data['USD']['last'], data['GBP']['last'], data['EUR']['last']) + self.send_message(channel, notice) + pass + except Exception as e: + print e + + def do_rolldice(self, channel, user, params): + try: + dice = params[0].split('d') + rolls = [] + for roll in range(0,int(dice[0])): + rolls.append(random.randint(1,int(dice[1]))) + except Exception as e: + pass + else: + notice = "Rolls for {}: {!s}. Total of {}".format(user, rolls, sum(rolls)) + self.send_message(channel, notice) + +# Create a new bot, and run it! +if __name__ == "__main__": + doob = DoobBot("DotaBot") + init_match = DotaMatch('#atr0phy', 'SYSTEM', 'START', 'INIT') + init_match.get_hero_values() + doob.connect("irc.oftc.net", channel=['#dotanoobs', '#digital-deception', '#atr0phy']) + doob.start()