[Java log] P3: Ghi log bất đồng bộ vào Database sử dụng ExecutorService
[Java log] P3: Ghi log bất đồng bộ vào Database sử dụng ExecutorService Tháng Tám 2, 2019 nguyenvanquan7826 TUT Java Log Leave a response Bài viết được đăng lại từ blog của Tùng Huynh, đã được sự đồng ý của tác giả. Chào các bạn, ở 2 ...
[Java log] P3: Ghi log bất đồng bộ vào Database sử dụng ExecutorService
Bài viết được đăng lại từ blog của Tùng Huynh, đã được sự đồng ý của tác giả.
Chào các bạn, ở 2 phần trước về ghi log
[Java log] P1: Tầm quan trọng của ghi log trong phát triển phần mềm
[Java log] P2: Tích hợp log4j vào phần mềm
mình đã đề cập về tầm quan trọng của ghi log, một số nguyên tắc khi ghi log, và hướng dẫn cách tích hợp log4j vào một chương trình java.
Bài này mình sẽ hướng dẫn các bạn tự tạo một tính năng ghi log bất đồng bộ vào database.
Để làm được tính năng ghi log vào database thì Log4j hoàn toàn có khả năng đáp ứng sẵn việc này. Nhưng ở phần này mình sẽ hướng dẫn cách tự tạo một module ghi log vào database để các bạn có thể hiểu rõ hơn về cơ chế ghi log bất đồng bộ.
Trước tiên, để ghi log vào database thì cần phải có table lưu log, và tạo một sequence để lấy value cho trường ID của bảng log
CREATE TABLE LOGS ( LOGS_ID NUMBER NOT NULL PRIMARY KEY, LEVEL_LOG VARCHAR2(10), CREATE_TIME DATE, CONTENT CLOB ); / CREATE SEQUENCE LOGS_SEQ;
Trên ứng dụng, ta sẽ tạo một entity tương ứng với table đã tạo để thuận tiện cho việc insert dữ liệu vào bảng log.
package net.tunghuynh.logging.core; public class Logs { private Integer logsId; private String levelLog; private String content; public Integer getLogsId(){ return logsId; } public void setLogsId(Integer logsId){ this.logsId = logsId; } public String getLevelLog() { return levelLog; } public void setLevelLog(String levelLog) { this.levelLog = levelLog; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
Nếu bình thường chỉ nhắm mục đích ghi log thành công thì các bạn hoàn toàn có thể thực hiện trực tiếp câu lệnh insert into dữ liệu vào bảng log theo cách như sau
public static void main(String[] args) { Logs logs = new Logs(); logs.setLevelLog("INFO"); logs.setContent("Log by insert into database"); new Main().save(logs); } public void save(Logs item) { PreparedStatement stmt = null; Connection con = null; try { con = DatabaseUtils.getConnection(); stmt = con.prepareStatement("INSERT INTO LOGS (LOGS_ID, LEVEL_LOG, CONTENT, CREATE_TIME)" + " VALUES(LOGS_SEQ.NEXTVAL, ?, ?, SYSDATE)"); stmt.setString(1, item.getLevelLog()); stmt.setString(2, item.getContent()); stmt.execute(); } catch (Exception ex) { logger.error(ex.getMessage(), ex); } finally { DatabaseUtils.closeObject(stmt); DatabaseUtils.closeObject(con); } }
Tư tưởng là như vậy, đoạn code trên mình đã tách các xử lý ngoài phạm vi bài viết như việc kết nối database, đóng connection,... vào 1 class khác đặt tên DatabaseUtils để tránh làm loãng bài viết. Các bạn có thể xem chi tiết các phần đó trong source full ở cuối bài.
Nhưng như bài trước mình đã đề cập đến nguyên tắc khi ghi log là không được làm ảnh hưởng đến thời gian xử lý của nghiệp vụ chính. Như vậy đoạn code trên đã phạm phải nguyên tắc này, vì có thể việc connect database và execute lệnh insert kia sẽ chiếm 1 phần thời gian nhất định.
Để khắc phục việc này ta cần xử lý việc connect database và execute insert trong 1 thread riêng. Đơn giản nhất là new Thread mới để thực thi Runnable.
Thread saveLog = new Thread(new Runnable() { @Override public void run() { new Main().save(item); } }); saveLog.start();
Nhưng cách này sẽ gặp một vấn đề khá nguy hiểm đó là khi chức năng nghiệp vụ bị gọi liên tục, dẫn đến việc phải tạo quá nhiều Thread trong ứng dụng mà không giải phóng được, việc này sẽ làm ảnh hưởng rất nhiều đến hiệu năng của ứng dụng và ảnh hưởng cả đến server. Do vậy các bạn nên sử dụng một dạng pool để quản lý và hạn chế việc sinh Thread vô tội vạ như trên. Ở đây mình thường sử dụng ExecutorService để đảm nhiệm công việc này, khi nào hay ở chỗ nào cần ghi log thì chỉ cần submit logvào rồi kệ cho thằng ExecutorService xử lý ghi log dần dần, như vậy sẽ không làm ảnh hưởng đến thời gian xử lý của nghiệp vụ chính.
Đầu tiên mình tạo một abstract class Task để quản lý danh sách các đối tượng log cần save vào database, abstract class Task được implements Callable để execute nghiệp vụ.
package net.tunghuynh.logging.core; import java.util.List; import java.util.concurrent.Callable; public abstract class Task<E> implements Callable<Integer> { private List<E> items; public void setItems(List<E> items){ this.items = items; } public List<E> getItems(){ return items; } }
Sau đó mình tạo một abstract class ThreadManager để khai báo và định nghĩa khung làm việc cho quá trình ghi log.
package net.tunghuynh.logging.core; import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; public abstract class ThreadManager { public final int BATCH_SIZE = 10;//Chương trình sẽ thực hiện doProcess khi hàng đợi vượt quá 10 phần tử public final long WAIT_TIME_OUT = 1000; //ms public final long TIME_OUT = 2 * 1000; //ms. Chương trình sẽ thực hiện doProcess với chu kỳ 2 giây private final Logger logger = org.apache.logging.log4j.LogManager.getLogger(ThreadManager.class); private final BlockingQueue sourceQueue = new LinkedBlockingQueue(); protected ArrayList items = new ArrayList(BATCH_SIZE); protected AtomicBoolean shouldWork = new AtomicBoolean(true); protected AtomicBoolean isRunning = new AtomicBoolean(true); private boolean listening = false; private String name = "DB LOGGER"; protected ExecutorService executorService = Executors.newFixedThreadPool(5); private Thread mainThread; public ThreadManager() { logger.debug("Start task manager named: " + name); mainThread = new Thread(new Runnable() { @Override public void run() { logger.info("Queued job manager " + name + " is running and watching for queue... "); isRunning.set(true); int recNum = 0; long lgnStart = System.currentTimeMillis(); while (shouldWork.get()) { try { Object item = sourceQueue.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS); if (item != null) { items.add(item); recNum++; } if (recNum >= BATCH_SIZE || timedOut(lgnStart)) { if (items.size() > 0) { logger.info(String.format("Thread %s: %s submits %d item(s)", Thread.currentThread().getName(), name, items.size())); doProcess(items); items = new ArrayList(BATCH_SIZE); lgnStart = System.currentTimeMillis(); recNum = 0; } } } catch (Exception e) { logger.error(e.getMessage()); } isRunning.set(false); } logger.info("Taskmanager " + name + " is stopped!!"); } private boolean timedOut(Long startTime) { return System.currentTimeMillis() - startTime > TIME_OUT; } }); } /** * abstract method xử lý nghiệp vụ * @param items */ public abstract void doProcess(ArrayList items); /** * Bắt đầu lắng nghe dữ liệu cần xử lý từ hàng đợi */ public synchronized void listen() { if (!listening) { mainThread.start(); listening = true; } } public BlockingQueue getSourceQueue() { return sourceQueue; } public void stop() { logger.info(String.format("%s received a termination signal, stopping ... ", name)); this.shouldWork.set(false); int tryTime = 0; while (isRunning.get() && tryTime < 50) { try { Thread.currentThread().sleep(50L); } catch (Exception ex) { } tryTime++; } } /** * Submit một đối tượng cần xử lý vào hàng đợi * @param item */ public void submit(Object item) { sourceQueue.offer(item); } }
Phần này sẽ tương đối phức tạp đối với bạn nào chưa được tiếp xúc nhiều với xử lý đa luồng trong Java Core. Mình sẽ mô tả qua như sau:
– BlockingQueue sourceQueue: hàng đợi lưu các đối tượng log cần insert, bên ngoài thêm phần tử vào hàng đợi thông qua method submit()
– ExecutorService executorService: Service thực thi, đã được cấu hình cố định chỉ cho phép tạo ra 5 Threadđồng thời, dù có bị gọi nhiều lần thì cũng chỉ tối đa 5 thread được tạo ra, tránh việc tạo quá nhiều thread ảnh hưởng hiệu năng server
– ArrayList items: danh sách lưu các đối tượng được lấy ra từ hàng đợi sourceQueue truyền cho doProcess để insert vào database
– Luồng xử lý: ở đây mình có 1 thread được tạo ra và start 1 lần duy nhất bằng method listen(), thread này sẽ tồn tại trong suốt quá trình chạy của ứng dụng. Bên trong thread xử lý việc liên tục đọc dữ liệu từ hàng đợi sourceQueue để add vào danh sách items. Điều quan trọng ở đây là khi thoả mãn 1 trong 2 điều kiện: danh sách items vượt quá BATCH_SIZE hoặc các phần tử được lưu trong danh sách items quá TIME_OUT thì sẽ đẩy items đi insert. Điều kiện này để tránh việc insert quá nhiều bản ghi 1 lúc, và tránhviệc lưu dữ liệu trong items quá lâu mà không được insert. Sau khi đã gửi đi doProcess thì phải cleardanh sách items và reset biến đếm thời gian timeout. Các bạn có thể xem ảnh dưới để hình dung dễ hơn.
Sau khi tạo xong 2 abstract tổng quát ở trên, mình sẽ tạo các class xử lý việc ghi log.
Đầu tiên là class LogThread được extends Task, có nhiệm vụ insert 1 List đối tượng Logs vào database. Phần này tương tự như ví dụ trên nhưng khác là insert 1 list thôi.
package net.tunghuynh.logging.core; import net.tunghuynh.preparestatement.DatabaseUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.List; public class LogThread extends Task { private final Logger logger = LogManager.getLogger(LogThread.class); @Override public Integer call() throws Exception { List lstLog = getItems(); try { if (lstLog != null && !lstLog.isEmpty()) { save(lstLog); } } catch (Exception e) { logger.error(e.toString(), e); return 0; } return 1; } public void save(List<Logs> lst) { PreparedStatement stmt = null; Connection con = null; try { con = DatabaseUtils.getConnection(); stmt = con.prepareStatement("INSERT INTO AD_SCHEDULE_LOG " + " (LOGS_ID, LEVEL_LOG, CONTENT, CREATE_TIME)" + " VALUES(LOGS_SEQ.NEXTVAL, ?, ?, SYSDATE)"); for(Logs item : lst){ stmt.setString(1, item.getLevelLog()); stmt.setString(2, item.getContent()); stmt.execute(); } } catch (Exception ex) { logger.error(ex.getMessage(), ex); } finally { DatabaseUtils.closeObject(stmt); DatabaseUtils.closeObject(con); } } }
Tiếp theo mình sẽ tạo class LogManager được extends ThreadManager, nhiệm vụ là nhận danh sách đối tượng cần ghi log từ hàng đợi trong ThreadManager để submit vào ExecutorService.
Đoạn này có thể sẽ có 1 số bạn thắc mắc tại sao không dùng insert theo lô. Bởi vì ở ThreadManager đã đọc từ hàng đợi ra, và tối đa cũng chỉ có 10 phần tử được đẩy vào list để doProcess, nghĩa là bản chất đã là batch rồi không cần xử lý ở đây nữa.
package net.tunghuynh.logging.core; import java.util.ArrayList; public class LogManager extends ThreadManager { @Override public void doProcess(ArrayList items) { LogThread logThread = new LogThread(); logThread.setItems(items); executorService.submit(logThread); } }
Đến đây chắc các bạn cũng mường tượng ra, khi phát sinh thêm nhiều loại log khác cần ghi vào table khác, hoặc send qua FTP,… hoặc bất kỳ task vụ gì khác cần xử lý bất đồng bộ thì đều có thể tái sử dụng abstract class ThreadManager làm khung, chỉ cần định nghĩa lại các class khác tương tự như LogThread để xử lý nghiệp vụ chính và LogManager để đọc dữ liệu từ hàng đợi. Như vậy các bạn đã có được một khung tiến trình cơ bản.
Cuối cùng để sử dụng được bộ ghi log bất đồng bộ vào database đã tạo ở trên thì mình viết 1 đoạn testtrong Main.java
package net.tunghuynh.logging; import net.tunghuynh.logging.core.Logs; import net.tunghuynh.preparestatement.DatabaseUtils; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.Date; import java.util.List; public class Main { static Logger logger = LogManager.getLogger(Main.class); public static void main(String[] args) { //Tạo đối tượng test để ghi log Logs logs = new Logs(); logs.setLevelLog("INFO"); logs.setContent("Content"); long start = new Date().getTime(); //Khởi tạo LogManager để ghi log bất đồng bộ net.tunghuynh.logging.core.LogManager logManager = new net.tunghuynh.logging.core.LogManager(); logManager.listen(); //Submit vào hàng đợi logManager.submit(logs); logger.info("Async: " + (new Date().getTime()-start) + "ms"); start = new Date().getTime(); //Ghi log không dùng thread new Main().save(logs); logger.info("No-async: " + (new Date().getTime()-start) + "ms"); } public void save(Logs item) { PreparedStatement stmt = null; Connection con = null; try { con = DatabaseUtils.getConnection(); stmt = con.prepareStatement("INSERT INTO LOGS (LOGS_ID, LEVEL_LOG, CONTENT, CREATE_TIME)" + " VALUES(LOGS_SEQ.NEXTVAL, ?, ?, SYSDATE)"); stmt.setString(1, item.getLevelLog()); stmt.setString(2, item.getContent()); stmt.execute(); } catch (Exception ex) { logger.error(ex.getMessage(), ex); } finally { DatabaseUtils.closeObject(stmt); DatabaseUtils.closeObject(con); } } }
Ở đoạn test này mình kết hợp cả cách ghi không dùng thread ban đầu và cách ghi bất đồng bộ mới dựng để đo thời gian xử lý
Thử chạy và xem kết quả
2019-07-12 16:34:58 PM INFO [Thread-1] net.tunghuynh.logging.core.ThreadManager.run : Queued job manager ACTION/API LOGGER is running and watching for queue... 2019-07-12 16:34:58 PM INFO [main] net.tunghuynh.logging.Main.main : Async: 4ms 2019-07-12 16:35:00 PM INFO [main] net.tunghuynh.logging.Main.main : No-async: 1898ms 2019-07-12 16:35:00 PM INFO [Thread-1] net.tunghuynh.logging.core.ThreadManager.run : Thread Thread-1: ACTION/API LOGGER submits 1 item(s) 2019-07-12 16:35:29 PM INFO [Thread-1] net.tunghuynh.logging.core.ThreadManager.run : Queued job manager ACTION/API LOGGER is running and watching for queue... 2019-07-12 16:35:29 PM INFO [main] net.tunghuynh.logging.Main.main : Async: 7ms 2019-07-12 16:35:31 PM INFO [main] net.tunghuynh.logging.Main.main : No-async: 1519ms 2019-07-12 16:35:31 PM INFO [Thread-1] net.tunghuynh.logging.core.ThreadManager.run : Thread Thread-1: ACTION/API LOGGER submits 1 item(s)
Nhìn kết quả ta thấy sự khác biệt rất lớn. Với ghi log bất đồng bộ thì thời gian xử lý chỉ dưới 10 mili giây, bởi vì cái gì chạy lâu thì nó để cho thằng khác xử lý rồi. Còn insert thông thường thì phải mất 1.5 đến 2 giây (lâu gấp 500 lần), rất đáng kể để sử dụng.
Ở đoạn test trên mình chỉ đưa hết vào hàm main để test cho dễ, các bạn có thể init cái ThreadManagertrong 1 khối static để sử dụng luôn hàm submit mà không phải khởi tạo lại nhiều lần, hoặc nếu các bạn dùng Spring thì nên tạo 1 Bean cho LogManager với hàm khởi tạo là listen() và destroy là stop() để sử dụng
@Bean(name = "logManager", initMethod = "listen", destroyMethod = "stop") public LogManager getLogManager() { return new LogManager(); }
Khi cần dùng Bean logManager này ở đâu thì chỉ cần Autowired như mọi Bean khác là có thể sử dụng được.
@Autowired LogManager logManager;
Tới demo này thì các bạn đã tạo được một module ghi log bất đồng bộ vào database đủ dùng và an toàn. Các bạn hoàn toàn có thể bổ sung các tính năng ghi log bằng cách gọi một API lưu log khác, gửi log sang server khác qua FTP,…. thay vì chỉ ghi vào database, tùy theo mục đích sử dụng mà không ảnh hưởng đến ứng dụng chính.
Chúc các bạn thành công
Download code LogDBExample.zip