Spring Batch批处理框架操作实例分析

发布时间:2022-07-21 09:52:42 作者:iii
来源:亿速云 阅读:157

这篇文章主要介绍“Spring Batch批处理框架操作实例分析”,在日常操作中,相信很多人在Spring Batch批处理框架操作实例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spring Batch批处理框架操作实例分析”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

简介

Spring Batch 是 Spring 提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。

这些业务运营包括:

Spring Batch 是一个轻量级,全面的批处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序。

Spring Batch 构建了人们期望的 Spring Framework 特性(生产力,基于 POJO 的开发方法和一般易用性),同时使开发人员可以在必要时轻松访问和利用更高级的企业服务。Spring Batch 不是一个 schuedling 的框架。

Spring Batch 提供了可重用的功能,这些功能对于处理大量的数据至关重要,包括记录/跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理。

它还提供更高级的技术服务和功能,通过优化和分区技术实现极高容量和高性能的批处理作业。

Spring Batch 可用于两种简单的用例(例如将文件读入数据库或运行存储过程)以及复杂的大量用例(例如在数据库之间移动大量数据,转换它等等) 上)。大批量批处理作业可以高度可扩展的方式利用该框架来处理大量信息。

Spring Batch 架构

一个典型的批处理应用程序大致如下:

其对应的示意图如下:

Spring Batch批处理框架操作实例分析

Spring Batch 的一个总体的架构如下:

Spring Batch批处理框架操作实例分析

在 Spring Batch 中一个 job 可以定义很多的步骤 step,在每一个 step 里面可以定义其专属的 ItemReader 用于读取数据。

ItemProcesseor 用于处理数据,ItemWriter 用于写数据,而每一个定义的 job 则都在 JobRepository 里面,我们可以通过 JobLauncher 来启动某一个 job。

Spring Batch 核心概念

下面是一些概念是 Spring Batch 框架中的核心概念。

什么是 Job

Job 和 Step 是 Spring Batch 执行批处理任务最为核心的两个概念。

其中 Job 是一个封装整个批处理过程的一个概念。Job 在 Spring Batch 的体系当中只是一个最顶层的一个抽象概念,体现在代码当中则它只是一个最上层的接口。

其代码如下:

/**
 * Batch domain object representing a job. Job is an explicit abstraction
 * representing the configuration of a job specified by a developer. It should
 * be noted that restart policy is applied to the job as a whole and not to a
 * step.
 */
public interface Job {
 String getName();
 boolean isRestartable();
 void execute(JobExecution execution);
 JobParametersIncrementer getJobParametersIncrementer();
 JobParametersValidator getJobParametersValidator();
}

在 Job 这个接口当中定义了五个方法,它的实现类主要有两种类型的 job,一个是 simplejob,另一个是 flowjob。

在 Spring Batch 当中,job 是最顶层的抽象,除 job 之外我们还有 JobInstance 以及 JobExecution 这两个更加底层的抽象。

一个 job 是我们运行的基本单位,它内部由 step 组成。job 本质上可以看成 step 的一个容器。

一个 job 可以按照指定的逻辑顺序组合 step,并提供了我们给所有 step 设置相同属性的方法,例如一些事件监听,跳过策略。

Spring Batch 以 SimpleJob 类的形式提供了 Job 接口的默认简单实现,它在 Job 之上创建了一些标准功能。

一个使用 java config 的例子代码如下:

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

这个配置的意思是:首先给这个 job 起了一个名字叫 footballJob,接着指定了这个 job 的三个 step,他们分别由方法 playerLoad,gameLoad,playerSummarization 实现。

什么是 JobInstance

我们在上文已经提到了 JobInstance,他是 Job 的更加底层的一个抽象,他的定义如下:

public interface JobInstance {
 /**
  * Get unique id for this JobInstance.
  * @return instance id
  */
 public long getInstanceId();
 /**
  * Get job name.
  * @return value of 'id' attribute from <job>
  */
 public String getJobName(); 
}

他的方法很简单,一个是返回 Job 的 id,另一个是返回 Job 的名字。

JobInstance 指的是 job 运行当中,作业执行过程当中的概念。Instance 本就是实例的意思。

