您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
这篇文章主要讲解了“如何实现elasticsearch导入mysql数据”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何实现elasticsearch导入mysql数据”吧!
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> </dependencies>
jdbc连接类
public class DBHelper { public static final String url = "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"; public static final String name = "com.mysql.cj.jdbc.Driver"; public static final String user = "root"; public static final String password = "root"; public static Connection conn = null; public static Connection getConn() { try { Class.forName(name); conn = DriverManager.getConnection(url, user, password);//获取连接 } catch (Exception e) { e.printStackTrace(); } return conn; } }
@Service("positionService") public class PositionService { @Autowired ElasticsearchRestTemplate elasticsearchTemplate; @Autowired RestHighLevelClient client; private static final String POSITIOIN_INDEX = "position"; public void importAll() throws IOException { writeMysqlDataToES(POSITIOIN_INDEX); } /** 讲数据批量写入ES中 */ private void writeMysqlDataToES(String tableName) { BulkProcessor bulkProcessor = getBulkProcessor(client); Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = DBHelper.getConn(); System.out.println("Start handle data :" + tableName); String sql = "SELECT * from " + tableName; ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // 根据自己需要 设置 ps.setFetchSize(20); rs = ps.executeQuery(); ResultSetMetaData colData = rs.getMetaData(); ArrayList<HashMap<String, String>> dataList = new ArrayList<HashMap<String, String>>(); // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的方式,故笔者在此将查出来的数据转换成hashMap方式 HashMap<String, String> map = null; int count = 0; String c = null; String v = null; while (rs.next()) { count++; map = new HashMap<String, String>(128); for (int i = 1; i <= colData.getColumnCount(); i++) { c = colData.getColumnName(i); v = rs.getString(c); map.put(c, v); } dataList.add(map); // 每1万条写一次,不足的批次的最后再一并提交 if (count % 10000 == 0) { System.out.println("Mysql handle data number : " + count); // 将数据添加到 bulkProcessor 中 for (HashMap<String, String> hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); } // 每提交一次便将map与list清空 map.clear(); dataList.clear(); } } // 处理未提交的数据 for (HashMap<String, String> hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); System.out.println(hashMap2); } System.out.println("-------------------------- Finally insert number total: " + count); // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间 bulkProcessor.flush(); } catch (Exception e) { System.out.println(e.getMessage()); } finally { try { rs.close(); ps.close(); conn.close(); boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); System.out.println(terminatedFlag); } catch (Exception e) { System.out.println(e.getMessage()); } } } private BulkProcessor getBulkProcessor(RestHighLevelClient client) { BulkProcessor bulkProcessor = null; try { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { System.out.println("Try to insert data number : " + request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId); } }; BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client .bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB)); builder.setConcurrentRequests(10); builder.setFlushInterval(TimeValue.timeValueSeconds(100L)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // 注意点:让参数设置生效 bulkProcessor = builder.build(); } catch (Exception e) { e.printStackTrace(); try { bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); } catch (Exception e1) { System.out.println(e1.getMessage()); } } return bulkProcessor; } }
@RestController public class PositionController { @Autowired PositionService positionService; @RequestMapping("query") public List<Map> query(String positionName) { if(positionName == null){ return null; } return positionService.queryPositions(positionName); } @RequestMapping("/importAll") public String importAll(){ try { positionService.importAll(); } catch (IOException e) { e.printStackTrace(); } return "success"; } }
public class Position implements Serializable { //主键 private String id; //公司名称 private String companyName; //职位名称 private String positionName; //职位诱惑 private String positionAdvantage; //薪资 private String salary; //薪资下限 private int salaryMin; //薪资上限 private int salaryMax; //学历 private String education; //工作年限 private String workYear; //发布时间 private String publishTime; //工作城市 private String city; //工作地点 private String workAddress; //发布时间 @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; //工作模式 private String jobNature; }
前提:安装好logstash
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai" jdbc_user => "root" jdbc_password => "root" jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "1000" statement_filepath => "D:/import.sql" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { hosts => ["localhost:9200"] index => "position" document_type => "_doc" } stdout { codec => json_lines } }
select * from position
logstash -f ../import.conf
感谢各位的阅读,以上就是“如何实现elasticsearch导入mysql数据”的内容了,经过本文的学习后,相信大家对如何实现elasticsearch导入mysql数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。