public void executeBatch(Connection connection) throws SQLException {
long start = System.currentTimeMillis();
if (rows.size() != 0) {
try {
rows.forEach(row -> {
try {
JDBCUtils.setRecordToStatement(this.statement, this.fieldTypes, (Row) row);
this.statement.addBatch();
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
int[] ints = this.statement.executeBatch();
connection.commit();
int j = ints.length;
if (rows.size() != j) {
throw new RuntimeException(String.format(“入库数量不一致, Rows.size():%d, 执行成功条数,批量执行返回int数组长度:%s”, rows.size(), j));
}
long s = System.currentTimeMillis() - start;
LOG.info(“批量入库{}条数据,用时{}ms, 实际入库{}条数据”, rows.size(), s, j);
rows.clear();
} catch (Exception e) {
LOG.info(“批量入库失败,转为单条入库…{}, rows:{}”, e, rows);
connection.rollback();
connection.commit();
this.statement.clearBatch();
executeUpdate(connection);
}
}
}
public synchronized void executeUpdate(Connection connection) {
long start = System.currentTimeMillis();
AtomicInteger success = new AtomicInteger();
AtomicInteger failed = new AtomicInteger();
rows.forEach(row -> {
try {
JDBCUtils.setRecordToStatement(statement, fieldTypes, row);
int i = statement.executeUpdate();
if (i <= 0) {
throw new RuntimeException("执行executeUpdate失败,影响" + i + "行");
}
connection.commit();
success.getAndAdd(i);
} catch (Exception e) {
try {
// connection.rollback();
connection.commit();
} catch (SQLException e1) {
throw new RuntimeException(e1);
}
failed.getAndIncrement();
LOG.error("Insert failed , data: {{}}", row);
LOG.error("", e);
}
});
long s = System.currentTimeMillis() - start;
LOG.info("execute update 完成,共{}条,成功入库{}条,失败{}条,用时{}ms", rows.size(), success.get(), failed.get(), s);
rows.clear();
}