绿色应用市场
当前位置:首页 > 学习教程 > 正文

使用python生成大量数据写入es数据库并查询操作(2)

发布时间:2022-09-22 11:22:27来源:周小白软件园编辑:本站整理

前言 :

上一篇文章:如何使用python生成大量数据写入es数据库并查询操作

模拟学生个人信息写入es数据库,包括姓名、性别、年龄、特点、科目、成绩,创建时间。

方案一

在写入数据时未提前创建索引mapping,而是每插入一条数据都包含了索引的信息。

示例代码:【多线程写入数据】【一次性写入10000*1000条数据】 【本人亲测耗时3266秒】

from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es)   names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自负,不以自我为中心',  '努力、积极、乐观、拼搏是我的人生信条',  '抗压能力强,能够快速适应周围环境',  '敢做敢拼,脚踏实地;做事认真负责,责任心强',  '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',  '主动性强,自学能力强,具有团队合作意识,有一定组织能力',  '忠实诚信,讲原则,说到做到,决不推卸责任',  '有自制力,做事情始终坚持有始有终,从不半途而废',  '肯学习,有问题不逃避,愿意虚心向他人学习',  '愿意以谦虚态度赞扬接纳优越者,权威者',  '会用100%的热情和精力投入到工作中;平易近人',  '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',  '有较强的团队精神,工作积极进取,态度认真'] subjects = ['语文', '数学', '英语', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')   def save_to_es(num): """ 批量写入数据到es数据库 :param num: :return: """ start = time.time() action = [ { "_index": "personal_info_10000000", "_type": "doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ] helpers.bulk(es, action) end = time.time() print(f"{num}耗时{end - start}s!")   def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num)  if __name__ == '__main__': start = time.time() queue = Queue() # 序号数据进队列 for num in range(1000): queue.put(num)   # 多线程执行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序执行完毕!花费时间:', end - start)

运行结果:

自动创建的索引mapping:

GET personal_info_10000000/_mapping {   "personal_info_10000000" : { "mappings" : {   "properties" : { "age" : {   "type" : "long" }, "character" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 256 }   } }, "create_time" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 256 }   } }, "grade" : {   "type" : "long" }, "id" : {   "type" : "long" }, "name" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 256 }   } }, "sex" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 256 }   } }, "subject" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 256 }   } }   } }   } }

方案二

1.顺序插入5000000条数据

先创建索引personal_info_5000000,确定好mapping后,再插入数据。

新建索引并设置mapping信息:

PUT personal_info_5000000 {   "settings": { "number_of_shards": 3, "number_of_replicas": 1   },   "mappings": { "properties": {   "id": { "type": "long"   },   "name": { "type": "text", "fields": {   "keyword": { "type": "keyword", "ignore_above": 32   } }   },   "sex": { "type": "text", "fields": {   "keyword": { "type": "keyword", "ignore_above": 8   } }   },   "age": { "type": "long"   },   "character": { "type": "text", "analyzer": "ik_smart", "fields": {   "keyword": { "type": "keyword", "ignore_above": 256   } }   },   "subject": { "type": "text", "fields": {   "keyword": { "type": "keyword", "ignore_above": 256   } }   },   "grade": { "type": "long"   },   "create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"   } }   } }

查看新建索引信息:

GET personal_info_5000000   {   "personal_info_5000000" : { "aliases" : { }, "mappings" : {   "properties" : { "age" : {   "type" : "long" }, "character" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 256 }   },   "analyzer" : "ik_smart" }, "create_time" : {   "type" : "date",   "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "grade" : {   "type" : "long" }, "id" : {   "type" : "long" }, "name" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 32 }   } }, "sex" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 8 }   } }, "subject" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 256 }   } }   } }, "settings" : {   "index" : { "routing" : {   "allocation" : { "include" : {   "_tier_preference" : "data_content" }   } }, "number_of_shards" : "3", "provided_name" : "personal_info_50000000", "creation_date" : "1663471072176", "number_of_replicas" : "1", "uuid" : "5DfmfUhUTJeGk1k4XnN-lQ", "version" : {   "created" : "7170699" }   } }   } }

开始插入数据:

示例代码: 【单线程写入数据】【一次性写入10000*500条数据】 【本人亲测耗时7916秒】

from elasticsearch import Elasticsearch from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自负,不以自我为中心',  '努力、积极、乐观、拼搏是我的人生信条',  '抗压能力强,能够快速适应周围环境',  '敢做敢拼,脚踏实地;做事认真负责,责任心强',  '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',  '主动性强,自学能力强,具有团队合作意识,有一定组织能力',  '忠实诚信,讲原则,说到做到,决不推卸责任',  '有自制力,做事情始终坚持有始有终,从不半途而废',  '肯学习,有问题不逃避,愿意虚心向他人学习',  '愿意以谦虚态度赞扬接纳优越者,权威者',  '会用100%的热情和精力投入到工作中;平易近人',  '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',  '有较强的团队精神,工作积极进取,态度认真'] subjects = ['语文', '数学', '英语', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')   # 添加程序耗时的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start)) return res   return wrapper   @timer def save_to_es(num): """ 顺序写入数据到es数据库 :param num: :return: """ body = { "id": num, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } # 此时若索引不存在时会新建 es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body)   def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num)   if __name__ == '__main__': start = time.time() queue = Queue() # 序号数据进队列 for num in range(5000000): queue.put(num)   # 多线程执行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序执行完毕!花费时间:', end - start)

运行结果:

2.批量插入5000000条数据

先创建索引personal_info_5000000_v2,确定好mapping后,再插入数据。

新建索引并设置mapping信息:

PUT personal_info_5000000_v2 {   "settings": { "number_of_shards": 3, "number_of_replicas": 1   },   "mappings": { "properties": {   "id": { "type": "long"   },   "name": { "type": "text", "fields": {   "keyword": { "type": "keyword", "ignore_above": 32   } }   },   "sex": { "type": "text", "fields": {   "keyword": { "type": "keyword", "ignore_above": 8   } }   },   "age": { "type": "long"   },   "character": { "type": "text", "analyzer": "ik_smart", "fields": {   "keyword": { "type": "keyword", "ignore_above": 256   } }   },   "subject": { "type": "text", "fields": {   "keyword": { "type": "keyword", "ignore_above": 256   } }   },   "grade": { "type": "long"   },   "create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"   } }   } }

