先看官方文档步骤:
需要一个编解码器,看源码:
可见内置了需要数据类型的实现,所以发送其他消息可以发送,但是如果发送自定义对象就需要自己实现编解码逻辑了
一 自定义编解码器
/** * 自定义对象编解码器,两个类型可用于消息转换,即发送对象转换为接受需要的对象 */ public class CustomizeMessageCodec implements MessageCodec<OrderMessage, OrderMessage> { /** * 将消息实体封装到Buffer用于传输 * 实现方式:使用对象流从对象中获取Byte数组然后追加到Buffer */ @Override public void encodeToWire(Buffer buffer, OrderMessage orderMessage) { final ByteArrayOutputStream b = new ByteArrayOutputStream(); try (ObjectOutputStream o = new ObjectOutputStream(b)){ o.writeObject(orderMessage); o.close(); buffer.appendBytes(b.toByteArray()); } catch (IOException e) { e.printStackTrace(); } } //从Buffer中获取消息对象 @Override public OrderMessage decodeFromWire(int pos, Buffer buffer) { final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes()); OrderMessage msg = null; try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return msg; } //消息转换 @Override public OrderMessage transform(OrderMessage orderMessage) { System.out.println("消息转换---");//可对接受消息进行转换,比如转换成另一个对象等 orderMessage.setName("姚振"); return orderMessage; } @Override public String name() { return "myCodec"; } //识别是否是用户自定义编解码器,通常为-1 @Override public byte systemCodecID() { return -1; } public static MessageCodec create() { return new CustomizeMessageCodec(); } }
这里有一个点要注意,nam方法是必须的,且发送的时候一定要指明name
二 发送消息编写
public class ProducerVerticle extends AbstractVerticle { @Override public void start() throws Exception { EventBus eventBus = vertx.eventBus(); //发布消息(群发) eventBus.publish("com.hou", "群发祝福!"); //发送消息(单发),只会发送注册此地址的一个,采用不严格的轮询算法选择 DeliveryOptions options = new DeliveryOptions();//设置消息头等 options.addHeader("some-header", "some-value"); eventBus.send("com.hou", "单发消息",options,ar->{ if(ar.succeeded()) System.out.println("收到消费者确认信息:"+ar.result().body()); }); //发送自定义对象,需要编解码器 eventBus.registerCodec(CustomizeMessageCodec.create());//注册编码器 DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必须指定名字 OrderMessage orderMessage = new OrderMessage(); orderMessage.setName("侯征"); eventBus.send("com.hou", orderMessage, options1); } }
三 接受消息Verticle编写
public class ConsumerVerticle extends AbstractVerticle { @Override public void start() throws Exception { //每个Vertx实例默认是单例 EventBus eb = vertx.eventBus(); //注册处理器,消费com.hou发送的消息 MessageConsumer<Object> consumer = eb.consumer("com.hou");//订阅地址 consumer.handler(message -> {//消息处理器 if(message.body() instanceof OrderMessage){ System.out.println("接受到对象: " + ((OrderMessage) message.body()).getName()); } System.out.println("我是普通消费者: " + message.body()); message.reply("收到了!"); // 回复生产者,send才能接受 }).completionHandler(res -> {//注册完成后通知事件,适用于集群中比较慢的情况下 System.out.println("注册处理器结果"+res.succeeded()); }); //撤销处理器 //consumer.unregister(); } }
四 注册部署Verticcle
vertx.deployVerticle(ConsumerVerticle.class.getName()); TimeUnit.SECONDS.sleep(1); vertx.deployVerticle(ProducerVerticle.class.getName());
五 测试
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
暂无评论...
更新日志
2025年01月08日
2025年01月08日
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]