`
mizhao1984
  • 浏览: 88061 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

利用jgroup做负载

阅读更多

public class JavaGroupBroadcastingManager implements NotificationBus.Consumer {
 
 /**
  * 共享数据:未被激活的时间片信息集合(离线通话信息)
  */
 private static Map<String,TimeSlice> voiceMessageMap = new HashMap<String, TimeSlice>();
 
 /**
  * 共享数据:短信信息
  */
 private static Map<String,SmsCcrParameter> smsMessageIdMap = new HashMap<String,SmsCcrParameter>();

 private static final Log log = LogFactory
   .getLog(JavaGroupBroadcastingManager.class);
 private static final String BUS_NAME = "bus.name";
 private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
 private NotificationBus bus;
 private static JavaGroupBroadcastingManager manager = null;

 public static JavaGroupBroadcastingManager getInstance(){
  if(manager == null){
   manager = new JavaGroupBroadcastingManager();
   try {
    manager.initialize();
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
  return manager;
 }
 
 /**
  * 初始化成员
  */
 public synchronized void initialize()
   throws Exception {
  Properties properties = new Properties();
  String filePath = System.getProperty("com.sntele.surfing.conf") + File.separator + "ocsgroup.properties";
  log.info(filePath);
  properties.load(new FileInputStream(filePath));
  String channelProperties = properties.getProperty(CHANNEL_PROPERTIES);
  String busName = properties.getProperty(BUS_NAME);
  try {
   bus = new NotificationBus(busName, channelProperties);
   bus.start();
   bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
   bus.setConsumer(this);
   log.info("JavaGroups clustering support started successfully");
  } catch (Exception e) {
   throw new Exception("Initialization failed: " + e);
  }
 }

 /**
  * 关闭成员
  */
 public synchronized void finialize() throws Exception {
  bus.stop();
  bus = null;
 }

 /**
  * 发送信息
  */
 public void sendNotification(Serializable serializable) {
  bus.sendNotification(serializable);
  if(serializable instanceof SmsCcrParameter){
   SmsCcrParameter smsCcrParameter = (SmsCcrParameter)serializable;
   if(smsCcrParameter.getMemoryAction() == 0){
    smsMessageIdMap.put(smsCcrParameter.getMessageId(), smsCcrParameter);
   }else{
    smsMessageIdMap.remove(smsCcrParameter.getMessageId());
   }
  }else if(serializable instanceof TimeSlice){
   TimeSlice timeSlice = (TimeSlice)serializable;
   if(timeSlice.getMemoryAction() == 0){
    voiceMessageMap.put(timeSlice.getInitCcrParameter().getCallerNumber(),timeSlice);
   }else{
    TimeSliceControlPool.getInstince().getTimeSliceControl(timeSlice.getInitCcrParameter().getProvinceCode()).removeData(timeSlice.getInitCcrParameter().getCallerNumber());
    voiceMessageMap.remove(timeSlice.getInitCcrParameter().getCallerNumber());
   }
  }
 }

 /**
  * 成员发送信息
  */
 public void handleNotification(Serializable serializable) {
  if(serializable instanceof SmsCcrParameter){
   SmsCcrParameter smsCcrParameter = (SmsCcrParameter)serializable;
   if(smsCcrParameter.getMemoryAction() == 0){
    smsMessageIdMap.put(smsCcrParameter.getMessageId(), smsCcrParameter);
   }else{
    smsMessageIdMap.remove(smsCcrParameter.getMessageId());
   }
  }else if(serializable instanceof TimeSlice){
   TimeSlice timeSlice = (TimeSlice)serializable;
   if(timeSlice.getMemoryAction() == 0){
    voiceMessageMap.put(timeSlice.getInitCcrParameter().getCallerNumber(),timeSlice);
   }else{
    TimeSliceControlPool.getInstince().getTimeSliceControl(timeSlice.getInitCcrParameter().getProvinceCode()).removeData(timeSlice.getInitCcrParameter().getCallerNumber());
    voiceMessageMap.remove(timeSlice.getInitCcrParameter().getCallerNumber());
   }
  }
 }

 /**
  * 成员地址
  */
 public Serializable getCache() {
  if (log.isInfoEnabled()) {
   log.info("成员本地地址: " + bus.getLocalAddress().toString());
  }
  return bus.getLocalAddress();
 }

 /**
  * 新成员加入
  */
 public void memberJoined(Address address) {
  if (log.isInfoEnabled()) {
   log.info("新成员加入:" + address);
  }
  Iterator<?> smsIt = smsMessageIdMap.entrySet().iterator();
  log.info("发送缓存中已经存在的短信信息给新成员:" + smsMessageIdMap.size() + "......start ");
  while (smsIt.hasNext()) {
   Map.Entry entry = (Map.Entry) smsIt.next();
   SmsCcrParameter value = (SmsCcrParameter)entry.getValue();
   sendNotification(value);
  }
  Iterator<?> voiceIt = voiceMessageMap.entrySet().iterator();
  log.info("发送缓存中已经存在的语音信息给新成员:" + voiceMessageMap.size() + "......start ");
  while (voiceIt.hasNext()) {
   Map.Entry entry = (Map.Entry) voiceIt.next();
   TimeSlice value = (TimeSlice)entry.getValue();
   sendNotification(value);
  }
 }

 /**
  * 成员离开
  */
 public void memberLeft(Address address) {
  if (log.isInfoEnabled()) {
   log.info("成员离开:" + address);
  }
  //将检测语音的是否到时的信息放到其他服务器上
  Iterator<?> voiceIt = voiceMessageMap.entrySet().iterator();
  log.info("添加离开成员的语音到时检索信息到通话时间片控制中:" + voiceMessageMap.size() + "......start ");
  while (voiceIt.hasNext()) {
   Map.Entry entry = (Map.Entry) voiceIt.next();
   TimeSlice value = (TimeSlice)entry.getValue();
   TimeSliceControlPool.getInstince().getTimeSliceControl(value.getInitCcrParameter().getProvinceCode()).addTimeSlice(value.getInitCcrParameter().getCallerNumber(),value);
  }
 }
 
 public Map<String, TimeSlice> getVoiceMessageMap() {
  return voiceMessageMap;
 }

 public Map<String, SmsCcrParameter> getSmsMessageIdMap() {
  return smsMessageIdMap;
 }

}

 

测试方法:

JavaGroupBroadcastingManager manager = JavaGroupBroadcastingManager.getInstance();
manager.sendNotification(requestSession);

分享到:
评论

相关推荐

    jgroup master

    jgroup详细介绍

    jgroup代码

    jgroup代码

    jgroup手册

    Reliable group communication with JGroups 3.x Preface This is the JGroups manual. It provides information about: 1. Installation and configuration 2. Using JGroups (the API) 3. Configuration of the ...

    Jgroup学习总结

    NULL 博文链接:https://8366.iteye.com/blog/921760

    EHCACHE集群配置-JGroup篇

    EHCAHCE基于JGROUP的集群配置方案,内含相关配置文件,及配置说明

    jgroup笔记.doc

    jgroup笔记.

    jgroup使用实例

    jroup是一个比较优秀的集群通讯开源软件,本实例展示如何用jgroup进行不同机器之间的通讯

    java SWT编写JGroup局域网聊天程序

    找不到对方在调试的时候发现只要是使用了SWT的类的地方会出现线程错误,于是我想是不是出现了线程同步的问题经询问别人后得知在SWT中使用JGroup应该要使线程同步,应该使用Display类的syncExec(Runnable r)方法于是...

    《jgroup in action》

    最强大得UUP开源组件,用于底层通讯,以被JBOSS采用

    jboss jdbc json jgroup.jar

    jboss jdbc json jgroup.jar

    encache+jgroups集群缓存共享

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其工作模式基于IP多播,但可以在可靠性和群组成员管理上进行扩展。其结构上设计灵活,提供了一种灵活兼容多种协议的协议栈。

    jgroup-3.0.1

    jgroup 源码 HIBERNATE 二级缓存 集群

    jgroup配置[收集].pdf

    jgroup配置[收集].pdf

    jgroup配置[归类].pdf

    jgroup配置[归类].pdf

    GroupData:使用JGroup实现分布式数据结构(堆栈和集合)

    使用JGroup实现分布式数据结构(堆栈和集合) 介绍 [什么是JGroups?]( ) [JGroup入门]( ) JGroups是完全用Java编写的可靠的组通信工具包。 它基于IP多播(也支持TCP),但是有一些特殊功能,例如可靠性和组...

    Jgroups-all.jar

    JGroup功能十分强大,通过配置各种参数就可以充分利用它所提供的各项功能。JGroup最大的特点就是支持协议栈的可配置性,它本是实现了基本的Java的协议栈实现,也就是最基本的消息广播的基础,同时支持附加协议栈的...

    MemCached 缓存系统配置说明

    其实回顾一下集中式的构架,无非两种情况:一是节点均衡的网状(JBoss Tree Cache),利用JGroup的多播通信机制来同步数据;二是Master-Slaves模式(分布式文件系统),由Master来管理Slave,比如如何选择Slave,...

    MemCached Cache Java Client封装优化历程

    节点均衡的网状(JBoss Tree Cache),利用JGroup的多播通信机制来同步数据。2.Master-Slaves模式(分布式文件系统),由Master来管理Slave,如何选择Slave,如何迁移数据,都是由Master来完成,但是Master本身也...

    jgroups官方帮助文档html格式打包2.X版本

    jgroups官方帮助文档html格式2.X版本

Global site tag (gtag.js) - Google Analytics