# Slixmpp: The Slick XMPP Library# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net># This file is part of Slixmpp.# See the file LICENSE for copying permission.fromtypingimport(List,Optional,Set,Tuple,)fromslixmppimportJID,Iqfromslixmpp.exceptionsimportIqError,IqTimeoutfromslixmpp.pluginsimportBasePluginfromslixmpp.stanza.rosterimportRosterItemfromslixmpp.plugins.xep_0405importstanzafromslixmpp.plugins.xep_0369importstanzaasmix_stanzaBASE_NODES=['urn:xmpp:mix:nodes:messages','urn:xmpp:mix:nodes:participants','urn:xmpp:mix:nodes:info',]
[docs]asyncdefcheck_server_capability(self)->bool:"""Check if the server is MIX-PAM capable"""result=awaitself.xmpp.plugin['xep_0030'].get_info(jid=self.xmpp.boundjid.bare)features=result['disco_info']['features']returnstanza.NSinfeatures
[docs]asyncdefjoin_channel(self,room:JID,nick:str,subscribe:Optional[Set[str]]=None,*,ito:Optional[JID]=None,ifrom:Optional[JID]=None,**iqkwargs)->Set[str]:""" Join a MIX channel. :param JID room: JID of the MIX channel :param str nick: Desired nickname on that channel :param Set[str] subscribe: Set of nodes to subscribe to when joining. If empty, all nodes will be subscribed by default. :rtype: Set[str] :return: The nodes that failed to subscribe, if any """ifsubscribeisNone:subscribe=set(BASE_NODES)ifitoisNone:ito=self.xmpp.boundjid.bareiq=self.xmpp.make_iq_set(ito=ito,ifrom=ifrom)iq['client_join']['channel']=roomiq['client_join']['mix_join']['nick']=nickfornodeinsubscribe:sub=mix_stanza.Subscribe()sub['node']=nodeiq['client_join']['mix_join'].append(sub)result=awaitiq.send(**iqkwargs)result_nodes={sub['node']forsubinresult['client_join']['mix_join']}returnsubscribe.difference(result_nodes)
[docs]asyncdefleave_channel(self,room:JID,*,ito:Optional[JID]=None,ifrom:Optional[JID]=None,**iqkwargs)->Iq:"""" Leave a MIX channel :param JID room: JID of the channel to leave """ifitoisNone:ito=self.xmpp.boundjid.bareiq=self.xmpp.make_iq_set(ito=ito,ifrom=ifrom)iq['client_leave']['channel']=roomiq['client_leave'].enable('mix_leave')returnawaitiq.send(**iqkwargs)
[docs]asyncdefget_mix_roster(self,*,ito:Optional[JID]=None,ifrom:Optional[JID]=None,**iqkwargs)->Tuple[List[RosterItem],List[RosterItem]]:""" Get the annotated roster, with MIX channels. :return: A tuple of (contacts, mix channels) as RosterItem elements """iq=self.xmpp.make_iq_get(ito=ito,ifrom=ifrom)iq['roster'].enable('annotate')result=awaitiq.send(**iqkwargs)self.xmpp.event("roster_update",result)contacts=[]mix=[]foriteminresult['roster']:channel=item.get_plugin('channel',check=True)ifchannel:mix.append(item)else:contacts.append(item)return(contacts,mix)