四时宝库

程序员的知识宝库

mybatis使用load data local infile实现导入数据到mysql数据库

背景:

项目框架为:dubbo+zookeeper+ssm 数据库为mysql

最近有个新的需求,要在代码里实现往数据库插入大批量数据,每次插入的数据量从10万~50万条不等,而且每条数据有80多个字段,大概估算了一下,一条数据大小差不多是1kb,那么每次插入的数据量大小应该在100M~500M之间。这个数据量还是很大的。

想来想去,我就先按照从开发到调试所碰到问题的顺序来写好了,到后面我再贴出代码,供同行们参考。

1、碰到的第一个问题是:

Packet for query is too large (1139736> 1048576). You can change this value on the server by setting

the 'max_allowed_packet' variable.

刚接到需求时,根本没有考虑到数据量的问题,就按照平时开发那样,直接往数据库里执行insert,就报了上面这个错,原来因为mysql有一个max_allowed_packet变量,可以控制其通信缓冲区的最大长度,所以当缓冲区的大小太小的时候,导致某些查询和插入操作报错。

解决办法:

数据库执行命令 show VARIABLES like '%max_allowed_packet%'; 查看数据库max_allowed_packet变量配置时多少,显示的结果为

+--------------------+---------+ | Variable_name | Value | +---------

-----------+---------+ | max_allowed_packet | 1048576 | +--------------------+---------+

这说明当前的配置时1M,我们需要将其设置大一些。

数据库执行命令 set global max_allowed_packet = 4*1024*1024*10,将值设置为40M,,执行完后,关掉数据库可视化界面,重新打开,要是命令行进行的就重启mysql(不重启的话是不行的,切记重启mysql),然后接着执行命令show VARIABLES like '%max_allowed_packet%';看看有没有设置成功。一般都是可行的!至此,第一个问题解决。

2、碰到的第二个问题是:

com.alibaba.dubbo.remoting.transport.AbstractCodec.checkPayload() ERROR Data length too large: 11557050, max payload: 8388608 java.io.IOException: Data length too large: 11557050, max payload: 838860

这个错是dubbo相关的,为什么会报这个错呢?想来想去,可能是service服务端读取到的数据量太大,服务端提供给web客户端的数据量就过大,超过了dubbo的默认值8M,错误信息如上所示,天哪,第一次碰到这样的报错,各种查资料,最终还是解决了。

解决办法:

方法1、 修改提供方的dubbo配置,

在dubbo.properties 中增加如下

dubbo.protocol.dubbo.payload=41943040(默认为8M,即8388608)

方法2、

在dubbo-provider.xml文件配置如下

<dubbo:provider id="payload" payload="41943040"/>

如上两种方法都是将值修改为40M。

3、碰到的第三个问题:

使用mysql的load data local infile往数据库导数据时,英文和数字都正常导入,但是,中文要么不显示,要么就是乱码,真的是搞不懂了,怎么会这样呢?以下是导入部分代码:

