twint源码阅读笔记

本文约 1400 字,阅读需 3 分钟。

介绍

twint全称Twitter Intelligence Tool,是一个twitter爬虫工具。官方介绍如下:

Twint is an advanced Twitter scraping tool written in Python that allows for scraping Tweets from Twitter profiles without using Twitter’s API.

Twint utilizes Twitter’s search operators to let you scrape Tweets from specific users, scrape Tweets relating to certain topics, hashtags & trends, or sort out sensitive information from Tweets like e-mail and phone numbers. I find this very useful, and you can get really creative with it too.

Twint also makes special queries to Twitter allowing you to also scrape a Twitter user’s followers, Tweets a user has liked, and who they follow without any authentication, API, Selenium, or browser emulation.

由于功能较多,这里以搜索某一话题为例,介绍下这个爬虫的原理。

核心

调用代码如下:

import twint

# Configure
c = twint.Config()
c.Search = "fruit"
c.Proxy_host = "127.0.0.1"
c.Proxy_port = 12639
c.Proxy_type = "http"

twint.run.Search(c)

run.Search代码如下:

def Search(config, callback=None):
    logme.debug(__name__+':Search')
    config.TwitterSearch = True
    config.Favorites = False
    config.Following = False
    config.Followers = False
    config.Profile = False
    config.Profile_full = False
    run(config, callback)
    if config.Pandas_au:
        storage.panda._autoget("tweet")

run方法代码如下:

def run(config, callback=None):
    logme.debug(__name__+':run')
    try:
        get_event_loop()
    except RuntimeError as e:
        if "no current event loop" in str(e):
            set_event_loop(new_event_loop())
        else:
            logme.exception(__name__+':Lookup:Unexpected exception while handling an expected RuntimeError.')
            raise
    except Exception as e:
        logme.exception(__name__+':Lookup:Unexpected exception occured while attempting to get or create a new event loop.')
        raise

    get_event_loop().run_until_complete(Twint(config).main(callback))

说明调用了Twint.main,代码如下:

    async def main(self, callback=None):

        task = ensure_future(self.run())  # Might be changed to create_task in 3.7+.

        if callback:
            task.add_done_callback(callback)

        await task

Twint.run 代码如下:

    async def run(self):
        if self.config.TwitterSearch:
            self.user_agent = await get.RandomUserAgent(wa=True)
        else:
            self.user_agent = await get.RandomUserAgent()

        if self.config.User_id is not None:
            logme.debug(__name__+':Twint:main:user_id')
            self.config.Username = await get.Username(self.config.User_id)

        if self.config.Username is not None:
            logme.debug(__name__+':Twint:main:username')
            url = f"https://twitter.com/{self.config.Username}?lang=en"
            self.config.User_id = await get.User(url, self.config, self.conn, True)

        if self.config.TwitterSearch and self.config.Since and self.config.Until:
            ......
        else:
            logme.debug(__name__+':Twint:main:not-search+since+until')
            while True:
                if len(self.feed) > 0:
                    if self.config.Followers or self.config.Following:
                        logme.debug(__name__+':Twint:main:follow')
                        await self.follow()
                    elif self.config.Favorites:
                        logme.debug(__name__+':Twint:main:favorites')
                        await self.favorite()
                    elif self.config.Profile:
                        logme.debug(__name__+':Twint:main:profile')
                        await self.profile()
                    elif self.config.TwitterSearch:
                        logme.debug(__name__+':Twint:main:twitter-search')
                        await self.tweets()
                else:
                    logme.debug(__name__+':Twint:main:no-more-tweets')
                    break

                #logging.info("[<] " + str(datetime.now()) + ':: run+Twint+main+CallingGetLimit2')
                if get.Limit(self.config.Limit, self.count):
                    logme.debug(__name__+':Twint:main:reachedLimit')
                    break

        if self.config.Count:
            verbose.Count(self.count, self.config)

最终会执行Twint.tweets方法:

    async def tweets(self):
        await self.Feed()
        if self.config.Location:
            logme.debug(__name__+':Twint:tweets:location')
            self.count += await get.Multi(self.feed, self.config, self.conn)
        else:
            logme.debug(__name__+':Twint:tweets:notLocation')
            for tweet in self.feed:
                self.count += 1
                await output.Tweets(tweet, self.config, self.conn)

在调用Twint.Feed(为什么牛逼的项目调用链都这么长咧~~)

    async def Feed(self):
        logme.debug(__name__+':Twint:Feed')
        consecutive_errors_count = 0
        while True:
            response = await get.RequestUrl(self.config, self.init, headers=[("User-Agent", self.user_agent)])
            if self.config.Debug:
                print(response, file=open("twint-last-request.log", "w", encoding="utf-8"))

            self.feed = []
            try:
                if self.config.Favorites:
                    self.feed, self.init = feed.Mobile(response)
                    if not self.count%40:
                        time.sleep(5)
                elif self.config.Followers or self.config.Following:
                    self.feed, self.init = feed.Follow(response)
                    if not self.count%40:
                        time.sleep(5)
                elif self.config.Profile:
                    if self.config.Profile_full:
                        self.feed, self.init = feed.Mobile(response)
                    else:
                        self.feed, self.init = feed.profile(response)
                elif self.config.TwitterSearch:
                    self.feed, self.init = feed.Json(response)
                break
            except TimeoutError as e:
                ......
            except Exception as e:
                ......
        if self.config.Resume:
            print(self.init, file=open(self.config.Resume, "a", encoding="utf-8"))

