twint源码阅读笔记
介绍
twint全称Twitter Intelligence Tool
,是一个twitter爬虫工具。官方介绍如下:
Twint is an advanced Twitter scraping tool written in Python that allows for scraping Tweets from Twitter profiles without using Twitter’s API.
Twint utilizes Twitter’s search operators to let you scrape Tweets from specific users, scrape Tweets relating to certain topics, hashtags & trends, or sort out sensitive information from Tweets like e-mail and phone numbers. I find this very useful, and you can get really creative with it too.
Twint also makes special queries to Twitter allowing you to also scrape a Twitter user’s followers, Tweets a user has liked, and who they follow without any authentication, API, Selenium, or browser emulation.
由于功能较多,这里以搜索某一话题为例,介绍下这个爬虫的原理。
核心
调用代码如下:
import twint
# Configure
c = twint.Config()
c.Search = "fruit"
c.Proxy_host = "127.0.0.1"
c.Proxy_port = 12639
c.Proxy_type = "http"
twint.run.Search(c)
run.Search
代码如下:
def Search(config, callback=None):
logme.debug(__name__+':Search')
config.TwitterSearch = True
config.Favorites = False
config.Following = False
config.Followers = False
config.Profile = False
config.Profile_full = False
run(config, callback)
if config.Pandas_au:
storage.panda._autoget("tweet")
run
方法代码如下:
def run(config, callback=None):
logme.debug(__name__+':run')
try:
get_event_loop()
except RuntimeError as e:
if "no current event loop" in str(e):
set_event_loop(new_event_loop())
else:
logme.exception(__name__+':Lookup:Unexpected exception while handling an expected RuntimeError.')
raise
except Exception as e:
logme.exception(__name__+':Lookup:Unexpected exception occured while attempting to get or create a new event loop.')
raise
get_event_loop().run_until_complete(Twint(config).main(callback))
说明调用了Twint.main
,代码如下:
async def main(self, callback=None):
task = ensure_future(self.run()) # Might be changed to create_task in 3.7+.
if callback:
task.add_done_callback(callback)
await task
Twint.run
代码如下:
async def run(self):
if self.config.TwitterSearch:
self.user_agent = await get.RandomUserAgent(wa=True)
else:
self.user_agent = await get.RandomUserAgent()
if self.config.User_id is not None:
logme.debug(__name__+':Twint:main:user_id')
self.config.Username = await get.Username(self.config.User_id)
if self.config.Username is not None:
logme.debug(__name__+':Twint:main:username')
url = f"https://twitter.com/{self.config.Username}?lang=en"
self.config.User_id = await get.User(url, self.config, self.conn, True)
if self.config.TwitterSearch and self.config.Since and self.config.Until:
......
else:
logme.debug(__name__+':Twint:main:not-search+since+until')
while True:
if len(self.feed) > 0:
if self.config.Followers or self.config.Following:
logme.debug(__name__+':Twint:main:follow')
await self.follow()
elif self.config.Favorites:
logme.debug(__name__+':Twint:main:favorites')
await self.favorite()
elif self.config.Profile:
logme.debug(__name__+':Twint:main:profile')
await self.profile()
elif self.config.TwitterSearch:
logme.debug(__name__+':Twint:main:twitter-search')
await self.tweets()
else:
logme.debug(__name__+':Twint:main:no-more-tweets')
break
#logging.info("[<] " + str(datetime.now()) + ':: run+Twint+main+CallingGetLimit2')
if get.Limit(self.config.Limit, self.count):
logme.debug(__name__+':Twint:main:reachedLimit')
break
if self.config.Count:
verbose.Count(self.count, self.config)
最终会执行Twint.tweets
方法:
async def tweets(self):
await self.Feed()
if self.config.Location:
logme.debug(__name__+':Twint:tweets:location')
self.count += await get.Multi(self.feed, self.config, self.conn)
else:
logme.debug(__name__+':Twint:tweets:notLocation')
for tweet in self.feed:
self.count += 1
await output.Tweets(tweet, self.config, self.conn)
在调用Twint.Feed
(为什么牛逼的项目调用链都这么长咧~~)
async def Feed(self):
logme.debug(__name__+':Twint:Feed')
consecutive_errors_count = 0
while True:
response = await get.RequestUrl(self.config, self.init, headers=[("User-Agent", self.user_agent)])
if self.config.Debug:
print(response, file=open("twint-last-request.log", "w", encoding="utf-8"))
self.feed = []
try:
if self.config.Favorites:
self.feed, self.init = feed.Mobile(response)
if not self.count%40:
time.sleep(5)
elif self.config.Followers or self.config.Following:
self.feed, self.init = feed.Follow(response)
if not self.count%40:
time.sleep(5)
elif self.config.Profile:
if self.config.Profile_full:
self.feed, self.init = feed.Mobile(response)
else:
self.feed, self.init = feed.profile(response)
elif self.config.TwitterSearch:
self.feed, self.init = feed.Json(response)
break
except TimeoutError as e:
......
except Exception as e:
......
if self.config.Resume:
print(self.init, file=open(self.config.Resume, "a", encoding="utf-8"))
可见get.RequestUrl
就是真正发起请求的地方
async def RequestUrl(config, init, headers = []):
logme.debug(__name__+':RequestUrl')
_connector = get_connector(config)
_serialQuery = ""
params = []
_url = ""
if config.Profile:
......
elif config.TwitterSearch:
logme.debug(__name__+':RequestUrl:TwitterSearch')
_url, params, _serialQuery = await url.Search(config, init)
else:
......
response = await Request(_url, params=params, connector=_connector, headers=headers)
if config.Debug:
print(_serialQuery, file=open("twint-request_urls.log", "a", encoding="utf-8"))
return response
首先会调用url.Search
方法把请求构造出来
async def Search(config, init):
logme.debug(__name__+':Search')
url = f"{base}/search/timeline"
q = ""
params = [
('vertical', 'default'),
('src', 'unkn'),
('include_available_features', '1'),
('include_entities', '1'),
('max_position', str(init)),
('reset_error_state', 'false'),
]
if not config.Popular_tweets:
params.append(('f', 'tweets'))
if config.Lang:
params.append(("l", config.Lang))
params.append(("lang", "en"))
if config.Query:
q += f" from:{config.Query}"
if config.Username:
q += f" from:{config.Username}"
if config.Geo:
config.Geo = config.Geo.replace(" ", "")
q += f" geocode:{config.Geo}"
if config.Search:
q += f" {config.Search}"
if config.Year:
q += f" until:{config.Year}-1-1"
if config.Since:
q += f" since:{_formatDate(config.Since)}"
if config.Until:
q += f" until:{_formatDate(config.Until)}"
if config.Email:
q += ' "mail" OR "email" OR'
q += ' "gmail" OR "e-mail"'
if config.Phone:
q += ' "phone" OR "call me" OR "text me"'
if config.Verified:
q += " filter:verified"
if config.To:
q += f" to:{config.To}"
if config.All:
q += f" to:{config.All} OR from:{config.All} OR @{config.All}"
if config.Near:
q += f' near:"{config.Near}"'
if config.Images:
q += " filter:images"
if config.Videos:
q += " filter:videos"
if config.Media:
q += " filter:media"
if config.Replies:
q += " filter:replies"
if config.Native_retweets:
q += " filter:nativeretweets"
if config.Min_likes:
q += f" min_faves:{config.Min_likes}"
if config.Min_retweets:
q += f" min_retweets:{config.Min_retweets}"
if config.Min_replies:
q += f" min_replies:{config.Min_replies}"
if config.Links == "include":
q += " filter:links"
elif config.Links == "exclude":
q += " exclude:links"
if config.Source:
q += f" source:\"{config.Source}\""
if config.Members_list:
q += f" list:{config.Members_list}"
if config.Filter_retweets:
q += f" exclude:nativeretweets exclude:retweets"
if config.Custom_query:
q = config.Custom_query
params.append(("q", q))
_serialQuery = _sanitizeQuery(url, params)
return url, params, _serialQuery
然后执行真正的请求
async def Request(url, connector=None, params=[], headers=[]):
logme.debug(__name__+':Request:Connector')
async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
return await Response(session, url, params)
async def Response(session, url, params=[]):
logme.debug(__name__+':Response')
with timeout(120):
async with session.get(url, ssl=True, params=params, proxy=httpproxy) as response:
return await response.text()
最后就是返回结果的处理了,再次不做赘述。
比如实例中最后构造的结果就是
Get请求的url:https://twitter.com/i/search/timeline
参数:
vertical: default
src: unkn
nclude_available_features: 1
include_entities: 1
max_position: -1
reset_error_state: false
f: tweets
q: %20fruit
总结
虽然单看一次执行的链路很长,但是通过这种模块化的设计,对Twitter复杂的查询方式做了非常灵活的封装,正好最近自己也在写一个关于B站的爬虫,目前代码很乱,可以从这个项目中看到很多值得借鉴的地方。使用Python很重要的一部分就是借助各种第三方库以提高代码的简洁程度、可维护性和质量,比如这个项目中用到了:
aiohttp
aiodns
beautifulsoup4
cchardet
elasticsearch
pysocks
pandas>=0.23.0
aiohttp_socks
schedule
geopy
fake-useragent
googletransx
目前自己还没有系统积累这些Python库的使用,考虑后面先学习一波,然后把自己的B站爬虫也做成这样的!