[java] view plain copy

  1. public void batchInsert(List<BqLoan> bqLoanList) throws ClassNotFoundException, SQLException {
  2. //1000条一提交
  3. int COMMIT_SIZE=1000;
  4. //一共多少条
  5. int COUNT=bqLoanList.size();
  6. Connection conn= null;
  7. try {
  8. Class.forName("com.mysql.jdbc.Driver");
  9. String url = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.url","CONF_HOME");
  10. String user = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.username","CONF_HOME");
  11. String password = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.password","CONF_HOME");
  12. conn= DriverManager.getConnection(url,user,password);
  13. conn.setAutoCommit(false);
  14. String exectuteSql = "load data local infile ''into table bq_loan fields terminated by ','";
  15. PreparedStatement pstmt = conn.prepareStatement(exectuteSql);
  16. StringBuilder sb = new StringBuilder();
  17. for (int i = 0; i < COUNT; i++) {
  18. sb.append(getTestDataInputStream(bqLoanList.get(i)));
  19. if (i % COMMIT_SIZE == 0) {
  20. InputStream is = null;
  21. try {
  22. is = new ByteArrayInputStream(sb.toString().getBytes());
  23. ((com.mysql.jdbc.Statement) pstmt).setLocalInfileInputStream(is);
  24. pstmt.execute();
  25. conn.commit();
  26. sb.setLength(0);
  27. } catch (UnsupportedEncodingException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. InputStream is = null;
  33. try {
  34. is = new ByteArrayInputStream(sb.toString().getBytes());
  35. ((com.mysql.jdbc.Statement) pstmt).setLocalInfileInputStream(is);
  36. pstmt.execute();
  37. conn.commit();
  38. } catch (UnsupportedEncodingException e) {
  39. e.printStackTrace();
  40. }
  41. } catch (SQLException e) {
  42. e.printStackTrace();
  43. }finally{
  44. conn.close();
  45. }
  46. }
  47. }

上面代码就是导入部分的一个方法,怎么导中文都不显示,还有些字段中文为乱码,我想肯定是字符集的问题,首先查了下数据库字符集,(查询命令为:show variables like '%char%';),然后看看代码,查询结果显示数据库字符集为utf8,然后百度发现这个导入代码得加上编码格式:

[java] view plain copy

  1. "load data local infile ''into table bq_loan fields terminated by ','";

这个加上红色部分编码格式设置后如下,

"load data local infile ''into table bq_loan character set utf8 fields terminated by ','";

修改完后再次导入,还是一样,中文不显示,有些字段中文乱码,这就头疼了啊,仔细检查,加上各种百度,才发现代码里自己还挖了个坑,

[java] view plain copy

  1. is = new ByteArrayInputStream(sb.toString().getBytes());

这个将字节数组转换为输入流时,括号里将字符串转换成字节数组时,并没有给定转换后的字节数组的编码格式,所以采用的就是默认的编码格式,我们知道不同编码格式,单个中英文多对应的字节数是不一样的。所以我猜测是这个地方没有设置,导致生成的字节数组编码格式和数据库编码格式不一致,最终导致导数据时中文不显示以及乱码。然后给getBytes()方法加上编码格式,代码如下。

[java] view plain copy

  1. is = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));

加上后再进行导入数据,一切顺利,数据一点儿不差的导到库里。

到这里导数就顺利进行了,但是想到以后业务发展壮大时,设置的dubbo的服务端给消费端提供数据量最大值还会不够用,所以就就决定改一下代码,最后和同事讨论,建议采取分批插入,就是调用service服务端时进行分页处理,每页数据量设置为dubbo允许服务端给消费端提供数据量最大值的范围内,然后每次插入数据时,就会进行分批插入,只不过和数据库交互次数相对多几次而已,影响不大。

还有一个,使用"load data local infile"导数据时,我是直接将查询出来的结果(list集合)进行数据的组合,即每条数据的每个字段间使用“,”隔开,每条数据之间使用“/n”换行隔开,最终将每条数据拼接成一个字符串,然后将字符串转换成字节数组并转换成输入流,然后再执行导入操作,再往后就比较简单了。由于我不是通过文件进行导数操作,所以 load data local infile '' into table bq_loan character set utf8 fields terminated by ','" 中红色部分的文件名地址我就不写。

最终的结果是:

导入1万条数据,用时5.5秒左右

导入2.6万数据, 用时17.8秒左右

......

导入35万条数据,用时210秒左右

导入50万条数据,用时305秒左右

我这个每条的数据量比较大,一条大概是1kb,所以,感觉速度还行吧,能实现我的需求。

好了,下面贴出部分代码,供大家对照参考。

