爬取知乎千万用户信息

爬取知乎千万用户信息

写在前面:

之前看到有人爬取某乎用户信息进行数据分析,我也尝试了一下

@Component
public class HttpConnectionManager {
    PoolingHttpClientConnectionManager connectionManager = null;

    @PostConstruct
    public void init() {
        LayeredConnectionSocketFactory sslsf = null;
        try {
            sslsf = new SSLConnectionSocketFactory(SSLContext.getDefault());
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }

        Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
                .register("https", sslsf)
                .register("http", new PlainConnectionSocketFactory())
                .build();
        connectionManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
        connectionManager.setMaxTotal(50);
        connectionManager.setDefaultMaxPerRoute(10);
    }
    public CloseableHttpClient getHttpClient() {
        CloseableHttpClient httpClient = HttpClients.custom()
                .setConnectionManager(connectionManager)
                // 为多线程设置共享连接
                .setConnectionManagerShared(true)
                .build();
        return httpClient;
    }
}

为了精确计算每个用户关系网的请求次数,请求每个用户的关系网之前需要获取其关注数和粉丝数

public Integer getUserAttentionNo(String urlToken) {
        // 关注页api
        HttpGet httpGet = new HttpGet(url + urlToken + "关注列表api" + 0);
        Integer attentionNo = getAttentionOrFanNo(httpGet);
        return attentionNo;
    }
