Post random tweets (or images) from a Twitter account
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

main.py 4.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. #!/usr/bin/env python3
  2. from mastodon import Mastodon
  3. import twitter
  4. import requests
  5. import sqlite3, json, re, random, os
  6. cfg = {
  7. "cw":None,
  8. "mark_sensitive":True,
  9. "site":"https://botsin.space"
  10. }
  11. try:
  12. j = json.load(open("config.json"))
  13. for key, value in j.items():
  14. cfg[key] = value
  15. except:
  16. print("No config.json, using default configuration")
  17. scopes = ["read:accounts", "write:statuses", "write:media"]
  18. if "client" not in cfg:
  19. print("No application info -- registering application with {}".format(cfg['site']))
  20. client_id, client_secret = Mastodon.create_app("Twitter Image Poster",
  21. api_base_url=cfg['site'],
  22. scopes=scopes,
  23. website="https://git.lynnesbian.space/lynnesbian/Twitter_Image_Poster")
  24. cfg['client'] = {
  25. "id": client_id,
  26. "secret": client_secret
  27. }
  28. if "secret" not in cfg:
  29. print("No user credentials -- logging in to {}".format(cfg['site']))
  30. client = Mastodon(client_id = cfg['client']['id'],
  31. client_secret = cfg['client']['secret'],
  32. api_base_url=cfg['site'])
  33. print("Open this URL and authenticate to give Twitter Image Poster access to your bot's account: {}".format(client.auth_request_url(scopes=scopes)))
  34. cfg['secret'] = client.log_in(code=input("Secret: "), scopes=scopes)
  35. if "twitter" not in cfg:
  36. print("No Twitter credentials")
  37. print("Please create a Twitter app by using this page: https://developer.twitter.com/en/apps/create")
  38. cfg['twitter'] = {
  39. "consumer_key": None,
  40. "consumer_secret": None,
  41. "access_token_key": None,
  42. "access_token_secret": None
  43. }
  44. for i in cfg['twitter'].keys():
  45. cfg['twitter'][i] = input("{}: ".format(i))
  46. json.dump(cfg, open("config.json", "w+"))
  47. # log in to twitter
  48. api = twitter.Api(**cfg['twitter'])
  49. if "accounts" not in cfg:
  50. print("Please specify the accounts you'd like Twitter Image Poster to learn from. Enter one account at a time formatted as '@user', followed by a blank line.")
  51. i = None
  52. accounts = []
  53. while i != "":
  54. i = input("User: ")
  55. if not re.match("@(\w){1,16}$", i):
  56. if i == "":
  57. print("Accounts to learn from: {}".format(", ".join(accounts)))
  58. break
  59. print("Invalid username")
  60. continue
  61. if i == "" and len(accounts) == 0:
  62. print("You must enter at least one account")
  63. continue
  64. print("Checking account...")
  65. try:
  66. user = api.GetUser(screen_name=i)
  67. accounts.append(i)
  68. except:
  69. print("Invalid username.")
  70. print("Saving account list. To choose a different set of accounts, delete the 'accounts' entry in config.json.")
  71. cfg['accounts'] = accounts
  72. json.dump(cfg, open("config.json", "w+"))
  73. # connect to database
  74. db = sqlite3.connect("tip.db")
  75. db.text_factory=str
  76. c = db.cursor()
  77. c.execute("CREATE TABLE IF NOT EXISTS `images` (post_id INT NOT NULL UNIQUE PRIMARY KEY, screen_name VARCHAR NOT NULL, image_urls VARCHAR, count INT DEFAULT 0) WITHOUT ROWID")
  78. db.commit()
  79. for acct in cfg['accounts']:
  80. last_tweet = c.execute("SELECT post_id FROM `images` WHERE screen_name LIKE ? ORDER BY post_id DESC LIMIT 1", (acct,)).fetchone()
  81. if last_tweet != None:
  82. last_tweet = last_tweet[0]
  83. else:
  84. last_tweet = 0 # start from the first one
  85. print("Downloading tweets from account {}, starting from {}".format(acct, last_tweet))
  86. while True:
  87. tl = api.GetUserTimeline(screen_name = acct, since_id = last_tweet, exclude_replies = True, include_rts = False, count = 200, trim_user = True)
  88. if len(tl) == 0:
  89. # we've reached the end of this user's timeline
  90. break
  91. for tweet in tl:
  92. media_urls = []
  93. if tweet.media != None:
  94. for media in tweet.media:
  95. media_urls.append(media.media_url)
  96. c.execute("INSERT INTO `images` (screen_name, post_id, image_urls) VALUES (?, ?, ?)", (acct, tweet.id_str, ",".join(media_urls)))
  97. last_tweet = c.execute("SELECT post_id FROM `images` WHERE screen_name LIKE ? ORDER BY post_id DESC LIMIT 1", (acct,)).fetchone()[0]
  98. print(last_tweet)
  99. db.commit()
  100. client = Mastodon(
  101. client_id=cfg['client']['id'],
  102. client_secret = cfg['client']['secret'],
  103. access_token=cfg['secret'],
  104. api_base_url=cfg['site'])
  105. base_count = c.execute("SELECT MIN(count) FROM images").fetchone()[0]
  106. # base_count gets the lowest post count from the table.
  107. # for example, if image A and B have already been posted once, and image C has never been posted, it'll return 0.
  108. # this lets us filter for images that haven't been posted yet in this "generation".
  109. chosen_tweet = c.execute("SELECT post_id, image_urls FROM images WHERE count = ? AND image_urls NOT LIKE '' ORDER BY RANDOM() LIMIT 1", (base_count,)).fetchone()
  110. image = random.choice(chosen_tweet[1].split(","))
  111. filename = re.match(r".*\/(.*)", image).group(1)
  112. # save image to disk
  113. open(filename, 'wb').write(requests.get(image).content)
  114. # post!
  115. media = client.media_post(filename, description="Image downloaded from Twitter account {}".format(acct))
  116. client.status_post("Source: https://twitter.com/{}/status/{}".format(acct, chosen_tweet[0]), media_ids=media)
  117. os.remove(filename)