python多线程数据库操作_python多线程 + 批量插入 数据库 健壮你的小爬虫
参考链接:获取 url,标题以及时间。。。。(很简单的奥,xpath一下子就可以提取到了)(这次主要是练习多线程和批处理存入数据库的,所以只是简单的解析,获取页面元素)重要思路: 开启多线程时1)首先将爬取的url 放入到数据结构的队列里,保证数据安全。2)将爬取到的结果,全部存入到一个结果集队列里,进行下一步的操作。3)队列里的get和put方法不要混淆,put是向队列里添加元素,get是取出或
参考链接:
获取 url,标题以及时间。。。。(很简单的奥,xpath一下子就可以提取到了)
(这次主要是练习多线程和批处理存入数据库的,所以只是简单的解析,获取页面元素)
重要思路: 开启多线程时
1)首先将爬取的url 放入到数据结构的队列里,保证数据安全。
2)将爬取到的结果,全部存入到一个结果集队列里,进行下一步的操作。
3)队列里的get和put方法不要混淆,put是向队列里添加元素,get是取出或者是踢出并返回这个元素!!!!
第一步: 创建线程以及存储队列:
def main():
start_url = Queue.Queue() # 存放url的队列
result_queue = Queue.Queue() # 结果集队列
for i in range(1, 3):# 网站分页
page_url = 'http://data.stcn.com/list/djsj_%s.shtml' % i
start_url.put(page_url) # 将值添加到start_url队列中
# 构建线程
thread_list = [] # 存放线程的容器
for n in range(4): # 一次运行4 个线程
# 创建线程,target调用get_news_url方法,args传入参数
t_t = threading.Thread(target=get_news_url, args=(start_url, result_queue))
thread_list.append(t_t)
for t in thread_list:
t.start() # 启动线程
第二步:解析网页,获取目标元素(不多介绍了哈)
def get_news_url(start_url, result_queue): # 在main方法里传入参数
result = []
while start_url.qsize():
page_url = start_url.get() # 从队列中取出并返回这个数据
try:
response = requests.get(page_url)
except Exception as e:
print "抓取网页错误,错误为:%s" % e
return None
if response.status_code == 200:
selector = etree.HTML(response.text)
web_content = selector.xpath('//p[@class="tit"]')
for news in web_content:
item_result = {}
item_result['href'] = news.xpath('a/@href')[0]
item_result['title'] = news.xpath('a/text()')[0]
item_result['date_news'] = news.xpath('span/text()')[0]
result.append(item_result)
if len(result) > 0:#如果result里有数据的话,
result_queue.put(result) # put是向结果集 队列里添加元素result
start_url.task_done() #是指这个任务结束
else:
time.sleep(5)
第三步: 存,批量插入Mysql数据库
到这一步我们获取得到的数据结构是形如这样的[{},{},{},{}]:
而插入数据库的关键,就是获取插入的值!!!这里遍历出是个字典格式的数据,所以需要用dict的方法获取元素!!!
核心代码就两行!!!!
data = [item.values() for item in result] #遍历得到每个{}里的values值
cur.executemany(sql2, tuple(data)) #记得一定要转化成元组
print 'insert sucessful'
多条记录的插入,需要用executemany(速度哦,真的比之前快好多好多~~~)
不要忘记大前提
链接数据库 和 创建数据库 和表(这部分 可以手动创建 也可以代码创建)
代码链接数据库 和创建 数据表
def save_news_mysql(result):
con = MySQLdb.connect(host= '127.0.0.1', user= 'root', passwd= '123456' ,charset='utf8',port = 3306)
cur = con.cursor()
sql = 'create database if not exists cstn_database default charset utf8'
cur.execute(sql)
con.select_db('cstn_database') # 以上是连接和创建 数据库
sql = 'create table if not exists news_cstn'+"(id int auto_increment, href varchar(255), title varchar(255),\date_news varchar(255), primary key(ID))"
cur.execute(sql) # 创建表结构 这部分代码以后直接在mysql里创建就可以了
sql2 = 'insert into news_cstn (href,title,date_news) VALUES (%s,%s,%s)'
data = [item.values() for item in result]
cur.executemany(sql2, tuple(data))
print 'insert sucessful'
数据库名:cstn_database
表名:cstn
字段:id,href,title,data_news
看似简单,其实虐我好久丫!!!
敬上完整代码:
# -*- coding: UTF-8 -*-
import requests
from lxml import etree
import csv
import MySQLdb
import xlwt
import Queue
import threading
import time
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
# from util.crawler import Header, Proxy 代理 请求头 我放在另个文件夹
# from database.db import Database
#
# con = Database.getConnection() # 连接数据库
# cur = con.cursor() # 游标对象
def get_news_url(start_url, result_queue):
result = []
while start_url.qsize():
page_url = start_url.get() # 从队列中移除并返回这个数据
try:
response = requests.get(page_url)
except Exception as e:
print "抓取网页错误,错误为:%s" % e
return None
if response.status_code == 200:
selector = etree.HTML(response.text)
web_content = selector.xpath('//p[@class="tit"]')
for news in web_content:
item_result = {}
item_result['href'] = news.xpath('a/@href')[0]
item_result['title'] = news.xpath('a/text()')[0]
item_result['date_news'] = news.xpath('span/text()')[0]
result.append(item_result)
if len(result) > 0:
result_queue.put(result) # put是向结果集 队列里添加元素
start_url.task_done()
else:
time.sleep(5)
def save_to_excel(result):
workbook = xlwt.Workbook()
sheet = workbook.add_sheet('result3')
title = ['href','title','date']
for i ,item in enumerate(title):
sheet.write(0, i, item)
data = [item.values() for item in result]
print data
for row, item in enumerate(data):
for i, info in enumerate(item):
print row+1, i ,info
sheet.write(row+1 , i , info)
workbook.save('Myresult.xls')
def save_news_mysql(result):
con = MySQLdb.connect(host= '127.0.0.1', user= 'root', passwd= '123456' ,charset='utf8',port = 3306)
cur = con.cursor()
sql = 'create database if not exists cstn_database default charset utf8'
cur.execute(sql)
con.select_db('cstn_database')
sql = 'create table if not exists news_cstn'+"(id int auto_increment, href varchar(255), title varchar(255), \
date_news varchar(255), primary key(ID))"
cur.execute(sql) # 创建表结构 这部分代码以后直接在mysql里创建就可以了
sql2 = 'insert into news_cstn (href,title,date_news) VALUES (%s,%s,%s)'
data = [item.values() for item in result]
cur.executemany(sql2, tuple(data))
print 'insert sucessful'
# for elem in result: # 单条插入
# sql = 'insert into news_cstn (href,title,date_news) VALUES (\'%s\',\'%s\',\'%s\')' % (elem['href'],elem['title'],elem['date_news'])
# cur.execute(sql)
con.commit()
cur.close()
con.close()
def main():
start_url = Queue.Queue() # 存放url的队列
result_queue = Queue.Queue() # 结果集队列
for i in range(1, 3):
page_url = 'http://data.stcn.com/list/djsj_%s.shtml' % i
start_url.put(page_url) # 将值插入队列中
# 构建线程
thread_list = []
for n in range(4): # 创建4 个线程
t_t = threading.Thread(target=get_news_url, args=(start_url, result_queue)) # 创建线程,调用get_news_url方法,args传入参数
thread_list.append(t_t)
for t in thread_list:
t.start()
start_url.join() # 就是当所有的url全部获取完,放入到结果集里才开始存入数据库,防止出现 插入数据库报错的情况
while result_queue.qsize(): # 返回队列的大小
save_news_mysql(result_queue.get()) # 将结果存入数据库中
if __name__ == "__main__":
main()
更多推荐
所有评论(0)