比如说现在有一个批处理的 job,它的功能是在一天结束时执行行一次。我们假定这个批处理 job 的名字为 'EndOfDay'。

在这个情况下,那么每天就会有一个逻辑意义上的 JobInstance,而我们必须记录 job 的每次运行的情况。

什么是 JobParameters

在上文当中我们提到了,同一个 job 每天运行一次的话,那么每天都有一个 jobIntsance,但他们的 job 定义都是一样的,那么我们怎么来区别一个 job 的不同 jobinstance 了。

不妨先做个猜想,虽然 jobinstance 的 job 定义一样,但是他们有的东西就不一样,例如运行时间。

Spring Batch 中提供的用来标识一个 jobinstance 的东西是:JobParameters。

JobParameters 对象包含一组用于启动批处理作业的参数,它可以在运行期间用于识别或甚至用作参考数据。我们假设的运行时间,就可以作为一个 JobParameters。

例如,我们前面的 'EndOfDay' 的 job 现在已经有了两个实例,一个产生于 1 月 1 日,另一个产生于 1 月 2 日,那么我们就可以定义两个 JobParameter 对象:一个的参数是 01-01,, 另一个的参数是 01-02。

因此,识别一个 JobInstance 的方法可以定义为:

Spring Batch批处理框架操作实例分析

因此,我么可以通过 Jobparameter 来操作正确的 JobInstance。

什么是 JobExecution

JobExecution 指的是单次尝试运行一个我们定义好的 Job 的代码层面的概念。job 的一次执行可能以失败也可能成功。只有当执行成功完成时,给定的与执行相对应的 JobInstance 才也被视为完成。

还是以前面描述的 EndOfDay 的 job 作为示例,假设第一次运行 01-01-2019 的 JobInstance 结果是失败。

那么此时如果使用与第一次运行相同的 Jobparameter 参数(即 01-01-2019)作业参数再次运行,那么就会创建一个对应于之前 jobInstance 的一个新的 JobExecution 实例,JobInstance 仍然只有一个。

JobExecution 的接口定义如下:

public interface JobExecution {
 /**
  * Get unique id for this JobExecution.
  * @return execution id
  */
 public long getExecutionId();
 /**
  * Get job name.
  * @return value of 'id' attribute from <job>
  */
 public String getJobName(); 
 /**
  * Get batch status of this execution.
  * @return batch status value.
  */
 public BatchStatus getBatchStatus();
 /**
  * Get time execution entered STARTED status. 
  * @return date (time)
  */
 public Date getStartTime();
 /**
  * Get time execution entered end status: COMPLETED, STOPPED, FAILED 
  * @return date (time)
  */
 public Date getEndTime();
 /**
  * Get execution exit status.
  * @return exit status.
  */
 public String getExitStatus();
 /**
  * Get time execution was created.
  * @return date (time)
  */
 public Date getCreateTime();
 /**
  * Get time execution was last updated updated.
  * @return date (time)
  */
 public Date getLastUpdatedTime();
 /**
  * Get job parameters for this execution.
  * @return job parameters  
  */
 public Properties getJobParameters();

}

每一个方法的注释已经解释的很清楚,这里不再多做解释。只提一下 BatchStatus,JobExecution 当中提供了一个方法 getBatchStatus 用于获取一个 job 某一次特地执行的一个状态。

BatchStatus 是一个代表 job 状态的枚举类,其定义如下:

public enum BatchStatus {STARTING, STARTED, STOPPING, 
   STOPPED, FAILED, COMPLETED, ABANDONED }

这些属性对于一个 job 的执行来说是非常关键的信息,并且 Spring Batch 会将他们持久到数据库当中。

在使用 Spring Batch 的过程当中 Spring Batch 会自动创建一些表用于存储一些 job 相关的信息,用于存储 JobExecution 的表为 batch_job_execution。

下面是一个从数据库当中截图的实例:

Spring Batch批处理框架操作实例分析

什么是 Step

每一个 Step 对象都封装了批处理作业的一个独立的阶段。事实上,每一个 Job 本质上都是由一个或多个步骤组成。每一个 step 包含定义和控制实际批处理所需的所有信息。

