• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    python使用pywinauto驱动微信客户端实现公众号爬虫

    项目地址

    https://github.com/fancyerii/wechat-gongzhonghao-crawler

    pywinauto简介

    pywinauto是一个python的工具,可以用于控制Windows的GUI程序。详细的文档可以参考这里。

    WechatAutomator类

    自动化微信的代码封装在了类WechatAutomator里,完整的代码可以参考这里。这里简要的介绍一下其中的主要方法:

    init_window

    这个方法完成类的初始化,它的代码为:

        def init_window(self, exe_path=r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe",
                        turn_page_interval=3,
                        click_url_interval=1,
                        win_width=1000,
                        win_height=600):
            app = Application(backend="uia").connect(path=exe_path)
            self.main_win = app.window(title=u"微信", class_name="WeChatMainWndForPC")
            self.main_win.set_focus()
            self.app = app
            self.visible_top = 70
            self.turn_page_interval = turn_page_interval
            self.click_url_interval = click_url_interval
            self.browser = None
            self.win_width = win_width
            self.win_height = win_height
            # 为了让移动窗口,同时使用非uia的backend,这是pywinauto的uia的一个bug
            self.app2 = Application().connect(path=exe_path)
            self.move_window()

    我们首先来看函数的参数:

    这个函数的主要功能是构建Application对象从而通过pywinauto实现控制,这里使用的是uia的backend,然后设置窗口的大小并且把窗口移到最左上角。因为根据so文章,pywinauto 0.6.8存在bug,只能通过win32的backend来移到窗口,所以构造了self.app2然后调用move_window()函数把窗口移到最左上角。

    crawl_gongzhonghao

    这个函数实现了某个公众号的文章抓取。它的基本控制逻辑如下:

    第一个是通过locate_user函数实现,后面会介绍。第二个是通过process_page函数实现,后面也会介绍。判断是否继续抓取的逻辑为:

    所以我们通常会在第一次抓取的时候把max_pages设置的很大(比如100),然后通过latest_date来抓到指定的日期。而之后的抓取则设置max_pages为较小的值(比如默认的6),这样只要爬虫在两次抓取之间公众号的更新不超过6页,那么就不会漏掉文章。具体的逻辑可以参考main.py,它会把抓取的文章通过http请求发给Server,并且每次抓取的时候从Server查询抓取过的文章存放到states这个list里states[i][“url”]就存储了第i篇文章的url。

        def crawl_gongzhonghao(self, account_name, articles, states, detail,
                               max_pages=6, latest_date=None, no_item_retry=3):
            logger.debug(account_name)
            if not self.locate_user(account_name):
                return False
            last_visited_titles = set()
            visited_urls = set()
            self.turn_page_up(min(20, max_pages * 2))
    
            pagedown_retry = 0
            last_visited_titles = []
            for page in range(0, max_pages):
                items = []
                last_visited_titles = self.process_page(account_name, items, last_visited_titles, states, visited_urls, detail)
                articles.extend(items)
    
                if len(items) == 0:
                    pagedown_retry += 1
                    if pagedown_retry >= no_item_retry:
                        s = "break because of retry {}".format(pagedown_retry)
                        logger.debug(s)
                        WechatAutomator.add_to_detail(s, detail)
                        break
                else:
                    pagedown_retry = 0
    
                if len(items) > 0 and latest_date is not None:
                    html = items[-1][-1]
                    pub_date = WechatAutomator.get_pubdate(html)
                    if pub_date and pub_date  latest_date:
                        s = "stop because {}  {}".format(pub_date, latest_date)
                        logger.debug(s)
                        WechatAutomator.add_to_detail(s, detail)
                        break
    
                url_exist = False
                for item in items:
                    if WechatAutomator.url_in_states(item[0], states):
                        s = "stop because url exist {}".format(item[0])
                        logger.debug(s)
                        WechatAutomator.add_to_detail(s, detail)
                        url_exist = True
                        break
                if url_exist:
                    break
    
                self.click_right()
                self.main_win.type_keys("{PGDN}")
                time.sleep(self.turn_page_interval)
    
            self.turn_page_up(page * 2)
    
            return True

    locate_user

    locate_user函数的控制流程为:

        def locate_user(self, user, retry=5):
            if not self.main_win:
                raise RuntimeError("you should call init_window first")
    
            search_btn = self.main_win.child_window(title="搜索", control_type="Edit")
            self.click_center(search_btn)
    
            self.main_win.type_keys("^a")
            self.main_win.type_keys("{BACKSPACE}")
            self.main_win.type_keys(user)
            for i in range(retry):
                time.sleep(1)
                try:
                    search_list = self.main_win.child_window(title="搜索结果")
                    match_result = search_list.child_window(title=user, control_type="ListItem")
                    self.click_center(match_result)
                    return True
                except:
                    pass
    
            return False

    这里主要就是通过child_window函数进行定位,关于它的用法这里不介绍。关于怎么定位元素的方法可以使用Inspect.exe或者print_control_identifiers函数,具体参考这里。

    process_page

    这个函数是最主要的抓取代码,它处理当前一页的内容,它的控制流程如下:

    逻辑比较简单,但是有一些很trick的地方:

    与此类似的是右上角的黑色头部(不能滚到并且会遮挡)也有一定空间,如下图所示:

    因为这个框可能很窄(bottom-top很小)并且可能在很靠上或者靠下的位置。所以有如下代码:

        # 计算可见的高度
        visible_height = min(rect.bottom, win_rect.bottom) - max(rect.top, win_rect.top+self.visible_top)
        # 太窄的不点击,希望下次翻页后能显示更多像素从而可以点击,
        # 但是如果微信的某个文章的框的高度小于10个像素,那么这篇文章就无法被点击
        # 不过作者目前为发现这么窄的文章
        if visible_height  10:
            continue
        
        # 如果某个文章的框太大,则抛出异常,目前为止为发现这样的问题。
        if rect.bottom - rect.top >= win_rect.bottom - self.visible_top:
            raise RuntimeError("{}-{}>={}-{}".format(rect.bottom, rect.top,
                                                     win_rect.bottom, self.visible_top))
        # 如果下部部分可见,那么点击上方是比较”安全“的
        if rect.bottom >= win_rect.bottom:
            click_up = True
        # 如果下部完全可见,则点击下方是”安全“的
        else:
            click_up = False

    完整代码如下:

        def process_page(self, account_name, items, lastpage_clicked_titles, states, visited_urls, detail):
            clicked_titles = set()
            text = self.main_win.child_window(title=account_name, control_type="Text", found_index=0)
            parent = text
            while parent:
                parent = parent.parent()
                if '会话列表' == parent.element_info.name:
                    break
            paths = [0, 2, 0, 0, 0, 1, 0]
            for idx in paths:
                parent = parent.children()[idx]
    
            elems = []
            self.recursive_get(parent, elems)
            win_rect = self.main_win.rectangle()
            for elem in elems:
                rect = elem.rectangle()
    
                if elem.element_info.name in lastpage_clicked_titles:
                    continue
    
                if rect.top >= win_rect.bottom or rect.bottom = self.visible_top:
                    continue
    
                visible_height = min(rect.bottom, win_rect.bottom) - max(rect.top, win_rect.top+self.visible_top)
                if visible_height  10:
                    continue
    
                if rect.bottom - rect.top >= win_rect.bottom - self.visible_top:
                    raise RuntimeError("{}-{}>={}-{}".format(rect.bottom, rect.top,
                                                             win_rect.bottom, self.visible_top))
                if rect.bottom >= win_rect.bottom:
                    click_up = True
                else:
                    click_up = False
                if self.is_bad_elem(elem):
                    s = "not good elem {}".format(elem.element_info.name[0:10])
                    logger.debug(s)
                    WechatAutomator.add_to_detail(s, detail)
                    continue
    
                try:
                    self.click_url(rect, win_rect, click_up)
                    copy_btn = self.browser.child_window(title="复制链接地址")
                    self.click_center(copy_btn, click_main=False)
                    url = clipboard.GetData()
                    if elem.element_info.name != '图片':
                        clicked_titles.add(elem.element_info.name)
                    if url and not url in visited_urls:
                        visited_urls.add(url)
                        html = None
                        try:
                            html = requests.get(url).text
                        except:
                            s = "fail get {}".format(url)
                            logger.debug(s)
                            WechatAutomator.add_to_detail(s, detail)
    
                        items.append((url, rect, elem.element_info.name, html))
    
                except:
                    traceback.print_exc()
                    pass
                finally:
                    if self.browser:
                        try:
                            self.browser.close()
                        except:
                            pass
                        self.browser = None
    
                time.sleep(self.click_url_interval)
    
            return clicked_titles

    以上就是python使用pywinauto驱动微信客户端实现公众号爬虫的详细内容,更多关于python 公众号爬虫的资料请关注脚本之家其它相关文章!

    您可能感兴趣的文章:
    • 基于python爬取链家二手房信息代码示例
    • Python爬虫之爬取二手房信息
    • python爬取安居客二手房网站数据(实例讲解)
    • Requests什么的通通爬不了的Python超强反爬虫方案!
    • Python爬虫之m3u8文件里提取小视频的正确姿势
    • Python爬虫基础讲解之请求
    • Python爬虫之爬取最新更新的小说网站
    • python爬取链家二手房的数据
    上一篇:python基于tkinter实现gif录屏功能
    下一篇:python 如何将两个实数矩阵合并为一个复数矩阵
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    python使用pywinauto驱动微信客户端实现公众号爬虫 python,使用,pywinauto,驱动,