@Async
    public Integer getAttentionOrFanNo(HttpGet httpGet) {
        httpGet.setHeader("user-agent", userAgent);
        httpGet.setHeader(":authority", authority);
        httpGet.setHeader(":method", "GET");
        httpGet.setHeader(":path", "/api/v4/members/da-shen-35-43/followees?limit=20&offset=0");
        httpGet.setHeader(":scheme", "https");
        httpGet.setHeader("accept", accept);
        httpGet.setHeader("cookie", "替换为自己的cookie");

        Integer totals = 0;

        // 使用httpClient连接池, 获取一个连接
        CloseableHttpClient httpClient = httpConnectionManager.getHttpClient();
        CloseableHttpResponse response = null;
        try {
            // 发送http请求
            response = httpClient.execute(httpGet);

            // 请求成功
            if (response.getStatusLine().getStatusCode() == 200) {
                // 获取响应的数据
                String data1 = EntityUtils.toString(response.getEntity(), "UTF-8");
                // 解析为userVo对象
                UserListVo userListVo = JSONObject.parseObject(data1, UserListVo.class);
                // 获取关注数或者粉丝数
                Paging paging = userListVo.getPaging();
                totals = paging.getTotals();
            } else if (response.getStatusLine().getStatusCode() == 403) {
                String Data403 = EntityUtils.toString(response.getEntity(), "UTF-8");
                ErrorVo errorVo = JSONObject.parseObject(Data403, ErrorVo.class);
                logger.info("获取关注数或粉丝数 403" +"errorMessage:" + errorVo.getError().getMessage() + "开始处理验证码...");
                // 处理验证码
                forbiddenService.calibrationImageCode();
                // 验证码处理完毕之后,重新发起此次请求,直至取到值,或者递归过深抛出OOM
                totals = getAttentionOrFanNo(httpGet);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (response != null) {
                try {
                    response.close();
                } catch (IOException e) {
                    logger.info("关闭response失败! IO异常" + e);
                }
            }
            try {
                // 请求接口完成之后,关闭httpClient
                httpClient.close();
            } catch (IOException e) {
                logger.info("关闭httpClient失败! IO异常" + e);
            }
        }
        return totals;
    }

取得关注数和粉丝数之后,计算好请求次数,页码,偏移量,开始请求此用户的关系网

public List<String> getUserAttentionTokenList(String urlToken, Integer start) {
        ArrayList<String> urlTokenList = new ArrayList<>();
        // 关注页api
        HttpGet httpGet = new HttpGet(url + urlToken + attentionUrl + start);
        List<UserData> attentionUrlTokenList = getAttentionOrFanUserList(httpGet);
        if (attentionUrlTokenList.size() > 0) {
            // 有数据,去重
            for (UserData userData : attentionUrlTokenList) {
                // 判断此用户是否已在处理完毕列表中,或者已在待处理列表中
                boolean mightContain = urlTokenSet1.contains(userData.getUrl_token());
                boolean mightContain2 = toUrlTokenList.contains(userData.getUrl_token());

                if (mightContain || mightContain2) {
                } else {
                    // 未爬取过此用户
                    urlTokenList.add(userData.getUrl_token());
                }
            }
        }
        return urlTokenList;
    }
@Async
    public List<UserData> getAttentionOrFanUserList(HttpGet httpGet) {
        httpGet.setHeader("user-agent", userAgent);
        httpGet.setHeader(":authority", authority);
        httpGet.setHeader(":method", "GET");
        httpGet.setHeader(":path", "/api/v4/members/da-shen-35-43/followees?limit=20&offset=0");
        httpGet.setHeader(":scheme", "https");
        httpGet.setHeader("accept", accept);
        httpGet.setHeader("cookie", cookie);
        List<UserData> userDataList = new ArrayList<>();
        // 使用httpClient连接池, 获取一个连接\n        CloseableHttpClient httpClient = httpConnectionManager.getHttpClient();
        CloseableHttpResponse response = null;
        try {
            // 发送http请求
            response = httpClient.execute(httpGet);
            // 请求成功
            if (response.getStatusLine().getStatusCode() == 200) {
                // 获取响应的数据
                String data1 = EntityUtils.toString(response.getEntity(), "UTF-8");
                // 解析为userVo对象
                UserListVo userListVo = JSONObject.parseObject(data1, UserListVo.class);
                // 获取用户列表
                userDataList.addAll(userListVo.getData());
            } else if (response.getStatusLine().getStatusCode() == 403) {
                String Data403 = EntityUtils.toString(response.getEntity(), "UTF-8");
                ErrorVo errorVo = JSONObject.parseObject(Data403, ErrorVo.class);
                logger.info("获取关注列表或粉丝列表 403" + "errorMessage:" + errorVo.getError().getMessage() + "开始处理验证码...");
                // 处理验证码
                forbiddenService.calibrationImageCode();
                // 验证码处理完成之后,重新发起本次请求,直至取到值,或者递归过深抛出OOM
                userDataList = getAttentionOrFanUserList(httpGet);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 如果httpclient响应内容不为空,关闭
            if (response != null) {
                try {
                    response.close();
                } catch (IOException e) {
                    logger.info("关闭response失败! IO异常" + e);
                }
            }
            try {
                // 请求接口完成之后,关闭httpClient
                httpClient.close();
            } catch (IOException e) {
                logger.info("关闭httpClient失败! IO异常" + e);
            }
        }
        return userDataList;
    }
关于递归爬取用户关系网的过程

由于用户量巨大,使用递归会导致堆内存溢出,因此使用遍历用户列表的方式进行爬取, 并且动态地将每次爬取到的关系网去重后添加到用户列表中

关于去重

可以使用布隆过滤器,也可以使用hashset(占用内存较大)

关于爬取过程中可能出现的意外中断

本次爬取过程中将每次循环后的用户列表和已爬取过的用户列表写入文件中,一旦出现网络异常或其他情况,重新启动时可以从断点继续,不会丢失数据

关于爬取过程中某乎的反爬机制:

带cookie请求的情况下,约10万次http请求会被要求输入验证码,否则此cookie对应的账户在任何设备都无法使用,输入验证码后即可正常. 这一块接入了打码平台,遇到验证码时,取验证码base64图片进行识别,将结果再提交即可
关于本次使用的打码平台

public String getImageCode(String imageBase64) {
        logger.info("请求打码平台,获取验证码内容......");
        String imageCode = "";
        HttpPost httpPost = new HttpPost(base64Api);
        CloseableHttpClient httpClient = httpConnectionManager.getHttpClient();
        CloseableHttpResponse response = null;

        // 设置要提交的表单参数
        ArrayList<NameValuePair> nameValuePairs = new ArrayList<>();
        nameValuePairs.add(new BasicNameValuePair("username", base64ApiUserName));
        nameValuePairs.add(new BasicNameValuePair("password", base64ApiPassword));
        nameValuePairs.add(new BasicNameValuePair("softid", base64ApiSoftid));
        nameValuePairs.add(new BasicNameValuePair(\"image\", imageBase64));
        try {
            // 创建一个form表单
            UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(nameValuePairs);
            httpPost.setEntity(formEntity);
            // 发起请求
            response = httpClient.execute(httpPost);
            if (response.getStatusLine().getStatusCode() == 200) {
                // 获取响应内容
                String s = EntityUtils.toString(response.getEntity());
                // 解析为对象
                ImageCodeVo imageCodeVo = JSONObject.parseObject(s, ImageCodeVo.class);
                logger.info("验证码识别结果: " + imageCodeVo.getMessage());
                // 识别成功
                if (imageCodeVo.isSuccess() == true) {
                    imageCode = imageCodeVo.getData().getResult();
                }
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (ClientProtocolException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return imageCode;
    }

由于某乎提交验证码机制比较复杂,所以采用大杀器selenium + chrome驱动的方式完全模拟用户操作,设置cookie信息获取验证码,请求打码平台接口获取验证码值,然后模拟用户操作提交表单
关于selenium详细用法
初始化chrome浏览器,并设置cookie

public MyDownloader() {
        // 指定selenium插件驱动位置
        System.setProperty("webdriver.chrome.driver", "C:/test/chromedriver.exe");
        // 创建selenium设置对象
        ChromeOptions chromeOptions = new ChromeOptions();
        // 设置为不打开浏览器模式
        // chromeOptions.addArguments("--headless");
        remoteWebDriver = new ChromeDriver(chromeOptions);
        //控制selenium去加载网址
        remoteWebDriver.get("https://www.zhihu.com");

        // 先清除所有的cookie信息
        remoteWebDriver.manage().deleteAllCookies();
        // 重新添加cookie(某乎共11条cookie信息需要添加)
        Cookie cookie1 = new Cookie("_zap", "替换");
        Cookie cookie2 = new Cookie("d_c0", "替换");
        Cookie cookie3 = new Cookie("__gads", "替换");
        remoteWebDriver.manage().addCookie(cookie1);
        remoteWebDriver.manage().addCookie(cookie2);
        remoteWebDriver.manage().addCookie(cookie3);
        // 添加cookie之后, 再次加载网址
        remoteWebDriver.get("https://www.zhihu.com");
    }

模拟用户输入验证码

public Page download(Request request, Task task) {
        try {
            System.out.println("验证码内容为: " + code);
            // 定位到验证码输入框
            WebElement inputText = remoteWebDriver.findElement(By.cssSelector("#root > div > div.Unhuman > section > div > div > input"));
            // 输入字符到输入框
            inputText.sendKeys(code);
            // 等待一下,防止提交过快
            Thread.sleep(3000);
            // 模拟鼠标点击发送按钮
            WebElement sendButton = remoteWebDriver.findElement(By.cssSelector("#root > div > div.Unhuman > section > button"));
            sendButton.click();
            // 等待一秒钟后,关闭窗口
            Thread.sleep(1000);
            remoteWebDriver.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return Page.fail();
    }
关于selenium + chrome驱动取验证码base64值出现的%0A问题

会导致base64值异常,解决方式:

public String getBase64Image() {
        WebElement element = remoteWebDriver.findElement(By.cssSelector("#root > div > div.Unhuman > section > div > img"));
        String src = element.getAttribute("src");
        // 从第22位开始截取,去掉头信息,只取base64信息
        String base64Image = src.substring(22);
        String decode = "";
        try {
            /**
             * 由于字符串中出现转译后的%0A
             * 无法直接replace去除, 需进行decode
             */
            decode = URLDecoder.decode(base64Image, "utf-8");
            decode.replace("%0A", "");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return decode;
    }

到这里为止,关于爬取用户关系网的部分已经全部解决,开始根据写入文件中的关系网爬取用户信息,由于用户量巨大(爬取了400万用户的url_token), 需配置线程池提高效率

@Configuration\npublic class ThreadPoolConfig {
    @Bean
    public ThreadPoolTaskExecutor defaultThreadPool(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("知乎线程");
        // 当线程池的所有线程都已使用完毕时,不在新线程中执行任务,交由调用者所在线程执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setKeepAliveSeconds(60);
        executor.initialize();
        return executor;
    }
}

开始爬取用户信息

    /**
     * 根据url_token获取用户信息
     */
    @Override
    public void getUserInfo() {
        List<String> url_token_list = getUrl_token_list();
        // 4024584
        for (int i = 0; i <= url_token_list.size(); i++) {
            String thisUrl_token = url_token_list.get(i);
            Integer thisNo = i;
            // 开启多线程处理
            threadPoolTaskExecutor.execute(
                    () -> {
                        // 请求个人信息,并保存到数据库
                        saveUserInfo(thisUrl_token, thisNo);
                    }
            );
        }
    }

    /**
     * 读取url_token列表到集合中
     *
     * @return
     */
    public List<String> getUrl_token_list() {
        // 初始化url_token列表
        ArrayList<String> url_token_list = new ArrayList<>();
        // 从文件中读取url_token信息
        BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new FileReader(to_url_token_url));
            while (true) {
                String thisUrl_token = bufferedReader.readLine();
                if (thisUrl_token != null) {
                    url_token_list.add(thisUrl_token);
                } else {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                bufferedReader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        url_token_list.remove("");
        return url_token_list;
    }

    /**
     * 获取用户个人信息页, 存储个人信息数据到数据库
     * 开启异步方法支持
     *
     * @param url_token
     * @return
     */
    // @Async
    public void saveUserInfo(String url_token, Integer i) {
        HttpGet httpGet = new HttpGet(userInfo + url_token);
        // 设置请求头信息
        httpGet.setHeader(":authority", authority);
        httpGet.setHeader(":path", "/people/" + url_token);
        httpGet.setHeader("cookie", cookie);
        httpGet.setHeader("user-agent", userAgent);
        httpGet.setHeader("content-type", contentType);
        // 使用httpClient, 获取一个httpclient
        CloseableHttpClient httpClient = httpConnectionManager.getHttpClient();
        CloseableHttpResponse response = null;
        try {
            // 发送http请求
            response = httpClient.execute(httpGet);
            // 请求成功
            if (response.getStatusLine().getStatusCode() == 200) {
                // 获取响应的数据
                String data1 = EntityUtils.toString(response.getEntity(), "UTF-8");
                // 解析为UserInfoVo对象
                UserInfoVo userInfoVo = JSONObject.parseObject(data1, UserInfoVo.class);
                // 再将UserInfoVo对象转为userInfo对象(添加Integer类型的主键)
                UserInfo userInfo = UserInfoVo2UserInfo(userInfoVo);
                // 保存到数据库
                userInfoDao.save(userInfo);
            } else if (response.getStatusLine().getStatusCode() == 410) {
                // 410 则此用户已注销,直接存一个空用户到数据库中
                UserInfo userInfo = new UserInfo();
                userInfo.setUrl_token(url_token);
                userInfoDao.save(userInfo);
            } else if (response.getStatusLine().getStatusCode() == 403) {
                logger.info("403 Forbidden " + i);
                // 处理验证码
                forbiddenService.calibrationImageCode();
                logger.info("验证码处理完成");
                Thread.sleep(20 * 1000);
                // 解决验证码之后, 递归此次请求
                saveUserInfo(url_token, i);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (response != null) {
                try {
                    response.close();
                } catch (IOException e) {
                    logger.info("关闭response失败! IO异常" + e);
                }
            }
            try {
                // 请求接口完成之后,关闭httpClient
                httpClient.close();
            } catch (IOException e) {
                logger.info("关闭httpClient失败! IO异常" + e);
            }
        }
    }

去重后爬取了700万用户信息,再继续爬也没意义,其中爬取关系网由于没有启用多线程,效率较低, 爬取用户详细信息使用10个线程每小时大约能爬取到30万条, 由于cookie的使用,避开了某乎反爬封ip,只需要处理验证码即可
*关于用户信息的处理,mysql数据库性能有限,用来检索不太靠谱,可以使用elasticsearch + kibana + logstash进行统计处理 附上300万条数据时的性别比例信息: