详解多线程与Spring事务

审校 | 梁策 孙淑娟

我们注重客户提出的每个要求,我们充分考虑每一个细节,我们积极的做好网站设计制作、成都网站设计服务,我们努力开拓更好的视野,通过不懈的努力,创新互联公司赢得了业内的良好声誉,这一切,也不断的激励着我们更好的服务客户。 主要业务:网站建设,网站制作,网站设计,小程序开发,网站开发,技术开发实力,DIV+CSS,PHP及ASP,ASP.Net,SQL数据库的技术开发工程师。

作为开发人员,我们习惯于通过在public方法上添加@Transactional 注解来实现事务管理。大多数情况下,把事务的启动、提交或者回滚全部交给Spring框架操作非常便捷,但如果认为这就是事务管理的全部,那就有失偏颇了。

Spring的确可负责事务管理的所有底层实现细节,而且不管你用的是什么持久层框架,如Hibernate、MyBatis,即便是JDBC也都提供了统一的事务模型,确保数据访问方式的变更不会影响到代码实现层面。事务管理的良好封装,一方面提升了开发效率,但同时也要注意到其降低了开发者了解底层原理的动机和意愿。扪心自问,我们真正了解在多线程环境中事务运行的机制吗?例如在一个事务里面是否可以支持多个线程同时进行数据写入?针对这个问题,网上很多论坛给出了确定的答案,但也不乏反馈@Transaction失效的声音。

究其背后的根源是Spring实现事务通过ThreadLocal把事务和当前线程进行了绑定。ThreadLocal作为本地线程变量载体,保存了当前线程的变量,并确保所有变量是线程安全的。这些封闭隔离的变量中就包含了数据库连接,Session管理的对象以及当前事务运行的其他必要信息,而开启的新线程是获取不到这些变量和对象的。不了解这些,事务内部冒然启用多线程,受限于业务场景,大多数情况下是不会有问题的,但是作为严谨的开发万不能忽视其潜在的风险。问题主要集中在两个方面:一方面导致事务失效,看似是提高了处理效率,但是一旦有异常相关数据将不会回滚,就会破坏业务的完整性。另一方面还会增加死锁的概率,无计划的并发处理,增加资源争抢的概率,其后果就是死锁,产生的异常进一步破坏业务的完整性,得不偿失。

难道就没有提升事务内处理性能的方法了?非也!虽然不能通过事务内,发起多线程处理。我们可以通过合理分块后,再启用多线程处理,通过类似分布式事务方式达到异曲同工的效果。

假设我们要并行处理一个大的对象列表,然后将它们存储到数据库中。我们先将这些对象分组,将每个块传递给不同线程分别去调用加了事务的处理方法,最后将每个线程中处理的结果收集汇总。这样通过事务的传播机制既确保了业务的完整性,也通过并行处理提升了处理效率。下面通过具体的示例,逐步演示如何实现。

第一步:定义一个负责对象处理逻辑的服务接口。

/**

* Service interface defining the contract for object identifiers processing

*/

public interface ProcessingService {

/**

* Processes the list of objects identified by id and returns a an identifiers

* list of the successfully processed objects

*

* @param objectIds List of object identifiers

*

* @return identifiers list of the successfully processed objects

*/

List processObjects(List objectIds);

}

第二步:针对上述对象处理的接口的一个简单实现。

/**
* Service implementation for database related ids processing
*/
@Service("ProcessingDBService")
public class ProcessingDBService implements ProcessingService {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Transactional
@Override
public List processObjects(List objectIds) {
// Process and save to DB

logger.info("Running in thread " + Thread.currentThread().getName() + " with object ids " + objectIds.toString());

return objectIds.stream().collect(Collectors.toList());
}
}

第三步:也是最核心的一步,通过分块然后进行并行处理。当然为了保持代码的整洁性和隔离性,我们将在后续具体实现中使用Decorator修饰模式。

/**
* Service implementation for parallel chunk processing
*/
@Service
@Primary
@ConditionalOnProperty(prefix = "service", name = "parallel", havingValue = "true")
public class ProcessingServiceParallelRunDecorator implements ProcessingService {

private ProcessingService delegate;

public ProcessingServiceParallelRunDecorator(ProcessingService delegate) {
this.delegate = delegate;
}

/**
* In a real scenario it should be an external configuration
*/
private int batchSize = 10;

@Override
public List processObjects(List objectIds) {
List> chuncks = getBatches(objectIds, batchSize);
List> processedObjectIds = chuncks.parallelStream().map(delegate::processObjects)
.collect(Collectors.toList());

return flatList(processedObjectIds);
}
private List> getBatches(List collection, int batchSize) {

return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)

.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))

.collect(Collectors.toList());

}

private List flatList(List> listOfLists) {

return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);

}

最后,我们通过一个简单的单元测试验证一下执行的结果。

private List> getBatches(List collection, int batchSize) {

return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)

.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))

.collect(Collectors.toList());

}

private List flatList(List> listOfLists) {

return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);

}
}

通过输出日志,我们看到如下的执行结果:

ProcessingDBService: Running in thread ForkJoinPool.commonPool-worker-3 with object ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ProcessingDBService: Running in thread main with object ids [11, 12]

执行结果也是符合预期目标的。List对象分组后,除了主线程又通过ForkJoin启动另外线程进行并行处理。ProcessingServiceParallelRunDecorator 的parallelStream().map的并行处理提升了处理性能,而ProcessingDBService中processObjects这个public方法上@Transactional的注解保证了业务完整性,问题得以完美解决。

译者介绍

胥磊,社区编辑,某头部电商技术副总监,关注Java后端开发,技术管理,架构优化,分布式开发等领域。

原文标题:​​Multi-Threading and Spring Transactions​​,作者:Daniela Kolarova

分享名称:详解多线程与Spring事务
文章源于:http://www.shufengxianlan.com/qtweb/news26/55426.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联