배치 병렬 처리를 위한 쓰레드 빈 생성 클래스
@Configuration
@EnableAsync
public class ThreadConfig {
@Bean(name="jobExecutor")
public Executor jobExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(30);
taskExecutor.setThreadNamePrefix("jobThead-");
taskExecutor.initialize();
return taskExecutor;
}
}
메인 추상 클래스
@Setter
@Slf4j
public abstract class AbstractJob extends AbstractJobSub{
private Long id;
private String code; // 코드
private boolean isActive = false; // 활성/비활성화
private String cronRegExp; // 크론 표현식
private JobStatusEnum status; // 잡 상태값
private Long startWorkTime; // 잡 시작 시간
private Long endWorkTime; // 잡 종료 시간
/**
* Job 실행전 수행
*/
private void start(boolean isActive){
jobIsWorking = true;
startWorkTime = System.currentTimeMillis();
changeJobStatus(id, JobStatusEnum.ING);
makeJogLogs(id);
if(isActive){
logs("{} 수동 시작되었습니다.", code);
}else{
logs("{} 시작되었습니다.", code);
}
}
/**
* Job 종료 후 수행
*/
private void end(){
jobIsWorking = false;
endWorkTime = System.currentTimeMillis();
changeJobStatus(id, JobStatusEnum.WAIT);
logs("{} 종료되었습니다.\t{}ms 소요되었습니다.", code, (endWorkTime - startWorkTime));
jobIsSuccess();
}
/**
* Job 종료 후 수행
*/
private void interruptedExceptionEnd(){
jobIsWorking = false;
endWorkTime = System.currentTimeMillis();
changeJobStatus(id, JobStatusEnum.WAIT);
logs("{} 강제 종료되었습니다.\t{}ms 소요되었습니다.", code, (endWorkTime - startWorkTime));
jobIsSuccess();
}
/**
* Job 수행중 에러 발생시 처리
* @param e: Exception
*/
private void error(Exception e){
jobIsWorking = false;
changeJobStatus(id, JobStatusEnum.FAIL);
logs("{} 오류가 발생하였습니다.", code);
logs("{} / " + e.toString(), code);
jobIsFail();
}
/**
* Job 활성화 상태 여부 검사
* @return : 잡 활성화 상태 여부
*/
private boolean checkActive(){
if(jobIsWorking || status.equals(JobStatusEnum.ING)){
log.info("{} 실행중 상태로 중복실행을 방지합니다.", code);
return false;
}
// JOB 상태 검사
if(!isActive){
log.info("{} 실행 가능한 상태가 아닙니다", code);
return false;
}
// 크론식 검사
boolean validExpression = CronSequenceGenerator.isValidExpression(cronRegExp);
if(!validExpression){
log.info("{} cron 표현식이 잘못되었습니다. > {}", code, cronRegExp);
return false;
}
return true;
}
/**
* Job 실행 시간 검사, true인 경우 잡 수행
* @return : 잡 수행 가능 여부
*/
private boolean checkIsWorkTime(){
// 1초전 날짜로 다음 실행시간을 구하고, 현재 시간과 비교하여 실행여부를 판단한다.
cronSequenceGenerator = new CronSequenceGenerator(cronRegExp);
Date nextJobDate = cronSequenceGenerator.next(nowDateMinusOneMinute.getTime());
Calendar nextJobCalendar = Calendar.getInstance();
nextJobCalendar.setTime(nextJobDate);
nextJobCalendar.set(Calendar.MILLISECOND, 0);
// 같은 시간이라면 실행한다.
if(nowDate.compareTo(nextJobCalendar) == 0){
return true;
// }else{
// log.info("{} 실행 가능한 시간이 아닙니다. {} : {}", code, simpleDateFormat.format(nowDate.getTime()), simpleDateFormat.format(nextJobCalendar.getTime()));
}
return false;
}
/**
* 초단위 설정은 0으로 하드코딩
* @param exp
*/
public void setCronRegExp(String exp){
cronRegExp = "0 " + exp;
}
/**
* 실제 Job 수행 전체 로직
* 활성화 여부 검사, 잡 실행시간 검사 후 통과된 경우 Job 실행
*/
@Async("jobExecutor")
public void run(){
try {
if (checkActive() && checkIsWorkTime()) {
start(false);
job();
end();
}
}catch (InterruptedException interruptedException){
// 강제 종료시 일반 종료 처리
interruptedExceptionEnd();
}catch(Exception e){
error(e);
e.printStackTrace();
}
}
/**
* 즉시 실행 처리
*/
@Async("jobExecutor")
public void doRun(){
try {
if (checkActive()){
start(true);
job();
end();
}
}catch (InterruptedException interruptedException){
// 강제 종료시 일반 종료 처리
interruptedExceptionEnd();
}catch(Exception e){
error(e);
e.printStackTrace();
}
}
/**
* 실제 수행되는 Job 구현부
* @throws Exception
*/
public abstract void job() throws Exception;
}
추상클래스 상속용 클래스(로그 적재 등 작업)
@Setter
@Slf4j
public class AbstractJobSub {
// Job 실행 상태 컨트롤용 변수
public CronSequenceGenerator cronSequenceGenerator;
public boolean jobIsWorking = false;
public Calendar nowDateMinusOneMinute = Calendar.getInstance();
public Calendar nowDate = Calendar.getInstance();
// JobLogs 테이블 데이터 입력용 변수
public Long jobLogsId;
private StringBuilder logText = new StringBuilder();
@Autowired
private JobService jobService;
/**
* 현재 시간 설정 및 -1분 시간 설정
* @param date
*/
public void setNowDate(Calendar date){
date.set(Calendar.MILLISECOND, 0); // ms 제거
nowDate.setTime(date.getTime());
nowDateMinusOneMinute.setTime(date.getTime());
nowDateMinusOneMinute.add(Calendar.SECOND, -1); // -1분 처리
}
/**
* Job 실행 상태 수정
* @param id : Job Id
* @param jobStatusEnum : Job Status
*/
public void changeJobStatus(Long id, JobStatusEnum jobStatusEnum){
jobService.updateStatus(id, jobStatusEnum);
}
/**
* 현재 실행중인 Job 즉시 중지(thread.interrupt 에러 발생)
*/
public void emergencyStop(){
Thread thread = Thread.currentThread();
thread.interrupt();
}
/**
* JobLogs 1로우 생성
* @param id
*/
public void makeJogLogs(Long id){
this.jobLogsId = jobService.makeJobLogs(id);
resetLogText();
}
/**
* Job 성공시 JobLogs 상태값, Text 수정
*/
public void jobIsSuccess(){
jobService.jobIsSuccess(jobLogsId);
updateLogText();
}
/**
* Job 실패시 JobLogs 상태값, Text 수정
*/
public void jobIsFail(){
jobService.jobIsFail(jobLogsId);
updateLogText();
}
/**
* 빈 로그 텍스트 초기화
*/
private void resetLogText(){
this.logText.setLength(0);
}
/**
* 잡 실행 로그를 DB에 적재
* @param text
*/
public void logs(String text, Object... arguments){
LocalDateTime now = LocalDateTime.now();
logText.append(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))
.append("\t")
.append(Thread.currentThread().getName())
.append("\t")
.append(makeLogText(text, arguments))
.append("\n");
log.info(text, arguments);
// 2000자 이상의 텍스트가 적재 대기중인 경우 적재 후 초기화
if(logText.length() > 2000){
updateLogText();
resetLogText();
}
}
/**
* log_text 컬럼 update
*/
private void updateLogText(){
if(logText.length() > 0){
jobService.updateLogText(jobLogsId, logText.toString());
}
}
private String makeLogText(String text, Object ...args){
if(StringUtils.isEmpty(text)){
return "";
}
if(!text.contains("{}")){
return text;
}
StringBuilder sb = new StringBuilder();
String[] split = text.split("\\{}");
for (int i = 0; i < split.length; i++) {
if(StringUtils.isEmpty(split[i].replace("{}", "").trim())){
sb.append(args.length < i + 1 ? "" : args[i]);
}else{
sb.append(split[i].replace("{}", "")).append(args.length < i + 1 ? "" : args[i]);
}
}
return sb.toString();
}
}
JOB 배치 구현 클래스
@Slf4j
@Component
public class TestJob1 extends AbstractJob {
@Override
public void job() throws Exception {
for (int i = 0; i < 10; i++) {
logs("testjob1 is working");
Thread.sleep(500);
if(i == 5){
emergencyStop();
}
}
}
}
등록된 JOB을 실행시키는 클래스
@Slf4j
@Component
@RequiredArgsConstructor
public class MainJob {
private final ApplicationContext applicationContext;
private final JobService jobService;
private final String hostName = SystemUtils.getLocalHostname();
/**
* 1분마다 실행되어 조건에 맞는 잡을 실행한다.
* @throws Exception
*/
@Scheduled(cron = "0 * * * * *")
public void mainJob() throws Exception {
// 실행 전 활성화 상태 대상 중 기간이 벗어난 목록 수정
jobService.findExpiredAndActiveJobAndSetNotActive(hostName);
// 실행 대상 job 목록 조회
List<JobsModel> allJobsHostEquals = jobService.findWorkJobs(hostName);
log.info("{} job size > {}", hostName, allJobsHostEquals.size());
Calendar nowDate = Calendar.getInstance();
// 아래 for문은 병렬 처리가 된다.
for (JobsModel jobsModel : allJobsHostEquals) {
// 실행 중 상태이면 수행하지 않음.
if(jobsModel.getStatus().equals(JobStatusEnum.ING)){
continue;
}
try{
AbstractJob jobBean = (AbstractJob) applicationContext.getBean(jobsModel.getClassName());
jobBean.setId(jobsModel.getId());
jobBean.setNowDate(nowDate);
jobBean.setCode(jobsModel.getCode());
jobBean.setActive(jobsModel.getIsActive());
jobBean.setCronRegExp(jobsModel.getCronRegExp());
jobBean.setStatus(jobsModel.getStatus());
// 잡 실행
jobBean.run();
}catch(NoSuchBeanDefinitionException noSuchBeanDefinitionException){
log.error("존재하지 않는 Bean 이름 입니다. > {}", jobsModel.getClassName());
}
}
}
}
'SpringFramework > Spring' 카테고리의 다른 글
동적 빈 생성 (0) | 2021.07.07 |
---|---|
Cron 정규식 검사 (0) | 2021.07.06 |
String 중괄호 매핑 함수 (log.info 대신 사용) (0) | 2021.07.06 |
Spring Boot 어플리케이션 설정 공식 문서 (0) | 2021.05.04 |
SpringBoot CORS (WebMvcConfigurer 사용) 적용 방법 (0) | 2021.04.13 |