2019-02-25 11:35:52 +00:00
#!/usr/bin/env python3
2019-02-26 03:43:13 +00:00
from mastodon import Mastodon
import twitter
2019-05-07 20:29:32 +00:00
import requests
2019-02-26 03:43:13 +00:00
2019-05-07 20:29:32 +00:00
import sqlite3 , json , re , random
2019-02-26 03:43:13 +00:00
cfg = {
" cw " : None ,
2019-02-26 04:38:21 +00:00
" mark_sensitive " : True ,
" site " : " https://botsin.space "
2019-02-26 03:43:13 +00:00
}
try :
j = json . load ( open ( " config.json " ) )
for key , value in j . items ( ) :
cfg [ key ] = value
except :
print ( " No config.json, using default configuration " )
2019-05-07 20:29:32 +00:00
scopes = [ " read:accounts " , " write:statuses " , " write:media " ]
2019-02-26 03:43:13 +00:00
if " client " not in cfg :
print ( " No application info -- registering application with {} " . format ( cfg [ ' site ' ] ) )
client_id , client_secret = Mastodon . create_app ( " Twitter Image Poster " ,
api_base_url = cfg [ ' site ' ] ,
scopes = scopes ,
website = " https://git.lynnesbian.space/lynnesbian/Twitter_Image_Poster " )
cfg [ ' client ' ] = {
" id " : client_id ,
" secret " : client_secret
}
if " secret " not in cfg :
print ( " No user credentials -- logging in to {} " . format ( cfg [ ' site ' ] ) )
client = Mastodon ( client_id = cfg [ ' client ' ] [ ' id ' ] ,
client_secret = cfg [ ' client ' ] [ ' secret ' ] ,
api_base_url = cfg [ ' site ' ] )
print ( " Open this URL and authenticate to give Twitter Image Poster access to your bot ' s account: {} " . format ( client . auth_request_url ( scopes = scopes ) ) )
cfg [ ' secret ' ] = client . log_in ( code = input ( " Secret: " ) , scopes = scopes )
if " twitter " not in cfg :
print ( " No Twitter credentials " )
print ( " Please create a Twitter app by using this page: https://developer.twitter.com/en/apps/create " )
cfg [ ' twitter ' ] = {
" consumer_key " : None ,
" consumer_secret " : None ,
" access_token_key " : None ,
" access_token_secret " : None
}
for i in cfg [ ' twitter ' ] . keys ( ) :
cfg [ ' twitter ' ] [ i ] = input ( " {} : " . format ( i ) )
json . dump ( cfg , open ( " config.json " , " w+ " ) )
2019-02-27 03:38:10 +00:00
# log in to twitter
api = twitter . Api ( * * cfg [ ' twitter ' ] )
if " accounts " not in cfg :
print ( " Please specify the accounts you ' d like Twitter Image Poster to learn from. Enter one account at a time formatted as ' @user ' , followed by a blank line. " )
i = None
accounts = [ ]
while i != " " :
i = input ( " User: " )
2019-05-07 19:27:42 +00:00
if not re . match ( " @( \ w) { 1,16}$ " , i ) :
if i == " " :
print ( " Accounts to learn from: {} " . format ( " , " . join ( accounts ) ) )
break
2019-02-27 03:38:10 +00:00
print ( " Invalid username " )
continue
if i == " " and len ( accounts ) == 0 :
print ( " You must enter at least one account " )
continue
print ( " Checking account... " )
2019-05-07 19:27:42 +00:00
try :
user = api . GetUser ( screen_name = i )
accounts . append ( i )
except :
print ( " Invalid username. " )
2019-02-27 03:38:10 +00:00
2019-05-07 20:29:32 +00:00
print ( " Saving account list. To choose a different set of accounts, delete the ' accounts ' entry in config.json. " )
cfg [ ' accounts ' ] = accounts
json . dump ( cfg , open ( " config.json " , " w+ " ) )
2019-02-27 03:38:10 +00:00
# connect to database
db = sqlite3 . connect ( " tip.db " )
db . text_factory = str
c = db . cursor ( )
2019-05-07 20:29:32 +00:00
c . execute ( " CREATE TABLE IF NOT EXISTS `images` (post_id INT NOT NULL UNIQUE PRIMARY KEY, screen_name VARCHAR NOT NULL, image_urls VARCHAR, count INT DEFAULT 0) WITHOUT ROWID " )
2019-02-27 03:38:10 +00:00
db . commit ( )
2019-05-07 20:29:32 +00:00
for acct in cfg [ ' accounts ' ] :
last_tweet = c . execute ( " SELECT post_id FROM `images` WHERE screen_name LIKE ? ORDER BY post_id DESC LIMIT 1 " , ( acct , ) ) . fetchone ( )
2019-05-07 19:27:42 +00:00
if last_tweet != None :
last_tweet = last_tweet [ 0 ]
else :
last_tweet = 0 # start from the first one
print ( " Downloading tweets from account {} , starting from {} " . format ( acct , last_tweet ) )
2019-05-07 20:29:32 +00:00
while True :
tl = api . GetUserTimeline ( screen_name = acct , since_id = last_tweet , exclude_replies = True , include_rts = False , count = 200 , trim_user = True )
if len ( tl ) == 0 :
# we've reached the end of this user's timeline
break
for tweet in tl :
media_urls = [ ]
if tweet . media != None :
for media in tweet . media :
media_urls . append ( media . media_url )
c . execute ( " INSERT INTO `images` (screen_name, post_id, image_urls) VALUES (?, ?, ?) " , ( acct , tweet . id_str , " , " . join ( media_urls ) ) )
last_tweet = c . execute ( " SELECT post_id FROM `images` WHERE screen_name LIKE ? ORDER BY post_id DESC LIMIT 1 " , ( acct , ) ) . fetchone ( ) [ 0 ]
print ( last_tweet )
db . commit ( )
2019-02-27 03:38:10 +00:00
2019-02-26 03:43:13 +00:00
client = Mastodon (
client_id = cfg [ ' client ' ] [ ' id ' ] ,
client_secret = cfg [ ' client ' ] [ ' secret ' ] ,
access_token = cfg [ ' secret ' ] ,
api_base_url = cfg [ ' site ' ] )
2019-05-07 20:29:32 +00:00
base_count = c . execute ( " SELECT MIN(count) FROM images " ) . fetchone ( ) [ 0 ]
# base_count gets the lowest post count from the table.
# for example, if image A and B have already been posted once, and image C has never been posted, it'll return 0.
# this lets us filter for images that haven't been posted yet in this "generation".
chosen_tweet = c . execute ( " SELECT post_id, image_urls FROM images WHERE count = ? AND image_urls NOT LIKE ' ' ORDER BY RANDOM() LIMIT 1 " , ( base_count , ) ) . fetchone ( )
image = random . choice ( chosen_tweet [ 1 ] . split ( " , " ) )
filename = re . match ( r " .* \ /(.*) " , image ) . group ( 1 )
# save image to disk
open ( filename , ' wb ' ) . write ( requests . get ( image ) . content )
# post!
media = client . media_post ( filename , description = " Image downloaded from Twitter account {} " . format ( acct ) )
client . status_post ( " Source: https://twitter.com/ {} /status/ {} " . format ( acct , chosen_tweet [ 0 ] ) , media_ids = media )
2019-02-27 03:38:10 +00:00