av一区二区在线观看_亚洲男人的天堂网站_日韩亚洲视频_在线成人免费_欧美日韩精品免费观看视频_久草视

您的位置:首頁技術文章
文章詳情頁

springboot整合rocketmq實現分布式事務

瀏覽:8日期:2023-03-08 14:54:37
目錄1 執行流程2 工程2.1 pom2.2 application.yml2.3 TransactionListenerImpl2.4 SpringTransactionProducer2.5 SpringTxConsumer2.6 ProducerController2.7 RocketApplication3 測試3.1 正常消費測試3.2 回查代碼測試1 執行流程

springboot整合rocketmq實現分布式事務

(1) 發送方向 MQ 服務端發送消息。(2) MQ Server 將消息持久化成功之后,向發送方 ACK 確認消息已經發送成功,此時消息為半消息。(3) 發送方開始執行本地事務邏輯。(4) 發送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到Commit 狀態則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。(5) 在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間后MQ Server 將對該消息發起消息回查。(6) 發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。(7) 發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。

2 工程

springboot整合rocketmq實現分布式事務

2.1 pom

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent> <properties><java.version>1.8</java.version> </properties> <dependencies><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope></dependency><dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.2</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version></dependency><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version></dependency> </dependencies> <build><plugins> <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.0.RELEASE</version> </plugin> <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration> <source>1.8</source> <target>1.8</target></configuration> </plugin></plugins> </build>2.2 application.yml

rocketmq: name-server: 192.168.38.50:9876 producer: group: transcation-group2.3 TransactionListenerImpl

@RocketMQTransactionListener(txProducerGroup = 'transaction-producer-group')@Slf4jpublic class TransactionListenerImpl implements RocketMQLocalTransactionListener { private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>(); /** * 執行業務邏輯 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try { System.out.println('用戶A賬戶減500元.'); System.out.println('用戶B賬戶加500元.'); STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT); return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) { e.printStackTrace();}STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.UNKNOWN; } /** * 回查 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);log.info('回查消息 -> transId ={} , state = {}', transId, STATE_MAP.get(transId));return STATE_MAP.get(transId); }}2.4 SpringTransactionProducer

@Component@Slf4jpublic class SpringTransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 發送消息 * */ public void sendMsg(String topic, String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.sendMessageInTransaction('transaction-producer-group', topic, message, null);log.info('發送成功'); }}2.5 SpringTxConsumer

@Component@RocketMQMessageListener(topic = 'pay_topic',consumerGroup = 'transaction-consumer-group',selectorExpression = '*')@Slf4jpublic class SpringTxConsumer implements RocketMQListener<String> { @Override public void onMessage(String msg) {log.info('接收到消息 -> {}', msg); }}2.6 ProducerController

@RestController@RequestMapping('/producer')public class ProducerController { @Autowired private SpringTransactionProducer springTransactionProducer; @GetMapping('/sendMsg') public String sendMsg() {springTransactionProducer.sendMsg('pay_topic', '用戶A賬戶減500元,用戶B賬戶加500元。');return '發送成功'; }}2.7 RocketApplication

@SpringBootApplicationpublic class RocketApplication { public static void main(String[] args) {SpringApplication.run(RocketApplication.class); }}3 測試3.1 正常消費測試

描述: 正常啟動及可。

springboot整合rocketmq實現分布式事務

springboot整合rocketmq實現分布式事務

3.2 回查代碼測試

描述: 執行本地事務時添加異常,重啟測試,發現消費者沒有收到消息。

springboot整合rocketmq實現分布式事務

springboot整合rocketmq實現分布式事務

springboot整合rocketmq實現分布式事務

到此這篇關于springboot整合rocketmq實現分布式事務的文章就介紹到這了,更多相關springboot 分布式事務內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Spring
相關文章:
主站蜘蛛池模板: 日韩免费视频一区二区 | 亚洲区中文字幕 | 91免费观看国产 | 成人午夜毛片 | 日日夜夜天天 | 日韩欧美一级精品久久 | 精品无码三级在线观看视频 | 一级在线免费观看 | 国产精品欧美精品日韩精品 | 精品美女视频在线观看免费软件 | 欧美激情综合 | 狠狠干五月天 | 国产成人综合一区二区三区 | aa级毛片毛片免费观看久 | 亚洲国产成人精品女人久久久 | 国产在线一区二区 | 国产91av视频 | 国产一区二区三区视频免费观看 | 欧美久久久久久 | 狠狠躁18三区二区一区 | 国产在线中文字幕 | 亚洲第一在线 | 中文天堂在线观看 | 九九久久这里只有精品 | 古典武侠第一页久久777 | 好好的日在线视频 | 国产高清视频在线播放 | 一区二区三区免费 | 午夜免费福利电影 | 精品日韩一区二区 | 精品久久精品 | www.青青草| 久久国产亚洲 | 国产精品久久久久久久久免费 | 美国av片在线观看 | 97精品国产97久久久久久免费 | 一级片免费在线观看 | 国产精品揄拍一区二区 | 精品视频一区二区三区在线观看 | 中文字幕一区二区三区四区五区 | 99视频在线免费观看 |