静觅 崔庆才的个人博客 Mon, 09 Dec 2019 02:14:40 +0000 zh-CN hourly 1 https://wordpress.org/?v=5.0.7 [Python3网络爬虫开发实战] 14.3–Scrapy 分布式实现 /8468.html /8468.html#respond Mon, 09 Dec 2019 02:14:40 +0000 /?p=8468 14.3 Scrapy 分布式实现

接下来,我们会利用 Scrapy-Redis 来实现分布式的对接。

1. 准备工作

请确保已经成功实现了 Scrapy 新浪微博爬虫,Scrapy-Redis 库已经正确安装,如果还没安装,请参考第 1 章的安装说明。

2. 搭建 Redis 服务器

要实现分布式部署,多台主机需要共享爬取队列和去重集合,而这两部分内容都是存于 Redis 数据库中的,我们需要搭建一个可公网访问的 Redis 服务器。

推荐使用 Linux 服务器,可以购买阿里云、腾讯云、Azure 等提供的云主机,一般都会配有公网 IP,具体的搭建方式可以参考第 1 章中 Redis 数据库的安装方式。

Redis 安装完成之后就可以远程连接了,注意部分商家(如阿里云、腾讯云)的服务器需要配置安全组放通 Redis 运行端口才可以远程访问。如果遇到不能远程连接的问题,可以排查安全组的设置。

需要记录 Redis 的运行 IP、端口、地址,供后面配置分布式爬虫使用。当前配置好的 Redis 的 IP 为服务器的 IP 120.27.34.25,端口为默认的 6379,密码为 foobared。

3. 部署代理池和 Cookies 池

新浪微博项目需要用到代理池和 Cookies 池,而之前我们的代理池和 Cookies 池都是在本地运行的。所以我们需要将二者放到可以被公网访问的服务器上运行,将代码上传到服务器,修改 Redis 的连接信息配置,用同样的方式运行代理池和 Cookies 池。

远程访问代理池和 Cookies 池提供的接口,来获取随机代理和 Cookies。如果不能远程访问,先确保其在 0.0.0.0 这个 Host 上运行,再检查安全组的配置。

如我当前配置好的代理池和 Cookies 池的运行 IP 都是服务器的 IP,120.27.34.25,端口分别为 5555 和 5556,如图 14-3 和图 14-4 所示。

图 14-3 代理池接口

图 14-4 Cookies 池接口

所以接下来我们就需要把 Scrapy 新浪微博项目中的访问链接修改如下:

PROXY_URL = 'http://120.27.34.25:5555/random'
COOKIES_URL = 'http://120.27.34.25:5556/weibo/random'

具体的修改方式根据实际配置的 IP 和端口做相应调整。

4. 配置 Scrapy-Redis

配置 Scrapy-Redis 非常简单,只需要修改一下 settings.py 配置文件即可。

核心配置

首先最主要的是,需要将调度器的类和去重的类替换为 Scrapy-Redis 提供的类,在 settings.py 里面添加如下配置即可:

SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

Redis 连接配置

接下来配置 Redis 的连接信息,这里有两种配置方式。

第一种方式是通过连接字符串配置。我们可以用 Redis 的地址、端口、密码来构造一个 Redis 连接字符串,支持的连接形式如下所示:

redis://[:password]@host:port/db
rediss://[:password]@host:port/db
unix://[:password]@/path/to/socket.sock?db=db

password 是密码,比如要以冒号开头,中括号代表此选项可有可无,host 是 Redis 的地址,port 是运行端口,db 是数据库代号,其值默认是 0。

根据上文中提到我的 Redis 连接信息,构造这个 Redis 的连接字符串如下所示:

redis://:foobared@120.27.34.25:6379

直接在 settings.py 里面配置为 REDIS_URL 变量即可:

REDIS_URL = 'redis://:foobared@120.27.34.25:6379'

第二种配置方式是分项单独配置。这个配置就更加直观明了,如根据我的 Redis 连接信息,可以在 settings.py 中配置如下代码:

REDIS_HOST = '120.27.34.25'
REDIS_PORT = 6379
REDIS_PASSWORD = 'foobared'

这段代码分开配置了 Redis 的地址、端口和密码。

注意,如果配置了 REDIS_URL,那么 Scrapy-Redis 将优先使用 REDIS_URL 连接,会覆盖上面的三项配置。如果想要分项单独配置的话,请不要配置 REDIS_URL。

在本项目中,我选择的是配置 REDIS_URL。

配置调度队列

此项配置是可选的,默认使用 PriorityQueue。如果想要更改配置,可以配置 SCHEDULER_QUEUE_CLASS 变量,如下所示:

SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.FifoQueue'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.LifoQueue'

以上三行任选其一配置,即可切换爬取队列的存储方式。

在本项目中不进行任何配置,我们使用默认配置。

配置持久化

此配置是可选的,默认是 False。Scrapy-Redis 默认会在爬取全部完成后清空爬取队列和去重指纹集合。

如果不想自动清空爬取队列和去重指纹集合,可以增加如下配置:

SCHEDULER_PERSIST = True

将 SCHEDULER_PERSIST 设置为 True 之后,爬取队列和去重指纹集合不会在爬取完成后自动清空,如果不配置,默认是 False,即自动清空。

值得注意的是,如果强制中断爬虫的运行,爬取队列和去重指纹集合是不会自动清空的。

在本项目中不进行任何配置,我们使用默认配置。

配置重爬

此配置是可选的,默认是 False。如果配置了持久化或者强制中断了爬虫,那么爬取队列和指纹集合不会被清空,爬虫重新启动之后就会接着上次爬取。如果想重新爬取,我们可以配置重爬的选项:

SCHEDULER_FLUSH_ON_START = True

这样将 SCHEDULER_FLUSH_ON_START 设置为 True 之后,爬虫每次启动时,爬取队列和指纹集合都会清空。所以要做分布式爬取,我们必须保证只能清空一次,否则每个爬虫任务在启动时都清空一次,就会把之前的爬取队列清空,势必会影响分布式爬取。

注意,此配置在单机爬取的时候比较方便,分布式爬取不常用此配置。

在本项目中不进行任何配置,我们使用默认配置。

Pipeline 配置

此配置是可选的,默认不启动 Pipeline。Scrapy-Redis 实现了一个存储到 Redis 的 Item Pipeline,启用了这个 Pipeline 的话,爬虫会把生成的 Item 存储到 Redis 数据库中。在数据量比较大的情况下,我们一般不会这么做。因为 Redis 是基于内存的,我们利用的是它处理速度快的特性,用它来做存储未免太浪费了,配置如下:

ITEM_PIPELINES = {'scrapy_redis.pipelines.RedisPipeline': 300}

本项目不进行任何配置,即不启动 Pipeline。

到此为止,Scrapy-Redis 的配置就完成了。有的选项我们没有配置,但是这些配置在其他 Scrapy 项目中可能用到,要根据具体情况而定。

5. 配置存储目标

之前 Scrapy 新浪微博爬虫项目使用的存储是 MongoDB,而且 MongoDB 是本地运行的,即连接的是 localhost。但是,当爬虫程序分发到各台主机运行的时候,爬虫就会连接各自的的 MongoDB。所以我们需要在各台主机上都安装 MongoDB,这样有两个缺点:一是搭建 MongoDB 环境比较烦琐;二是这样各台主机的爬虫会把爬取结果分散存到各自主机上,不方便统一管理。

所以我们最好将存储目标存到同一个地方,例如都存到同一个 MongoDB 数据库中。我们可以在服务器上搭建一个 MongoDB 服务,或者直接购买 MongoDB 数据存储服务。

这里使用的就是服务器上搭建的的 MongoDB 服务,IP 仍然为 120.27.34.25,用户名为 admin,密码为 admin123。

修改配置 MONGO_URI 为如下:

MONGO_URI = 'mongodb://admin:admin123@120.27.34.25:27017'

到此为止,我们就成功完成了 Scrapy 分布式爬虫的配置了。

6. 运行

接下来将代码部署到各台主机上,记得每台主机都需要配好对应的 Python 环境。

每台主机上都执行如下命令,即可启动爬取:

scrapy crawl weibocn

每台主机启动了此命令之后,就会从配置的 Redis 数据库中调度 Request,做到爬取队列共享和指纹集合共享。同时每台主机占用各自的带宽和处理器,不会互相影响,爬取效率成倍提高。

7. 结果

一段时间后,我们可以用 RedisDesktop 观察远程 Redis 数据库的信息。这里会出现两个 Key:一个叫作 weibocn:dupefilter,用来储存指纹;另一个叫作 weibocn:requests,即爬取队列,如图 14-5 和图 14-6 所示。

图 14-5 去重指纹

图 14-6 爬取队列

随着时间的推移,指纹集合会不断增长,爬取队列会动态变化,爬取的数据也会被储存到 MongoDB 数据库中。

至此 Scrapy 分布式的配置已全部完成。

8. 本节代码

本节代码地址为:https://github.com/Python3WebSpider/Weibo/tree/distributed,注意这里是 distributed 分支。

9. 结语

本节通过对接 Scrapy-Redis 成功实现了分布式爬虫,但是部署还是有很多不方便的地方。另外,如果爬取量特别大的话,Redis 的内存也是个问题。在后文我们会继续了解相关优化方案。

转载请注明:静觅 » [Python3网络爬虫开发实战] 14.3–Scrapy 分布式实现

]]>
/8468.html/feed 0
[Python3网络爬虫开发实战] 14.2–Scrapy-Redis 源码解析 /8465.html /8465.html#respond Mon, 09 Dec 2019 02:08:42 +0000 /?p=8465 14.2 Scrapy-Redis 源码解析

Scrapy-Redis 库已经为我们提供了 Scrapy 分布式的队列、调度器、去重等功能,其 GitHub 地址为:https://github.com/rmax/scrapy-redis

本节我们深入了解一下,利用 Redis 如何实现 Scrapy 分布式。

1. 获取源码

可以把源码克隆下来,执行如下命令:

git clone https://github.com/rmax/scrapy-redis.git

核心源码在 scrapy-redis/src/scrapy_redis 目录下。

2. 爬取队列

从爬取队列入手,看看它的具体实现。源码文件为 queue.py,它有三个队列的实现,首先它实现了一个父类 Base,提供一些基本方法和属性,如下所示:

class Base(object):
    """Per-spider base queue class"""
    def __init__(self, server, spider, key, serializer=None):
        if serializer is None:
            serializer = picklecompat
        if not hasattr(serializer, 'loads'):
            raise TypeError("serializer does not implement 'loads' function: % r"
                            % serializer)
        if not hasattr(serializer, 'dumps'):
            raise TypeError("serializer '% s' does not implement 'dumps' function: % r"
                            % serializer)
        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer

    def _encode_request(self, request):
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)

    def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError

    def push(self, request):
        """Push a request"""
        raise NotImplementedError

    def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError

    def clear(self):
        """Clear queue/stack"""
        self.server.delete(self.key)

首先看一下 _encode_request() 和 _decode_request() 方法,因为我们需要把一 个 Request 对象存储到数据库中,但数据库无法直接存储对象,所以需要将 Request 序列化转成字符串再存储,而这两个方法就分别是序列化和反序列化的操作,利用 pickle 库来实现,一般在调用 push() 将 Request 存入数据库时会调用 _encode_request() 方法进行序列化,在调用 pop() 取出 Request 的时候会调用 _decode_request() 进行反序列化。

在父类中 __len__()、push() 和 pop() 方法都是未实现的,会直接抛出 NotImplementedError,因此这个类是不能直接被使用的,所以必须要实现一个子类来重写这三个方法,而不同的子类就会有不同的实现,也就有着不同的功能。

那么接下来就需要定义一些子类来继承 Base 类,并重写这几个方法,那在源码中就有三个子类的实现,它们分别是 FifoQueue、PriorityQueue、LifoQueue,我们分别来看下它们的实现原理。

首先是 FifoQueue:

class FifoQueue(Base):
    """Per-spider FIFO queue"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.brpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.rpop(self.key)
        if data:
            return self._decode_request(data)

可以看到这个类继承了 Base 类,并重写了 __len__()、push()、pop() 这三个方法,在这三个方法中都是对 server 对象的操作,而 server 对象就是一个 Redis 连接对象,我们可以直接调用其操作 Redis 的方法对数据库进行操作,可以看到这里的操作方法有 llen()、lpush()、rpop() 等,那这就代表此爬取队列是使用的 Redis 的列表,序列化后的 Request 会被存入列表中,就是列表的其中一个元素,__len__() 方法是获取列表的长度,push() 方法中调用了 lpush() 操作,这代表从列表左侧存入数据,pop() 方法中调用了 rpop() 操作,这代表从列表右侧取出数据。

所以 Request 在列表中的存取顺序是左侧进、右侧出,所以这是有序的进出,即先进先出,英文叫做 First Input First Output,也被简称作 Fifo,而此类的名称就叫做 FifoQueue。

另外还有一个与之相反的实现类,叫做 LifoQueue,实现如下:

class LifoQueue(Base):
    """Per-spider LIFO queue."""

    def __len__(self):
        """Return the length of the stack"""
        return self.server.llen(self.key)

    def push(self, request):
        """Push a request"""
        self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
        """Pop a request"""
        if timeout > 0:
            data = self.server.blpop(self.key, timeout)
            if isinstance(data, tuple):
                data = data[1]
        else:
            data = self.server.lpop(self.key)

        if data:
            return self._decode_request(data)

与 FifoQueue 不同的就是它的 pop() 方法,在这里使用的是 lpop() 操作,也就是从左侧出,而 push() 方法依然是使用的 lpush() 操作,是从左侧入。那么这样达到的效果就是先进后出、后进先出,英文叫做 Last In First Out,简称为 Lifo,而此类名称就叫做 LifoQueue。同时这个存取方式类似栈的操作,所以其实也可以称作 StackQueue。

另外在源码中还有一个子类实现,叫做 PriorityQueue,顾名思义,它叫做优先级队列,实现如下:

class PriorityQueue(Base):
    """Per-spider priority queue abstraction using redis' sorted set"""

    def __len__(self):
        """Return the length of the queue"""
        return self.server.zcard(self.key)

    def push(self, request):
        """Push a request"""
        data = self._encode_request(request)
        score = -request.priority
        self.server.execute_command('ZADD', self.key, score, data)

    def pop(self, timeout=0):
        """
        Pop a request
        timeout not support in this queue class
        """
        pipe = self.server.pipeline()
        pipe.multi()
        pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
        results, count = pipe.execute()
        if results:
            return self._decode_request(results[0])

在这里我们可以看到 __len__()、push()、pop() 方法中使用了 server 对象的 zcard()、zadd()、zrange() 操作,可以知道这里使用的存储结果是有序集合 Sorted Set,在这个集合中每个元素都可以设置一个分数,那么这个分数就代表优先级。

在 __len__() 方法里调用了 zcard() 操作,返回的就是有序集合的大小,也就是爬取队列的长度,在 push() 方法中调用了 zadd() 操作,就是向集合中添加元素,这里的分数指定成 Request 的优先级的相反数,因为分数低的会排在集合的前面,所以这里高优先级的 Request 就会存在集合的最前面。pop() 方法是首先调用了 zrange() 操作取出了集合的第一个元素,因为最高优先级的 Request 会存在集合最前面,所以第一个元素就是最高优先级的 Request,然后再调用 zremrangebyrank() 操作将这个元素删除,这样就完成了取出并删除的操作。

此队列是默认使用的队列,也就是爬取队列默认是使用有序集合来存储的。

3. 去重过滤

前面说过 Scrapy 的去重是利用集合来实现的,而在 Scrapy 分布式中的去重就需要利用共享的集合,那么这里使用的就是 Redis 中的集合数据结构。我们来看看去重类是怎样实现的,源码文件是 dupefilter.py,其内实现了一个 RFPDupeFilter 类,如下所示:

class RFPDupeFilter(BaseDupeFilter):
    """Redis-based request duplicates filter.
    This class can also be used with default Scrapy's scheduler.
    """
    logger = logger
    def __init__(self, server, key, debug=False):
        """Initialize the duplicates filter.
        Parameters
        ----------
        server : redis.StrictRedis
            The redis server instance.
        key : str
            Redis key Where to store fingerprints.
        debug : bool, optional
            Whether to log filtered requests.
        """
        self.server = server
        self.key = key
        self.debug = debug
        self.logdupes = True

    @classmethod
    def from_settings(cls, settings):
        """Returns an instance from given settings.
        This uses by default the key ``dupefilter:<timestamp>``. When using the
        ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
        it needs to pass the spider name in the key.
        Parameters
        ----------
        settings : scrapy.settings.Settings
        Returns
        -------
        RFPDupeFilter
            A RFPDupeFilter instance.
        """
        server = get_redis_from_settings(settings)
        key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
        debug = settings.getbool('DUPEFILTER_DEBUG')
        return cls(server, key=key, debug=debug)

    @classmethod
    def from_crawler(cls, crawler):
        """Returns instance from crawler.
        Parameters
        ----------
        crawler : scrapy.crawler.Crawler
        Returns
        -------
        RFPDupeFilter
            Instance of RFPDupeFilter.
        """
        return cls.from_settings(crawler.settings)

    def request_seen(self, request):
        """Returns True if request was already seen.
        Parameters
        ----------
        request : scrapy.http.Request
        Returns
        -------
        bool
        """
        fp = self.request_fingerprint(request)
        added = self.server.sadd(self.key, fp)
        return added == 0

    def request_fingerprint(self, request):
        """Returns a fingerprint for a given request.
        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        str

        """
        return request_fingerprint(request)

    def close(self, reason=''):
        """Delete data on close. Called by Scrapy's scheduler.
        Parameters
        ----------
        reason : str, optional
        """
        self.clear()

    def clear(self):
        """Clears fingerprints data."""
        self.server.delete(self.key)

    def log(self, request, spider):
        """Logs given request.
        Parameters
        ----------
        request : scrapy.http.Request
        spider : scrapy.spiders.Spider
        """
        if self.debug:
            msg = "Filtered duplicate request: %(request) s"
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
        elif self.logdupes:
            msg = ("Filtered duplicate request %(request) s"
                   "- no more duplicates will be shown"
                   "(see DUPEFILTER_DEBUG to show all duplicates)")
            self.logger.debug(msg, {'request': request}, extra={'spider': spider})
            self.logdupes = False

这里同样实现了一个 request_seen() 方法,和 Scrapy 中的 request_seen() 方法实现极其类似。不过这里集合使用的是 server 对象的 sadd() 操作,也就是集合不再是一个简单数据结构了,而是直接换成了数据库的存储方式。

鉴别重复的方式还是使用指纹,指纹同样是依靠 request_fingerprint() 方法来获取的。获取指纹之后就直接向集合添加指纹,如果添加成功,说明这个指纹原本不存在于集合中,返回值 1。代码中最后的返回结果是判定添加结果是否为 0,如果刚才的返回值为 1,那这个判定结果就是 False,也就是不重复,否则判定为重复。

这样我们就成功利用 Redis 的集合完成了指纹的记录和重复的验证。

4. 调度器

Scrapy-Redis 还帮我们实现了配合 Queue、DupeFilter 使用的调度器 Scheduler,源文件名称是 scheduler.py。我们可以指定一些配置,如 SCHEDULER_FLUSH_ON_START 即是否在爬取开始的时候清空爬取队列,SCHEDULER_PERSIST 即是否在爬取结束后保持爬取队列不清除。我们可以在 settings.py 里自由配置,而此调度器很好地实现了对接。

接下来我们看看两个核心的存取方法,实现如下所示:

def enqueue_request(self, request):
    if not request.dont_filter and self.df.request_seen(request):
        self.df.log(request, self.spider)
        return False
    if self.stats:
        self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
    self.queue.push(request)
    return True

def next_request(self):
    block_pop_timeout = self.idle_before_close
    request = self.queue.pop(block_pop_timeout)
    if request and self.stats:
        self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
    return request

enqueue_request() 可以向队列中添加 Request,核心操作就是调用 Queue 的 push() 操作,还有一些统计和日志操作。next_request() 就是从队列中取 Request,核心操作就是调用 Queue 的 pop() 操作,此时如果队列中还有 Request,则 Request 会直接取出来,爬取继续,否则如果队列为空,爬取则会重新开始。

5. 总结

那么到现在为止我们就把之前所说的三个分布式的问题解决了,总结如下:

  • 爬取队列的实现,在这里提供了三种队列,使用了 Redis 的列表或有序集合来维护。
  • 去重的实现,使用了 Redis 的集合来保存 Request 的指纹来提供重复过滤。
  • 中断后重新爬取的实现,中断后 Redis 的队列没有清空,再次启动时调度器的 next_request() 会从队列中取到下一个 Request,继续爬取。

6. 结语

以上内容便是 Scrapy-Redis 的核心源码解析。Scrapy-Redis 中还提供了 Spider、Item Pipeline 的实现,不过它们并不是必须使用。

在下一节,我们会将 Scrapy-Redis 集成到之前所实现的 Scrapy 新浪微博项目中,实现多台主机协同爬取。

转载请注明:静觅 » [Python3网络爬虫开发实战] 14.2–Scrapy-Redis 源码解析

]]>
/8465.html/feed 0
[Python3网络爬虫开发实战] 14.1–分布式爬虫原理 /8456.html /8456.html#respond Sun, 08 Dec 2019 01:52:07 +0000 /?p=8456 14.1 分布式爬虫原理

我们在前面已经实现了 Scrapy 微博爬虫,虽然爬虫是异步加多线程的,但是我们只能在一台主机上运行,所以爬取效率还是有限的,分布式爬虫则是将多台主机组合起来,共同完成一个爬取任务,这将大大提高爬取的效率。

1. 分布式爬虫架构

