springCloud学习5(Spring-Cloud-Stream事件驱动) (2)

  首先在 organization 服务中引入 spring cloud stream 和 kafka 的依赖。

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>

  然后在 events 类中编写SimpleSouce类,用于组织数据修改,产生一条消息到队列中。代码如下:

@EnableBinding(Source.class) public class SimpleSource { private Logger logger = LoggerFactory.getLogger(SimpleSource.class); private Source source; @Autowired public SimpleSource(Source source) { this.source = source; } public void publishOrChange(String action, String orgId) { logger.info("在请求:{}中,发送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId); OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id); source.output().send(MessageBuilder.withPayload(change).build()); } }

这里使用的是默认通道,Source 类定义的 output 通道发消息。后面通过 Sink 定义的 input 通道收消息。

  然后在OrganizationController类中定义一个 delete 方法,并注入 SimpleSouce 类,代码如下:

@Autowired private SimpleSource simpleSource; @DeleteMapping(value = "/organization/{orgId}") public void deleteOne(@PathVariable("orgId") String id) { logger.debug("删除了组织:{}", id); simpleSource.publishOrChange("delete", id); }

  最后在配置文件中加入消息队列的配置:

# 省略了其他配置 spring: cloud: stream: bindings: output: destination: orgChangeTopic content-type: application/json kafka: binder: # 替换为部署kafka的ip和端口 zk-nodes: 192.168.226.5:2181 brokers: 192.168.226.5:9092

  现在我们可以测试下访问localhost:5555/apis/org/organization/12,可以看到控制台打印消息生成的日志。

在许可证服务中编写消息消费者

  集成 redis 的方法,参看。这里不作说明。

  首先引入依赖,依赖项同上面组织服务。

  然后在 event 包下创建OrgChange的类,代码如下:

@EnableBinding(Sink.class) //使用Sink接口中定义的通道来监听传入消息 public class OrgChange { private Logger logger = LoggerFactory.getLogger(OrgChange.class); @StreamListener(Sink.INPUT) public void loggerSink(OrganizationChange change){ logger.info("收到一个消息,组织id为:{},关联id为:{}",change.getOrgId(),change.getId()); //删除失效缓存 RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId())); } } //下面两个都在util包下 //RedisKeyUtils.java代码如下 public class RedisKeyUtils { private static final String ORG_CACHE_PREFIX = "orgCache_"; public static String getOrgCacheKey(String orgId){ return ORG_CACHE_PREFIX+orgId; } } //RedisUtils.java代码如下 @Component @SuppressWarnings("all") public class RedisUtils { public static RedisTemplate redisTemplate; @Autowired public void setRedisTemplate(RedisTemplate redisTemplate) { RedisUtils.redisTemplate = redisTemplate; } public static boolean setObj(String key,Object value){ return setObj(key,value,0); } /** * Description: * * @author fanxb * @date 2019/2/21 15:21 * @param key 键 * @param value 值 * @param time 过期时间,单位ms * @return boolean 是否成功 */ public static boolean setObj(String key,Object value,long time){ try{ if(time<=0){ redisTemplate.opsForValue().set(key,value); }else{ redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS); } return true; }catch (Exception e){ e.printStackTrace(); return false; } } public static Object get(String key){ if(key==null){ return null; } try{ Object obj = redisTemplate.opsForValue().get(key); return obj; }catch (Exception e){ e.printStackTrace(); return null; } } public static void del(String... key){ if(key!=null && key.length>0){ redisTemplate.delete(CollectionUtils.arrayToList(key)); } } }

  上面用到的是 Sink.INPUT 通道,这个和之前的 Source.OUTPUT 通道刚好一队,一个负责收,一个负责发。

  然后修改OrganizationByRibbonService.java文件中的getOrganizationWithRibbon方法:

public Organization getOrganizationWithRibbon(String id) { String key = RedisKeyUtils.getOrgCacheKey(id); //先从redis缓存取数据 Object res = RedisUtils.get(key); if (res == null) { logger.info("当前数据无缓存:{}", id); try{ ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}", HttpMethod.GET, null, Organization.class, id); res = responseEntity.getBody(); RedisUtils.setObj(key, res); }catch (Exception e){ e.printStackTrace(); } } else { logger.info("当前数据为缓存数据:{}", id); } return (Organization) res; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwsdw.html