SpringFramework/Spring

추상클래스를 사용한 통합 배치 관리 프로그램

lovineff 2021. 7. 6. 16:21

배치 병렬 처리를 위한 쓰레드 빈 생성 클래스

@Configuration
@EnableAsync
public class ThreadConfig {
    @Bean(name="jobExecutor")
    public Executor jobExecutor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(30);
        taskExecutor.setThreadNamePrefix("jobThead-");
        taskExecutor.initialize();
        return taskExecutor;
    }
}

 

메인 추상 클래스

@Setter
@Slf4j
public abstract class AbstractJob extends AbstractJobSub{
    private Long id;
    private String code;                // 코드
    private boolean isActive = false;   // 활성/비활성화
    private String cronRegExp;          // 크론 표현식
    private JobStatusEnum status;       // 잡 상태값
    private Long startWorkTime;         // 잡 시작 시간
    private Long endWorkTime;           // 잡 종료 시간

    /**
     * Job 실행전 수행
     */
    private void start(boolean isActive){
        jobIsWorking = true;
        startWorkTime = System.currentTimeMillis();

        changeJobStatus(id, JobStatusEnum.ING);
        makeJogLogs(id);

        if(isActive){
            logs("{} 수동 시작되었습니다.", code);
        }else{
            logs("{} 시작되었습니다.", code);
        }
    }

    /**
     * Job 종료 후 수행
     */
    private void end(){
        jobIsWorking = false;
        endWorkTime = System.currentTimeMillis();

        changeJobStatus(id, JobStatusEnum.WAIT);
        logs("{} 종료되었습니다.\t{}ms 소요되었습니다.", code, (endWorkTime - startWorkTime));
        jobIsSuccess();
    }

    /**
     * Job 종료 후 수행
     */
    private void interruptedExceptionEnd(){
        jobIsWorking = false;
        endWorkTime = System.currentTimeMillis();

        changeJobStatus(id, JobStatusEnum.WAIT);
        logs("{} 강제 종료되었습니다.\t{}ms 소요되었습니다.", code, (endWorkTime - startWorkTime));
        jobIsSuccess();
    }

    /**
     * Job 수행중 에러 발생시 처리
     * @param e: Exception
     */
    private void error(Exception e){
        jobIsWorking = false;
        changeJobStatus(id, JobStatusEnum.FAIL);
        logs("{} 오류가 발생하였습니다.", code);
        logs("{} / " + e.toString(), code);
        jobIsFail();
    }

    /**
     * Job 활성화 상태 여부 검사
     * @return : 잡 활성화 상태 여부
     */
    private boolean checkActive(){
        if(jobIsWorking || status.equals(JobStatusEnum.ING)){
            log.info("{} 실행중 상태로 중복실행을 방지합니다.", code);
            return false;
        }

        // JOB 상태 검사
        if(!isActive){
            log.info("{} 실행 가능한 상태가 아닙니다", code);
            return false;
        }

        // 크론식 검사
        boolean validExpression = CronSequenceGenerator.isValidExpression(cronRegExp);
        if(!validExpression){
            log.info("{} cron 표현식이 잘못되었습니다. > {}", code, cronRegExp);
            return false;
        }

        return true;
    }

    /**
     * Job 실행 시간 검사, true인 경우 잡 수행
     * @return : 잡 수행 가능 여부
     */
    private boolean checkIsWorkTime(){
        // 1초전 날짜로 다음 실행시간을 구하고, 현재 시간과 비교하여 실행여부를 판단한다.
        cronSequenceGenerator = new CronSequenceGenerator(cronRegExp);
        Date nextJobDate = cronSequenceGenerator.next(nowDateMinusOneMinute.getTime());

        Calendar nextJobCalendar = Calendar.getInstance();
        nextJobCalendar.setTime(nextJobDate);
        nextJobCalendar.set(Calendar.MILLISECOND, 0);

        // 같은 시간이라면 실행한다.
        if(nowDate.compareTo(nextJobCalendar) == 0){
            return true;
//        }else{
//            log.info("{} 실행 가능한 시간이 아닙니다. {} : {}", code, simpleDateFormat.format(nowDate.getTime()), simpleDateFormat.format(nextJobCalendar.getTime()));
        }

        return false;
    }

    /**
     * 초단위 설정은 0으로 하드코딩
     * @param exp
     */
    public void setCronRegExp(String exp){
        cronRegExp = "0 " + exp;
    }

    /**
     * 실제 Job 수행 전체 로직
     * 활성화 여부 검사, 잡 실행시간 검사 후 통과된 경우 Job 실행
     */
    @Async("jobExecutor")
    public void run(){
        try {
            if (checkActive() && checkIsWorkTime()) {
                start(false);
                job();
                end();
            }
        }catch (InterruptedException interruptedException){
            // 강제 종료시 일반 종료 처리
            interruptedExceptionEnd();
        }catch(Exception e){
            error(e);
            e.printStackTrace();
        }
    }

    /**
     * 즉시 실행 처리
     */
    @Async("jobExecutor")
    public void doRun(){
        try {
            if (checkActive()){
                start(true);
                job();
                end();
            }
        }catch (InterruptedException interruptedException){
            // 강제 종료시 일반 종료 처리
            interruptedExceptionEnd();
        }catch(Exception e){
            error(e);
            e.printStackTrace();
        }
    }

    /**
     * 실제 수행되는 Job 구현부
     * @throws Exception
     */
    public abstract void job() throws Exception;
}

 

추상클래스 상속용 클래스(로그 적재 등 작업)