在了解分布式爬虫架构之前,首先回顾一下 Scrapy 的架构,如图 13-1 所示。

Scrapy 单机爬虫中有一个本地爬取队列 Queue,这个队列是利用 deque 模块实现的。如果新的 Request 生成就会放到队列里面,随后 Request 被 Scheduler 调度。之后,Request 交给 Downloader 执行爬取,简单的调度架构如图 14-1 所示。

图 14-1 调度架构

如果两个 Scheduler 同时从队列里面取 Request,每个 Scheduler 都有其对应的 Downloader,那么在带宽足够、正常爬取且不考虑队列存取压力的情况下,爬取效率会有什么变化?没错,爬取效率会翻倍。

这样,Scheduler 可以扩展多个,Downloader 也可以扩展多个。而爬取队列 Queue 必须始终为一个,也就是所谓的共享爬取队列。这样才能保证 Scheduer 从队列里调度某个 Request 之后,其他 Scheduler 不会重复调度此 Request,就可以做到多个 Schduler 同步爬取。这就是分布式爬虫的基本雏形,简单调度架构如图 14-2 所示。

图 14-2 调度架构

我们需要做的就是在多台主机上同时运行爬虫任务协同爬取,而协同爬取的前提就是共享爬取队列。这样各台主机就不需要各自维护爬取队列,而是从共享爬取队列存取 Request。但是各台主机还是有各自的 Scheduler 和 Downloader,所以调度和下载功能分别完成。如果不考虑队列存取性能消耗,爬取效率还是会成倍提高。

2. 维护爬取队列

那么这个队列用什么维护来好呢?我们首先需要考虑的就是性能问题,什么数据库存取效率高?我们自然想到基于内存存储的 Redis,而且 Redis 还支持多种数据结构,例如列表 List、集合 Set、有序集合 Sorted Set 等等,存取的操作也非常简单,所以在这里我们采用 Redis 来维护爬取队列。

这几种数据结构存储实际各有千秋,分析如下:

  • 列表数据结构有 lpush()、lpop()、rpush()、rpop() 方法,所以我们可以用它来实现一个先进先出式爬取队列,也可以实现一个先进后出栈式爬取队列。
  • 集合的元素是无序的且不重复的,这样我们可以非常方便地实现一个随机排序的不重复的爬取队列。
  • 有序集合带有分数表示,而 Scrapy 的 Request 也有优先级的控制,所以用有集合我们可以实现一个带优先级调度的队列。

这些不同的队列我们需要根据具体爬虫的需求灵活选择。

3. 怎样来去重

Scrapy 有自动去重,它的去重使用了 Python 中的集合。这个集合记录了 Scrapy 中每个 Request 的指纹,这个指纹实际上就是 Request 的散列值。我们可以看看 Scrapy 的源代码,如下所示:

import hashlib
def request_fingerprint(request, include_headers=None):
    if include_headers:
        include_headers = tuple(to_bytes(h.lower())
                                 for h in sorted(include_headers))
    cache = _fingerprint_cache.setdefault(request, {})
    if include_headers not in cache:
        fp = hashlib.sha1()
        fp.update(to_bytes(request.method))
        fp.update(to_bytes(canonicalize_url(request.url)))
        fp.update(request.body or b'')
        if include_headers:
            for hdr in include_headers:
                if hdr in request.headers:
                    fp.update(hdr)
                    for v in request.headers.getlist(hdr):
                        fp.update(v)
        cache[include_headers] = fp.hexdigest()
    return cache[include_headers]

request_fingerprint() 就是计算 Request 指纹的方法,其方法内部使用的是 hashlib 的 sha1() 方法。计算的字段包括 Request 的 Method、URL、Body、Headers 这几部分内容,这里只要有一点不同,那么计算的结果就不同。计算得到的结果是加密后的字符串,也就是指纹。每个 Request 都有独有的指纹,指纹就是一个字符串,判定字符串是否重复比判定 Request 对象是否重复容易得多,所以指纹可以作为判定 Request 是否重复的依据。

那么我们如何判定重复呢?Scrapy 是这样实现的,如下所示:

def __init__(self):
    self.fingerprints = set()

def request_seen(self, request):
    fp = self.request_fingerprint(request)
    if fp in self.fingerprints:
        return True
    self.fingerprints.add(fp)

在去重的类 RFPDupeFilter 中,有一个 request_seen() 方法,这个方法有一个参数 request,它的作用就是检测该 Request 对象是否重复。这个方法调用 request_fingerprint() 获取该 Request 的指纹,检测这个指纹是否存在于 fingerprints 变量中,而 fingerprints 是一个集合,集合的元素都是不重复的。如果指纹存在,那么就返回 True,说明该 Request 是重复的,否则这个指纹加入到集合中。如果下次还有相同的 Request 传递过来,指纹也是相同的,那么这时指纹就已经存在于集合中,Request 对象就会直接判定为重复。这样去重的目的就实现了。

Scrapy 的去重过程就是,利用集合元素的不重复特性来实现 Request 的去重。

对于分布式爬虫来说,我们肯定不能再用每个爬虫各自的集合来去重了。因为这样还是每个主机单独维护自己的集合,不能做到共享。多台主机如果生成了相同的 Request,只能各自去重,各个主机之间就无法做到去重了。

那么要实现去重,这个指纹集合也需要是共享的,Redis 正好有集合的存储数据结构,我们可以利用 Redis 的集合作为指纹集合,那么这样去重集合也是利用 Redis 共享的。每台主机新生成 Request 之后,把该 Request 的指纹与集合比对,如果指纹已经存在,说明该 Request 是重复的,否则将 Request 的指纹加入到这个集合中即可。利用同样的原理不同的存储结构我们也实现了分布式 Reqeust 的去重。

4. 防止中断

在 Scrapy 中,爬虫运行时的 Request 队列放在内存中。爬虫运行中断后,这个队列的空间就被释放,此队列就被销毁了。所以一旦爬虫运行中断,爬虫再次运行就相当于全新的爬取过程。

要做到中断后继续爬取,我们可以将队列中的 Request 保存起来,下次爬取直接读取保存数据即可获取上次爬取的队列。我们在 Scrapy 中指定一个爬取队列的存储路径即可,这个路径使用 JOB_DIR 变量来标识,我们可以用如下命令来实现:

scrapy crawl spider -s JOBDIR=crawls/spider

更加详细的使用方法可以参见官方文档,链接为:https://doc.scrapy.org/en/latest/topics/jobs.html

在 Scrapy 中,我们实际是把爬取队列保存到本地,第二次爬取直接读取并恢复队列即可。那么在分布式架构中我们还用担心这个问题吗?不需要。因为爬取队列本身就是用数据库保存的,如果爬虫中断了,数据库中的 Request 依然是存在的,下次启动就会接着上次中断的地方继续爬取。

所以,当 Redis 的队列为空时,爬虫会重新爬取;当 Redis 的队列不为空时,爬虫便会接着上次中断之处继续爬取。

5. 架构实现

我们接下来就需要在程序中实现这个架构了。首先实现一个共享的爬取队列,还要实现去重的功能。另外,重写一个 Scheduer 的实现,使之可以从共享的爬取队列存取 Request。

幸运的是,已经有人实现了这些逻辑和架构,并发布成叫 Scrapy-Redis 的 Python 包。接下来,我们看看 Scrapy-Redis 的源码实现,以及它的详细工作原理。

转载请注明:静觅 » [Python3网络爬虫开发实战] 14.1–分布式爬虫原理

]]>
/8456.html/feed 0
[Python3网络爬虫开发实战] 13.13–Scrapy 爬取新浪微博 /8453.html /8453.html#respond Sun, 08 Dec 2019 01:50:48 +0000 /?p=8453 13.13 Scrapy 爬取新浪微博

前面讲解了 Scrapy 中各个模块基本使用方法以及代理池、Cookies 池。接下来我们以一个反爬比较强的网站新浪微博为例,来实现一下 Scrapy 的大规模爬取。

1. 本节目标

本次爬取的目标是新浪微博用户的公开基本信息,如用户昵称、头像、用户的关注、粉丝列表以及发布的微博等,这些信息抓取之后保存至 MongoDB。

2. 准备工作

请确保前文所讲的代理池、Cookies 池已经实现并可以正常运行,安装 Scrapy、PyMongo 库,如没有安装可以参考前文内容。

3. 爬取思路

首先我们要实现用户的大规模爬取。这里采用的爬取方式是,以微博的几个大 V 为起始点,爬取他们各自的粉丝和关注列表,然后获取粉丝和关注列表的粉丝和关注列表,以此类推,这样下去就可以实现递归爬取。如果一个用户与其他用户有社交网络上的关联,那他们的信息就会被爬虫抓取到,这样我们就可以做到对所有用户的爬取。通过这种方式,我们可以得到用户的唯一 ID,再根据 ID 获取每个用户发布的微博即可。

4. 爬取分析

这里我们选取的爬取站点是:https://m.weibo.cn,此站点是微博移动端的站点。打开该站点会跳转到登录页面,这是因为主页做了登录限制。不过我们可以直接打开某个用户详情页面,如图 13-32 所示。

图 13-32 个人详情页面

我们在页面最上方可以看到她的关注和粉丝数量。我们点击关注,进入到她的关注列表,如图 13-33 所示。

图 13-33 关注列表

我们打开开发者工具,切换到 XHR 过滤器,一直下拉关注列表,即可看到下方会出现很多 Ajax 请求,这些请求就是获取关注列表的 Ajax 请求,如图 13-34 所示。

图 13-34 请求列表

我们打开第一个 Ajax 请求看一下,发现它的链接为:https://m.weibo.cn/api/container/getIndex?containerid=231051_-_followers_-_1916655407&luicode=10000011&lfid=1005051916655407&featurecode=20000320&type=uid&value=1916655407&page=2,详情如图 13-35 和 13-36 所示。

图 13-35 请求详情

图 13-36 响应结果

请求类型是 GET 类型,返回结果是 JSON 格式,我们将其展开之后即可看到其关注的用户的基本信息。接下来我们只需要构造这个请求的参数。此链接一共有 7 个参数,如图 13-37 所示。

图 13-37 参数信息

其中最主要的参数就是 containerid 和 page。有了这两个参数,我们同样可以获取请求结果。我们可以将接口精简为:https://m.weibo.cn/api/container/getIndex?containerid=231051_-_followers_-_1916655407&page=2,这里的 containerid 的前半部分是固定的,后半部分是用户的 id。所以这里参数就可以构造出来了,只需要修改 containerid 最后的 id 和 page 参数即可获取分页形式的关注列表信息。

利用同样的方法,我们也可以分析用户详情的 Ajax 链接、用户微博列表的 Ajax 链接,如下所示:

# 用户详情 API
user_url = 'https://m.weibo.cn/api/container/getIndex?uid={uid}&type=uid&value={uid}&containerid=100505{uid}'
# 关注列表 API
follow_url = 'https://m.weibo.cn/api/container/getIndex?containerid=231051_-_followers_-_{uid}&page={page}'
# 粉丝列表 API
fan_url = 'https://m.weibo.cn/api/container/getIndex?containerid=231051_-_fans_-_{uid}&page={page}'
# 微博列表 API
weibo_url = 'https://m.weibo.cn/api/container/getIndex?uid={uid}&type=uid&page={page}&containerid=107603{uid}'

此处的 uid 和 page 分别代表用户 ID 和分页页码。

注意,这个 API 可能随着时间的变化或者微博的改版而变化,以实测为准。

我们从几个大 V 开始抓取,抓取他们的粉丝、关注列表、微博信息,然后递归抓取他们的粉丝和关注列表的粉丝、关注列表、微博信息,递归抓取,最后保存微博用户的基本信息、关注和粉丝列表、发布的微博。

我们选择 MongoDB 作为存储的数据库,可以更方便地存储用户的粉丝和关注列表。

5. 新建项目

接下来,我们用 Scrapy 来实现这个抓取过程。首先创建一个项目,命令如下所示:

scrapy startproject weibo

进入项目中,新建一个 Spider,名为 weibocn,命令如下所示:

scrapy genspider weibocn m.weibo.cn

我们首先修改 Spider,配置各个 Ajax 的 URL,选取几个大 V,将他们的 ID 赋值成一个列表,实现 start_requests() 方法,也就是依次抓取各个大 V 的个人详情,然后用 parse_user() 进行解析,如下所示:

from scrapy import Request, Spider

class WeiboSpider(Spider):
    name = 'weibocn'
    allowed_domains = ['m.weibo.cn']
    user_url = 'https://m.weibo.cn/api/container/getIndex?uid={uid}&type=uid&value={uid}&containerid=100505{uid}'
    follow_url = 'https://m.weibo.cn/api/container/getIndex?containerid=231051_-_followers_-_{uid}&page={page}'
    fan_url = 'https://m.weibo.cn/api/container/getIndex?containerid=231051_-_fans_-_{uid}&page={page}'
    weibo_url = 'https://m.weibo.cn/api/container/getIndex?uid={uid}&type=uid&page={page}&containerid=107603{uid}'
    start_users = ['3217179555', '1742566624', '2282991915', '1288739185', '3952070245', '5878659096']

    def start_requests(self):
        for uid in self.start_users:
            yield Request(self.user_url.format(uid=uid), callback=self.parse_user)

    def parse_user(self, response):
        self.logger.debug(response)

6. 创建 Item

接下来,我们解析用户的基本信息并生成 Item。这里我们先定义几个 Item,如用户、用户关系、微博的 Item,如下所示:

from scrapy import Item, Field

class UserItem(Item):
    collection = 'users'
    id = Field()
    name = Field()
    avatar = Field()
    cover = Field()
    gender = Field()
    description = Field()
    fans_count = Field()
    follows_count = Field()
    weibos_count = Field()
    verified = Field()
    verified_reason = Field()
    verified_type = Field()
    follows = Field()
    fans = Field()
    crawled_at = Field()

class UserRelationItem(Item):
    collection = 'users'
    id = Field()
    follows = Field()
    fans = Field()

class WeiboItem(Item):
    collection = 'weibos'
    id = Field()
    attitudes_count = Field()
    comments_count = Field()
    reposts_count = Field()
    picture = Field()
    pictures = Field()
    source = Field()
    text = Field()
    raw_text = Field()
    thumbnail = Field()
    user = Field()
    created_at = Field()
    crawled_at = Field()

这里定义了 collection 字段,指明保存的 Collection 的名称。用户的关注和粉丝列表直接定义为一个单独的 UserRelationItem,其中 id 就是用户的 ID,follows 就是用户关注列表,fans 是粉丝列表,但这并不意味着我们会将关注和粉丝列表存到一个单独的 Collection 里。后面我们会用 Pipeline 对各个 Item 进行处理、合并存储到用户的 Collection 里,因此 Item 和 Collection 并不一定是完全对应的。

7. 提取数据

我们开始解析用户的基本信息,实现 parse_user() 方法,如下所示:

def parse_user(self, response):
    """
    解析用户信息
    :param response: Response 对象
    """
    result = json.loads(response.text)
    if result.get('userInfo'):
        user_info = result.get('userInfo')
        user_item = UserItem()
        field_map = {
            'id': 'id', 'name': 'screen_name', 'avatar': 'profile_image_url', 'cover': 'cover_image_phone',
            'gender': 'gender', 'description': 'description', 'fans_count': 'followers_count',
            'follows_count': 'follow_count', 'weibos_count': 'statuses_count', 'verified': 'verified',
            'verified_reason': 'verified_reason', 'verified_type': 'verified_type'
        }
        for field, attr in field_map.items():
            user_item[field] = user_info.get(attr)
        yield user_item
        # 关注
        uid = user_info.get('id')
        yield Request(self.follow_url.format(uid=uid, page=1), callback=self.parse_follows,
                      meta={'page': 1, 'uid': uid})
        # 粉丝
        yield Request(self.fan_url.format(uid=uid, page=1), callback=self.parse_fans,
                      meta={'page': 1, 'uid': uid})
        # 微博
        yield Request(self.weibo_url.format(uid=uid, page=1), callback=self.parse_weibos,
                      meta={'page': 1, 'uid': uid})

在这里我们一共完成了两个操作。

  • 解析 JSON 提取用户信息并生成 UserItem 返回。我们并没有采用常规的逐个赋值的方法,而是定义了一个字段映射关系。我们定义的字段名称可能和 JSON 中用户的字段名称不同,所以在这里定义成一个字典,然后遍历字典的每个字段实现逐个字段的赋值。
  • 构造用户的关注、粉丝、微博的第一页的链接,并生成 Request,这里需要的参数只有用户的 ID。另外,初始分页页码直接设置为 1 即可。

接下来,我们还需要保存用户的关注和粉丝列表。以关注列表为例,其解析方法为 parse_follows(),实现如下所示:

def parse_follows(self, response):
    """
    解析用户关注
    :param response: Response 对象
    """
    result = json.loads(response.text)
    if result.get('ok') and result.get('cards') and len(result.get('cards')) and result.get('cards')[-1].get('card_group'):
        # 解析用户
        follows = result.get('cards')[-1].get('card_group')
        for follow in follows:
            if follow.get('user'):
                uid = follow.get('user').get('id')
                yield Request(self.user_url.format(uid=uid), callback=self.parse_user)
        # 关注列表
        uid = response.meta.get('uid')
        user_relation_item = UserRelationItem()
        follows = [{'id': follow.get('user').get('id'), 'name': follow.get('user').get('screen_name')} for follow in
                   follows]
        user_relation_item['id'] = uid
        user_relation_item['follows'] = follows
        user_relation_item['fans'] = []
        yield user_relation_item
        # 下一页关注
        page = response.meta.get('page') + 1
        yield Request(self.follow_url.format(uid=uid, page=page),
                      callback=self.parse_follows, meta={'page': page, 'uid': uid})

那么在这个方法里面我们做了如下三件事。

  • 解析关注列表中的每个用户信息并发起新的解析请求。我们首先解析关注列表的信息,得到用户的 ID,然后再利用 user_url 构造访问用户详情的 Request,回调就是刚才所定义的 parse_user() 方法。
  • 提取用户关注列表内的关键信息并生成 UserRelationItem。id 字段直接设置成用户的 ID,JSON 返回数据中的用户信息有很多冗余字段。在这里我们只提取了关注用户的 ID 和用户名,然后把它们赋值给 follows 字段,fans 字段设置成空列表。这样我们就建立了一个存有用户 ID 和用户部分关注列表的 UserRelationItem,之后合并且保存具有同一个 ID 的 UserRelationItem 的关注和粉丝列表。
  • 提取下一页关注。只需要将此请求的分页页码加 1 即可。分页页码通过 Request 的 meta 属性进行传递,Response 的 meta 来接收。这样我们构造并返回下一页的关注列表的 Request。

抓取粉丝列表的原理和抓取关注列表原理相同,在此不再赘述。

接下来我们还差一个方法的实现,即 parse_weibos(),它用来抓取用户的微博信息,实现如下所示:

def parse_weibos(self, response):
    """
    解析微博列表
    :param response: Response 对象
    """
    result = json.loads(response.text)
    if result.get('ok') and result.get('cards'):
        weibos = result.get('cards')
        for weibo in weibos:
            mblog = weibo.get('mblog')
            if mblog:
                weibo_item = WeiboItem()
                field_map = {
                    'id': 'id', 'attitudes_count': 'attitudes_count', 'comments_count': 'comments_count', 'created_at': 'created_at',
                    'reposts_count': 'reposts_count', 'picture': 'original_pic', 'pictures': 'pics',
                    'source': 'source', 'text': 'text', 'raw_text': 'raw_text', 'thumbnail': 'thumbnail_pic'
                }
                for field, attr in field_map.items():
                    weibo_item[field] = mblog.get(attr)
                weibo_item['user'] = response.meta.get('uid')
                yield weibo_item
        # 下一页微博
        uid = response.meta.get('uid')
        page = response.meta.get('page') + 1
        yield Request(self.weibo_url.format(uid=uid, page=page), callback=self.parse_weibos,
                      meta={'uid': uid, 'page': page})

这里 parse_weibos() 方法完成了两件事。

  • 提取用户的微博信息,并生成 WeiboItem。这里同样建立了一个字段映射表,实现批量字段赋值。
  • 提取下一页的微博列表。这里同样需要传入用户 ID 和分页页码。

到目前为止,微博的 Spider 已经完成。后面还需要对数据进行数据清洗存储,以及对接代理池、Cookies 池来防止反爬虫。

8. 数据清洗

有些微博的时间可能不是标准的时间,比如它可能显示为刚刚、几分钟前、几小时前、昨天等。这里我们需要统一转化这些时间,实现一个 parse_time() 方法,如下所示:

def parse_time(self, date):
    if re.match(' 刚刚 ', date):
        date = time.strftime('% Y-% m-% d % H:% M', time.localtime(time.time()))
    if re.match('d + 分钟前 ', date):
        minute = re.match('(d+)', date).group(1)
        date = time.strftime('% Y-% m-% d % H:% M', time.localtime(time.time() - float(minute) * 60))
    if re.match('d + 小时前 ', date):
        hour = re.match('(d+)', date).group(1)
        date = time.strftime('% Y-% m-% d % H:% M', time.localtime(time.time() - float(hour) * 60 * 60))
    if re.match(' 昨天.*', date):
        date = re.match(' 昨天 (.*)', date).group(1).strip()
        date = time.strftime('% Y-% m-% d', time.localtime() - 24 * 60 * 60) + ' ' + date
    if re.match('d{2}-d{2}', date):
        date = time.strftime('% Y-', time.localtime()) + date + ' 00:00'
    return date

我们用正则来提取一些关键数字,用 time 库来实现标准时间的转换。