任何特定的内容都由编写 Job 的开发人员自行决定。一个 step 可以非常简单也可以非常复杂。

例如,一个 step 的功能是将文件中的数据加载到数据库中,那么基于现在 Spring Batch 的支持则几乎不需要写代码。更复杂的 step 可能具有复杂的业务逻辑,这些逻辑作为处理的一部分。

与 Job 一样,Step 具有与 JobExecution 类似的 StepExecution,如下图所示:

Spring Batch批处理框架操作实例分析

什么是 StepExecution

StepExecution 表示一次执行 Step,每次运行一个 Step 时都会创建一个新的 StepExecution,类似于 JobExecution。

但是,某个步骤可能由于其之前的步骤失败而无法执行。且仅当 Step 实际启动时才会创建 StepExecution。

一次 step 执行的实例由 StepExecution 类的对象表示。每个 StepExecution 都包含对其相应步骤的引用以及 JobExecution 和事务相关的数据,例如提交和回滚计数以及开始和结束时间。

此外,每个步骤执行都包含一个 ExecutionContext,其中包含开发人员需要在批处理运行中保留的任何数据,例如重新启动所需的统计信息或状态信息。

下面是一个从数据库当中截图的实例:

Spring Batch批处理框架操作实例分析

什么是 ExecutionContext

ExecutionContext 即每一个 StepExecution 的执行环境。它包含一系列的键值对。

我们可以用如下代码获取 ExecutionContext:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();

什么是 JobRepository

JobRepository 是一个用于将上述 job,step 等概念进行持久化的一个类。它同时给 Job 和 Step 以及下文会提到的 JobLauncher 实现提供 CRUD 操作。

首次启动 Job 时,将从 repository 中获取 JobExecution,并且在执行批处理的过程中,StepExecution 和 JobExecution 将被存储到 repository 当中。

@EnableBatchProcessing 注解可以为 JobRepository 提供自动配置。

什么是 JobLauncher

JobLauncher 这个接口的功能非常简单,它是用于启动指定了 JobParameters 的 Job,为什么这里要强调指定了 JobParameter,原因其实我们在前面已经提到了,jobparameter 和 job 一起才能组成一次 job 的执行。

下面是代码实例:

public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

上面 run 方法实现的功能是根据传入的 job 以及 jobparamaters 从 JobRepository 获取一个 JobExecution 并执行 Job。

什么是 Item Reader

ItemReader 是一个读数据的抽象,它的功能是为每一个 Step 提供数据输入。当 ItemReader 以及读完所有数据时,它会返回 null 来告诉后续操作数据已经读完。

Spring Batch 为 ItemReader 提供了非常多的有用的实现类,比如 JdbcPagingItemReader,JdbcCursorItemReader 等等。

ItemReader 支持的读入的数据源也是非常丰富的,包括各种类型的数据库,文件,数据流,等等。几乎涵盖了我们的所有场景。

下面是一个 JdbcPagingItemReader 的例子代码:

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");

        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
               .name("creditReader")
               .dataSource(dataSource)
               .queryProvider(queryProvider)
               .parameterValues(parameterValues)
               .rowMapper(customerCreditMapper())
               .pageSize(1000)
               .build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");

        return provider;
}

JdbcPagingItemReader 必须指定一个 PagingQueryProvider,负责提供 SQL 查询语句来按分页返回数据。

下面是一个 JdbcCursorItemReader 的例子代码:

private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
            String tenant) {

        JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(dataSource);
        itemReader.setSql("sql here");
        itemReader.setRowMapper(new RowMapper());
        return itemReader;
    }

什么是 Item Writer

既然 ItemReader 是读数据的一个抽象,那么 ItemWriter 自然就是一个写数据的抽象,它是为每一个 step 提供数据写出的功能。

写的单位是可以配置的,我们可以一次写一条数据,也可以一次写一个 chunk 的数据,关于 chunk 下文会有专门的介绍。ItemWriter 对于读入的数据是不能做任何操作的。

Spring Batch 为 ItemWriter 也提供了非常多的有用的实现类,当然我们也可以去实现自己的 writer 功能。

什么是 Item Processor

