# Slixmpp: The Slick XMPP Library# This file is part of Slixmpp# See the file LICENSE for copying permissionimportasyncioimportsocketimportloggingfromtypingimport(Optional,IO,Union,)fromslixmppimportJIDfromslixmpp.stanzaimportIq,Messagefromslixmpp.exceptionsimportXMPPError,IqTimeoutlog=logging.getLogger(__name__)
[docs]classIBBytestream(object):"""XEP-0047 Stream abstraction. Created by the ibb plugin automatically. Provides send methods and triggers :term:`ibb_stream_data` events. """def__init__(self,xmpp,sid:str,block_size:int,jid:JID,peer:JID,use_messages:bool=False):self.xmpp=xmppself.sid=sidself.block_size=block_sizeself.use_messages=use_messagesifjidisNone:jid=xmpp.boundjidself.self_jid=jidself.peer_jid=peerself.send_seq=-1self.recv_seq=-1self.stream_started=Falseself.stream_in_closed=Falseself.stream_out_closed=Falseself.recv_queue=asyncio.Queue()
[docs]asyncdefsend(self,data:bytes,timeout:Optional[int]=None)->int:"""Send a single block of data. :param data: Data to send (will be truncated if above block size). :returns: Number of bytes sent. """ifnotself.stream_startedorself.stream_out_closed:raisesocket.erroriflen(data)>self.block_size:data=data[:self.block_size]self.send_seq=(self.send_seq+1)%65536seq=self.send_seqifself.use_messages:msg=self.xmpp.Message()msg['to']=self.peer_jidmsg['from']=self.self_jidmsg['id']=self.xmpp.new_id()msg['ibb_data']['sid']=self.sidmsg['ibb_data']['seq']=seqmsg['ibb_data']['data']=datamsg.send()else:iq=self.xmpp.Iq()iq['type']='set'iq['to']=self.peer_jidiq['from']=self.self_jidiq['ibb_data']['sid']=self.sidiq['ibb_data']['seq']=seqiq['ibb_data']['data']=dataawaitiq.send(timeout=timeout)returnlen(data)
[docs]asyncdefsendall(self,data:bytes,timeout:Optional[int]=None):"""Send all the contents of ``data`` in chunks. :param data: Raw data to send. """sent_len=0whilesent_len<len(data):sent_len+=awaitself.send(data[sent_len:sent_len+self.block_size],timeout=timeout)
[docs]asyncdefgather(self,max_data:Optional[int]=None,timeout:int=3600)->bytes:"""Gather all data sent on a stream until it is closed, and return it. .. versionadded:: 1.8.0 :param max_data: Max number of bytes to receive. (received data may be over this limit depending on block_size) :param timeout: Timeout after which an error will be raised. :raises .IqTimeout: If the timeout is reached. :returns: All bytes accumulated in the stream. """result=b''whilenotself.recv_queue.empty():result+=self.recv_queue.get_nowait()ifmax_dataandlen(result)>max_data:returnresultifself.stream_in_closed:returnresultend_future=asyncio.Future()defon_close(stream):ifstreamisself:end_future.set_result(True)defon_data(stream):nonlocalresultifstreamisself:result+=stream.read()ifmax_dataandlen(result)>max_data:end_future.set_result(True)self.xmpp.add_event_handler('ibb_stream_end',on_close)self.xmpp.add_event_handler('ibb_stream_data',on_data)try:awaitasyncio.wait_for(end_future,timeout)exceptasyncio.TimeoutError:raiseIqTimeout(result)finally:self.xmpp.del_event_handler('ibb_stream_end',on_close)self.xmpp.del_event_handler('ibb_stream_data',on_data)returnresult
[docs]asyncdefsendfile(self,file:IO[bytes],timeout:Optional[int]=None):"""Send the contents of a file over the wire, in chunks. :param file: The opened file (or file-like) object, in bytes mode."""whileTrue:data=file.read(self.block_size)ifnotdata:breakawaitself.send(data,timeout=timeout)
[docs]defclose(self,timeout:Optional[int]=None)->asyncio.Future:"""Close the stream."""iq=self.xmpp.Iq()iq['type']='set'iq['to']=self.peer_jidiq['from']=self.self_jidiq['ibb_close']['sid']=self.sidself.stream_out_closed=Truedef_close_stream(_):self.stream_in_closed=Truefuture=iq.send(timeout=timeout,callback=_close_stream)self.xmpp.event('ibb_stream_end',self)returnfuture