以 X 分钟前的处理为例,爬取的时间会赋值为 created_at 字段。我们首先用正则匹配这个时间,表达式写作 d + 分钟前,如果提取到的时间符合这个表达式,那么就提取出其中的数字,这样就可以获取分钟数了。接下来使用 time 模块的 strftime() 方法,第一个参数传入要转换的时间格式,第二个参数就是时间戳。这里我们用当前的时间戳减去此分钟数乘以 60 就是当时的时间戳,这样我们就可以得到格式化后的正确时间了。

然后 Pipeline 可以实现如下处理:

class WeiboPipeline():
    def process_item(self, item, spider):
        if isinstance(item, WeiboItem):
            if item.get('created_at'):
                item['created_at'] = item['created_at'].strip()
                item['created_at'] = self.parse_time(item.get('created_at'))

我们在 Spider 里没有对 crawled_at 字段赋值,它代表爬取时间,我们可以统一将其赋值为当前时间,实现如下所示:

class TimePipeline():
    def process_item(self, item, spider):
        if isinstance(item, UserItem) or isinstance(item, WeiboItem):
            now = time.strftime('% Y-% m-% d % H:% M', time.localtime())
            item['crawled_at'] = now
        return item

这里我们判断了 item 如果是 UserItem 或 WeiboItem 类型,那么就给它的 crawled_at 字段赋值为当前时间。

通过上面的两个 Pipeline,我们便完成了数据清洗工作,这里主要是时间的转换。

9. 数据存储

数据清洗完毕之后,我们就要将数据保存到 MongoDB 数据库。我们在这里实现 MongoPipeline 类,如下所示:

import pymongo

class MongoPipeline(object):
    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(mongo_uri=crawler.settings.get('MONGO_URI'), mongo_db=crawler.settings.get('MONGO_DATABASE')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]
        self.db[UserItem.collection].create_index([('id', pymongo.ASCENDING)])
        self.db[WeiboItem.collection].create_index([('id', pymongo.ASCENDING)])

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        if isinstance(item, UserItem) or isinstance(item, WeiboItem):
            self.db[item.collection].update({'id': item.get('id')}, {'$set': item}, True)
        if isinstance(item, UserRelationItem):
            self.db[item.collection].update({'id': item.get('id')},
                {'$addToSet':
                    {'follows': {'$each': item['follows']},
                        'fans': {'$each': item['fans']}
                    }
                }, True)
        return item

当前的 MongoPipeline 和前面我们所写的有所不同,主要有以下几点。

  • 在 open_spider() 方法里面添加了 Collection 的索引,在这里为两个 Item 都做了索引,索引的字段是 id,由于我们这次是大规模爬取,同时在爬取过程中涉及到数据的更新问题,所以我们为每个 Collection 建立了索引,建立了索引之后可以大大提高检索效率。
  • 在 process_item() 方法里存储使用的是 update() 方法,第一个参数是查询条件,第二个参数是爬取的 Item,这里我们使用了 $set 操作符,这样我们如果爬取到了重复的数据即可对数据进行更新,同时不会删除已存在的字段,如果这里不加 $set 操作符,那么会直接进行 item 替换,这样可能会导致已存在的字段如关注和粉丝列表清空,所以这里必须要加上 $set 操作符。第三个参数我们设置为了 True,这个参数起到的作用是如果数据不存在,则插入数据。这样我们就可以做到数据存在即更新、数据不存在即插入,这样就达到了去重的效果。
  • 对于用户的关注和粉丝列表,我们在这里使用了一个新的操作符,叫做 $addToSet,这个操作符可以向列表类型的字段插入数据同时去重,接下来它的值就是需要操作的字段名称,我们在这里又利用了 $each 操作符对需要插入的列表数据进行了遍历,这样就可以逐条插入用户的关注或粉丝数据到指定的字段了,关于该操作更多的解释可以参考 MongoDB 的官方文档,链接为:https://docs.mongodb.com/manual/reference/operator/update/addToSet/

10. Cookies 池对接

新浪微博的反爬能力非常强,我们需要做一些防范反爬虫的措施才可以顺利完成数据爬取。

如果没有登录而直接请求微博的 API 接口,这非常容易导致 403 状态码。这个情况我们在 10.2 节也提过。所以在这里我们实现一个 Middleware,为每个 Request 添加随机的 Cookies。

我们先开启 Cookies 池,使 API 模块正常运行。例如在本地运行 5000 端口,访问:http://localhost:5000/weibo/random 即可获取随机的 Cookies,当然也可以将 Cookies 池部署到远程的服务器,这样只需要更改一下访问的链接就好了。

那么在这里我们将 Cookies 池在本地启动起来,再实现一个 Middleware 如下:

class CookiesMiddleware():
    def __init__(self, cookies_url):
        self.logger = logging.getLogger(__name__)
        self.cookies_url = cookies_url

    def get_random_cookies(self):
        try:
            response = requests.get(self.cookies_url)
            if response.status_code == 200:
                cookies = json.loads(response.text)
                return cookies
        except requests.ConnectionError:
            return False

    def process_request(self, request, spider):
        self.logger.debug(' 正在获取 Cookies')
        cookies = self.get_random_cookies()
        if cookies:
            request.cookies = cookies
            self.logger.debug(' 使用 Cookies ' + json.dumps(cookies))

    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(cookies_url=settings.get('COOKIES_URL')
        )

我们首先利用 from_crawler() 方法获取了 COOKIES_URL 变量,它定义在 settings.py 里,这就是刚才我们所说的接口。接下来实现 get_random_cookies() 方法,这个方法主要就是请求此 Cookies 池接口并获取接口返回的随机 Cookies。如果成功获取,则返回 Cookies;否则返回 False。

接下来,在 process_request() 方法里,我们给 request 对象的 cookies 属性赋值,其值就是获取的随机 Cookies,这样我们就成功地为每一次请求赋值 Cookies 了。

如果启用了该 Middleware,每个请求都会被赋值随机的 Cookies。这样我们就可以模拟登录之后的请求,403 状态码基本就不会出现。

11. 代理池对接

微博还有一个反爬措施就是,检测到同一 IP 请求量过大时就会出现 414 状态码。如果遇到这样的情况可以切换代理。例如,在本地 5555 端口运行,获取随机可用代理的地址为:http://localhost:5555/random,访问这个接口即可获取一个随机可用代理。接下来我们再实现一个 Middleware,代码如下所示:

class ProxyMiddleware():
    def __init__(self, proxy_url):
        self.logger = logging.getLogger(__name__)
        self.proxy_url = proxy_url

    def get_random_proxy(self):
        try:
            response = requests.get(self.proxy_url)
            if response.status_code == 200:
                proxy = response.text
                return proxy
        except requests.ConnectionError:
            return False

    def process_request(self, request, spider):
        if request.meta.get('retry_times'):
            proxy = self.get_random_proxy()
            if proxy:
                uri = 'https://{proxy}'.format(proxy=proxy)
                self.logger.debug(' 使用代理 ' + proxy)
                request.meta['proxy'] = uri

    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        return cls(proxy_url=settings.get('PROXY_URL')
        )

同样的原理,我们实现了一个 get_random_proxy() 方法用于请求代理池的接口获取随机代理。如果获取成功,则返回改代理,否则返回 False。在 process_request() 方法中,我们给 request 对象的 meta 属性赋值一个 proxy 字段,该字段的值就是代理。

另外,赋值代理的判断条件是当前 retry_times 不为空,也就是说第一次请求失败之后才启用代理,因为使用代理后访问速度会慢一些。所以我们在这里设置了只有重试的时候才启用代理,否则直接请求。这样就可以保证在没有被封禁的情况下直接爬取,保证了爬取速度。

12. 启用 Middleware

接下来,我们在配置文件中启用这两个 Middleware,修改 settings.py 如下所示:

DOWNLOADER_MIDDLEWARES = {
    'weibo.middlewares.CookiesMiddleware': 554,
    'weibo.middlewares.ProxyMiddleware': 555,
}

注意这里的优先级设置,前文提到了 Scrapy 的默认 Downloader Middleware 的设置如下:

{
    'scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware': 100,
    'scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware': 300,
    'scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware': 350,
    'scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware': 400,
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': 500,
    'scrapy.downloadermiddlewares.retry.RetryMiddleware': 550,
    'scrapy.downloadermiddlewares.ajaxcrawl.AjaxCrawlMiddleware': 560,
    'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware': 580,
    'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 590,
    'scrapy.downloadermiddlewares.redirect.RedirectMiddleware': 600,
    'scrapy.downloadermiddlewares.cookies.CookiesMiddleware': 700,
    'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware': 750,
    'scrapy.downloadermiddlewares.stats.DownloaderStats': 850,
    'scrapy.downloadermiddlewares.httpcache.HttpCacheMiddleware': 900,
}

要使得我们自定义的 CookiesMiddleware 生效,它在内置的 CookiesMiddleware 之前调用。内置的 CookiesMiddleware 的优先级为 700,所以这里我们设置一个比 700 小的数字即可。

要使得我们自定义的 ProxyMiddleware 生效,它在内置的 HttpProxyMiddleware 之前调用。内置的 HttpProxyMiddleware 的优先级为 750,所以这里我们设置一个比 750 小的数字即可。

13. 运行

到此为止,整个微博爬虫就实现完毕了,我们运行如下命令启动一下爬虫:

scrapy crawl weibocn

类似的输出结果如下:

2017-07-11 17:27:34 [urllib3.connectionpool] DEBUG: http://localhost:5000 "GET /weibo/random HTTP/1.1" 200 339
2017-07-11 17:27:34 [weibo.middlewares] DEBUG: 使用 Cookies {"SCF": "AhzwTr_DxIGjgri_dt46_DoPzUqq-PSupu545JdozdHYJ7HyEb4pD3pe05VpbIpVyY1ciKRRWwUgojiO3jYwlBE.", "_T_WM": "8fe0bc1dad068d09b888d8177f1c1218", "SSOLoginState": "1501496388", "M_WEIBOCN_PARAMS": "uicode%3D20000174", "SUHB": "0tKqV4asxqYl4J", "SUB": "_2A250e3QUDeRhGeBM6VYX8y7NwjiIHXVXhBxcrDV6PUJbkdBeLXjckW2fUT8MWloekO4FCWVlIYJGJdGLnA.."}
2017-07-11 17:27:34 [weibocn] DEBUG: <200 https://m.weibo.cn/api/container/getIndex?uid=1742566624&type=uid&value=1742566624&containerid=1005051742566624>
2017-07-11 17:27:34 [scrapy.core.scraper] DEBUG: Scraped from <200 https://m.weibo.cn/api/container/getIndex?uid=1742566624&type=uid&value=1742566624&containerid=1005051742566624>
{'avatar': 'https://tva4.sinaimg.cn/crop.0.0.180.180.180/67dd74e0jw1e8qgp5bmzyj2050050aa8.jpg',
 'cover': 'https://tva3.sinaimg.cn/crop.0.0.640.640.640/6ce2240djw1e9oaqhwllzj20hs0hsdir.jpg',
 'crawled_at': '2017-07-11 17:27',
 'description': ' 成长,就是一个不断觉得以前的自己是个傻逼的过程 ',
 'fans_count': 19202906,
 'follows_count': 1599,
 'gender': 'm',
 'id': 1742566624,
 'name': ' 思想聚焦 ',
 'verified': True,
 'verified_reason': ' 微博知名博主,校导网编辑 ',
 'verified_type': 0,
 'weibos_count': 58393}

运行一段时间后,我们便可以到 MongoDB 数据库查看数据,爬取下来的数据如图 13-38 和图 13-39 所示。

图 13-38 用户信息

图 13-39 微博信息

针对用户信息,我们不仅爬取了其基本信息,还把关注和粉丝列表加到了 follows 和 fans 字段并做了去重操作。针对微博信息,我们成功进行了时间转换处理,同时还保存了微博的图片列表信息。

14. 本节代码

本节代码地址:https://github.com/Python3WebSpider/Weibo

15. 结语

本节实现了新浪微博的用户及其粉丝关注列表和微博信息的爬取,还对接了 Cookies 池和代理池来处理反爬虫。不过现在是针对单机的爬取,后面我们会将此项目修改为分布式爬虫,以进一步提高抓取效率。

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.13–Scrapy 爬取新浪微博

]]>
/8453.html/feed 0
[Python3网络爬虫开发实战] 13.12–Scrapy 对接 Docker /8448.html /8448.html#respond Sat, 07 Dec 2019 02:23:45 +0000 /?p=8448 13.12 Scrapy 对接 Docker

环境配置问题可能一直是我们头疼的,我们可能遇到过如下的情况:

  • 我们在本地写好了一个 Scrapy 爬虫项目,想要把它放到服务器上运行,但是服务器上没有安装 Python 环境。
  • 别人给了我们一个 Scrapy 爬虫项目,项目中使用包的版本和我们本地环境版本不一致,无法直接运行。
  • 我们需要同时管理不同版本的 Scrapy 项目,如早期的项目依赖于 Scrapy 0.25,现在的项目依赖于 Scrapy 1.4.0。

在这些情况下,我们需要解决的就是环境的安装配置、环境的版本冲突解决等问题。

对于 Python 来说,VirtualEnv 的确可以解决版本冲突的问题。但是,VirtualEnv 不太方便做项目部署,我们还是需要安装 Python 环境,

如何解决上述问题呢?答案是用 Docker。Docker 可以提供操作系统级别的虚拟环境,一个 Docker 镜像一般都包含一个完整的操作系统,而这些系统内也有已经配置好的开发环境,如 Python 3.6 环境等。

我们可以直接使用此 Docker 的 Python 3 镜像运行一个容器,将项目直接放到容器里运行,就不用再额外配置 Python 3 环境。这样就解决了环境配置的问题。

我们也可以进一步将 Scrapy 项目制作成一个新的 Docker 镜像,镜像里只包含适用于本项目的 Python 环境。如果要部署到其他平台,只需要下载该镜像并运行就好了,因为 Docker 运行时采用虚拟环境,和宿主机是完全隔离的,所以也不需要担心环境冲突问题。

如果我们能够把 Scrapy 项目制作成一个 Docker 镜像,只要其他主机安装了 Docker,那么只要将镜像下载并运行即可,而不必再担心环境配置问题或版本冲突问题。

接下来,我们尝试把一个 Scrapy 项目制作成一个 Docker 镜像。

1. 本节目标

我们要实现把前文 Scrapy 的入门项目打包成一个 Docker 镜像的过程。项目爬取的网址为:http://quotes.toscrape.com/,本章 Scrapy 入门一节已经实现了 Scrapy 对此站点的爬取过程,项目代码为:https://github.com/Python3WebSpider/ScrapyTutorial,如果本地不存在的话可以 Clone 下来。

2. 准备工作

请确保已经安装好 Docker 和 MongoDB 并可以正常运行,如果没有安装可以参考第 1 章的安装说明。

3. 创建 Dockerfile

首先在项目的根目录下新建一个 requirements.txt 文件,将整个项目依赖的 Python 环境包都列出来,如下所示:

scrapy
pymongo

如果库需要特定的版本,我们还可以指定版本号,如下所示:

scrapy>=1.4.0
pymongo>=3.4.0

在项目根目录下新建一个 Dockerfile 文件,文件不加任何后缀名,修改内容如下所示:

FROM python:3.6
ENV PATH /usr/local/bin:$PATH
ADD . /code
WORKDIR /code
RUN pip3 install -r requirements.txt
CMD scrapy crawl quotes

第一行的 FROM 代表使用的 Docker 基础镜像,在这里我们直接使用 python:3.6 的镜像,在此基础上运行 Scrapy 项目。

第二行 ENV 是环境变量设置,将 /usr/local/bin:$PATH 赋值给 PATH,即增加 /usr/local/bin 这个环境变量路径。

第三行 ADD 是将本地的代码放置到虚拟容器中。它有两个参数:第一个参数是.,代表本地当前路径;第二个参数是 /code,代表虚拟容器中的路径,也就是将本地项目所有内容放置到虚拟容器的 /code 目录下,以便于在虚拟容器中运行代码。

第四行 WORKDIR 是指定工作目录,这里将刚才添加的代码路径设成工作路径。这个路径下的目录结构和当前本地目录结构是相同的,所以我们可以直接执行库安装命令、爬虫运行命令等。

第五行 RUN 是执行某些命令来做一些环境准备工作。由于 Docker 虚拟容器内只有 Python 3 环境,而没有所需要的 Python 库,所以我们运行此命令来在虚拟容器中安装相应的 Python 库如 Scrapy,这样就可以在虚拟容器中执行 Scrapy 命令了。

第六行 CMD 是容器启动命令。在容器运行时,此命令会被执行。在这里我们直接用 scrapy crawl quotes 来启动爬虫。

4. 修改 MongoDB 连接

接下来我们需要修改 MongoDB 的连接信息。如果我们继续用 localhost 是无法找到 MongoDB 的,因为在 Docker 虚拟容器里 localhost 实际指向容器本身的运行 IP,而容器内部并没有安装 MongoDB,所以爬虫无法连接 MongoDB。

这里的 MongoDB 地址可以有如下两种选择。

  • 如果只想在本机测试,我们可以将地址修改为宿主机的 IP,也就是容器外部的本机 IP,一般是一个局域网 IP,使用 ifconfig 命令即可查看。
  • 如果要部署到远程主机运行,一般 MongoDB 都是可公网访问的地址,修改为此地址即可。

在本节中,我们的目标是将项目打包成一个镜像,让其他远程主机也可运行这个项目。所以我们直接将此处 MongoDB 地址修改为某个公网可访问的远程数据库地址,修改 MONGO_URI 如下所示:

MONGO_URI = 'mongodb://admin:admin123@120.27.34.25:27017'

此处地址可以修改为自己的远程 MongoDB 数据库地址。

这样项目的配置就完成了。

5. 构建镜像

接下来我们便可以构建镜像了,执行如下命令:

docker build -t quotes:latest .

这样的输出就说明镜像构建成功。这时我们查看一下构建的镜像,如下所示:

Sending build context to Docker daemon 191.5 kB
Step 1/6 : FROM python:3.6
 ---> 968120d8cbe8
Step 2/6 : ENV PATH /usr/local/bin:$PATH
 ---> Using cache
 ---> 387abbba1189
Step 3/6 : ADD . /code
 ---> a844ee0db9c6
Removing intermediate container 4dc41779c573
Step 4/6 : WORKDIR /code
 ---> 619b2c064ae9
Removing intermediate container bcd7cd7f7337
Step 5/6 : RUN pip3 install -r requirements.txt
 ---> Running in 9452c83a12c5
...
Removing intermediate container 9452c83a12c5
Step 6/6 : CMD scrapy crawl quotes
 ---> Running in c092b5557ab8
 ---> c8101aca6e2a
Removing intermediate container c092b5557ab8
Successfully built c8101aca6e2a

出现类似输出就证明镜像构建成功了,这时执行如我们查看一下构建的镜像:

docker images

返回结果中其中有一行就是:

quotes  latest  41c8499ce210    2 minutes ago   769 MB

这就是我们新构建的镜像。

6. 运行

我们可以先在本地测试运行,执行如下命令:

docker run quotes

这样我们就利用此镜像新建并运行了一个 Docker 容器,运行效果完全一致,如图 13-29 所示。

图 13-32 运行结果

如果出现类似图 13-29 的运行结果,这就证明构建的镜像没有问题。

7. 推送至 Docker Hub

构建完成之后,我们可以将镜像 Push 到 Docker 镜像托管平台,如 Docker Hub 或者私有的 Docker Registry 等,这样我们就可以从远程服务器下拉镜像并运行了。

以 Docker Hub 为例,如果项目包含一些私有的连接信息(如数据库),我们最好将 Repository 设为私有或者直接放到私有的 Docker Registry。

首先在 https://hub.docker.com 注册一个账号,新建一个 Repository,名为 quotes。比如,我的用户名为 germey,新建的 Repository 名为 quotes,那么此 Repository 的地址就可以用 germey/quotes 来表示。

为新建的镜像打一个标签,命令如下所示:

docker tag quotes:latest germey/quotes:latest

推送镜像到 Docker Hub 即可,命令如下所示:

docker push germey/quotes

Docker Hub 便会出现新推送的 Docker 镜像了,如图 13-30 所示。

图 13-30 推送结果

如果我们想在其他的主机上运行这个镜像,主机上装好 Docker 后,可以直接执行如下命令:

docker run germey/quotes

这样就会自动下载镜像,然后启动容器运行,不需要配置 Python 环境,不需要关心版本冲突问题。

运行效果如图 13-31 所示:

图 13-31 运行效果

整个项目爬取完成后,数据就可以存储到指定的数据库中。

8. 结语

我们讲解了将 Scrapy 项目制作成 Docker 镜像并部署到远程服务器运行的过程。使用此种方式,我们在本节开头所列出的问题都迎刃而解。

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.12–Scrapy 对接 Docker

]]>
/8448.html/feed 0
[Python3网络爬虫开发实战] 13.11–Scrapyrt 的使用 /8445.html /8445.html#respond Sat, 07 Dec 2019 02:20:01 +0000 /?p=8445 13.11 Scrapyrt 的使用

Scrapyrt 为 Scrapy 提供了一个调度的 HTTP 接口。有了它我们不需要再执行 Scrapy 命令,而是通过请求一个 HTTP 接口即可调度 Scrapy 任务,我们就不需要借助于命令行来启动项目了。如果项目是在远程服务器运行,利用它来启动项目是个不错的选择。

1. 本节目标

我们以本章 Scrapy 入门项目为例来说明 Scrapyrt 的使用方法,项目源代码地址为:https://github.com/Python3WebSpider/ScrapyTutorial

2. 准备工作

请确保 Scrapyrt 已经正确安装并正常运行,具体安装可以参考第 1 章的说明。

3. 启动服务