可见get.RequestUrl就是真正发起请求的地方

async def RequestUrl(config, init, headers = []):
    logme.debug(__name__+':RequestUrl')
    _connector = get_connector(config)
    _serialQuery = ""
    params = []
    _url = ""

    if config.Profile:
        ......
    elif config.TwitterSearch:
        logme.debug(__name__+':RequestUrl:TwitterSearch')
        _url, params, _serialQuery = await url.Search(config, init)
    else:
        ......

    response = await Request(_url, params=params, connector=_connector, headers=headers)

    if config.Debug:
        print(_serialQuery, file=open("twint-request_urls.log", "a", encoding="utf-8"))

    return response

首先会调用url.Search方法把请求构造出来

async def Search(config, init):
    logme.debug(__name__+':Search')
    url = f"{base}/search/timeline"
    q = ""
    params = [
        ('vertical', 'default'),
        ('src', 'unkn'),
        ('include_available_features', '1'),
        ('include_entities', '1'),
        ('max_position', str(init)),
        ('reset_error_state', 'false'),
    ]
    if not config.Popular_tweets:
        params.append(('f', 'tweets'))
    if config.Lang:
        params.append(("l", config.Lang))
        params.append(("lang", "en"))
    if config.Query:
        q += f" from:{config.Query}"
    if config.Username:
        q += f" from:{config.Username}"
    if config.Geo:
        config.Geo = config.Geo.replace(" ", "")
        q += f" geocode:{config.Geo}"
    if config.Search:
        q += f" {config.Search}"
    if config.Year:
        q += f" until:{config.Year}-1-1"
    if config.Since:
        q += f" since:{_formatDate(config.Since)}"
    if config.Until:
        q += f" until:{_formatDate(config.Until)}"
    if config.Email:
        q += ' "mail" OR "email" OR'
        q += ' "gmail" OR "e-mail"'
    if config.Phone:
        q += ' "phone" OR "call me" OR "text me"'
    if config.Verified:
        q += " filter:verified"
    if config.To:
        q += f" to:{config.To}"
    if config.All:
        q += f" to:{config.All} OR from:{config.All} OR @{config.All}"
    if config.Near:
        q += f' near:"{config.Near}"'
    if config.Images:
        q += " filter:images"
    if config.Videos:
        q += " filter:videos"
    if config.Media:
        q += " filter:media"
    if config.Replies:
        q += " filter:replies"
    if config.Native_retweets:
        q += " filter:nativeretweets"
    if config.Min_likes:
        q += f" min_faves:{config.Min_likes}"
    if config.Min_retweets:
        q += f" min_retweets:{config.Min_retweets}"
    if config.Min_replies:
        q += f" min_replies:{config.Min_replies}"
    if config.Links == "include":
        q += " filter:links"
    elif config.Links == "exclude":
        q += " exclude:links"
    if config.Source:
        q += f" source:\"{config.Source}\""
    if config.Members_list:
        q += f" list:{config.Members_list}"
    if config.Filter_retweets:
        q += f" exclude:nativeretweets exclude:retweets"
    if config.Custom_query:
        q = config.Custom_query

    params.append(("q", q))
    _serialQuery = _sanitizeQuery(url, params)
    return url, params, _serialQuery

然后执行真正的请求

async def Request(url, connector=None, params=[], headers=[]):
    logme.debug(__name__+':Request:Connector')
    async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
        return await Response(session, url, params)

async def Response(session, url, params=[]):
    logme.debug(__name__+':Response')
    with timeout(120):
        async with session.get(url, ssl=True, params=params, proxy=httpproxy) as response:
            return await response.text()

最后就是返回结果的处理了,再次不做赘述。

比如实例中最后构造的结果就是

Get请求的url:https://twitter.com/i/search/timeline
参数:
    vertical: default
    src: unkn
    nclude_available_features: 1
    include_entities: 1
    max_position: -1
    reset_error_state: false
    f: tweets
    q: %20fruit

总结

虽然单看一次执行的链路很长,但是通过这种模块化的设计,对Twitter复杂的查询方式做了非常灵活的封装,正好最近自己也在写一个关于B站的爬虫,目前代码很乱,可以从这个项目中看到很多值得借鉴的地方。使用Python很重要的一部分就是借助各种第三方库以提高代码的简洁程度、可维护性和质量,比如这个项目中用到了:

aiohttp
aiodns
beautifulsoup4
cchardet
elasticsearch
pysocks
pandas>=0.23.0
aiohttp_socks
schedule
geopy
fake-useragent
googletransx

目前自己还没有系统积累这些Python库的使用,考虑后面先学习一波,然后把自己的B站爬虫也做成这样的!

总阅读量次。