宁波网站推广高手,软件下载大全网站,安阳网站建设,移动应用开发介绍文章目录 1 第三方库2 爬取2.1 初始化函数2.2 结束时关闭数据库2.3 生成header2.4 获取请求body2.5 解析异步json数据2.6 使用BS4的find方法解析2.7 写入口函数2.8 调用 3 完整代码 1 第三方库
获取随机UA
pip install fake-useragent连接数据库
$ pip3 install PyMySQL发起… 文章目录 1 第三方库2 爬取2.1 初始化函数2.2 结束时关闭数据库2.3 生成header2.4 获取请求body2.5 解析异步json数据2.6 使用BS4的find方法解析2.7 写入口函数2.8 调用 3 完整代码 1 第三方库
获取随机UA
pip install fake-useragent连接数据库
$ pip3 install PyMySQL发起请求
pip install requests解析页面
pip install beautifulsoup4进度条
pip install tqdm2 爬取
2.1 初始化函数
新建爬虫类
class mySpider:创建数据库连接和初始化url
# 初始化urldef __init__(self, url):self.url url# 计数请求一个页面的次数初始值为1self.count 1# 数据库连接对象self.db pymysql.connect(hostlocalhost,port3306,userroot,password123456,databasetest)# 创建游标对象self.cursor self.db.cursor()2.2 结束时关闭数据库
关闭数据库释放资源方法运行完后调用。
# 结束断开数据库连接def __del__(self):self.cursor.close()self.db.close()print(关闭数据库)2.3 生成header
使用第三方库fake-useragent生成随机UA
# 获取一个headerdef getHeader(self):# 实例化ua对象ua UserAgent()# 随机获取一个uaheaders {User-Agent: ua.random}return headers2.4 获取请求body
注意有返回值的递归要把返回值返回回调时加return
def getBody(self, url, send_type, data):# 每次请求都随机停顿一些时间# time.sleep(random.randint(1, 2))# 在超时时间内对于失败页面尝试请求三次if self.count 3:try:if send_type get:res requests.get(urlurl, headersself.getHeader(), paramsdata, timeout2)elif send_type post:res requests.post(urlurl, headersself.getHeader(), datadata, timeout2)else:print(未输入send_type直接返回None)res Nonereturn resexcept Exception as e:print(e)self.count 1print(f第{self.count}次发起请求)# 再次调用自己,并把值返回注意要加returnreturn self.getBody(url, send_type, data)2.5 解析异步json数据
解析异步json数据
def parseData(self, dataList):# 循环查看详情for row in tqdm(dataList, desc爬取进度):# 请求详情页urlurlDetail fhttps://www.baidu.com/CTMDS/pub/PUB010100.do?methodhandle04compId{row[companyId]}# 发起请求# 每次请求都初始化一次self.countself.count 1res self.getBody(urlurlDetail, send_typeget, data{})if res is not None:# 解析htmlself.parseHtml(rowrow, htmlTextres.text)else:print(f{urlDetail}请求失败)2.6 使用BS4的find方法解析
find_all() 方法用来搜索当前 tag 的所有子节点并判断这些节点是否符合过滤条件最后以列表形式将符合条件的内容返回语法格式如下 find_all( name , attrs , recursive , text , limit )参数说明name查找所有名字为 name 的 tag 标签字符串对象会被自动忽略。attrs按照属性名和属性值搜索 tag 标签注意由于 class 是 Python 的关键字吗所以要使用 “class_”。recursivefind_all() 会搜索 tag 的所有子孙节点设置 recursiveFalse 可以只搜索 tag 的直接子节点。text用来搜文档中的字符串内容该参数可以接受字符串 、正则表达式 、列表、True。limit由于 find_all() 会返回所有的搜索结果这样会影响执行效率通过 limit 参数可以限制返回结果的数量。
def parseHtml(self, row, htmlText):soup BeautifulSoup(htmlText, html.parser)# 获取备案信息divList soup.find_all(div, class_[col-md-8])divtextList [re.sub(r\s, , div.text) for div in divList]# 获取其他机构地址divListOther soup.find_all(div, class_[col-sm-8])divtextListOther [re.sub(r\s, , div.text) for div in divListOther]otherOrgAdd ,.join(divtextListOther)# 插入数据库companyId row[companyId]linkTel row[linkTel]recordNo row[recordNo]areaName row[areaName]linkMan row[linkMan]address row[address]compName row[compName]recordStatus row[recordStatus]cancelRecordTime row.get(cancelRecordTime, )compLevel divtextList[2]recordTime divtextList[6]sql1 insert INTO medical_register(company_id,area_name,record_no,comp_name,address,link_man,link_tel,record_status,comp_level,record_time,cancel_record_time,other_org_add) sql2 fvalues({companyId},{areaName},{recordNo},{compName},{address},{linkMan},{linkTel},{recordStatus},{compLevel},{recordTime},{cancelRecordTime},{otherOrgAdd})sql3 sql1 sql2# 执行sqlself.cursor.execute(sql3)# 提交self.db.commit()# 获取备案专业和主要研究者信息tbody soup.find(tbody)trList tbody.find_all(tr)# 对tr循环获取tdfor tr in trList:tdList tr.find_all(td)tdTextList [td.text for td in tdList]tdTextList.insert(0, companyId)# print(tdTextList)# 插入数据库sql4 insert into medical_register_sub (company_id,professional_name,principal_investigator,job_title) values(%s,%s,%s,%s)self.cursor.execute(sql4, tdTextList)# 提交到数据库self.db.commit()2.7 写入口函数
这里pageSize直接干到最大懂的都懂
def run(self):try:# 拿第一页的数据data {pageSize: 1350, curPage: 1}# 每次请求都初始化一次self.countself.count 1res self.getBody(urlself.url, send_typepost, datadata)if res is not None:# 加载为jsonjsonRes json.loads(res.text)# 查看响应状态码status jsonRes[success]# 如果状态为Trueif status True:# 获取数据dataList jsonRes[data]# 处理数据self.parseData(dataListdataList)else:print(f{self.url}请求失败)except Exception as e:print(发生错误, e)2.8 调用
调用
if __name__ __main__:spider mySpider(https://www.baidu.com/CTMDS/pub/PUB010100.do?methodhandle05)spider.run()3 完整代码
完整代码
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
import time
import random
import json
import re
import pymysql
from tqdm import tqdmclass mySpider:# 初始化urldef __init__(self, url):self.url url# 计数请求一个页面的次数初始值为1self.count 1# 数据库连接对象self.db pymysql.connect(hostlocalhost,port3306,userroot,passwordlogicfeng,databasetest2)# 创建游标对象self.cursor self.db.cursor()# 结束断开数据库连接def __del__(self):self.cursor.close()self.db.close()print(关闭数据库)# 获取一个headerdef getHeader(self):# 实例化ua对象ua UserAgent()# 随机获取一个uaheaders {User-Agent: ua.random}return headers# 获取请求bodydef getBody(self, url, send_type, data):# 每次请求都随机停顿一些时间# time.sleep(random.randint(1, 2))# 在超时时间内对于失败页面尝试请求三次if self.count 3:try:if send_type get:res requests.get(urlurl, headersself.getHeader(), paramsdata, timeout2)elif send_type post:res requests.post(urlurl, headersself.getHeader(), datadata, timeout2)else:print(未输入send_type直接返回None)res Nonereturn resexcept Exception as e:print(e)self.count 1print(f第{self.count}次发起请求)# 再次调用自己,并把值返回注意要加returnreturn self.getBody(url, send_type, data)# 解析bodydef parseData(self, dataList):# 循环查看详情for row in tqdm(dataList, desc爬取进度):# 请求详情页urlurlDetail fhttps://www.baidu.com/CTMDS/pub/PUB010100.do?methodhandle04compId{row[companyId]}# 发起请求# 每次请求都初始化一次self.countself.count 1res self.getBody(urlurlDetail, send_typeget, data{})if res is not None:# 解析htmlself.parseHtml(rowrow, htmlTextres.text)else:print(f{urlDetail}请求失败)# 解析页面def parseHtml(self, row, htmlText):soup BeautifulSoup(htmlText, html.parser)# 获取备案信息divList soup.find_all(div, class_[col-md-8])divtextList [re.sub(r\s, , div.text) for div in divList]# 获取其他机构地址divListOther soup.find_all(div, class_[col-sm-8])divtextListOther [re.sub(r\s, , div.text) for div in divListOther]otherOrgAdd ,.join(divtextListOther)# 插入数据库companyId row[companyId]linkTel row[linkTel]recordNo row[recordNo]areaName row[areaName]linkMan row[linkMan]address row[address]compName row[compName]recordStatus row[recordStatus]cancelRecordTime row.get(cancelRecordTime, )compLevel divtextList[2]recordTime divtextList[6]sql1 insert INTO medical_register(company_id,area_name,record_no,comp_name,address,link_man,link_tel,record_status,comp_level,record_time,cancel_record_time,other_org_add) sql2 fvalues({companyId},{areaName},{recordNo},{compName},{address},{linkMan},{linkTel},{recordStatus},{compLevel},{recordTime},{cancelRecordTime},{otherOrgAdd})sql3 sql1 sql2# 执行sqlself.cursor.execute(sql3)# 提交self.db.commit()# 获取备案专业和主要研究者信息tbody soup.find(tbody)trList tbody.find_all(tr)# 对tr循环获取tdfor tr in trList:tdList tr.find_all(td)tdTextList [td.text for td in tdList]tdTextList.insert(0, companyId)# print(tdTextList)# 插入数据库sql4 insert into medical_register_sub (company_id,professional_name,principal_investigator,job_title) values(%s,%s,%s,%s)self.cursor.execute(sql4, tdTextList)# 提交到数据库self.db.commit()# 入口函数def run(self):try:# 拿第一页的数据data {pageSize: 1350, curPage: 1}# 每次请求都初始化一次self.countself.count 1res self.getBody(urlself.url, send_typepost, datadata)if res is not None:# 加载为jsonjsonRes json.loads(res.text)# 查看响应状态码status jsonRes[success]# 如果状态为Trueif status True:# 获取数据dataList jsonRes[data]# 处理数据self.parseData(dataListdataList)else:print(f{self.url}请求失败)except Exception as e:print(发生错误, e)if __name__ __main__:spider mySpider(https://www.百度.com/CTMDS/pub/PUB010100.do?methodhandle05)spider.run()