ItemProcessor 对项目的业务逻辑处理的一个抽象,当 ItemReader 读取到一条记录之后,ItemWriter 还未写入这条记录之前,I 我们可以借助 temProcessor 提供一个处理业务逻辑的功能,并对数据进行相应操作。

如果我们在 ItemProcessor 发现一条数据不应该被写入,可以通过返回 null 来表示。

ItemProcessor 和 ItemReader 以及 ItemWriter 可以非常好的结合在一起工作,他们之间的数据传输也非常方便。我们直接使用即可。

chunk 处理流程

Spring Batch 提供了让我们按照 chunk 处理数据的能力,一个 chunk 的示意图如下:

Spring Batch批处理框架操作实例分析

它的意思就和图示的一样,由于我们一次batch的任务可能会有很多的数据读写操作,因此一条一条的处理并向数据库提交的话效率不会很高。

因此 Spring Batch 提供了 chunk 这个概念,我们可以设定一个 chunk size,spring batch 将一条一条处理数据,但不提交到数据库,只有当处理的数据数量达到 chunk size 设定的值得时候,才一起去 commit。

java 的实例定义代码如下:

Spring Batch批处理框架操作实例分析

在上面这个 step 里面,chunk size 被设为了 10,当 ItemReader 读的数据数量达到 10 的时候,这一批次的数据就一起被传到 itemWriter,同时 transaction 被提交。

skip 策略和失败处理

一个 batch 的 job 的 step,可能会处理非常大数量的数据,难免会遇到出错的情况,出错的情况虽出现的概率较小,但是我们不得不考虑这些情况,因为我们做数据迁移最重要的是要保证数据的最终一致性。

Spring Batch 当然也考虑到了这种情况,并且为我们提供了相关的技术支持,请看如下 bean 的配置:

Spring Batch批处理框架操作实例分析

我们需要留意这三个方法,分别是 skipLimit(),skip(),noSkip()。

skipLimit 方法的意思是我们可以设定一个我们允许的这个 step 可以跳过的异常数量,假如我们设定为 10,则当这个 step 运行时,只要出现的异常数目不超过 10,整个 step 都不会 fail。

注意,若不设定 skipLimit,则其默认值是 0。

skip 方法我们可以指定我们可以跳过的异常,因为有些异常的出现,我们是可以忽略的。

noSkip 方法的意思则是指出现这个异常我们不想跳过,也就是从 skip 的所以 exception 当中排除这个 exception。

从上面的例子来说,也就是跳过所有除 FileNotFoundException 的 exception。

那么对于这个 step 来说,FileNotFoundException 就是一个 fatal 的 exception,抛出这个 exception 的时候 step 就会直接 fail。

批处理操作指南

本部分是一些使用 Spring Batch 时的值得注意的点。

批处理原则

在构建批处理解决方案时,应考虑以下关键原则和注意事项:

如何默认不启动 job

在使用 java config 使用 Spring Batch 的 job 时,如果不做任何配置,项目在启动时就会默认去跑我们定义好的批处理 job。那么如何让项目在启动时不自动去跑 job 呢?

Spring Batch 的 job 会在项目启动时自动 run,如果我们不想让他在启动时 run 的话,可以在 application.properties 中添加如下属性:

spring.batch.job.enabled=false

在读数据时内存不够

在使用 Spring Batch 做数据迁移时,发现在 job 启动后,执行到一定时间点时就卡在一个地方不动了,且 log 也不再打印,等待一段时间之后,得到如下错误:

Spring Batch批处理框架操作实例分析

红字的信息为:Resource exhaustion event:the JVM was unable to allocate memory from the heap.

翻译过来的意思就是项目发出了一个资源耗尽的事件,告诉我们 java 虚拟机无法再为堆分配内存。

造成这个错误的原因是:这个项目里的 batch job 的 reader 是一次性拿回了数据库里的所有数据,并没有进行分页,当这个数据量太大时,就会导致内存不够用。

解决的办法有两个:

到此,关于“Spring Batch批处理框架操作实例分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. Spring Batch介绍
  2. 大数据批处理框架Spring Batch 的全面解析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spring batch

上一篇:Node异步和事件循环实例分析

下一篇:Flutter iOS开发OC混编Swift动态库和静态库问题怎么解决

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》