Toggle Light / Dark / Auto color theme
Toggle table of contents sidebar
Source code for slixmpp.plugins.xep_0013.offline
# Slixmpp: The Slick XMPP Library
# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permissio
import logging
from asyncio import Future
from typing import Iterable , Optional , Callable , List , Set , Union
from slixmpp import JID
from slixmpp.stanza import Message , Iq
from slixmpp.xmlstream.handler import Collector
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.xep_0013 import stanza
log = logging . getLogger ( __name__ )
[docs]
class XEP_0013 ( BasePlugin ):
"""
XEP-0013 Flexible Offline Message Retrieval
"""
name = 'xep_0013'
description = 'XEP-0013: Flexible Offline Message Retrieval'
dependencies = { 'xep_0030' }
stanza = stanza
def plugin_init ( self ):
register_stanza_plugin ( Iq , stanza . Offline )
register_stanza_plugin ( Message , stanza . Offline )
def get_count ( self , ** kwargs ):
return self . xmpp [ 'xep_0030' ] . get_info (
node = 'http://jabber.org/protocol/offline' ,
local = False ,
** kwargs )
def get_headers ( self , ** kwargs ):
return self . xmpp [ 'xep_0030' ] . get_items (
node = 'http://jabber.org/protocol/offline' ,
local = False ,
** kwargs )
def view ( self , nodes : Iterable [ str ], ifrom : Optional [ JID ] = None ,
timeout : Optional [ int ] = None ,
callback : Optional [ Callable ] = None ) -> Future :
if not isinstance ( nodes , ( list , set )):
nodes = [ nodes ]
iq = self . xmpp . Iq ()
iq [ 'type' ] = 'get'
iq [ 'from' ] = ifrom
offline = iq [ 'offline' ]
for node in nodes :
item = stanza . Item ()
item [ 'node' ] = node
item [ 'action' ] = 'view'
offline . append ( item )
collector = Collector (
'Offline_Results_ %s ' % iq [ 'id' ],
StanzaPath ( 'message/offline' ))
self . xmpp . register_handler ( collector )
def wrapped_cb ( iq ):
results = collector . stop ()
if iq [ 'type' ] == 'result' :
iq [ 'offline' ][ 'results' ] = results
callback ( iq )
return iq . send ( timeout = timeout , callback = wrapped_cb )
def remove ( self , nodes : Union [ List [ str ], Set [ str ], str ],
ifrom : Optional [ JID ] = None , timeout : Optional [ int ] = None ,
callback : Optional [ Callable ] = None ) -> Future :
if not isinstance ( nodes , ( list , set )):
nodes = [ nodes ]
iq = self . xmpp . Iq ()
iq [ 'type' ] = 'set'
iq [ 'from' ] = ifrom
offline = iq [ 'offline' ]
for node in nodes :
item = stanza . Item ()
item [ 'node' ] = node
item [ 'action' ] = 'remove'
offline . append ( item )
return iq . send ( timeout = timeout , callback = callback )
def fetch ( self , ifrom : Optional [ JID ] = None , timeout : Optional [ int ] = None ,
callback : Optional [ Callable ] = None ) -> Future :
iq = self . xmpp . Iq ()
iq [ 'type' ] = 'set'
iq [ 'from' ] = ifrom
iq [ 'offline' ][ 'fetch' ] = True
collector = Collector (
'Offline_Results_ %s ' % iq [ 'id' ],
StanzaPath ( 'message/offline' ))
self . xmpp . register_handler ( collector )
def wrapped_cb ( iq ):
results = collector . stop ()
if iq [ 'type' ] == 'result' :
iq [ 'offline' ][ 'results' ] = results
callback ( iq )
return iq . send ( timeout = timeout , callback = wrapped_cb )
def purge ( self , ifrom : Optional [ JID ] = None , timeout : Optional [ int ] = None ,
callback : Optional [ Callable ] = None ):
iq = self . xmpp . Iq ()
iq [ 'type' ] = 'set'
iq [ 'from' ] = ifrom
iq [ 'offline' ][ 'purge' ] = True
return iq . send ( timeout = timeout , callback = callback )