[java] view plain copy

  1. public void insertLoanInfo (Map<String,Object> msg) {
  2. try {
  3. long startTime = DateOperation.currentTimeMills();
  4. List<AssetPkgRel> loanList = (List<AssetPkgRel>)msg.get("loanList");
  5. String pkgName = (String) msg.get("pkgName");
  6. String pkgCde = (String) msg.get("pkgCde");
  7. // 备份时间
  8. String bkTime = DateOperation.convertToDateStr1(DateOperation.currentTimeMills());
  9. msg.put("bkTime",bkTime);
  10. if (IS_ONE_KEY_ASSOCIATED.getCode().equals(msg.get("isOneKeyAssociated"))) {
  11. BqLoanService.deleteByPkgCde(pkgCde);
  12. }
  13. List<String> bkList = BqLoanService.selectNumByLoanNo(loanList);
  14. // 总共的页数
  15. double totalPage = Math.ceil(bkList.size()/25000.0);
  16. Map<String,Object> map = new HashMap<String,Object>();
  17. map.put("loanList",loanList);
  18. List<BqLoan> list = null;
  19. for (int i = 1;i <= totalPage;i++) {
  20. List<BqLoan> bqLoanList = new ArrayList<>();
  21. map.put("page",i);
  22. PageInfo<BqLoan> pageInfo = BqLoanService.selectByLoanNo(map);
  23. list = pageInfo.getList();
  24. for (int j= 0;j < list.size();j++) {
  25. BqLoan BqLoan = list.get(j);
  26. BqLoan.setPkgCde(pkgCde);
  27. BqLoan.setPkgName(pkgName);
  28. BqLoan.setArchTm(bkTime);
  29. bqLoanList.add(BqLoan);
  30. }
  31. // 将当前页数据插入数据库
  32. batchInsert(bqLoanList);
  33. // 当前页插入完之后清空list
  34. bqLoanList.clear();
  35. }
  36. long endTime = DateOperation.currentTimeMills();
  37. System.out.println("===============插入总时间:"+(endTime-startTime));
  38. } catch (BusinessException e) {
  39. logger.error("插入数据异常 "+e.getMessage());
  40. } catch (SQLException e) {
  41. e.printStackTrace();
  42. } catch (ClassNotFoundException e) {
  43. e.printStackTrace();
  44. }
  45. }

[java] view plain copy

  1. public void batchInsert(List<BqLoan> bqLoanList) throws ClassNotFoundException, SQLException {
  2. //1000条一提交
  3. int COMMIT_SIZE=1000;
  4. //一共多少条
  5. int COUNT=bqLoanList.size();
  6. Connection conn= null;
  7. try {
  8. Class.forName("com.mysql.jdbc.Driver");
  9. String url = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.url","CONF_HOME");
  10. String user = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.username","CONF_HOME");
  11. String password = GetResourceFromProperties.GetResourceFromPropertiesFromfiles("/jdbc.properties","jdbc.password","CONF_HOME");
  12. conn= DriverManager.getConnection(url,user,password);
  13. conn.setAutoCommit(false);
  14. String exectuteSql = "load data local infile ''into table bq_loan character set utf8 fields terminated by ','";
  15. PreparedStatement pstmt = conn.prepareStatement(exectuteSql);
  16. StringBuilder sb = new StringBuilder();
  17. for (int i = 0; i < COUNT; i++) {
  18. sb.append(getTestDataInputStream(bqLoanList.get(i)));
  19. if (i % COMMIT_SIZE == 0) {
  20. InputStream is = null;
  21. try {
  22. is = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
  23. ((com.mysql.jdbc.Statement) pstmt).setLocalInfileInputStream(is);
  24. pstmt.execute();
  25. conn.commit();
  26. sb.setLength(0);
  27. } catch (UnsupportedEncodingException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. InputStream is = null;
  33. try {
  34. is = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
  35. ((com.mysql.jdbc.Statement) pstmt).setLocalInfileInputStream(is);
  36. pstmt.execute();
  37. conn.commit();
  38. } catch (UnsupportedEncodingException e) {
  39. e.printStackTrace();
  40. }
  41. } catch (SQLException e) {
  42. e.printStackTrace();
  43. }finally{
  44. conn.close();
  45. }
  46. }
  47. }

[java] view plain copy

  1. /**
  2. * 组装需要插入的数据,字段间以","隔开,每条数据间以"/n"隔开
  3. */
  4. public static StringBuilder getTestDataInputStream(BqLoan BqLoan) {
  5. StringBuilder builder = new StringBuilder();
  6. builder.append(BqLoan.getSeq());
  7. builder.append(",");
  8. builder.append(BqLoan.getLoanNumber());
  9. builder.append(",");
  10. builder.append(BqLoan.gettPkgCde());
  11. builder.append(",");
  12. builder.append(BqLoan.getPkgName());
  13. builder.append(",");
  14. builder.append(BqLoan.getCustemerSeq());
  15. builder.append(",");
  16. builder.append(BqLoan.getCustemerName());
  17. builder.append(",");
  18. builder.append(BqLoan.getIdType());
  19. builder.append(",");
  20. builder.append(BqLoan.getIdNo());
  21. builder.append(",");
  22. builder.append(BqLoan.getPhoneNo());
  23. builder.append(",");
  24. builder.append("\n");
  25. return builder;
  26. }

到此结束,如果有遇到这些问题,然后这篇文章还不能够帮助到你,可以一起再探讨,欢迎骚扰。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接