首先将项目下载下来,在项目目录下运行 Scrapyrt,假设当前服务运行在 9080 端口上。下面将简单介绍 Scrapyrt 的使用方法。

4. GET 请求

目前,GET 请求方式支持如下的参数。

  • spider_name,Spider 名称,字符串类型,必传参数,如果传递的 Spider 名称不存在则会返回 404 错误。
  • url,爬取链接,字符串类型,如果起始链接没有定义的话就必须要传递,如果传递了该参数,Scrapy 会直接用该 URL 生成 Request,而直接忽略 start_requests() 方法和 start_urls 属性的定义。
  • callback,回调函数名称,字符串类型,可选参数,如果传递了就会使用此回调函数处理,否则会默认使用 Spider 内定义的回调函数。
  • max_requests,最大请求数量,数值类型,可选参数,它定义了 Scrapy 执行请求的 Request 的最大限制,如定义为 5,则最多只执行 5 次 Request 请求,其余的则会被忽略。
  • start_requests,是否要执行 start_request() 函数,布尔类型,可选参数,在 Scrapy 项目中如果定义了 start_requests() 方法,那么在项目启动时会默认调用该方法,但是在 Scrapyrt 就不一样了,它默认不执行 start_requests() 方法,如果要执行,需要将它设置为 true。

例如我们执行如下命令:

curl http://localhost:9080/crawl.json?spider_name=quotes&url=http://quotes.toscrape.com/

得到类似如下结果,如图 13-28 所示:

图 13-28 输出结果

返回的是一个 JSON 格式的字符串,我们解析它的结构,如下所示:

{
  "status": "ok",
  "items": [
    {
      "text": "“The world as we have created it is a process of o...",
      "author": "Albert Einstein",
      "tags": [
        "change",
        "deep-thoughts",
        "thinking",
        "world"
      ]
    },
    ...
    {
      "text": "“... a mind needs books as a sword needs a whetsto...",
      "author": "George R.R. Martin",
      "tags": [
        "books",
        "mind"
      ]
    }
  ],
  "items_dropped": [],
  "stats": {
    "downloader/request_bytes": 2892,
    "downloader/request_count": 11,
    "downloader/request_method_count/GET": 11,
    "downloader/response_bytes": 24812,
    "downloader/response_count": 11,
    "downloader/response_status_count/200": 10,
    "downloader/response_status_count/404": 1,
    "dupefilter/filtered": 1,
    "finish_reason": "finished",
    "finish_time": "2017-07-12 15:09:02",
    "item_scraped_count": 100,
    "log_count/DEBUG": 112,
    "log_count/INFO": 8,
    "memusage/max": 52510720,
    "memusage/startup": 52510720,
    "request_depth_max": 10,
    "response_received_count": 11,
    "scheduler/dequeued": 10,
    "scheduler/dequeued/memory": 10,
    "scheduler/enqueued": 10,
    "scheduler/enqueued/memory": 10,
    "start_time": "2017-07-12 15:08:56"
  },
  "spider_name": "quotes"
}

这里省略了 items 绝大部分。status 显示了爬取的状态,items 部分是 Scrapy 项目的爬取结果,items_dropped 是被忽略的 Item 列表,stats 是爬取结果的统计情况。此结果和直接运行 Scrapy 项目得到的统计是相同的。

这样一来,我们就通过 HTTP 接口调度 Scrapy 项目并获取爬取结果,如果 Scrapy 项目部署在服务器上,我们可以通过开启一个 Scrapyrt 服务实现任务的调度并直接取到爬取结果,这很方便。

5. POST 请求

除了 GET 请求,我们还可以通过 POST 请求来请求 Scrapyrt。但是此处 Request Body 必须是一个合法的 JSON 配置,在 JSON 里面可以配置相应的参数,支持的配置参数更多。

目前,JSON 配置支持如下参数。

  • spider_name:Spider 名称,字符串类型,必传参数。如果传递的 Spider 名称不存在,则返回 404 错误。
  • max_requests:最大请求数量,数值类型,可选参数。它定义了 Scrapy 执行请求的 Request 的最大限制,如定义为 5,则表示最多只执行 5 次 Request 请求,其余的则会被忽略。
  • request:Request 配置,JSON 对象,必传参数。通过该参数可以定义 Request 的各个参数,必须指定 url 字段来指定爬取链接,其他字段可选。

我们看一个 JSON 配置实例,如下所示:

{
    "request": {
        "url": "http://quotes.toscrape.com/",
        "callback": "parse",
        "dont_filter": "True",
        "cookies": {"foo": "bar"}
    },
    "max_requests": 2,
    "spider_name": "quotes"
}

我们执行如下命令传递该 Json 配置并发起 POST 请求:

curl http://localhost:9080/crawl.json -d '{"request": {"url": "http://quotes.toscrape.com/", "dont_filter": "True", "callback": "parse", "cookies": {"foo": "bar"}}, "max_requests": 2, "spider_name": "quotes"}'

运行结果和上文类似,同样是输出了爬取状态、结果、统计信息等内容。

6. 结语

以上内容便是 Scrapyrt 的相关用法介绍。通过它,我们方便地调度 Scrapy 项目的运行并获取爬取结果。更多的使用方法可以参考官方文档:http://scrapyrt.readthedocs.io

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.11–Scrapyrt 的使用

]]>
/8445.html/feed 0
Nginx 反向代理返回结果为空的问题 /8443.html /8443.html#respond Fri, 06 Dec 2019 22:26:39 +0000 /?p=8443 最近在开发过程中遇到了这么一个问题:

现在有一个 Web 项目,前端是使用 Vue.js 开发的,整个前端需要部署到 K8S 上,后端和前端分开,同样也需要部署到 K8S 上,因此二者需要打包为 Docker 镜像。

对前端来说,打包 Docker 就遇到了一个问题:跨域访问问题。

因此一个普遍的解决方案就是使用 Nginx 做反向代理。

一般来说,我们需要在打包时配置一下 nginx.conf 文件,然后在 Dockerfile 里面指定即可。

Dockerfile

首先看下 Dockerfile:

# build stage
FROM node:lts-alpine as build-stage
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build

# production stage
FROM nginx:lts-alpine as production-stage
COPY --from=build-stage /app/dist /usr/share/nginx/html
COPY nginx.conf /etc/nginx/conf.d/
RUN rm /etc/nginx/conf.d/default.conf 
    && mv /etc/nginx/conf.d/nginx.conf /etc/nginx/conf.d/default.conf
EXPOSE 80
CMD ["nginx", "-g", "daemon off;"]

一般来说,对于常规的 Vue.js 前端项目,Dockerfile 就这么写就行了。

简单介绍一下:

  • 第一步,使用 Node.js 镜像,在 Node.js 环境下对项目进行编译,默认会输出到 dist 文件夹下。
  • 第二步,使用新的 Nginx 镜像,将编译得到的前端文件拷贝到 nginx 默认 serve 的目录,然后把自定义的 nginx.conf 文件替换为 Nginx 默认的 conf 文件,运行即可。

反向代理

这里比较关键的就是 nginx.conf 文件了,为了解决跨域问题,我们一般会将后端的接口进行反向代理。

一般来说,后端的 API 接口都是以 api 为开头的,所以我们需要代理 api 开头的接口地址,nginx.conf 内容一般可以这么写:

server {
    listen       80;
    server_name  localhost;

    location /api/ {
                proxy_pass http://domain.com/api/;
                proxy_set_header X-Forwarded-Proto $scheme;
                proxy_set_header Host $http_host;
                proxy_set_header X-Real-IP $remote_addr;
    }

    location / {
        root   /usr/share/nginx/html;
        index  index.html index.htm;
    }

    location = /50x.html {
        root   /usr/share/nginx/html;
    }

    error_page  404              /404.html;
    error_page   500 502 503 504  /50x.html;
}

一般来说,以上的写法是没有问题的,proxy_set_header 也把一些 Header 进行设置,转发到后端服务器。

如果你这么写,打包 Docker 之后,测试没有遇到问题,那就完事了。

问题

但我遇到了一个奇怪的问题,某个接口在请求的时候,状态码还是 200,但其返回值总是为空,即 Response Data 的内容完全为空。

但是服务器端看 Log 确实有正常返回 Response,使用 Vue 的 devServer 也是正常的,使用 Postman 来请求也是正常的,但是经过 Nginx 这么一反向代理就不行了,什么 Response 都接收不到。

部署到 Prod 环境之后,浏览器上面可以得到这么个错误:

ERR_INCOMPLETE_CHUNKED_ENCODING

image-20191207042932549

最后经排查,发现后端接口使用时设定了 Transfer-Encoding: chunked 响应头:

Transfer-Encoding: chunked

这是啥?这时候就需要引出 Keep-Alive 的相关问题了。

什么是 Keep-Alive?

我们知道 HTTP 协议采用「请求-应答」模式,当使用普通模式,即非 Keep-Alive 模式时,每个请求/应答客户和服务器都要新建一个连接,完成之后立即断开连接(HTTP 协议为无连接的协议)。当使用 Keep-Alive 模式(又称持久连接、连接重用)时,Keep-Alive 功能使客户端到服务器端的连接持续有效,当出现对服务器的后继请求时,Keep-Alive 功能避免了建立或者重新建立连接。

  • HTTP 1.0 中默认是关闭 Keep-Alive 的,需要在 HTTP 头加入Connection: Keep-Alive,才能启用 Keep-Alive
  • HTTP 1.1 中默认启用 Keep-Alive,如果请求头中加入 Connection: close,Keep-Alive 才关闭。

目前大部分浏览器都是用 HTTP 1.1 协议,也就是说默认都会发起 Keep-Alive 的连接请求了,所以是否能完成一个完整的 Keep-Alive 连接就看服务器设置情况。

启用 Keep-Alive 模式肯定更高效,性能更高。因为避免了建立/释放连接的开销。

Keep-Alive 模式下如何传输数据

Keep-Alive 模式,客户端如何判断请求所得到的响应数据已经接收完成呢?或者说如何知道服务器已经发生完了数据?

我们已经知道了,Keep-Alive 模式发送完数据,HTTP 服务器不会自动断开连接,所有不能再使用返回 EOF(-1)来判断。

那么怎么判断呢?一个是使用 Content-Length ,一个是使用 Transfer-Encoding。

Content-Length

顾名思义,Conent-Length 表示实体内容长度,客户端(服务器)可以根据这个值来判断数据是否接收完成。

由于 Content-Length 字段必须真实反映实体长度,但实际应用中,有些时候实体长度并没那么好获得,例如实体来自于网络文件,或者由动态语言生成。这时候要想准确获取长度,只能开一个足够大的 buffer,等内容全部生成好再计算。但这样做一方面需要更大的内存开销,另一方面也会让客户端等更久。

我们在做 WEB 性能优化时,有一个重要的指标叫 TTFB(Time To First Byte),它代表的是从客户端发出请求到收到响应的第一个字节所花费的时间。大部分浏览器自带的 Network 面板都可以看到这个指标,越短的 TTFB 意味着用户可以越早看到页面内容,体验越好。可想而知,服务端为了计算响应实体长度而缓存所有内容,跟更短的 TTFB 理念背道而驰。但在 HTTP 报文中,实体一定要在头部之后,顺序不能颠倒,为此我们需要一个新的机制:不依赖头部的长度信息,也能知道实体的边界。

但是如果消息中没有 Conent-Length,那该如何来判断呢?又在什么情况下会没有 Conent-Length 呢?

Transfer-Encoding

当客户端向服务器请求一个静态页面或者一张图片时,服务器可以很清楚地知道内容大小,然后通过 Content-length 消息首部字段告诉客户端需要接收多少数据。但是如果是动态页面等时,服务器是不可能预先知道内容大小,这时就可以使用 分块编码模式来传输数据了。即如果要一边产生数据,一边发给客户端,服务器就需要在请求头中使用Transfer-Encoding: chunked 这样的方式来代替 Content-Length,这就是分块编码。

分块编码相当简单,在头部加入 Transfer-Encoding: chunked 之后,就代表这个报文采用了分块编码。这时,报文中的实体需要改为用一系列分块来传输。每个分块包含十六进制的长度值和数据,长度值独占一行,长度不包括它结尾的 CRLF(rn),也不包括分块数据结尾的 CRLF。最后一个分块长度值必须为 0,对应的分块数据没有内容,表示实体结束。

回归问题

那么我说了这么一大通有什么用呢?

OK,在我遇到的业务场景中,我发现服务器的响应头中就包含了Transfer-Encoding: chunked 这个字段。

而这个字段,在 HTTP 1.0 是不被支持的。

而 Nginx 的反向代理,默认用的就是 HTTP 1.0,那就导致了数据无法获取的问题,可以参考 Nginx 的官方文档说明:http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_pass

原文中:

Syntax: proxy_http_version 1.0 | 1.1;
Default: proxy_http_version 1.0;
By default, version 1.0 is used. Version 1.1 is recommended for use with keepalive connections and NTLM authentication.

所以,我们如果要解决这个问题,只需要设置一下 HTTP 版本为 1.1 就好了:

修改 nginx.conf 文件如下:

location /api/ {
    proxy_pass http://domain.com/api/;
    proxy_http_version 1.1;
    proxy_set_header X-Forwarded-Proto $scheme;
    proxy_set_header Host $http_host;
    proxy_set_header X-Real-IP $remote_addr;
}

这里就增加了一行:

proxy_http_version 1.1;

这样再测试,反向代理就会支持 Transfer-Encoding: chunked 模式了,这也就呼应了之前在浏览器中遇到的 ERR_INCOMPLETE_CHUNKED_ENCODING 错误。

自此,问题完美解决。

复盘记录

一开始本来只想简单一记录就了事的,但一边写,发现某个地方还可以展开写得更详细。

所以干脆最后我对这个问题进行了详细的复盘和记录。在写本文之前,我其实只思考到了 Keep-Alive 和 HTTP 1.1 的问题,其实我对 Transfer-Encoding 这个并没有去深入思考。在边写边总结的过程中,为了把整个脉络讲明白,我又查询了一些 Transfer-Encoding 和 Nginx 的官方文档,对这块的了解变得更加深入,相当于我在整个记录的过程中,又对整个流程梳理了一遍,同时又有额外的收获。

所以,遇到问题,深入去思考、总结和复盘,是很有帮助的,这会让我们对问题的看法和理解更加透彻。

怎么说呢?在开发过程中,难免会遇到一些奇奇怪怪的 Bug,但这其实只是技术问题,总会解决的。

但怎样在开发过程中,不断提高自己的技术能力,我觉得需要从每一个细节出发,去思考一些事情的来龙去脉。思考得越多,我们对整个事件的把握也会越清晰,以后如果再遇到类似的或者关联的事情,就会迎刃而解了。

平时我们可能很多情况下都在写业务代码,可能比较枯燥,感觉对技术没有实质性的提升,但如果我们能从中提炼出一些核心的问题或解决方案,这才是能真正提高技术的时候,这才是最有价值的。

参考文章

本文部分内容改写或摘自下列内容。

转载请注明:静觅 » Nginx 反向代理返回结果为空的问题

]]>
/8443.html/feed 0
阿里云服务器活动!阿里云代金券 + 1 折优惠码 /8418.html /8418.html#respond Fri, 06 Dec 2019 10:43:17 +0000 /?p=8418 阿里云作为国内最大的云服务商家,个人与企业上云都纷纷首选阿里云。但是在价格方面比整个市场有些许昂贵,让不少用户却而止步。因此星速云小编呕心沥血整理阿里云最新优惠折扣【汇总篇】,让大家不用花时间到处寻找优惠信息,帮助站长、开发者和企业们上云购节省项目开支。


最全:阿里云最新优惠获取教程【长期有效】


①:阿里云代金券2000元红包

阿里云代金券领取很简单,点击下面链接进行领取。

阿里云代金券领取和使用步骤教程

阿里云代金券领取地址:点击领取2000元代金券礼包

点击“立即领取”按钮就可以一键领取到所有满减代金券,最高2000元。别忘记通过购物车一键批量购买哟!

②:阿里云9折优惠码

新用户还可以使用手机扫码领取一个阿里云9折折扣码叠加上述阿里云代金券使用。该9折码只能通过阿里云手机客户端扫描领取,PC端无法领取,(限ECS首购并且优惠高于7折才可以使用,比如优惠已经为5折,则该折扣码无效)

阿里云代金券

注明:阿里云9折优惠码与阿里云2000元红包可叠加优惠折扣。


阿里云双12期间(2019.12.3-2019.12.31)最新优惠活动


阿里云双12优惠活动终于开启了,新用户1折甩卖,老用户五折,还可以领取2000元红包,优惠力度不亚于双11优惠活动哟!还不赶紧上云呢?错过双11优惠活动,那么双12不容错过了!

阿里云双12活动

什么?您还不知道云服务器用途

不管是做web网站、APP程序后端部署、应用程序后端、小程序后端等,还是打算创业的小伙伴,或者传统IDC自建机房的企业,上云已成为趋势。云服务器更便捷省心、节约IT运维的成本。

新用户1折优惠售卖:

实例规格 配置 带宽 时长 价格 官网购买
ECS突发性能型t5 1核2G40G高效云盘 1M 1年 89.00元 立即抢购
ECS突发性能型t5 1核2G40G高效云盘 1M 3年 229.00元
ECS共享型n4 2核4G40G高效云盘 3M 2年 469.00元
ECS突发性能t5 2核4G40G高效云盘 5M 3年 899.00元
ECS突发性能t5 2核4G40G高效云盘 3M 3年 639.00元
ECS共享型n4 2核4G40G高效云盘 3M 3年 799.00元
ECS共享通用型mn4 2核8G40G高效云盘 5M 3年 1399.00元
ECS突发性能t5(香港) 1核1G40G高效云盘 1M 1年 119.00元
ECS网络增强型sn1ne 4核8G40G高效云盘 5M 3年 5621.00元
8核16G40G高效云盘 8M 3年 12209.00元

注明:突发性t5实例,别看到价格比较便宜就直接购买,里面很多套路,购买页面有提示:限制20%性能基线。释义:依靠CPU 积分来提升 CPU 性能,满足业务需求。当实例实际工作性能高于基准 CPU 计算性能时,会把服务器 CPU 的性能限制在 20%以下,如果这时20%CPU性能满足不了业务需求,云服务器CPU会跑满100%,到那时候你以为是被某大佬攻击了,很有可能是你突发性t5实例CPU 积分消耗完了。笔者建议:如果用户业务对 CPU 要求高的,可以直接略过,选择t5实例(无限制CPU性能)、n4共享型、通用型mn4。以下笔者建议爆款:

个人博客与企业微服务首选

阿里云双12云服务器爆款

老用户五折优惠甩卖:

实例规格 CPU/内存/云盘 带宽 时长 价格 老用户优惠购买
云服务器计算型ic5 8核8G40G高效云盘 1M 1年 4433.94元 立即抢购
计算网络增强型sn1ne 8核16G40G高效云盘 1M 1年 3751.20元
通用网络增强型sn2ne 8核32G40G高效云盘 1M 1年 5353.20元
内存网络增强型se1ne 8核64G40G高效云盘 1M 1年 6793.20元

注明:本文为星速云原创版权所有,禁止转载,一经发现将追究版权责任!

转载请注明:静觅 » 阿里云服务器活动!阿里云代金券 + 1 折优惠码

]]>
/8418.html/feed 0
[Python3网络爬虫开发实战] 13.10–Scrapy 通用爬虫 /8413.html /8413.html#comments Fri, 06 Dec 2019 01:30:05 +0000 /?p=8413 13.10 Scrapy 通用爬虫

通过 Scrapy,我们可以轻松地完成一个站点爬虫的编写。但如果抓取的站点量非常大,比如爬取各大媒体的新闻信息,多个 Spider 则可能包含很多重复代码。

如果我们将各个站点的 Spider 的公共部分保留下来,不同的部分提取出来作为单独的配置,如爬取规则、页面解析方式等抽离出来做成一个配置文件,那么我们在新增一个爬虫的时候,只需要实现这些网站的爬取规则和提取规则即可。

本节我们就来探究一下 Scrapy 通用爬虫的实现方法。

1. CrawlSpider

在实现通用爬虫之前我们需要先了解一下 CrawlSpider,其官方文档链接为:http://scrapy.readthedocs.io/en/latest/topics/spiders.html#crawlspider

CrawlSpider 是 Scrapy 提供的一个通用 Spider。在 Spider 里,我们可以指定一些爬取规则来实现页面的提取,这些爬取规则由一个专门的数据结构 Rule 表示。Rule 里包含提取和跟进页面的配置,Spider 会根据 Rule 来确定当前页面中的哪些链接需要继续爬取、哪些页面的爬取结果需要用哪个方法解析等。

CrawlSpider 继承自 Spider 类。除了 Spider 类的所有方法和属性,它还提供了一个非常重要的属性和方法。

  • rules,它是爬取规则属性,是包含一个或多个 Rule 对象的列表。每个 Rule 对爬取网站的动作都做了定义,CrawlSpider 会读取 rules 的每一个 Rule 并进行解析。
  • parse_start_url(),它是一个可重写的方法。当 start_urls 里对应的 Request 得到 Response 时,该方法被调用,它会分析 Response 并必须返回 Item 对象或者 Request 对象。

这里最重要的内容莫过于 Rule 的定义了,它的定义和参数如下所示:

class scrapy.contrib.spiders.Rule(link_extractor, callback=None, cb_kwargs=None, follow=None, process_links=None, process_request=None)

下面对其参数依次说明:

  • link_extractor,是一个 Link Extractor 对象。通过它,Spider 可以知道从爬取的页面中提取哪些链接。提取出的链接会自动生成 Request。它又是一个数据结构,一般常用 LxmlLinkExtractor 对象作为参数,其定义和参数如下所示:

class scrapy.linkextractors.lxmlhtml.LxmlLinkExtractor(allow=(), deny=(), allow_domains=(), deny_domains=(), deny_extensions=None, restrict_xpaths=(), restrict_css=(), tags=('a', 'area'), attrs=('href',), canonicalize=False, unique=True, process_value=None, strip=True)

allow 是一个正则表达式或正则表达式列表,它定义了从当前页面提取出的链接哪些是符合要求的,只有符合要求的链接才会被跟进。deny 则相反。allow_domains 定义了符合要求的域名,只有此域名的链接才会被跟进生成新的 Request,它相当于域名白名单。deny_domains 则相反,相当于域名黑名单。restrict_xpaths 定义了从当前页面中 XPath 匹配的区域提取链接,其值是 XPath 表达式或 XPath 表达式列表。restrict_css 定义了从当前页面中 CSS 选择器匹配的区域提取链接,其值是 CSS 选择器或 CSS 选择器列表。还有一些其他参数代表了提取链接的标签、是否去重、链接的处理等内容,使用的频率不高。可以参考文档的参数说明:http://scrapy.readthedocs.io/en/latest/topics/link-extractors.html#module-scrapy.linkextractors.lxmlhtml

  • callback,即回调函数,和之前定义 Request 的 callback 有相同的意义。每次从 link_extractor 中获取到链接时,该函数将会调用。该回调函数接收一个 response 作为其第一个参数,并返回一个包含 Item 或 Request 对象的列表。注意,避免使用 parse() 作为回调函数。由于 CrawlSpider 使用 parse() 方法来实现其逻辑,如果 parse() 方法覆盖了,CrawlSpider 将会运行失败。
  • cb_kwargs,字典,它包含传递给回调函数的参数。
  • follow,布尔值,即 True 或 False,它指定根据该规则从 response 提取的链接是否需要跟进。如果 callback 参数为 None,follow 默认设置为 True,否则默认为 False。
  • process_links,指定处理函数,从 link_extractor 中获取到链接列表时,该函数将会调用,它主要用于过滤。
  • process_request,同样是指定处理函数,根据该 Rule 提取到每个 Request 时,该函数都会调用,对 Request 进行处理。该函数必须返回 Request 或者 None。

以上内容便是 CrawlSpider 中的核心 Rule 的基本用法。但这些内容可能还不足以完成一个 CrawlSpider 爬虫。下面我们利用 CrawlSpider 实现新闻网站的爬取实例,来更好地理解 Rule 的用法。

2. Item Loader

我们了解了利用 CrawlSpider 的 Rule 来定义页面的爬取逻辑,这是可配置化的一部分内容。但是,Rule 并没有对 Item 的提取方式做规则定义。对于 Item 的提取,我们需要借助另一个模块 Item Loader 来实现。

Item Loader 提供一种便捷的机制来帮助我们方便地提取 Item。它提供的一系列 API 可以分析原始数据对 Item 进行赋值。Item 提供的是保存抓取数据的容器,而 Item Loader 提供的是填充容器的机制。有了它,数据的提取会变得更加规则化。

Item Loader 的 API 如下所示:

class scrapy.loader.ItemLoader([item, selector, response,] **kwargs)

Item Loader 的 API 返回一个新的 Item Loader 来填充给定的 Item。如果没有给出 Item,则使用 default_item_class 中的类自动实例化。另外,它传入 selector 和 response 参数来使用选择器或响应参数实例化。

下面将依次说明 Item Loader 的 API 参数。

  • item,Item 对象,可以调用 add_xpath()、add_css() 或 add_value() 等方法来填充 Item 对象。
  • selector,Selector 对象,用来提取填充数据的选择器。
  • response,Response 对象,用于使用构造选择器的 Response。

一个比较典型的 Item Loader 实例如下:

from scrapy.loader import ItemLoader
from project.items import Product

def parse(self, response):
    loader = ItemLoader(item=Product(), response=response)
    loader.add_xpath('name', '//div[@class="product_name"]')
    loader.add_xpath('name', '//div[@class="product_title"]')
    loader.add_xpath('price', '//p[@id="price"]')
    loader.add_css('stock', 'p#stock]')
    loader.add_value('last_updated', 'today')
    return loader.load_item()

这里首先声明一个 Product Item,用该 Item 和 Response 对象实例化 ItemLoader,调用 add_xpath() 方法把来自两个不同位置的数据提取出来,分配给 name 属性,再用 add_xpath()、add_css()、add_value() 等方法对不同属性依次赋值,最后调用 load_item() 方法实现 Item 的解析。这种方式比较规则化,我们可以把一些参数和规则单独提取出来做成配置文件或存到数据库,即可实现可配置化。

另外,Item Loader 每个字段中都包含了一个 Input Processor(输入处理器)和一个 Output Processor(输出处理器)。Input Processor 收到数据时立刻提取数据,Input Processor 的结果被收集起来并且保存在 ItemLoader 内,但是不分配给 Item。收集到所有的数据后,load_item() 方法被调用来填充再生成 Item 对象。在调用时会先调用 Output Processor 来处理之前收集到的数据,然后再存入 Item 中,这样就生成了 Item。

下面将介绍一些内置的 Processor。

Identity

Identity 是最简单的 Processor,不进行任何处理,直接返回原来的数据。

TakeFirst

TakeFirst 返回列表的第一个非空值,类似 extract_first() 的功能,常用作 Output Processor,如下所示:

from scrapy.loader.processors import TakeFirst
processor = TakeFirst()
print(processor(['', 1, 2, 3]))

输出结果如下所示:

1

经过此 Processor 处理后的结果返回了第一个不为空的值。

Join

Join 方法相当于字符串的 join() 方法,可以把列表拼合成字符串,字符串默认使用空格分隔,如下所示:

from scrapy.loader.processors import Join
processor = Join()
print(processor(['one', 'two', 'three']))

输出结果如下所示:

one two three

它也可以通过参数更改默认的分隔符,例如改成逗号:

from scrapy.loader.processors import Join
processor = Join(',')
print(processor(['one', 'two', 'three']))

运行结果如下所示:

one,two,three

Compose

Compose 是用给定的多个函数的组合而构造的 Processor,每个输入值被传递到第一个函数,其输出再传递到第二个函数,依次类推,直到最后一个函数返回整个处理器的输出,如下所示:

from scrapy.loader.processors import Compose
processor = Compose(str.upper, lambda s: s.strip())
print(processor(' hello world'))

运行结果如下所示:

HELLO WORLD

在这里我们构造了一个 Compose Processor,传入一个开头带有空格的字符串。Compose Processor 的参数有两个:第一个是 str.upper,它可以将字母全部转为大写;第二个是一个匿名函数,它调用 strip() 方法去除头尾空白字符。Compose 会顺次调用两个参数,最后返回结果的字符串全部转化为大写并且去除了开头的空格。

MapCompose

与 Compose 类似,MapCompose 可以迭代处理一个列表输入值,如下所示:

from scrapy.loader.processors import MapCompose
processor = MapCompose(str.upper, lambda s: s.strip())
print(processor(['Hello', 'World', 'Python']))

运行结果如下所示:

['HELLO', 'WORLD', 'PYTHON']

被处理的内容是一个可迭代对象,MapCompose 会将该对象遍历然后依次处理。

SelectJmes

SelectJmes 可以查询 JSON,传入 Key,返回查询所得的 Value。不过需要先安装 jmespath 库才可以使用它,命令如下所示:

pip3 install jmespath

安装好 jmespath 之后,便可以使用这个 Processor 了,如下所示:

from scrapy.loader.processors import SelectJmes
proc = SelectJmes('foo')
processor = SelectJmes('foo')
print(processor({'foo': 'bar'}))

运行结果:

bar

以上内容便是一些常用的 Processor,在本节的实例中我们会使用 Processor 来进行数据的处理。

接下来,我们用一个实例来了解 Item Loader 的用法。

3. 本节目标

我们以中华网科技类新闻为例,来了解 CrawlSpider 和 Item Loader 的用法,再提取其可配置信息实现可配置化。官网链接为:http://tech.china.com/。我们需要爬取它的科技类新闻内容,链接为:http://tech.china.com/articles/,页面如图 13-19 所示。

我们要抓取新闻列表中的所有分页的新闻详情,包括标题、正文、时间、来源等信息。

图 13-19 爬取站点

4. 新建项目

首先新建一个 Scrapy 项目,名为 scrapyuniversal,如下所示:

scrapy startproject scrapyuniversal

创建一个 CrawlSpider,需要先制定一个模板。我们可以先看看有哪些可用模板,命令如下所示:

scrapy genspider -l

运行结果如下所示:

Available templates:
  basic
  crawl
  csvfeed
  xmlfeed

之前创建 Spider 的时候,我们默认使用了第一个模板 basic。这次要创建 CrawlSpider,就需要使用第二个模板 crawl,创建命令如下所示:

scrapy genspider -t crawl china tech.china.com

运行之后便会生成一个 CrawlSpider,其内容如下所示:

from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule

class ChinaSpider(CrawlSpider):
    name = 'china'
    allowed_domains = ['tech.china.com']
    start_urls = ['http://tech.china.com/']

    rules = (Rule(LinkExtractor(allow=r'Items/'), callback='parse_item', follow=True),
    )

    def parse_item(self, response):
        i = {}
        #i['domain_id'] = response.xpath('//input[@id="sid"]/@value').extract()
        #i['name'] = response.xpath('//div[@id="name"]').extract()
        #i['description'] = response.xpath('//div[@id="description"]').extract()
        return i

这次生成的 Spider 内容多了一个 rules 属性的定义。Rule 的第一个参数是 LinkExtractor,就是上文所说的 LxmlLinkExtractor,只是名称不同。同时,默认的回调函数也不再是 parse,而是 parse_item。

5. 定义 Rule

要实现新闻的爬取,我们需要做的就是定义好 Rule,然后实现解析函数。下面我们就来一步步实现这个过程。

首先将 start_urls 修改为起始链接,代码如下所示:

start_urls = ['http://tech.china.com/articles/']

之后,Spider 爬取 start_urls 里面的每一个链接。所以这里第一个爬取的页面就是我们刚才所定义的链接。得到 Response 之后,Spider 就会根据每一个 Rule 来提取这个页面内的超链接,去生成进一步的 Request。接下来,我们就需要定义 Rule 来指定提取哪些链接。

当前页面如图 13-20 所示:

图 13-20 页面内容

这是新闻的列表页,下一步自然就是将列表中的每条新闻详情的链接提取出来。这里直接指定这些链接所在区域即可。查看源代码,所有链接都在 ID 为 left_side 的节点内,具体来说是它内部的 class 为 con_item 的节点,如图 13-21 所示。

图 13-21 列表源码

此处我们可以用 LinkExtractor 的 restrict_xpaths 属性来指定,之后 Spider 就会从这个区域提取所有的超链接并生成 Request。但是,每篇文章的导航中可能还有一些其他的超链接标签,我们只想把需要的新闻链接提取出来。真正的新闻链接路径都是以 article 开头的,我们用一个正则表达式将其匹配出来再赋值给 allow 参数即可。另外,这些链接对应的页面其实就是对应的新闻详情页,而我们需要解析的就是新闻的详情信息,所以此处还需要指定一个回调函数 callback。

到现在我们就可以构造出一个 Rule 了,代码如下所示:

Rule(LinkExtractor(allow='article/.*.html', restrict_xpaths='//div[@id="left_side"]//div[@class="con_item"]'), callback='parse_item')

接下来,我们还要让当前页面实现分页功能,所以还需要提取下一页的链接。分析网页源码之后可以发现下一页链接是在 ID 为 pageStyle 的节点内,如图 13-22 所示。

图 13-22 分页源码

但是,下一页节点和其他分页链接区分度不高,要取出此链接我们可以直接用 XPath 的文本匹配方式,所以这里我们直接用 LinkExtractor 的 restrict_xpaths 属性来指定提取的链接即可。另外,我们不需要像新闻详情页一样去提取此分页链接对应的页面详情信息,也就是不需要生成 Item,所以不需要加 callback 参数。另外这下一页的页面如果请求成功了就需要继续像上述情况一样分析,所以它还需要加一个 follow 参数为 True,代表继续跟进匹配分析。其实,follow 参数也可以不加,因为当 callback 为空的时候,follow 默认为 True。此处 Rule 定义为如下所示:

Rule(LinkExtractor(restrict_xpaths='//div[@id="pageStyle"]//a[contains(., "下一页")]'))

所以现在 rules 就变成了:

rules = (Rule(LinkExtractor(allow='article/.*.html', restrict_xpaths='//div[@id="left_side"]//div[@class="con_item"]'), callback='parse_item'),
    Rule(LinkExtractor(restrict_xpaths='//div[@id="pageStyle"]//a[contains(., "下一页")]'))
)

接着我们运行一下代码,命令如下:

scrapy crawl china

现在已经实现页面的翻页和详情页的抓取了,我们仅仅通过定义了两个 Rule 即实现了这样的功能,运行效果如图 13-23 所示。

图 13-23 运行效果

6. 解析页面

接下来我们需要做的就是解析页面内容了,将标题、发布时间、正文、来源提取出来即可。首先定义一个 Item,如下所示:

from scrapy import Field, Item

class NewsItem(Item):
    title = Field()
    url = Field()
    text = Field()
    datetime = Field()
    source = Field()
    website = Field()

这里的字段分别指新闻标题、链接、正文、发布时间、来源、站点名称,其中站点名称直接赋值为中华网。因为既然是通用爬虫,肯定还有很多爬虫也来爬取同样结构的其他站点的新闻内容,所以需要一个字段来区分一下站点名称。

详情页的预览图如图 13-24 所示。

图 13-24 详情页面

如果像之前一样提取内容,就直接调用 response 变量的 xpath()、css() 等方法即可。这里 parse_item() 方法的实现如下所示:

def parse_item(self, response):
    item = NewsItem()
    item['title'] = response.xpath('//h1[@id="chan_newsTitle"]/text()').extract_first()
    item['url'] = response.url
    item['text'] = ''.join(response.xpath('//div[@id="chan_newsDetail"]//text()').extract()).strip()
    item['datetime'] = response.xpath('//div[@id="chan_newsInfo"]/text()').re_first('(d+-d+-d+sd+:d+:d+)')
    item['source'] = response.xpath('//div[@id="chan_newsInfo"]/text()').re_first(' 来源:(.*)').strip()
    item['website'] = ' 中华网 '
    yield item

这样我们就把每条新闻的信息提取形成了一个 NewsItem 对象。

这时实际上我们就已经完成了 Item 的提取。再运行一下 Spider,如下所示:

scrapy crawl china

输出内容如图 13-25 所示:

图 13-25 输出内容

现在我们就可以成功将每条新闻的信息提取出来。

不过我们发现这种提取方式非常不规整。下面我们再用 Item Loader,通过 add_xpath()、add_css()、add_value() 等方式实现配置化提取。我们可以改写 parse_item(),如下所示:

def parse_item(self, response):
    loader = ChinaLoader(item=NewsItem(), response=response)
    loader.add_xpath('title', '//h1[@id="chan_newsTitle"]/text()')
    loader.add_value('url', response.url)
    loader.add_xpath('text', '//div[@id="chan_newsDetail"]//text()')
    loader.add_xpath('datetime', '//div[@id="chan_newsInfo"]/text()', re='(d+-d+-d+sd+:d+:d+)')
    loader.add_xpath('source', '//div[@id="chan_newsInfo"]/text()', re=' 来源:(.*)')
    loader.add_value('website', ' 中华网 ')
    yield loader.load_item()

这里我们定义了一个 ItemLoader 的子类,名为 ChinaLoader,其实现如下所示:

from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, Join, Compose

class NewsLoader(ItemLoader):
    default_output_processor = TakeFirst()

class ChinaLoader(NewsLoader):
    text_out = Compose(Join(), lambda s: s.strip())
    source_out = Compose(Join(), lambda s: s.strip())

ChinaLoader 继承了 NewsLoader 类,其内定义了一个通用的 Out Processor 为 TakeFirst,这相当于之前所定义的 extract_first() 方法的功能。我们在 ChinaLoader 中定义了 text_out 和 source_out 字段。这里使用了一个 Compose Processor,它有两个参数:第一个参数 Join 也是一个 Processor,它可以把列表拼合成一个字符串;第二个参数是一个匿名函数,可以将字符串的头尾空白字符去掉。经过这一系列处理之后,我们就将列表形式的提取结果转化为去除头尾空白字符的字符串。

代码重新运行,提取效果是完全一样的。

至此,我们已经实现了爬虫的半通用化配置。

7. 通用配置抽取

为什么现在只做到了半通用化?如果我们需要扩展其他站点,仍然需要创建一个新的 CrawlSpider,定义这个站点的 Rule,单独实现 parse_item() 方法。还有很多代码是重复的,如 CrawlSpider 的变量、方法名几乎都是一样的。那么我们可不可以把多个类似的几个爬虫的代码共用,把完全不相同的地方抽离出来,做成可配置文件呢?

当然可以。那我们可以抽离出哪些部分?所有的变量都可以抽取,如 name、allowed_domains、start_urls、rules 等。这些变量在 CrawlSpider 初始化的时候赋值即可。我们就可以新建一个通用的 Spider 来实现这个功能,命令如下所示:

scrapy genspider -t crawl universal universal

这个全新的 Spider 名为 universal。接下来,我们将刚才所写的 Spider 内的属性抽离出来配置成一个 JSON,命名为 china.json,放到 configs 文件夹内,和 spiders 文件夹并列,代码如下所示:

{
  "spider": "universal",
  "website": "中华网科技",
  "type": "新闻",
  "index": "http://tech.china.com/",
  "settings": {"USER_AGENT": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36"
  },
  "start_urls": ["http://tech.china.com/articles/"],
  "allowed_domains": ["tech.china.com"],
  "rules": "china"
}

第一个字段 spider 即 Spider 的名称,在这里是 universal。后面是站点的描述,比如站点名称、类型、首页等。随后的 settings 是该 Spider 特有的 settings 配置,如果要覆盖全局项目,settings.py 内的配置可以单独为其配置。随后是 Spider 的一些属性,如 start_urls、allowed_domains、rules 等。rules 也可以单独定义成一个 rules.py 文件,做成配置文件,实现 Rule 的分离,如下所示:

from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import Rule

rules = {
    'china': (Rule(LinkExtractor(allow='article/.*.html', restrict_xpaths='//div[@id="left_side"]//div[@class="con_item"]'),
             callback='parse_item'),
        Rule(LinkExtractor(restrict_xpaths='//div[@id="pageStyle"]//a[contains(., "下一页")]'))
    )
}

这样我们将基本的配置抽取出来。如果要启动爬虫,只需要从该配置文件中读取然后动态加载到 Spider 中即可。所以我们需要定义一个读取该 JSON 文件的方法,如下所示:

from os.path import realpath, dirname
import json
def get_config(name):
    path = dirname(realpath(__file__)) + '/configs/' + name + '.json'
    with open(path, 'r', encoding='utf-8') as f:
        return json.loads(f.read())

定义了 get_config() 方法之后,我们只需要向其传入 JSON 配置文件的名称即可获取此 JSON 配置信息。随后我们定义入口文件 run.py,把它放在项目根目录下,它的作用是启动 Spider,如下所示:

import sys
from scrapy.utils.project import get_project_settings
from scrapyuniversal.spiders.universal import UniversalSpider
from scrapyuniversal.utils import get_config
from scrapy.crawler import CrawlerProcess

def run():
    name = sys.argv[1]
    custom_settings = get_config(name)
    # 爬取使用的 Spider 名称
    spider = custom_settings.get('spider', 'universal')
    project_settings = get_project_settings()
    settings = dict(project_settings.copy())
    # 合并配置
    settings.update(custom_settings.get('settings'))
    process = CrawlerProcess(settings)
    # 启动爬虫
    process.crawl(spider, **{'name': name})
    process.start()

if __name__ == '__main__':
    run()

运行入口为 run()。首先获取命令行的参数并赋值为 name,name 就是 JSON 文件的名称,其实就是要爬取的目标网站的名称。我们首先利用 get_config() 方法,传入该名称读取刚才定义的配置文件。获取爬取使用的 spider 的名称、配置文件中的 settings 配置,然后将获取到的 settings 配置和项目全局的 settings 配置做了合并。新建一个 CrawlerProcess,传入爬取使用的配置。调用 crawl() 和 start() 方法即可启动爬取。

在 universal 中,我们新建一个init() 方法,进行初始化配置,实现如下所示:

from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from scrapyuniversal.utils import get_config
from scrapyuniversal.rules import rules

class UniversalSpider(CrawlSpider):
    name = 'universal'
    def __init__(self, name, *args, **kwargs):
        config = get_config(name)
        self.config = config
        self.rules = rules.get(config.get('rules'))
        self.start_urls = config.get('start_urls')
        self.allowed_domains = config.get('allowed_domains')
        super(UniversalSpider, self).__init__(*args, **kwargs)

    def parse_item(self, response):
        i = {}
        return i

在 __init__() 方法中,start_urls、allowed_domains、rules 等属性被赋值。其中,rules 属性另外读取了 rules.py 的配置,这样就成功实现爬虫的基础配置。

接下来,执行如下命令运行爬虫:

python3 run.py china

程序会首先读取 JSON 配置文件,将配置中的一些属性赋值给 Spider,然后启动爬取。运行效果完全相同,运行结果如图 13-26 所示。

图 13-26 运行结果

现在我们已经对 Spider 的基础属性实现了可配置化。剩下的解析部分同样需要实现可配置化,原来的解析函数如下所示:

def parse_item(self, response):
    loader = ChinaLoader(item=NewsItem(), response=response)
    loader.add_xpath('title', '//h1[@id="chan_newsTitle"]/text()')
    loader.add_value('url', response.url)
    loader.add_xpath('text', '//div[@id="chan_newsDetail"]//text()')
    loader.add_xpath('datetime', '//div[@id="chan_newsInfo"]/text()', re='(d+-d+-d+sd+:d+:d+)')
    loader.add_xpath('source', '//div[@id="chan_newsInfo"]/text()', re=' 来源:(.*)')
    loader.add_value('website', ' 中华网 ')
    yield loader.load_item()

我们需要将这些配置也抽离出来。这里的变量主要有 Item Loader 类的选用、Item 类的选用、Item Loader 方法参数的定义,我们可以在 JSON 文件中添加如下 item 的配置:

"item": {
  "class": "NewsItem",
  "loader": "ChinaLoader",
  "attrs": {
    "title": [
      {
        "method": "xpath",
        "args": ["//h1[@id='chan_newsTitle']/text()"]
      }
    ],
    "url": [
      {
        "method": "attr",
        "args": ["url"]
      }
    ],
    "text": [
      {
        "method": "xpath",
        "args": ["//div[@id='chan_newsDetail']//text()"]
      }
    ],
    "datetime": [
      {
        "method": "xpath",
        "args": ["//div[@id='chan_newsInfo']/text()"],
        "re": "(\d+-\d+-\d+\s\d+:\d+:\d+)"
      }
    ],
    "source": [
      {
        "method": "xpath",
        "args": ["//div[@id='chan_newsInfo']/text()"],
        "re": "来源:(.*)"
      }
    ],
    "website": [
      {
        "method": "value",
        "args": ["中华网"]
      }
    ]
  }
}

这里定义了 class 和 loader 属性,它们分别代表 Item 和 Item Loader 所使用的类。定义了 attrs 属性来定义每个字段的提取规则,例如,title 定义的每一项都包含一个 method 属性,它代表使用的提取方法,如 xpath 即代表调用 Item Loader 的 add_xpath() 方法。args 即参数,就是 add_xpath() 的第二个参数,即 XPath 表达式。针对 datetime 字段,我们还用了一次正则提取,所以这里还可以定义一个 re 参数来传递提取时所使用的正则表达式。

我们还要将这些配置之后动态加载到 parse_item() 方法里。最后,最重要的就是实现 parse_item() 方法,如下所示:

def parse_item(self, response):
    item = self.config.get('item')
    if item:
        cls = eval(item.get('class'))()
        loader = eval(item.get('loader'))(cls, response=response)
        # 动态获取属性配置
        for key, value in item.get('attrs').items():
            for extractor in value:
                if extractor.get('method') == 'xpath':
                    loader.add_xpath(key, *extractor.get('args'), **{'re': extractor.get('re')})
                if extractor.get('method') == 'css':
                    loader.add_css(key, *extractor.get('args'), **{'re': extractor.get('re')})
                if extractor.get('method') == 'value':
                    loader.add_value(key, *extractor.get('args'), **{'re': extractor.get('re')})
                if extractor.get('method') == 'attr':
                    loader.add_value(key, getattr(response, *extractor.get('args')))
        yield loader.load_item()

这里首先获取 Item 的配置信息,然后获取 class 的配置,将其初始化,初始化 Item Loader,遍历 Item 的各个属性依次进行提取。判断 method 字段,调用对应的处理方法进行处理。如 method 为 css,就调用 Item Loader 的 add_css() 方法进行提取。所有配置动态加载完毕之后,调用 load_item() 方法将 Item 提取出来。

重新运行程序,结果如图 13-27 所示。

图 13-27 运行结果

运行结果是完全相同的。

我们再回过头看一下 start_urls 的配置。这里 start_urls 只可以配置具体的链接。如果这些链接有 100 个、1000 个,我们总不能将所有的链接全部列出来吧?在某些情况下,start_urls 也需要动态配置。我们将 start_urls 分成两种,一种是直接配置 URL 列表,一种是调用方法生成,它们分别定义为 static 和 dynamic 类型。

本例中的 start_urls 很明显是 static 类型的,所以 start_urls 配置改写如下所示:

`json”start_urls”: {“type”:”static”,”value”: [“http://tech.china.com/articles/“]
}

如果 start_urls 是动态生成的,我们可以调用方法传参数,如下所示:
```json
"start_urls": {
  "type": "dynamic",
  "method": "china",
  "args": [5, 10]
}

这里 start_urls 定义为 dynamic 类型,指定方法为 urls_china(),然后传入参数 5 和 10,来生成第 5 到 10 页的链接。这样我们只需要实现该方法即可,统一新建一个 urls.py 文件,如下所示:

def china(start, end):
    for page in range(start, end + 1):
        yield 'http://tech.china.com/articles/index_' + str(page) + '.html'

其他站点可以自行配置。如某些链接需要用到时间戳,加密参数等,均可通过自定义方法实现。

接下来在 Spider 的 __init__() 方法中,start_urls 的配置改写如下所示:

from scrapyuniversal import urls

start_urls = config.get('start_urls')
if start_urls:
    if start_urls.get('type') == 'static':
        self.start_urls = start_urls.get('value')
    elif start_urls.get('type') == 'dynamic':
        self.start_urls = list(eval('urls.' + start_urls.get('method'))(*start_urls.get('args', [])))

这里通过判定 start_urls 的类型分别进行不同的处理,这样我们就可以实现 start_urls 的配置了。

至此,Spider 的设置、起始链接、属性、提取方法都已经实现了全部的可配置化。

综上所述,整个项目的配置包括如下内容。

  • spider,指定所使用的 Spider 的名称。
  • settings,可以专门为 Spider 定制配置信息,会覆盖项目级别的配置。
  • start_urls,指定爬虫爬取的起始链接。
  • allowed_domains,允许爬取的站点。
  • rules,站点的爬取规则。
  • item,数据的提取规则。

我们实现了 Scrapy 的通用爬虫,每个站点只需要修改 JSON 文件即可实现自由配置。

7. 本节代码

本节代码地址为:https://github.com/Python3WebSpider/ScrapyUniversal

8. 结语

本节介绍了 Scrapy 通用爬虫的实现。我们将所有配置抽离出来,每增加一个爬虫,就只需要增加一个 JSON 文件配置。之后我们只需要维护这些配置文件即可。如果要更加方便的管理,可以将规则存入数据库,再对接可视化管理页面即可。

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.10–Scrapy 通用爬虫

]]>
/8413.html/feed 2
[Python3网络爬虫开发实战] 13.9–Scrapy 对接 Splash /8410.html /8410.html#respond Fri, 06 Dec 2019 01:27:03 +0000 /?p=8410 13.9 Scrapy 对接 Splash

在上一节我们实现了 Scrapy 对接 Selenium 抓取淘宝商品的过程,这是一种抓取 JavaScript 动态渲染页面的方式。除了 Selenium,Splash 也可以实现同样的功能。本节我们来了解 Scrapy 对接 Splash 来进行页面抓取的方式。

1. 准备工作

请确保 Splash 已经正确安装并正常运行,同时安装好 Scrapy-Splash 库,如果没有安装可以参考第 1 章的安装说明。

2. 新建项目

首先新建一个项目,名为 scrapysplashtest,命令如下所示:

scrapy startproject scrapysplashtest

新建一个 Spider,命令如下所示:

scrapy genspider taobao www.taobao.com

3. 添加配置

可以参考 Scrapy-Splash 的配置说明进行一步步的配置,链接如下:https://github.com/scrapy-plugins/scrapy-splash#configuration

修改 settings.py,配置 SPLASH_URL。在这里我们的 Splash 是在本地运行的,所以可以直接配置本地的地址:

SPLASH_URL = 'http://localhost:8050'

如果 Splash 是在远程服务器运行的,那此处就应该配置为远程的地址。例如运行在 IP 为 120.27.34.25 的服务器上,则此处应该配置为:

SPLASH_URL = 'http://120.27.34.25:8050'

还需要配置几个 Middleware,代码如下所示:

DOWNLOADER_MIDDLEWARES = {
    'scrapy_splash.SplashCookiesMiddleware': 723,
    'scrapy_splash.SplashMiddleware': 725,
    'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 810,
}
SPIDER_MIDDLEWARES = {'scrapy_splash.SplashDeduplicateArgsMiddleware': 100,}

这里配置了三个 Downloader Middleware 和一个 Spider Middleware,这是 Scrapy-Splash 的核心部分。我们不再需要像对接 Selenium 那样实现一个 Downloader Middleware,Scrapy-Splash 库都为我们准备好了,直接配置即可。

还需要配置一个去重的类 DUPEFILTER_CLASS,代码如下所示:

DUPEFILTER_CLASS = 'scrapy_splash.SplashAwareDupeFilter'

最后配置一个 Cache 存储 HTTPCACHE_STORAGE,代码如下所示:

HTTPCACHE_STORAGE = 'scrapy_splash.SplashAwareFSCacheStorage'

4. 新建请求

配置完成之后,我们就可以利用 Splash 来抓取页面了。我们可以直接生成一个 SplashRequest 对象并传递相应的参数,Scrapy 会将此请求转发给 Splash,Splash 对页面进行渲染加载,然后再将渲染结果传递回来。此时 Response 的内容就是渲染完成的页面结果了,最后交给 Spider 解析即可。

我们来看一个示例,如下所示:

yield SplashRequest(url, self.parse_result,
    args={
        # optional; parameters passed to Splash HTTP API
        'wait': 0.5,
        # 'url' is prefilled from request url
        # 'http_method' is set to 'POST' for POST requests
        # 'body' is set to request body for POST requests
    },
    endpoint='render.json', # optional; default is render.html
    splash_url='<url>',     # optional; overrides SPLASH_URL
)

在这里构造了一个 SplashRequest 对象,前两个参数依然是请求的 URL 和回调函数,另外还可以通过 args 传递一些渲染参数,例如等待时间 wait 等,还可以根据 endpoint 参数指定渲染接口,另外还有更多的参数可以参考文档的说明:https://github.com/scrapy-plugins/scrapy-splash#requests

另外我们也可以生成 Request 对象,关于 Splash 的配置通过 meta 属性配置即可,代码如下:

yield scrapy.Request(url, self.parse_result, meta={
    'splash': {
        'args': {
            # set rendering arguments here
            'html': 1,
            'png': 1,
            # 'url' is prefilled from request url
            # 'http_method' is set to 'POST' for POST requests
            # 'body' is set to request body for POST requests
        },
        # optional parameters
        'endpoint': 'render.json',  # optional; default is render.json
        'splash_url': '<url>',      # optional; overrides SPLASH_URL
        'slot_policy': scrapy_splash.SlotPolicy.PER_DOMAIN,
        'splash_headers': {},       # optional; a dict with headers sent to Splash
        'dont_process_response': True, # optional, default is False
        'dont_send_headers': True,  # optional, default is False
        'magic_response': False,    # optional, default is True
    }
})

SplashRequest 对象通过 args 来配置和 Request 对象通过 meta 来配置,两种方式达到的效果是相同的。

本节我们要做的抓取是淘宝商品信息,涉及页面加载等待、模拟点击翻页等操作。我们可以首先定义一个 Lua 脚本,来实现页面加载、模拟点击翻页的功能,代码如下所示:

function main(splash, args)
  args = {
    url="https://s.taobao.com/search?q=iPad",
    wait=5,
    page=5
  }
  splash.images_enabled = false
  assert(splash:go(args.url))
  assert(splash:wait(args.wait))
  js = string.format("document.querySelector('#mainsrp-pager div.form> input').value=% d;document.querySelector('#mainsrp-pager div.form> span.btn.J_Submit').click()", args.page)
  splash:evaljs(js)
  assert(splash:wait(args.wait))
  return splash:png()
end

我们定义了三个参数:请求的链接 url、等待时间 wait、分页页码 page。然后禁用图片加载,请求淘宝的商品列表页面,通过 evaljs() 方法调用 JavaScript 代码,实现页码填充和翻页点击,最后返回页面截图。我们将脚本放到 Splash 中运行,正常获取到页面截图,如图 13-15 所示。

图 13-15 页面截图

翻页操作也成功实现,如图 13-16 所示即为当前页码,和我们传入的页码 page 参数是相同的。

图 13-16 翻页结果

我们只需要在 Spider 里用 SplashRequest 对接 Lua 脚本就好了,如下所示:

from scrapy import Spider
from urllib.parse import quote
from scrapysplashtest.items import ProductItem
from scrapy_splash import SplashRequest

script = """
function main(splash, args)
  splash.images_enabled = false
  assert(splash:go(args.url))
  assert(splash:wait(args.wait))
  js = string.format("document.querySelector('#mainsrp-pager div.form> input').value=% d;document.querySelector('#mainsrp-pager div.form> span.btn.J_Submit').click()", args.page)
  splash:evaljs(js)
  assert(splash:wait(args.wait))
  return splash:html()
end
"""

class TaobaoSpider(Spider):
    name = 'taobao'
    allowed_domains = ['www.taobao.com']
    base_url = 'https://s.taobao.com/search?q='

    def start_requests(self):
        for keyword in self.settings.get('KEYWORDS'):
            for page in range(1, self.settings.get('MAX_PAGE') + 1):
                url = self.base_url + quote(keyword)
                yield SplashRequest(url, callback=self.parse, endpoint='execute', args={'lua_source': script, 'page': page, 'wait': 7})

我们把 Lua 脚本定义成长字符串,通过 SplashRequest 的 args 来传递参数,接口修改为 execute。另外,args 参数里还有一个 lua_source 字段用于指定 Lua 脚本内容。这样我们就成功构造了一个 SplashRequest,对接 Splash 的工作就完成了。

其他的配置不需要更改,Item、Item Pipeline 等设置与上节对接 Selenium 的方式相同,parse() 回调函数也是完全一致的。

5. 运行

接下来,我们通过如下命令运行爬虫:

scrapy crawl taobao

运行结果如图 13-17 所示。

图 13-17 运行结果

由于 Splash 和 Scrapy 都支持异步处理,我们可以看到同时会有多个抓取成功的结果。在 Selenium 的对接过程中,每个页面渲染下载是在 Downloader Middleware 里完成的,所以整个过程是阻塞式的。Scrapy 会等待这个过程完成后再继续处理和调度其他请求,这影响了爬取效率。因此使用 Splash 的爬取效率比 Selenium 高很多。

最后我们再看看 MongoDB 的结果,如图 13-18 所示。

图 13-18 存储结果

结果同样正常保存到了 MongoDB 中。

6. 本节代码

本节代码地址:https://github.com/Python3WebSpider/ScrapySplashTest

7. 结语

在 Scrapy 中,建议使用 Splash 处理 JavaScript 动态渲染的页面。这样不会破坏 Scrapy 中的异步处理过程,会大大提高爬取效率。而且 Splash 的安装和配置比较简单,通过 API 调用的方式实现了模块分离,大规模爬取的部署也更加方便。

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.9–Scrapy 对接 Splash

]]>
/8410.html/feed 0
[Python3网络爬虫开发实战] 13.8–Scrapy 对接 Selenium /8397.html /8397.html#comments Thu, 05 Dec 2019 01:30:23 +0000 /?p=8397 13.8 Scrapy 对接 Selenium

Scrapy 抓取页面的方式和 requests 库类似,都是直接模拟 HTTP 请求,而 Scrapy 也不能抓取 JavaScript 动态渲染的页面。在前文中抓取 JavaScript 渲染的页面有两种方式。一种是分析 Ajax 请求,找到其对应的接口抓取,Scrapy 同样可以用此种方式抓取。另一种是直接用 Selenium 或 Splash 模拟浏览器进行抓取,我们不需要关心页面后台发生的请求,也不需要分析渲染过程,只需要关心页面最终结果即可,可见即可爬。那么,如果 Scrapy 可以对接 Selenium,那 Scrapy 就可以处理任何网站的抓取了。

1. 本节目标

本节我们来看看 Scrapy 框架如何对接 Selenium,以 PhantomJS 进行演示。我们依然抓取淘宝商品信息,抓取逻辑和前文中用 Selenium 抓取淘宝商品完全相同。

2. 准备工作

请确保 PhantomJS 和 MongoDB 已经安装好并可以正常运行,安装好 Scrapy、Selenium、PyMongo 库,安装方式可以参考第 1 章的安装说明。

3. 新建项目

首先新建项目,名为 scrapyseleniumtest,命令如下所示:

scrapy startproject scrapyseleniumtest

新建一个 Spider,命令如下所示:

scrapy genspider taobao www.taobao.com

修改 ROBOTSTXT_OBEY 为 False,如下所示:

ROBOTSTXT_OBEY = False

4. 定义 Item

首先定义 Item 对象,名为 ProductItem,代码如下所示:

from scrapy import Item, Field

class ProductItem(Item):

    collection = 'products'
    image = Field()
    price = Field()
    deal = Field()
    title = Field()
    shop = Field()
    location = Field()

这里我们定义了 6 个 Field,也就是 6 个字段,跟之前的案例完全相同。然后定义了一个 collection 属性,即此 Item 保存到 MongoDB 的 Collection 名称。

初步实现 Spider 的 start_requests() 方法,如下所示:

from scrapy import Request, Spider
from urllib.parse import quote
from scrapyseleniumtest.items import ProductItem

class TaobaoSpider(Spider):
    name = 'taobao'
    allowed_domains = ['www.taobao.com']
    base_url = 'https://s.taobao.com/search?q='

    def start_requests(self):
        for keyword in self.settings.get('KEYWORDS'):
            for page in range(1, self.settings.get('MAX_PAGE') + 1):
                url = self.base_url + quote(keyword)
                yield Request(url=url, callback=self.parse, meta={'page': page}, dont_filter=True)

首先定义了一个 base_url,即商品列表的 URL,其后拼接一个搜索关键字就是该关键字在淘宝的搜索结果商品列表页面。

关键字用 KEYWORDS 标识,定义为一个列表。最大翻页页码用 MAX_PAGE 表示。它们统一定义在 setttings.py 里面,如下所示:

KEYWORDS = ['iPad']
MAX_PAGE = 100

在 start_requests() 方法里,我们首先遍历了关键字,遍历了分页页码,构造并生成 Request。由于每次搜索的 URL 是相同的,所以分页页码用 meta 参数来传递,同时设置 dont_filter 不去重。这样爬虫启动的时候,就会生成每个关键字对应的商品列表的每一页的请求了。

5. 对接 Selenium

接下来我们需要处理这些请求的抓取。这次我们对接 Selenium 进行抓取,采用 Downloader Middleware 来实现。在 Middleware 里面的 process_request() 方法里对每个抓取请求进行处理,启动浏览器并进行页面渲染,再将渲染后的结果构造一个 HtmlResponse 对象返回。代码实现如下所示:

from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from scrapy.http import HtmlResponse
from logging import getLogger

class SeleniumMiddleware():
    def __init__(self, timeout=None, service_args=[]):
        self.logger = getLogger(__name__)
        self.timeout = timeout
        self.browser = webdriver.PhantomJS(service_args=service_args)
        self.browser.set_window_size(1400, 700)
        self.browser.set_page_load_timeout(self.timeout)
        self.wait = WebDriverWait(self.browser, self.timeout)

    def __del__(self):
        self.browser.close()

    def process_request(self, request, spider):
        """
        用 PhantomJS 抓取页面
        :param request: Request 对象
        :param spider: Spider 对象
        :return: HtmlResponse
        """
        self.logger.debug('PhantomJS is Starting')
        page = request.meta.get('page', 1)
        try:
            self.browser.get(request.url)
            if page > 1:
                input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '#mainsrp-pager div.form> input')))
                submit = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, '#mainsrp-pager div.form> span.btn.J_Submit')))
                input.clear()
                input.send_keys(page)
                submit.click()
            self.wait.until(EC.text_to_be_present_in_element((By.CSS_SELECTOR, '#mainsrp-pager li.item.active> span'), str(page)))
            self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.m-itemlist .items .item')))
            return HtmlResponse(url=request.url, body=self.browser.page_source, request=request, encoding='utf-8', status=200)
        except TimeoutException:
            return HtmlResponse(url=request.url, status=500, request=request)

    @classmethod
    def from_crawler(cls, crawler):
        return cls(timeout=crawler.settings.get('SELENIUM_TIMEOUT'),
                   service_args=crawler.settings.get('PHANTOMJS_SERVICE_ARGS'))

首先我们在 __init__() 里对一些对象进行初始化,包括 PhantomJS、WebDriverWait 等对象,同时设置页面大小和页面加载超时时间。在 process_request() 方法中,我们通过 Request 的 meta 属性获取当前需要爬取的页码,调用 PhantomJS 对象的 get() 方法访问 Request 的对应的 URL。这就相当于从 Request 对象里获取请求链接,然后再用 PhantomJS 加载,而不再使用 Scrapy 里的 Downloader。

随后的处理等待和翻页的方法在此不再赘述,和前文的原理完全相同。最后,页面加载完成之后,我们调用 PhantomJS 的 page_source 属性即可获取当前页面的源代码,然后用它来直接构造并返回一个 HtmlResponse 对象。构造这个对象的时候需要传入多个参数,如 url、body 等,这些参数实际上就是它的基础属性。可以在官方文档查看 HtmlResponse 对象的结构:https://doc.scrapy.org/en/latest/topics/request-response.html,这样我们就成功利用 PhantomJS 来代替 Scrapy 完成了页面的加载,最后将 Response 返回即可。

有人可能会纳闷:为什么实现这么一个 Downloader Middleware 就可以了?之前的 Request 对象怎么办?Scrapy 不再处理了吗?Response 返回后又传递给了谁?

是的,Request 对象到这里就不会再处理了,也不会再像以前一样交给 Downloader 下载。Response 会直接传给 Spider 进行解析。

我们需要回顾一下 Downloader Middleware 的 process_request() 方法的处理逻辑,内容如下所示:

当 process_request() 方法返回 Response 对象的时候,更低优先级的 Downloader Middleware 的 process_request() 和 process_exception() 方法就不会被继续调用了,转而开始执行每个 Downloader Middleware 的 process_response() 方法,调用完毕之后直接将 Response 对象发送给 Spider 来处理。

这里直接返回了一个 HtmlResponse 对象,它是 Response 的子类,返回之后便顺次调用每个 Downloader Middleware 的 process_response() 方法。而在 process_response() 中我们没有对其做特殊处理,它会被发送给 Spider,传给 Request 的回调函数进行解析。

到现在,我们应该能了解 Downloader Middleware 实现 Selenium 对接的原理了。

在 settings.py 里,我们设置调用刚才定义的 SeleniumMiddleware、设置等待超时变量 SELENIUM_TIMEOUT、设置 PhantomJS 配置参数 PHANTOMJS_SERVICE_ARGS,如下所示:

DOWNLOADER_MIDDLEWARES = {'scrapyseleniumtest.middlewares.SeleniumMiddleware': 543,}

6. 解析页面

Response 对象就会回传给 Spider 内的回调函数进行解析。所以下一步我们就实现其回调函数,对网页来进行解析,代码如下所示:

def parse(self, response):
    products = response.xpath('//div[@id="mainsrp-itemlist"]//div[@class="items"][1]//div[contains(@class, "item")]')
    for product in products:
        item = ProductItem()
        item['price'] = ''.join(product.xpath('.//div[contains(@class, "price")]//text()').extract()).strip()
        item['title'] = ''.join(product.xpath('.//div[contains(@class, "title")]//text()').extract()).strip()
        item['shop'] = ''.join(product.xpath('.//div[contains(@class, "shop")]//text()').extract()).strip()
        item['image'] = ''.join(product.xpath('.//div[@class="pic"]//img[contains(@class, "img")]/@data-src').extract()).strip()
        item['deal'] = product.xpath('.//div[contains(@class, "deal-cnt")]//text()').extract_first()
        item['location'] = product.xpath('.//div[contains(@class, "location")]//text()').extract_first()
        yield item

在这里我们使用 XPath 进行解析,调用 response 变量的 xpath() 方法即可。首先我们传递选取所有商品对应的 XPath,可以匹配所有商品,随后对结果进行遍历,依次选取每个商品的名称、价格、图片等内容,构造并返回一个 ProductItem 对象。

7. 存储结果

最后我们实现一个 Item Pipeline,将结果保存到 MongoDB,如下所示:

import pymongo

class MongoPipeline(object):
    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(mongo_uri=crawler.settings.get('MONGO_URI'), mongo_db=crawler.settings.get('MONGO_DB'))

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def process_item(self, item, spider):
        self.db[item.collection].insert(dict(item))
        return item

    def close_spider(self, spider):
        self.client.close()

此实现和前文中存储到 MongoDB 的方法完全一致,原理不再赘述。记得在 settings.py 中开启它的调用,如下所示:

ITEM_PIPELINES = {'scrapyseleniumtest.pipelines.MongoPipeline': 300,}

其中,MONGO_URI 和 MONGO_DB 的定义如下所示:

MONGO_URI = 'localhost'
MONGO_DB = 'taobao'

8. 运行

整个项目就完成了,执行如下命令启动抓取即可:

scrapy crawl taobao

运行结果如图 13-13 所示:

图 13-13 运行结果

再查看一下 MongoDB,结果如图 13-14 所示:

图 13-14 MongoDB 结果

这样我们便成功在 Scrapy 中对接 Selenium 并实现了淘宝商品的抓取。

9. 本节代码

本节代码地址为:https://github.com/Python3WebSpider/ScrapySeleniumTest

10. 结语

我们通过改写 Downloader Middleware 的方式实现了 Selenium 的对接。但这种方法其实是阻塞式的,也就是说这样就破坏了 Scrapy 异步处理的逻辑,速度会受到影响。为了不破坏其异步加载逻辑,我们可以使用 Splash 实现。下一节我们再来看看 Scrapy 对接 Splash 的方式。

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.8–Scrapy 对接 Selenium

]]>
/8397.html/feed 2
[Python3网络爬虫开发实战] 13.7–Item Pipeline 的用法 /8394.html /8394.html#comments Thu, 05 Dec 2019 01:26:32 +0000 /?p=8394 13.7 Item Pipeline 的用法

Item Pipeline 是项目管道。在前面我们已经了解了 Item Pipeline 的基本用法,本节我们再作详细了解它的用法。

首先我们看看 Item Pipeline 在 Scrapy 中的架构,如图 13-1 所示。

图中的最左侧即为 Item Pipeline,它的调用发生在 Spider 产生 Item 之后。当 Spider 解析完 Response 之后,Item 就会传递到 Item Pipeline,被定义的 Item Pipeline 组件会顺次调用,完成一连串的处理过程,比如数据清洗、存储等。

它的主要功能有:

  • 清洗 HTML 数据
  • 验证爬取数据,检查爬取字段
  • 查重并丢弃重复内容
  • 将爬取结果储存到数据库

1. 核心方法

我们可以自定义 Item Pipeline,只需要实现指定的方法就好,其中必须要实现的一个方法是:

  • process_item(item, spider)

另外还有几个比较实用的方法,它们分别是:

  • open_spider(spider)
  • close_spider(spider)
  • from_crawler(cls, crawler)

下面我们对这几个方法的用法作下详细的介绍:

process_item(item, spider)

process_item() 是必须要实现的方法,被定义的 Item Pipeline 会默认调用这个方法对 Item 进行处理。比如,我们可以进行数据处理或者将数据写入到数据库等操作。它必须返回 Item 类型的值或者抛出一个 DropItem 异常。

process_item() 方法的参数有如下两个。

  • item,是 Item 对象,即被处理的 Item
  • spider,是 Spider 对象,即生成该 Item 的 Spider

下面对该方法的返回类型归纳如下:

  • 如果返回的是 Item 对象,那么此 Item 会接着被低优先级的 Item Pipeline 的 process_item() 方法进行处理,直到所有的方法被调用完毕。
  • 如果抛出的是 DropItem 异常,那么此 Item 就会被丢弃,不再进行处理。

open_spider(self, spider)

open_spider() 方法是在 Spider 开启的时候被自动调用的,在这里我们可以做一些初始化操作,如开启数据库连接等。其中参数 spider 就是被开启的 Spider 对象。

close_spider(spider)

close_spider() 方法是在 Spider 关闭的时候自动调用的,在这里我们可以做一些收尾工作,如关闭数据库连接等,其中参数 spider 就是被关闭的 Spider 对象。

from_crawler(cls, crawler)

from_crawler() 方法是一个类方法,用 @classmethod 标识,是一种依赖注入的方式。它的参数是 crawler,通过 crawler 对象,我们可以拿到 Scrapy 的所有核心组件,如全局配置的每个信息,然后创建一个 Pipeline 实例。参数 cls 就是 Class,最后返回一个 Class 实例。

下面我们用一个实例来加深对 Item Pipeline 用法的理解。

2. 本节目标

我们以爬取 360 摄影美图为例,来分别实现 MongoDB 存储、MySQL 存储、Image 图片存储的三个 Pipeline。

3. 准备工作

请确保已经安装好 MongoDB 和 MySQL 数据库,安装好 Python 的 PyMongo、PyMySQL、Scrapy 框架,另外需要安装 pillow 图像处理库,如没有安装可以参考第 1 章的安装说明。

4. 抓取分析

我们这次爬取的目标网站为:https://image.so.com。打开此页面,切换到摄影页面,网页中呈现了许许多多的摄影美图。我们打开浏览器开发者工具,过滤器切换到 XHR 选项,然后下拉页面,可以看到下面就会呈现许多 Ajax 请求,如图 13-6 所示。

图 13-6 请求列表

我们查看一个请求的详情,观察返回的数据结构,如图 13-7 所示。

图 13-7 返回结果

返回格式是 JSON。其中 list 字段就是一张张图片的详情信息,包含了 30 张图片的 ID、名称、链接、缩略图等信息。另外观察 Ajax 请求的参数信息,有一个参数 sn 一直在变化,这个参数很明显就是偏移量。当 sn 为 30 时,返回的是前 30 张图片,sn 为 60 时,返回的就是第 31~60 张图片。另外,ch 参数是摄影类别,listtype 是排序方式,temp 参数可以忽略。

所以我们抓取时只需要改变 sn 的数值就好了。

下面我们用 Scrapy 来实现图片的抓取,将图片的信息保存到 MongoDB、MySQL,同时将图片存储到本地。

5. 新建项目

首先新建一个项目,命令如下:

scrapy startproject images360

接下来新建一个 Spider,命令如下:

scrapy genspider images images.so.com

这样我们就成功创建了一个 Spider。

6. 构造请求

接下来定义爬取的页数。比如爬取 50 页、每页 30 张,也就是 1500 张图片,我们可以先在 settings.py 里面定义一个变量 MAX_PAGE,添加如下定义:

MAX_PAGE = 50

定义 start_requests() 方法,用来生成 50 次请求,如下所示:

def start_requests(self):
    data = {'ch': 'photography', 'listtype': 'new'}
    base_url = 'https://image.so.com/zj?'
    for page in range(1, self.settings.get('MAX_PAGE') + 1):
        data['sn'] = page * 30
        params = urlencode(data)
        url = base_url + params
        yield Request(url, self.parse)

在这里我们首先定义了初始的两个参数,sn 参数是遍历循环生成的。然后利用 urlencode() 方法将字典转化为 URL 的 GET 参数,构造出完整的 URL,构造并生成 Request。

还需要引入 scrapy.Request 和 urllib.parse 模块,如下所示:

from scrapy import Spider, Request
from urllib.parse import urlencode

再修改 settings.py 中的 ROBOTSTXT_OBEY 变量,将其设置为 False,否则无法抓取,如下所示:

ROBOTSTXT_OBEY = False

运行爬虫,即可以看到链接都请求成功,执行命令如下所示:

scrapy crawl images

运行示例结果如图 13-8 所示。

图 13-8 运行结果

所有请求的状态码都是 200,这就证明图片信息爬取成功了。

7. 提取信息

首先定义一个 Item,叫作 ImageItem,如下所示:

from scrapy import Item, Field
class ImageItem(Item):
    collection = table = 'images'
    id = Field()
    url = Field()
    title = Field()
    thumb = Field()

在这里我们定义了 4 个字段,包括图片的 ID、链接、标题、缩略图。另外还有两个属性 collection 和 table,都定义为 images 字符串,分别代表 MongoDB 存储的 Collection 名称和 MySQL 存储的表名称。

接下来我们提取 Spider 里有关信息,将 parse() 方法改写为如下所示:

def parse(self, response):
    result = json.loads(response.text)
    for image in result.get('list'):
        item = ImageItem()
        item['id'] = image.get('imageid')
        item['url'] = image.get('qhimg_url')
        item['title'] = image.get('group_title')
        item['thumb'] = image.get('qhimg_thumb_url')
        yield item

首先解析 JSON,遍历其 list 字段,取出一个个图片信息,然后再对 ImageItem 赋值,生成 Item 对象。

这样我们就完成了信息的提取。

8. 存储信息

接下来我们需要将图片的信息保存到 MongoDB、MySQL,同时将图片保存到本地。

MongoDB

首先确保 MongoDB 已经正常安装并且正常运行。

我们用一个 MongoPipeline 将信息保存到 MongoDB,在 pipelines.py 里添加如下类的实现:

import pymongo

class MongoPipeline(object):
    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DB')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def process_item(self, item, spider):
        self.db[item.collection].insert(dict(item))
        return item

    def close_spider(self, spider):
        self.client.close()

这里需要用到两个变量,MONGO_URI 和 MONGO_DB,即存储到 MongoDB 的链接地址和数据库名称。我们在 settings.py 里添加这两个变量,如下所示:

MONGO_URI = 'localhost'
MONGO_DB = 'images360'

这样一个保存到 MongoDB 的 Pipeline 的就创建好了。这里最主要的方法是 process_item() 方法,直接调用 Collection 对象的 insert() 方法即可完成数据的插入,最后返回 Item 对象。

MySQL

首先确保 MySQL 已经正确安装并且正常运行。

新建一个数据库,名字还是 images360,SQL 语句如下所示:

CREATE DATABASE images360 DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci

新建一个数据表,包含 id、url、title、thumb 四个字段,SQL 语句如下所示:

CREATE TABLE images (id VARCHAR(255) NULL PRIMARY KEY, url VARCHAR(255) NULL , title VARCHAR(255) NULL , thumb VARCHAR(255) NULL)

执行完 SQL 语句之后,我们就成功创建好了数据表。接下来就可以往表里存储数据了。

接下来我们实现一个 MySQLPipeline,代码如下所示:

import pymysql

class MysqlPipeline():
    def __init__(self, host, database, user, password, port):
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port

    @classmethod
    def from_crawler(cls, crawler):
        return cls(host=crawler.settings.get('MYSQL_HOST'),
            database=crawler.settings.get('MYSQL_DATABASE'),
            user=crawler.settings.get('MYSQL_USER'),
            password=crawler.settings.get('MYSQL_PASSWORD'),
            port=crawler.settings.get('MYSQL_PORT'),
        )

    def open_spider(self, spider):
        self.db = pymysql.connect(self.host, self.user, self.password, self.database, charset='utf8', port=self.port)
        self.cursor = self.db.cursor()

    def close_spider(self, spider):
        self.db.close()

    def process_item(self, item, spider):
        data = dict(item)
        keys = ', '.join(data.keys())
        values = ', '.join(['% s'] * len(data))
        sql = 'insert into % s (% s) values (% s)' % (item.table, keys, values)
        self.cursor.execute(sql, tuple(data.values()))
        self.db.commit()
        return item

如前所述,这里用到的数据插入方法是一个动态构造 SQL 语句的方法。

这里又需要几个 MySQL 的配置,我们在 settings.py 里添加几个变量,如下所示:

MYSQL_HOST = 'localhost'
MYSQL_DATABASE = 'images360'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = '123456'

这里分别定义了 MySQL 的地址、数据库名称、端口、用户名、密码。

这样,MySQL Pipeline 就完成了。

Image Pipeline

Scrapy 提供了专门处理下载的 Pipeline,包括文件下载和图片下载。下载文件和图片的原理与抓取页面的原理一样,因此下载过程支持异步和多线程,下载十分高效。下面我们来看看具体的实现过程。

官方文档地址为:https://doc.scrapy.org/en/latest/topics/media-pipeline.html

首先定义存储文件的路径,需要定义一个 IMAGES_STORE 变量,在 settings.py 中添加如下代码:

IMAGES_STORE = './images'

在这里我们将路径定义为当前路径下的 images 子文件夹,即下载的图片都会保存到本项目的 images 文件夹中。

内置的 ImagesPipeline 会默认读取 Item 的 image_urls 字段,并认为该字段是一个列表形式,它会遍历 Item 的 image_urls 字段,然后取出每个 URL 进行图片下载。

但是现在生成的 Item 的图片链接字段并不是 image_urls 字段表示的,也不是列表形式,而是单个的 URL。所以为了实现下载,我们需要重新定义下载的部分逻辑,即要自定义 ImagePipeline,继承内置的 ImagesPipeline,重写几个方法。

我们定义 ImagePipeline,如下所示:

from scrapy import Request
from scrapy.exceptions import DropItem
from scrapy.pipelines.images import ImagesPipeline

class ImagePipeline(ImagesPipeline):
    def file_path(self, request, response=None, info=None):
        url = request.url
        file_name = url.split('/')[-1]
        return file_name

    def item_completed(self, results, item, info):
        image_paths = [x['path'] for ok, x in results if ok]
        if not image_paths:
            raise DropItem('Image Downloaded Failed')
        return item

    def get_media_requests(self, item, info):
        yield Request(item['url'])

在这里我们实现了 ImagePipeline,继承 Scrapy 内置的 ImagesPipeline,重写下面几个方法。

  • get_media_requests()。它的第一个参数 item 是爬取生成的 Item 对象。我们将它的 url 字段取出来,然后直接生成 Request 对象。此 Request 加入到调度队列,等待被调度,执行下载。
  • file_path()。它的第一个参数 request 就是当前下载对应的 Request 对象。这个方法用来返回保存的文件名,直接将图片链接的最后一部分当作文件名即可。它利用 split() 函数分割链接并提取最后一部分,返回结果。这样此图片下载之后保存的名称就是该函数返回的文件名。
  • item_completed(),它是当单个 Item 完成下载时的处理方法。因为并不是每张图片都会下载成功,所以我们需要分析下载结果并剔除下载失败的图片。如果某张图片下载失败,那么我们就不需保存此 Item 到数据库。该方法的第一个参数 results 就是该 Item 对应的下载结果,它是一个列表形式,列表每一个元素是一个元组,其中包含了下载成功或失败的信息。这里我们遍历下载结果找出所有成功的下载列表。如果列表为空,那么该 Item 对应的图片下载失败,随即抛出异常 DropItem,该 Item 忽略。否则返回该 Item,说明此 Item 有效。

现在为止,三个 Item Pipeline 的定义就完成了。最后只需要启用就可以了,修改 settings.py,设置 ITEM_PIPELINES,如下所示:

ITEM_PIPELINES = {
    'images360.pipelines.ImagePipeline': 300,
    'images360.pipelines.MongoPipeline': 301,
    'images360.pipelines.MysqlPipeline': 302,
}

这里注意调用的顺序。我们需要优先调用 ImagePipeline 对 Item 做下载后的筛选,下载失败的 Item 就直接忽略,它们就不会保存到 MongoDB 和 MySQL 里。随后再调用其他两个存储的 Pipeline,这样就能确保存入数据库的图片都是下载成功的。

接下来运行程序,执行爬取,如下所示:

scrapy crawl images

爬虫一边爬取一边下载,下载速度非常快,对应的输出日志如图 13-9 所示。

图 13-9 输出日志

查看本地 images 文件夹,发现图片都已经成功下载,如图 13-10 所示。

图 13-10 下载结果

查看 MySQL,下载成功的图片信息也已成功保存,如图 13-11 所示。

图 13-11 MySQL 结果

查看 MongoDB,下载成功的图片信息同样已成功保存,如图 13-12 所示。

图 13-12 MongoDB 结果

这样我们就可以成功实现图片的下载并把图片的信息存入数据库了。

9. 本节代码

本节代码地址为:https://github.com/Python3WebSpider/Images360

10. 结语

Item Pipeline 是 Scrapy 非常重要的组件,数据存储几乎都是通过此组件实现的。请读者认真掌握此内容。

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.7–Item Pipeline 的用法

]]>
/8394.html/feed 1
[Python3网络爬虫开发实战] 13.6–Spider Middleware 的用法 /8385.html /8385.html#respond Wed, 04 Dec 2019 07:18:30 +0000 /?p=8385 13.6 Spider Middleware 的用法

Spider Middleware 是介入到 Scrapy 的 Spider 处理机制的钩子框架。我们首先来看看它的架构,如图 13-1 所示。

当 Downloader 生成 Response 之后,Response 会被发送给 Spider,在发送给 Spider 之前,Response 会首先经过 Spider Middleware 处理,当 Spider 处理生成 Item 和 Request 之后,Item 和 Request 还会经过 Spider Middleware 的处理。

Spider Middleware 有如下三个作用。

  • 我们可以在 Downloader 生成的 Response 发送给 Spider 之前,也就是在 Response 发送给 Spider 之前对 Response 进行处理。
  • 我们可以在 Spider 生成的 Request 发送给 Scheduler 之前,也就是在 Request 发送给 Scheduler 之前对 Request 进行处理。
  • 我们可以在 Spider 生成的 Item 发送给 Item Pipeline 之前,也就是在 Item 发送给 Item Pipeline 之前对 Item 进行处理。

1. 使用说明

需要说明的是,Scrapy 其实已经提供了许多 Spider Middleware,它们被 SPIDER_MIDDLEWARES_BASE 这个变量所定义。

SPIDER_MIDDLEWARES_BASE 变量的内容如下:

{
    'scrapy.spidermiddlewares.httperror.HttpErrorMiddleware': 50,
    'scrapy.spidermiddlewares.offsite.OffsiteMiddleware': 500,
    'scrapy.spidermiddlewares.referer.RefererMiddleware': 700,
    'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware': 800,
    'scrapy.spidermiddlewares.depth.DepthMiddleware': 900,
}

和 Downloader Middleware 一样,Spider Middleware 首先加入到 SPIDER_MIDDLEWARES 设置中,该设置会和 Scrapy 中 SPIDER_MIDDLEWARES_BASE 定义的 Spider Middleware 合并。然后根据键值的数字优先级排序,得到一个有序列表。第一个 Middleware 是最靠近引擎的,最后一个 Middleware 是最靠近 Spider 的。

2. 核心方法

Scrapy 内置的 Spider Middleware 为 Scrapy 提供了基础的功能。如果我们想要扩展其功能,只需要实现某几个方法即可。

每个 Spider Middleware 都定义了以下一个或多个方法的类,核心方法有如下 4 个。

  • process_spider_input(response, spider)
  • process_spider_output(response, result, spider)
  • process_spider_exception(response, exception, spider)
  • process_start_requests(start_requests, spider)

只需要实现其中一个方法就可以定义一个 Spider Middleware。下面我们来看看这 4 个方法的详细用法。

process_spider_input(response, spider)

当 Response 通过 Spider Middleware 时,该方法被调用,处理该 Response。

方法的参数有两个:

  • response,即 Response 对象,即被处理的 Response
  • spider,即 Spider 对象,即该 response 对应的 Spider

