av一区二区在线观看_亚洲男人的天堂网站_日韩亚洲视频_在线成人免费_欧美日韩精品免费观看视频_久草视

您的位置:首頁技術文章
文章詳情頁

springboot整合rocketmq實現(xiàn)分布式事務

瀏覽:9日期:2023-03-08 14:54:37
目錄1 執(zhí)行流程2 工程2.1 pom2.2 application.yml2.3 TransactionListenerImpl2.4 SpringTransactionProducer2.5 SpringTxConsumer2.6 ProducerController2.7 RocketApplication3 測試3.1 正常消費測試3.2 回查代碼測試1 執(zhí)行流程

springboot整合rocketmq實現(xiàn)分布式事務

(1) 發(fā)送方向 MQ 服務端發(fā)送消息。(2) MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認消息已經(jīng)發(fā)送成功,此時消息為半消息。(3) 發(fā)送方開始執(zhí)行本地事務邏輯。(4) 發(fā)送方根據(jù)本地事務執(zhí)行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到Commit 狀態(tài)則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會接受該消息。(5) 在斷網(wǎng)或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經(jīng)過固定時間后MQ Server 將對該消息發(fā)起消息回查。(6) 發(fā)送方收到消息回查后,需要檢查對應消息的本地事務執(zhí)行的最終結果。(7) 發(fā)送方根據(jù)檢查得到的本地事務的最終狀態(tài)再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。

2 工程

springboot整合rocketmq實現(xiàn)分布式事務

2.1 pom

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent> <properties><java.version>1.8</java.version> </properties> <dependencies><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope></dependency><dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.2</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version></dependency><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version></dependency> </dependencies> <build><plugins> <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.0.RELEASE</version> </plugin> <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration> <source>1.8</source> <target>1.8</target></configuration> </plugin></plugins> </build>2.2 application.yml

rocketmq: name-server: 192.168.38.50:9876 producer: group: transcation-group2.3 TransactionListenerImpl

@RocketMQTransactionListener(txProducerGroup = 'transaction-producer-group')@Slf4jpublic class TransactionListenerImpl implements RocketMQLocalTransactionListener { private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>(); /** * 執(zhí)行業(yè)務邏輯 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try { System.out.println('用戶A賬戶減500元.'); System.out.println('用戶B賬戶加500元.'); STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT); return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) { e.printStackTrace();}STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.UNKNOWN; } /** * 回查 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);log.info('回查消息 -> transId ={} , state = {}', transId, STATE_MAP.get(transId));return STATE_MAP.get(transId); }}2.4 SpringTransactionProducer

@Component@Slf4jpublic class SpringTransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 發(fā)送消息 * */ public void sendMsg(String topic, String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.sendMessageInTransaction('transaction-producer-group', topic, message, null);log.info('發(fā)送成功'); }}2.5 SpringTxConsumer

@Component@RocketMQMessageListener(topic = 'pay_topic',consumerGroup = 'transaction-consumer-group',selectorExpression = '*')@Slf4jpublic class SpringTxConsumer implements RocketMQListener<String> { @Override public void onMessage(String msg) {log.info('接收到消息 -> {}', msg); }}2.6 ProducerController

@RestController@RequestMapping('/producer')public class ProducerController { @Autowired private SpringTransactionProducer springTransactionProducer; @GetMapping('/sendMsg') public String sendMsg() {springTransactionProducer.sendMsg('pay_topic', '用戶A賬戶減500元,用戶B賬戶加500元。');return '發(fā)送成功'; }}2.7 RocketApplication

@SpringBootApplicationpublic class RocketApplication { public static void main(String[] args) {SpringApplication.run(RocketApplication.class); }}3 測試3.1 正常消費測試

描述: 正常啟動及可。

springboot整合rocketmq實現(xiàn)分布式事務

springboot整合rocketmq實現(xiàn)分布式事務

3.2 回查代碼測試

描述: 執(zhí)行本地事務時添加異常,重啟測試,發(fā)現(xiàn)消費者沒有收到消息。

springboot整合rocketmq實現(xiàn)分布式事務

springboot整合rocketmq實現(xiàn)分布式事務

springboot整合rocketmq實現(xiàn)分布式事務

到此這篇關于springboot整合rocketmq實現(xiàn)分布式事務的文章就介紹到這了,更多相關springboot 分布式事務內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持好吧啦網(wǎng)!

標簽: Spring
相關文章:
主站蜘蛛池模板: www.中文字幕av | 国产精品永久免费视频 | 欧美性大战xxxxx久久久 | 久久久久亚洲精品国产 | 午夜一级做a爰片久久毛片 精品综合 | 亚洲精品视频网站在线观看 | 欧美日韩亚洲国产综合 | 亚洲一区二区三区在线视频 | 精品久久久久久 | 久久久久久国产精品久久 | 亚洲一区二区成人 | 91精品在线观看入口 | 色综合色综合网色综合 | 久久国产区 | 久久在线看 | wwwww在线观看| 成人网在线 | 日本五月婷婷 | 久久久久亚洲 | 中文字幕在线看人 | 中文在线a在线 | 99精品久久 | 97久久久久久 | 欧美日韩高清 | 99热在线免费| 亚洲精品一区二区三区中文字幕 | 欧美一级二级视频 | 一级毛片成人免费看a | 亚洲欧美日韩系列 | 国产一区二区在线免费播放 | 亚洲午夜精品在线观看 | 成人免费一级 | 午夜天堂精品久久久久 | 久草视频在 | 国产精品二区三区在线观看 | 日本精品久久久一区二区三区 | 日本高清精品 | 国产第一页在线观看 | 欧美视频在线播放 | 久久久国产一区二区三区 | 天天干视频 |