#!/usr/bin/env python3 from mastodon import Mastodon import twitter import requests import sqlite3, json, re, random, os cfg = { "cw":None, "mark_sensitive":True, "site":"https://botsin.space" } try: j = json.load(open("config.json")) for key, value in j.items(): cfg[key] = value except: print("No config.json, using default configuration") scopes = ["read:accounts", "write:statuses", "write:media"] if "client" not in cfg: print("No application info -- registering application with {}".format(cfg['site'])) client_id, client_secret = Mastodon.create_app("Twitter Image Poster", api_base_url=cfg['site'], scopes=scopes, website="https://git.lynnesbian.space/lynnesbian/Twitter_Image_Poster") cfg['client'] = { "id": client_id, "secret": client_secret } if "secret" not in cfg: print("No user credentials -- logging in to {}".format(cfg['site'])) client = Mastodon(client_id = cfg['client']['id'], client_secret = cfg['client']['secret'], api_base_url=cfg['site']) print("Open this URL and authenticate to give Twitter Image Poster access to your bot's account: {}".format(client.auth_request_url(scopes=scopes))) cfg['secret'] = client.log_in(code=input("Secret: "), scopes=scopes) if "twitter" not in cfg: print("No Twitter credentials") print("Please create a Twitter app by using this page: https://developer.twitter.com/en/apps/create") cfg['twitter'] = { "consumer_key": None, "consumer_secret": None, "access_token_key": None, "access_token_secret": None } for i in cfg['twitter'].keys(): cfg['twitter'][i] = input("{}: ".format(i)) json.dump(cfg, open("config.json", "w+")) # log in to twitter api = twitter.Api(**cfg['twitter']) if "accounts" not in cfg: print("Please specify the accounts you'd like Twitter Image Poster to learn from. Enter one account at a time formatted as '@user', followed by a blank line.") i = None accounts = [] while i != "": i = input("User: ") if not re.match("@(\w){1,16}$", i): if i == "": print("Accounts to learn from: {}".format(", ".join(accounts))) break print("Invalid username") continue if i == "" and len(accounts) == 0: print("You must enter at least one account") continue print("Checking account...") try: user = api.GetUser(screen_name=i) accounts.append(i) except: print("Invalid username.") print("Saving account list. To choose a different set of accounts, delete the 'accounts' entry in config.json.") cfg['accounts'] = accounts json.dump(cfg, open("config.json", "w+")) # connect to database db = sqlite3.connect("tip.db") db.text_factory=str c = db.cursor() c.execute("CREATE TABLE IF NOT EXISTS `images` (post_id INT NOT NULL UNIQUE PRIMARY KEY, screen_name VARCHAR NOT NULL, image_urls VARCHAR, count INT DEFAULT 0) WITHOUT ROWID") db.commit() for acct in cfg['accounts']: last_tweet = c.execute("SELECT post_id FROM `images` WHERE screen_name LIKE ? ORDER BY post_id DESC LIMIT 1", (acct,)).fetchone() if last_tweet != None: last_tweet = last_tweet[0] else: last_tweet = 0 # start from the first one print("Downloading tweets from account {}, starting from {}".format(acct, last_tweet)) while True: tl = api.GetUserTimeline(screen_name = acct, since_id = last_tweet, exclude_replies = True, include_rts = False, count = 200, trim_user = True) if len(tl) == 0: # we've reached the end of this user's timeline break for tweet in tl: media_urls = [] if tweet.media != None: for media in tweet.media: media_urls.append(media.media_url) c.execute("INSERT INTO `images` (screen_name, post_id, image_urls) VALUES (?, ?, ?)", (acct, tweet.id_str, ",".join(media_urls))) last_tweet = c.execute("SELECT post_id FROM `images` WHERE screen_name LIKE ? ORDER BY post_id DESC LIMIT 1", (acct,)).fetchone()[0] print(last_tweet) db.commit() client = Mastodon( client_id=cfg['client']['id'], client_secret = cfg['client']['secret'], access_token=cfg['secret'], api_base_url=cfg['site']) base_count = c.execute("SELECT MIN(count) FROM images").fetchone()[0] # base_count gets the lowest post count from the table. # for example, if image A and B have already been posted once, and image C has never been posted, it'll return 0. # this lets us filter for images that haven't been posted yet in this "generation". chosen_tweet = c.execute("SELECT post_id, image_urls FROM images WHERE count = ? AND image_urls NOT LIKE '' ORDER BY RANDOM() LIMIT 1", (base_count,)).fetchone() image = random.choice(chosen_tweet[1].split(",")) filename = re.match(r".*\/(.*)", image).group(1) # save image to disk open(filename, 'wb').write(requests.get(image).content) # post! media = client.media_post(filename, description="Image downloaded from Twitter account {}".format(acct)) client.status_post("Source: https://twitter.com/{}/status/{}".format(acct, chosen_tweet[0]), media_ids=media) os.remove(filename)