process_spider_input() 应该返回 None 或者抛出一个异常。

  • 如果其返回 None ,Scrapy 将会继续处理该 Response,调用所有其他的 Spider Middleware 直到 Spider 处理该 Response。
  • 如果其抛出一个异常,Scrapy 将不会调用任何其他 Spider Middlewar e 的 process_spider_input() 方法,并调用 Request 的 errback() 方法。 errback 的输出将会以另一个方向被重新输入到中间件中,使用 process_spider_output() 方法来处理,当其抛出异常时则调用 process_spider_exception() 来处理。

process_spider_output(response, result, spider)

当 Spider 处理 Response 返回结果时,该方法被调用。

方法的参数有三个:

  • response,即 Response 对象,即生成该输出的 Response
  • result,包含 Request 或 Item 对象的可迭代对象,即 Spider 返回的结果
  • spider,即 Spider 对象,即其结果对应的 Spider

process_spider_output() 必须返回包含 Request 或 Item 对象的可迭代对象。

process_spider_exception(response, exception, spider)

当 Spider 或 Spider Middleware 的 process_spider_input() 方法抛出异常时, 该方法被调用。

方法的参数有三个:

  • response,即 Response 对象,即异常被抛出时被处理的 Response
  • exception,即 Exception 对象,被抛出的异常
  • spider,即 Spider 对象,即抛出该异常的 Spider

process_spider_exception() 必须要么返回 None , 要么返回一个包含 Response 或 Item 对象的可迭代对象。

  • 如果其返回 None ,Scrapy 将继续处理该异常,调用其他 Spider Middleware 中的 process_spider_exception() 方法,直到所有 Spider Middleware 都被调用。
  • 如果其返回一个可迭代对象,则其他 Spider Middleware 的 process_spider_output() 方法被调用, 其他的 process_spider_exception() 将不会被调用。

process_start_requests(start_requests, spider)

该方法以 Spider 启动的 Request 为参数被调用,执行的过程类似于 process_spider_output() ,只不过其没有相关联的 Response 并且必须返回 Request。

方法的参数有两个:

  • start_requests,即包含 Request 的可迭代对象,即 Start Requests
  • spider,即 Spider 对象,即 Start Requests 所属的 Spider

其必须返回另一个包含 Request 对象的可迭代对象。

3. 结语

本节介绍了 Spider Middleware 的基本原理和自定义 Spider Middleware 的方法。Spider Middleware 使用的频率不如 Downloader Middleware 的高,在必要的情况下它可以用来方便数据的处理。

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.6–Spider Middleware 的用法

]]>
/8385.html/feed 0
[Python3网络爬虫开发实战] 13.5–Downloader Middleware 的用法 /8381.html /8381.html#respond Wed, 04 Dec 2019 07:11:33 +0000 /?p=8381 13.5 Downloader Middleware 的用法

Downloader Middleware 即下载中间件,它是处于 Scrapy 的 Request 和 Response 之间的处理模块。我们首先来看看它的架构,如图 13-1 所示。

Scheduler 从队列中拿出一个 Request 发送给 Downloader 执行下载,这个过程会经过 Downloader Middleware 的处理。另外,当 Downloader 将 Request 下载完成得到 Response 返回给 Spider 时会再次经过 Downloader Middleware 处理。

也就是说,Downloader Middleware 在整个架构中起作用的位置是以下两个。

  • 在 Scheduler 调度出队列的 Request 发送给 Downloader 下载之前,也就是我们可以在 Request 执行下载之前对其进行修改。
  • 在下载后生成的 Response 发送给 Spider 之前,也就是我们可以在生成 Resposne 被 Spider 解析之前对其进行修改。

Downloader Middleware 的功能十分强大,修改 User-Agent、处理重定向、设置代理、失败重试、设置 Cookies 等功能都需要借助它来实现。下面我们来了解一下 Downloader Middleware 的详细用法。

1. 使用说明

需要说明的是,Scrapy 其实已经提供了许多 Downloader Middleware,比如负责失败重试、自动重定向等功能的 Middleware,它们被 DOWNLOADER_MIDDLEWARES_BASE 变量所定义。

DOWNLOADER_MIDDLEWARES_BASE 变量的内容如下所示:

{
    'scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware': 100,
    'scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware': 300,
    'scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware': 350,
    'scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware': 400,
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': 500,
    'scrapy.downloadermiddlewares.retry.RetryMiddleware': 550,
    'scrapy.downloadermiddlewares.ajaxcrawl.AjaxCrawlMiddleware': 560,
    'scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware': 580,
    'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 590,
    'scrapy.downloadermiddlewares.redirect.RedirectMiddleware': 600,
    'scrapy.downloadermiddlewares.cookies.CookiesMiddleware': 700,
    'scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware': 750,
    'scrapy.downloadermiddlewares.stats.DownloaderStats': 850,
    'scrapy.downloadermiddlewares.httpcache.HttpCacheMiddleware': 900,
}

这是一个字典格式,字典的键名是 Scrapy 内置的 Downloader Middleware 的名称,键值代表了调用的优先级,优先级是一个数字,数字越小代表越靠近 Scrapy 引擎,数字越大代表越靠近 Downloader。每个 Downloader Middleware 都可以定义 process_request() 和 request_response() 方法来分别处理请求和响应,对于 process_request() 方法来说,优先级数字越小越先被调用,对于 process_response() 方法来说,优先级数字越大越先被调用。。

如果自己定义的 Downloader Middleware 要添加到项目里,DOWNLOADER_MIDDLEWARES_BASE 变量不能直接修改。Scrapy 提供了另外一个设置变量 DOWNLOADER_MIDDLEWARES,我们直接修改这个变量就可以添加自己定义的 Downloader Middleware,以及禁用 DOWNLOADER_MIDDLEWARES_BASE 里面定义的 Downloader Middleware。下面我们具体来看看 Downloader Middleware 的使用方法。

2. 核心方法

Scrapy 内置的 Downloader Middleware 为 Scrapy 提供了基础的功能,但在项目实战中我们往往需要单独定义 Downloader Middleware。不用担心,这个过程非常简单,我们只需要实现某几个方法即可。

每个 Downloader Middleware 都定义了一个或多个方法的类,核心的方法有如下三个。

  • process_request(request, spider)
  • process_response(request, response, spider)
  • process_exception(request, exception, spider)

我们只需要实现至少一个方法,就可以定义一个 Downloader Middleware。下面我们来看看这三个方法的详细用法。

process_request(request, spider)

Request 被 Scrapy 引擎调度给 Downloader 之前,process_request() 方法就会被调用,也就是在 Request 从队列里调度出来到 Downloader 下载执行之前,我们都可以用 process_request() 方法对 Request 进行处理。方法的返回值必须为 None、Response 对象、Request 对象之一,或者抛出 IgnoreRequest 异常。

process_request() 方法的参数有如下两个。

  • request,即 Request 对象,即被处理的 Request
  • spider,即 Spdier 对象,即此 Request 对应的 Spider

返回类型不同,产生的效果也不同。下面归纳一下不同的返回情况。

  • 当返回是 None 时,Scrapy 将继续处理该 Request,接着执行其他 Downloader Middleware 的 process_request() 方法,一直到 Downloader 把 Request 执行后得到 Response 才结束。这个过程其实就是修改 Request 的过程,不同的 Downloader Middleware 按照设置的优先级顺序依次对 Request 进行修改,最后送至 Downloader 执行。
  • 当返回为 Response 对象时,更低优先级的 Downloader Middleware 的 process_request() 和 process_exception() 方法就不会被继续调用,每个 Downloader Middleware 的 process_response() 方法转而被依次调用。调用完毕之后,直接将 Response 对象发送给 Spider 来处理。
  • 当返回为 Request 对象时,更低优先级的 Downloader Middleware 的 process_request() 方法会停止执行。这个 Request 会重新放到调度队列里,其实它就是一个全新的 Request,等待被调度。如果被 Scheduler 调度了,那么所有的 Downloader Middleware 的 process_request() 方法会被重新按照顺序执行。
  • 如果 IgnoreRequest 异常抛出,则所有的 Downloader Middleware 的 process_exception() 方法会依次执行。如果没有一个方法处理这个异常,那么 Request 的 errorback() 方法就会回调。如果该异常还没有被处理,那么它便会被忽略。

process_response(request, response, spider)

Downloader 执行 Request 下载之后,会得到对应的 Response。Scrapy 引擎便会将 Response 发送给 Spider 进行解析。在发送之前,我们都可以用 process_response() 方法来对 Response 进行处理。方法的返回值必须为 Request 对象、Response 对象之一,或者抛出 IgnoreRequest 异常。

process_response() 方法的参数有如下三个。

  • request,是 Request 对象,即此 Response 对应的 Request。
  • response,是 Response 对象,即此被处理的 Response。
  • spider,是 Spider 对象,即此 Response 对应的 Spider。

下面对不同的返回情况做一下归纳:

  • 当返回为 Request 对象时,更低优先级的 Downloader Middleware 的 process_response() 方法不会继续调用。该 Request 对象会重新放到调度队列里等待被调度,它相当于一个全新的 Request。然后,该 Request 会被 process_request() 方法顺次处理。
  • 当返回为 Response 对象时,更低优先级的 Downloader Middleware 的 process_response() 方法会继续调用,继续对该 Response 对象进行处理。
  • 如果 IgnoreRequest 异常抛出,则 Request 的 errorback() 方法会回调。如果该异常还没有被处理,那么它便会被忽略。

process_exception(request, exception, spider)

当 Downloader 或 process_request() 方法抛出异常时,例如抛出 IgnoreRequest 异常,process_exception() 方法就会被调用。方法的返回值必须为 None、Response 对象、Request 对象之一。

process_exception() 方法的参数有如下三个。

  • request,即 Request 对象,即产生异常的 Request
  • exception,即 Exception 对象,即抛出的异常
  • spdier,即 Spider 对象,即 Request 对应的 Spider

下面归纳一下不同的返回值。

  • 当返回为 None 时,更低优先级的 Downloader Middleware 的 process_exception() 会被继续顺次调用,直到所有的方法都被调度完毕。
  • 当返回为 Response 对象时,更低优先级的 Downloader Middleware 的 process_exception() 方法不再被继续调用,每个 Downloader Middleware 的 process_response() 方法转而被依次调用。
  • 当返回为 Request 对象时,更低优先级的 Downloader Middleware 的 process_exception() 也不再被继续调用,该 Request 对象会重新放到调度队列里面等待被调度,它相当于一个全新的 Request。然后,该 Request 又会被 process_request() 方法顺次处理。

以上内容便是这三个方法的详细使用逻辑。在使用它们之前,请先对这三个方法的返回值的处理情况有一个清晰的认识。在自定义 Downloader Middleware 的时候,也一定要注意每个方法的返回类型。

下面我们用一个案例实战来加深一下对 Downloader Middleware 用法的理解。

3. 项目实战

新建一个项目,命令如下所示:

scrapy startproject scrapydownloadertest

新建了一个 Scrapy 项目,名为 scrapydownloadertest。进入项目,新建一个 Spider,命令如下所示:

scrapy genspider httpbin httpbin.org

新建了一个 Spider,名为 httpbin,源代码如下所示:

import scrapy
class HttpbinSpider(scrapy.Spider):
    name = 'httpbin'
    allowed_domains = ['httpbin.org']
    start_urls = ['http://httpbin.org/']

    def parse(self, response):
        pass
```接下来我们修改 start_urls 为:`['http://httpbin.org/']`。随后将 parse() 方法添加一行日志输出,将 response 变量的 text 属性输出出来,这样我们便可以看到 Scrapy 发送的 Request 信息了。

修改 Spider 内容如下所示:

```python
import scrapy

class HttpbinSpider(scrapy.Spider):
    name = 'httpbin'
    allowed_domains = ['httpbin.org']
    start_urls = ['http://httpbin.org/get']

    def parse(self, response):
        self.logger.debug(response.text)

接下来运行此 Spider,执行如下命令:

scrapy crawl httpbin

Scrapy 运行结果包含 Scrapy 发送的 Request 信息,内容如下所示:

{"args": {}, 
  "headers": {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", 
    "Accept-Encoding": "gzip,deflate,br", 
    "Accept-Language": "en", 
    "Connection": "close", 
    "Host": "httpbin.org", 
    "User-Agent": "Scrapy/1.4.0 (+http://scrapy.org)"
  }, 
  "origin": "60.207.237.85", 
  "url": "http://httpbin.org/get"
}

我们观察一下 Headers,Scrapy 发送的 Request 使用的 User-Agent 是 Scrapy/1.4.0(+http://scrapy.org),这其实是由 Scrapy 内置的 UserAgentMiddleware 设置的,UserAgentMiddleware 的源码如下所示:

from scrapy import signals

class UserAgentMiddleware(object):
    def __init__(self, user_agent='Scrapy'):
        self.user_agent = user_agent

    @classmethod
    def from_crawler(cls, crawler):
        o = cls(crawler.settings['USER_AGENT'])
        crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
        return o

    def spider_opened(self, spider):
        self.user_agent = getattr(spider, 'user_agent', self.user_agent)

    def process_request(self, request, spider):
        if self.user_agent:
            request.headers.setdefault(b'User-Agent', self.user_agent)

在 from_crawler() 方法中,首先尝试获取 settings 里面 USER_AGENT,然后把 USER_AGENT 传递给init() 方法进行初始化,其参数就是 user_agent。如果没有传递 USER_AGENT 参数就默认设置为 Scrapy 字符串。我们新建的项目没有设置 USER_AGENT,所以这里的 user_agent 变量就是 Scrapy。接下来,在 process_request() 方法中,将 user-agent 变量设置为 headers 变量的一个属性,这样就成功设置了 User-Agent。因此,User-Agent 就是通过此 Downloader Middleware 的 process_request() 方法设置的。

修改请求时的 User-Agent 可以有两种方式:一是修改 settings 里面的 USER_AGENT 变量;二是通过 Downloader Middleware 的 process_request() 方法来修改。

第一种方法非常简单,我们只需要在 setting.py 里面加一行 USER_AGENT 的定义即可:

USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'

一般推荐使用此方法来设置。但是如果想设置得更灵活,比如设置随机的 User-Agent,那就需要借助 Downloader Middleware 了。所以接下来我们用 Downloader Middleware 实现一个随机 User-Agent 的设置。

在 middlewares.py 里面添加一个 RandomUserAgentMiddleware 的类,如下所示:

import random

class RandomUserAgentMiddleware():
    def __init__(self):
        self.user_agents = ['Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)',
            'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.2 (KHTML, like Gecko) Chrome/22.0.1216.0 Safari/537.2',
            'Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:15.0) Gecko/20100101 Firefox/15.0.1'
        ]

    def process_request(self, request, spider):
        request.headers['User-Agent'] = random.choice(self.user_agents)

我们首先在类的 __init__() 方法中定义了三个不同的 User-Agent,并用一个列表来表示。接下来实现了 process_request() 方法,它有一个参数 request,我们直接修改 request 的属性即可。在这里我们直接设置了 request 对象的 headers 属性的 User-Agent,设置内容是随机选择的 User-Agent,这样一个 Downloader Middleware 就写好了。

不过,要使之生效我们还需要再去调用这个 Downloader Middleware。在 settings.py 中,将 DOWNLOADER_MIDDLEWARES 取消注释,并设置成如下内容:

DOWNLOADER_MIDDLEWARES = {'scrapydownloadertest.middlewares.RandomUserAgentMiddleware': 543,}

接下来我们重新运行 Spider,就可以看到 User-Agent 被成功修改为列表中所定义的随机的一个 User-Agent 了:

{"args": {}, 
  "headers": {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", 
    "Accept-Encoding": "gzip,deflate,br", 
    "Accept-Language": "en", 
    "Connection": "close", 
    "Host": "httpbin.org", 
    "User-Agent": "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)"
  }, 
  "origin": "60.207.237.85", 
  "url": "http://httpbin.org/get"
}

我们就通过实现 Downloader Middleware 并利用 process_request() 方法成功设置了随机的 User-Agent。

另外,Downloader Middleware 还有 process_response() 方法。Downloader 对 Request 执行下载之后会得到 Response,随后 Scrapy 引擎会将 Response 发送回 Spider 进行处理。但是在 Response 被发送给 Spider 之前,我们同样可以使用 process_response() 方法对 Response 进行处理。比如这里修改一下 Response 的状态码,在 RandomUserAgentMiddleware 添加如下代码:

def process_response(self, request, response, spider):
    response.status = 201
    return response

我们将 response 对象的 status 属性修改为 201,随后将 response 返回,这个被修改后的 Response 就会被发送到 Spider。

我们再在 Spider 里面输出修改后的状态码,在 parse() 方法中添加如下的输出语句:

self.logger.debug('Status Code: ' + str(response.status))

重新运行之后,控制台输出了如下内容:

[httpbin] DEBUG: Status Code: 201

可以发现,Response 的状态码成功修改了。

因此要想对 Response 进行后处理,就可以借助于 process_response() 方法。

另外还有一个 process_exception() 方法,它是用来处理异常的方法。如果需要异常处理的话,我们可以调用此方法。不过这个方法的使用频率相对低一些,在此不用实例演示。

4. 本节代码

本节源代码为:https://github.com/Python3WebSpider/ScrapyDownloaderTest

5. 结语

本节讲解了 Downloader Middleware 的基本用法。此组件非常重要,是做异常处理和应对反爬处理的核心。后面我们会在实战中应用此组件来处理代理、Cookies 等内容。

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.5–Downloader Middleware 的用法

]]>
/8381.html/feed 0
[Python3网络爬虫开发实战] 13.1–Scrapy 框架介绍 /8364.html /8364.html#respond Tue, 03 Dec 2019 03:42:17 +0000 /?p=8364 13.1 Scrapy 框架介绍

Scrapy 是一个基于 Twisted 的异步处理框架,是纯 Python 实现的爬虫框架,其架构清晰,模块之间的耦合程度低,可扩展性极强,可以灵活完成各种需求。我们只需要定制开发几个模块就可以轻松实现一个爬虫。

1. 架构介绍

首先我们来看下 Scrapy 框架的架构,如图 13-1 所示:

图 13-1 Scrapy 架构

它可以分为如下的几个部分。

  • Engine,引擎,用来处理整个系统的数据流处理,触发事务,是整个框架的核心。
  • Item,项目,它定义了爬取结果的数据结构,爬取的数据会被赋值成该对象。
  • Scheduler, 调度器,用来接受引擎发过来的请求并加入队列中,并在引擎再次请求的时候提供给引擎。
  • Downloader,下载器,用于下载网页内容,并将网页内容返回给蜘蛛。
  • Spiders,蜘蛛,其内定义了爬取的逻辑和网页的解析规则,它主要负责解析响应并生成提取结果和新的请求。
  • Item Pipeline,项目管道,负责处理由蜘蛛从网页中抽取的项目,它的主要任务是清洗、验证和存储数据。
  • Downloader Middlewares,下载器中间件,位于引擎和下载器之间的钩子框架,主要是处理引擎与下载器之间的请求及响应。
  • Spider Middlewares, 蜘蛛中间件,位于引擎和蜘蛛之间的钩子框架,主要工作是处理蜘蛛输入的响应和输出的结果及新的请求。

2. 数据流

Scrapy 中的数据流由引擎控制,其过程如下:

  • Engine 首先打开一个网站,找到处理该网站的 Spider 并向该 Spider 请求第一个要爬取的 URL。
  • Engine 从 Spider 中获取到第一个要爬取的 URL 并通过 Scheduler 以 Request 的形式调度。
  • Engine 向 Scheduler 请求下一个要爬取的 URL。
  • Scheduler 返回下一个要爬取的 URL 给 Engine,Engine 将 URL 通过 Downloader Middlewares 转发给 Downloader 下载。
  • 一旦页面下载完毕, Downloader 生成一个该页面的 Response,并将其通过 Downloader Middlewares 发送给 Engine。
  • Engine 从下载器中接收到 Response 并通过 Spider Middlewares 发送给 Spider 处理。
  • Spider 处理 Response 并返回爬取到的 Item 及新的 Request 给 Engine。
  • Engine 将 Spider 返回的 Item 给 Item Pipeline,将新的 Request 给 Scheduler。
  • 重复第二步到最后一步,直到 Scheduler 中没有更多的 Request,Engine 关闭该网站,爬取结束。

通过多个组件的相互协作、不同组件完成工作的不同、组件对异步处理的支持,Scrapy 最大限度地利用了网络带宽,大大提高了数据爬取和处理的效率。

3. 项目结构

Scrapy 框架和 pyspider 不同,它是通过命令行来创建项目的,代码的编写还是需要 IDE。项目创建之后,项目文件结构如下所示:

scrapy.cfg
project/
    __init__.py
    items.py
    pipelines.py
    settings.py
    middlewares.py
    spiders/
        __init__.py
        spider1.py
        spider2.py
        ...

在此要将各个文件的功能描述如下:

  • scrapy.cfg:它是 Scrapy 项目的配置文件,其内定义了项目的配置文件路径、部署相关信息等内容。
  • items.py:它定义 Item 数据结构,所有的 Item 的定义都可以放这里。
  • pipelines.py:它定义 Item Pipeline 的实现,所有的 Item Pipeline 的实现都可以放这里。
  • settings.py:它定义项目的全局配置。
  • middlewares.py:它定义 Spider Middlewares 和 Downloader Middlewares 的实现。
  • spiders:其内包含一个个 Spider 的实现,每个 Spider 都有一个文件。

4. 结语

本节介绍了 Scrapy 框架的基本架构、数据流过程以及项目结构。后面我们会详细了解 Scrapy 的用法,感受它的强大。

转载请注明:静觅 » [Python3网络爬虫开发实战] 13.1–Scrapy 框架介绍

]]>
/8364.html/feed 0