发布订阅(PubSub)是一个功能强大的XMPP协议扩展。用户订阅一个项目(在xmpp中叫做node),得到通知时,也即当事项节点更新时。xmpp服务器通知用户(通过message格式)。
节点类型:
- Leaf node: 叶子节点,包含了发布项.
- Collection node: 可以看做集合节点,它下面包含叶子.
注意:不能订阅整个Collection node,只能订阅Leaf node
访问和发布模式 Access and Publisher Models
- Open: 任何人都能订阅
- Authorize: 订阅请求必须由所有者批准,只有认证的用户可以订阅项目。
- Whitelist: 白名单里的用户可以订阅.
- Presence: 只有能收到发布者也即Owner的即席状态的用户才能收到订阅.
- Roster: 只有在用户花名册或花名册组内的用户可以收到订阅事项提醒
在openfire里,Whitelist的配置如下:
发布者模式:
- Open: anyone may publish items to the node.(权限最大)
- Publishers: owners and publishers are allowed to publish items to the node.
- Subscribers: owners, publishers and subscribers are allowed to publish items to the node.
发布订阅的过程,发布者发布到叶子节点,订阅者收到消息提醒
XMPP中的订阅流程
1、首先,需要确认你的服务器支持pubsub特性
1.1 查询XMPP服务的所有服务
返回:
1.2 查询某一项XMPP子域,如pubsub
返回:
1.3 查询发布订阅中的某一个持久化的叶子节点
返回
http://jabber.org/protocol/pubsub#meta-data leaf 1 0 1 1 1 1 0 open publishers English test17@im owner 1 0 -1 5120
测试:通过smackx 和spark im客户端实现发布订阅
发布者:
import java.util.Date;import org.jivesoftware.smack.XMPPConnection; import org.jivesoftware.smackx.pubsub.AccessModel;import org.jivesoftware.smackx.pubsub.ConfigureForm;import org.jivesoftware.smackx.pubsub.FormType;import org.jivesoftware.smackx.pubsub.LeafNode; import org.jivesoftware.smackx.pubsub.PayloadItem; import org.jivesoftware.smackx.pubsub.PubSubManager; import org.jivesoftware.smackx.pubsub.PublishModel;import org.jivesoftware.smackx.pubsub.SimplePayload; public class Publisher { private static XMPPConnection connection = new XMPPConnection("im.cvte.cn"); private static String USRE_NAME = "test17"; private static String PASSWORD = "password"; private static String nodeId = "NodeID_003"; static{ try { connection.connect(); connection.login(USRE_NAME,PASSWORD); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args)throws Exception{ try{ PubSubManager manager = new PubSubManager(connection,"pubsub.im"); LeafNode myNode = null; try { myNode = manager.getNode(nodeId); //创建叶子节点 } catch (Exception e) { e.printStackTrace(); } if(myNode == null){ myNode = manager.createNode(nodeId); } String id1 = "1001"; SimplePayload payload1 = new SimplePayload("message","pubsub:cvtalk",""+ id1+":消息发布:"+ new Date().toString()+" " ); //设置叶子节点参数,目前失灵 ConfigureForm f = new ConfigureForm(FormType.submit); //配置参数 f.setPersistentItems(true); //是否持久化 f.setDeliverPayloads(true); f.setAccessModel(AccessModel.open); f.setPublishModel(PublishModel.publishers); //f.setSubscribe(true); //通过设置创建叶子 //myNode =(LeafNode)manager.createNode(nodeId, f); PayloadItemitem1 = new PayloadItem (id1, payload1); //不带itemID的SimplePayload,同样是OK的 //PayloadItem item1 = new PayloadItem (payload1); myNode.publish(item1); System.out.println("-----publish item1-----------"); } catch(Exception E) {E.printStackTrace();} } }
订阅者,这里的代码请写到spark的LoginDialog的login()方法 :
private boolean login() {....... connection.login(..............
PubSubManager manager = new PubSubManager(connection,"pubsub.im"); Node eventNode = manager.getNode("NodeID_003"); eventNode.addItemEventListener(new ItemEventListener() { public void handlePublishedItems(ItemPublishEvent evt) { System.out.println("收到订阅的载荷数量=" + evt.getItems().size()); for (Object obj : evt.getItems()) { PayloadItem item = (PayloadItem ) obj; System.out.println("订阅项目=" + item.getPayload().toString()); } } }); eventNode.subscribe(connection.getUser());
......
订阅到达的消息
n4Ch63 1001:消息发布:Tue Dec 08 15:36:59 CST 2015 GP00jOONb9Lg2PRr0K0T01xunpquPmVC2q7QhjYg
smack中的pubsub的其他操作
获取节点配置
public ConfigureForm getDefaultConfiguration() throws XMPPException { // Errors will cause exceptions in getReply, so it only returns // on success. PubSub reply = (PubSub)sendPubsubPacket(Type.GET, new NodeExtension(PubSubElementType.DEFAULT), PubSubElementType.DEFAULT.getNamespace()); return NodeUtils.getFormFromPacket(reply, PubSubElementType.DEFAULT); }
删除节点
public void deleteNode(String nodeId) throws XMPPException { sendPubsubPacket(Type.SET, new NodeExtension(PubSubElementType.DELETE, nodeId), PubSubElementType.DELETE.getNamespace()); nodeMap.remove(nodeId); }
监听器
一共有3个监听:
- ItemDeleteListener
- ItemEventListener
- NodeConfigListener
其中 ItemEventListener使用的是泛型参数,类型是 org.jivesoftware.smackx.pubsub.Item
public interface ItemEventListener{ /** * Called whenever an item is published to the node the listener * is registered with. * * @param items The publishing details. */ void handlePublishedItems(ItemPublishEvent items);}
另外,Personal Event Publishing (XEP-163) 也是基于发布订阅,xmpp包体结构很类似,发布的代码:
PEPManager pepManager = new PEPManager(smackConnection); pepManager.addPEPListener(new PEPListener() { public void eventReceived(String inFrom, PEPEvent inEvent) { LOGGER.debug("Event received: " + inEvent); } }); PEPProvider pepProvider = new PEPProvider(); pepProvider.registerPEPParserExtension("http://jabber.org/protocol/tune", new TuneProvider()); ProviderManager.getInstance().addExtensionProvider("event", "http://jabber.org/protocol/pubsub#event", pepProvider); Tune tune = new Tune("jeff", "1", "CD", "My Title", "My Track"); pepManager.publish(tune);
接收的监听:
public interface PEPListener { /** * Called when PEP events are received as part of a presence subscribe or message filter. * * @param from the user that sent the entries. * @param event the event contained in the message. */ public void eventReceived(String from, PEPEvent event);}
最后一个问题,在openfire中叶子节点上的新项目持久化到哪里了?
PubSubPersistenceManager类中writePendingItems负责持久化到数据库
private static void writePendingItems(Connection con, LinkedListNode<RetryWrapper> addItem, boolean batch) throws SQLException
但每次发布却看不到数据库中的记录,可以在下面代码找到答案,原来都提交内存了
writePendingItems(Connection con, LinkedList<RetryWrapper> addList, LinkedList<PublishedItem> delList) 将数据库中的记录删除了
/** * Flush the cache(s) of items to be persisted (itemsToAdd) and deleted (itemsToDelete). * @param sendToCluster If true, delegate to cluster members, otherwise local only */ public static void flushPendingItems(boolean sendToCluster) { // forward to other cluster members and wait for response if (sendToCluster) { CacheFactory.doSynchronousClusterTask(new FlushTask(), false); } if (itemsToAdd.getFirst() == null && itemsToDelete.getFirst() == null) { return; // nothing to do for this cluster member } Connection con = null; boolean rollback = false; LinkedListaddList = null; LinkedList delList = null; // Swap pending items so we can parse and save the contents from this point in time // while not blocking new entries from being cached. synchronized(itemsPending) { addList = itemsToAdd; delList = itemsToDelete; itemsToAdd = new LinkedList (); itemsToDelete = new LinkedList (); // Ensure pending items are available via the item read cache; // this allows the item(s) to be fetched by other request threads // while being written to the DB from this thread int copied = 0; for (String key : itemsPending.keySet()) { if (!itemCache.containsKey(key)) { itemCache.put(key, (((RetryWrapper)itemsPending.get(key).object)).get()); copied++; } } if (log.isDebugEnabled() && copied > 0) { log.debug("Added " + copied + " pending items to published item cache"); } itemsPending.clear(); } // Note that we now make multiple attempts to write cached items to the DB: // 1) insert all pending items in a single batch // 2) if the batch insert fails, retry by inserting each item separately // 3) if a given item cannot be written, return it to the pending write cache // By default step 3 will be tried once per item, but this can be configured // (or disabled) using the "xmpp.pubsub.item.retry" property. In the event of // a transaction rollback, items that could not be written to the database // will be returned to the pending item write cache. try { con = DbConnectionManager.getTransactionConnection(); writePendingItems(con, addList, delList); } catch (SQLException se) { log.error("Failed to flush pending items; initiating rollback", se); // return new items to the write cache LinkedListNode node = addList.getLast(); while (node != null) { savePublishedItem(node.object); node.remove(); node = addList.getLast(); } rollback = true; } finally { DbConnectionManager.closeTransactionConnection(con, rollback); } }
参考网页:
http://xmpp.org/extensions/xep-0060.htmlhttp://xmpp.org/extensions/xep-0163.html
https://community.igniterealtime.org/thread/38433http://www.igniterealtime.org/support/articles/pubsub.jsphttp://blog.csdn.net/u011163195/article/details/17683741