查看新建索引信息:

GET personal_info_5000000_v2   {   "personal_info_5000000_v2" : { "aliases" : { }, "mappings" : {   "properties" : { "age" : {   "type" : "long" }, "character" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 256 }   },   "analyzer" : "ik_smart" }, "create_time" : {   "type" : "date",   "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "grade" : {   "type" : "long" }, "id" : {   "type" : "long" }, "name" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 32 }   } }, "sex" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 8 }   } }, "subject" : {   "type" : "text",   "fields" : { "keyword" : {   "type" : "keyword",   "ignore_above" : 256 }   } }   } }, "settings" : {   "index" : { "routing" : {   "allocation" : { "include" : {   "_tier_preference" : "data_content" }   } }, "number_of_shards" : "3", "provided_name" : "personal_info_5000000_v2", "creation_date" : "1663485323617", "number_of_replicas" : "1", "uuid" : "XBPaDn_gREmAoJmdRyBMAA", "version" : {   "created" : "7170699" }   } }   } }

批量插入数据:

通过elasticsearch模块导入helper,通过helper.bulk来批量处理大量的数据。首先将所有的数据定义成字典形式,各字段含义如下:

_index对应索引名称,并且该索引必须存在。_type对应类型名称。_source对应的字典内,每一篇文档的字段和值,可有有多个字段。

示例代码: 【程序中途异常,写入4714000条数据】

from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自负,不以自我为中心',  '努力、积极、乐观、拼搏是我的人生信条',  '抗压能力强,能够快速适应周围环境',  '敢做敢拼,脚踏实地;做事认真负责,责任心强',  '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',  '主动性强,自学能力强,具有团队合作意识,有一定组织能力',  '忠实诚信,讲原则,说到做到,决不推卸责任',  '有自制力,做事情始终坚持有始有终,从不半途而废',  '肯学习,有问题不逃避,愿意虚心向他人学习',  '愿意以谦虚态度赞扬接纳优越者,权威者',  '会用100%的热情和精力投入到工作中;平易近人',  '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',  '有较强的团队精神,工作积极进取,态度认真'] subjects = ['语文', '数学', '英语', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 添加程序耗时的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start)) return res   return wrapper     @timer def save_to_es(num): """ 批量写入数据到es数据库 :param num: :return: """ action = [ { "_index": "personal_info_5000000_v2", "_type": "_doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ] helpers.bulk(es, action) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序号数据进队列 for num in range(500): queue.put(num)   # 多线程执行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序执行完毕!花费时间:', end - start)

运行结果:

3.批量插入50000000条数据

先创建索引personal_info_5000000_v2,确定好mapping后,再插入数据。

此过程是在上面批量插入的前提下进行优化,采用python生成器。

建立索引和mapping同上,直接上代码:

示例代码:【程序中途异常,写入3688000条数据】

from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es)   names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自负,不以自我为中心',  '努力、积极、乐观、拼搏是我的人生信条',  '抗压能力强,能够快速适应周围环境',  '敢做敢拼,脚踏实地;做事认真负责,责任心强',  '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',  '主动性强,自学能力强,具有团队合作意识,有一定组织能力',  '忠实诚信,讲原则,说到做到,决不推卸责任',  '有自制力,做事情始终坚持有始有终,从不半途而废',  '肯学习,有问题不逃避,愿意虚心向他人学习',  '愿意以谦虚态度赞扬接纳优越者,权威者',  '会用100%的热情和精力投入到工作中;平易近人',  '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',  '有较强的团队精神,工作积极进取,态度认真'] subjects = ['语文', '数学', '英语', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')   # 添加程序耗时的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start)) return res   return wrapper @timer def save_to_es(num): """ 使用生成器批量写入数据到es数据库 :param num: :return: """ action = ( { "_index": "personal_info_5000000_v3", "_type": "_doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ) helpers.bulk(es, action)   def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num)   if __name__ == '__main__': start = time.time() queue = Queue() # 序号数据进队列 for num in range(500): queue.put(num)   # 多线程执行程序 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程序执行完毕!花费时间:', end - start)

运行结果:

到此这篇关于使用python生成大量数据写入es数据库并查询操作(2)的文章就介绍到这了,更多相关python生成 数据 内容请搜索云初冀北以前的文章或继续浏览下面的相关文章希望大家以后多多支持云初冀北!

相关推荐