@Setter
@Slf4j
public class AbstractJobSub {
    // Job 실행 상태 컨트롤용 변수
    public CronSequenceGenerator cronSequenceGenerator;
    public boolean jobIsWorking = false;
    public Calendar nowDateMinusOneMinute = Calendar.getInstance();
    public Calendar nowDate = Calendar.getInstance();

    // JobLogs 테이블 데이터 입력용 변수
    public Long jobLogsId;
    private StringBuilder logText = new StringBuilder();

    @Autowired
    private JobService jobService;

    /**
     * 현재 시간 설정 및 -1분 시간 설정
     * @param date
     */
    public void setNowDate(Calendar date){
        date.set(Calendar.MILLISECOND, 0);  // ms 제거

        nowDate.setTime(date.getTime());
        nowDateMinusOneMinute.setTime(date.getTime());
        nowDateMinusOneMinute.add(Calendar.SECOND, -1); // -1분 처리
    }

    /**
     * Job 실행 상태 수정
     * @param id : Job Id
     * @param jobStatusEnum : Job Status
     */
    public void changeJobStatus(Long id, JobStatusEnum jobStatusEnum){
        jobService.updateStatus(id, jobStatusEnum);
    }

    /**
     * 현재 실행중인 Job 즉시 중지(thread.interrupt 에러 발생)
     */
    public void emergencyStop(){
        Thread thread = Thread.currentThread();
        thread.interrupt();
    }

    /**
     * JobLogs 1로우 생성
     * @param id
     */
    public void makeJogLogs(Long id){
        this.jobLogsId = jobService.makeJobLogs(id);
        resetLogText();
    }

    /**
     * Job 성공시 JobLogs 상태값, Text 수정
     */
    public void jobIsSuccess(){
        jobService.jobIsSuccess(jobLogsId);
        updateLogText();
    }

    /**
     * Job 실패시 JobLogs 상태값, Text 수정
     */
    public void jobIsFail(){
        jobService.jobIsFail(jobLogsId);
        updateLogText();
    }

    /**
     * 빈 로그 텍스트 초기화
     */
    private void resetLogText(){
        this.logText.setLength(0);
    }

    /**
     * 잡 실행 로그를 DB에 적재
     * @param text
     */
    public void logs(String text, Object... arguments){
        LocalDateTime now = LocalDateTime.now();

        logText.append(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")))
                .append("\t")
                .append(Thread.currentThread().getName())
                .append("\t")
                .append(makeLogText(text, arguments))
                .append("\n");

        log.info(text, arguments);

        // 2000자 이상의 텍스트가 적재 대기중인 경우 적재 후 초기화
        if(logText.length() > 2000){
            updateLogText();
            resetLogText();
        }
    }

    /**
     * log_text 컬럼 update
     */
    private void updateLogText(){
        if(logText.length() > 0){
            jobService.updateLogText(jobLogsId, logText.toString());
        }
    }

    private String makeLogText(String text, Object ...args){
        if(StringUtils.isEmpty(text)){
            return "";
        }

        if(!text.contains("{}")){
            return text;
        }

        StringBuilder sb = new StringBuilder();
        String[] split = text.split("\\{}");

        for (int i = 0; i < split.length; i++) {
            if(StringUtils.isEmpty(split[i].replace("{}", "").trim())){
                sb.append(args.length < i + 1 ? "" : args[i]);
            }else{
                sb.append(split[i].replace("{}", "")).append(args.length < i + 1 ? "" : args[i]);
            }
        }

        return sb.toString();
    }
}​

 

JOB 배치 구현 클래스

@Slf4j
@Component
public class TestJob1 extends AbstractJob {
    @Override
    public void job() throws Exception {
        for (int i = 0; i < 10; i++) {
            logs("testjob1 is working");
            Thread.sleep(500);

            if(i == 5){
                emergencyStop();
            }
        }
    }
}

 

등록된 JOB을 실행시키는 클래스

@Slf4j
@Component
@RequiredArgsConstructor
public class MainJob {
    private final ApplicationContext applicationContext;
    private final JobService jobService;
    private final String hostName = SystemUtils.getLocalHostname();

    /**
     * 1분마다 실행되어 조건에 맞는 잡을 실행한다.
     * @throws Exception
     */
    @Scheduled(cron = "0 * * * * *")
    public void mainJob() throws Exception {
        // 실행 전 활성화 상태 대상 중 기간이 벗어난 목록 수정
        jobService.findExpiredAndActiveJobAndSetNotActive(hostName);

        // 실행 대상 job 목록 조회
        List<JobsModel> allJobsHostEquals = jobService.findWorkJobs(hostName);

        log.info("{} job size > {}", hostName, allJobsHostEquals.size());

        Calendar nowDate = Calendar.getInstance();

		// 아래 for문은 병렬 처리가 된다.
        for (JobsModel jobsModel : allJobsHostEquals) {
            // 실행 중 상태이면 수행하지 않음.
            if(jobsModel.getStatus().equals(JobStatusEnum.ING)){
                continue;
            }

            try{
                AbstractJob jobBean = (AbstractJob) applicationContext.getBean(jobsModel.getClassName());
                jobBean.setId(jobsModel.getId());
                jobBean.setNowDate(nowDate);
                jobBean.setCode(jobsModel.getCode());
                jobBean.setActive(jobsModel.getIsActive());
                jobBean.setCronRegExp(jobsModel.getCronRegExp());
                jobBean.setStatus(jobsModel.getStatus());

				// 잡 실행
                jobBean.run();
            }catch(NoSuchBeanDefinitionException noSuchBeanDefinitionException){
                log.error("존재하지 않는 Bean 이름 입니다. > {}", jobsModel.getClassName());
            }
        }
    }
}