B
    b
,c6X                 @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	m
Z
mZmZmZmZmZmZmZmZmZmZmZ ddlZddlmZmZmZ ddlmZ G d	d
 d
eZG dd dZG dd deejZ G dd dZ!edddZ"G dd deej#e Z$dS )z%Future-returning APIs for coroutines.    N)Future)deque)chain)Any	AwaitableCallableDictList
NamedTupleOptionalTupleTypeTypeVarUnioncastoverload)EVENTSPOLLINPOLLOUT)Literalc               @   s6   e Zd ZU eed< eed< eed< eed< eed< dS )_FutureEventfuturekindkwargsmsgtimerN)__name__
__module____qualname__r   __annotations__strr   r    r!   r!   *lib/python3.7/site-packages/zmq/_future.pyr      s
   
r   c               @   sV   e Zd ZU dZdZeed< ee ed< edddZ	eddd	Z
dddd
dZdS )_AsynczMixin for common async logicN_current_loop_Future)returnc             C   sL   | j dkr&|  | _ | | j  | j S |  }|| j k	rH|| _ | | |S )zGet event loop

        Notice if event loop has changed,
        and register init_io_state on activation of a new event loop
        N)r$   _default_loop_init_io_state)selfZcurrent_loopr!   r!   r"   	_get_loop6   s    



z_Async._get_loopc             C   s   t dd S )Nz!Must be implemented in a subclass)NotImplementedError)r)   r!   r!   r"   r'   G   s    z_Async._default_loopc             C   s   d S )Nr!   )r)   loopr!   r!   r"   r(   J   s    z_Async._init_io_state)N)r   r   r   __doc__r$   r   r   r   r   r*   r'   r(   r!   r!   r!   r"   r#   0   s   
r#   c                   s   e Zd ZU dZed ed< eed< eed< ee ed< eeee	ddd	d
Z
eeddddZdeeeeef   d fddZ  ZS )_AsyncPollerz:Poller that returns a Future on poll, instead of blocking._AsyncSocket_socket_class_READ_WRITEraw_socketsN)r,   socketevtfr&   c             C   s
   t  dS )z"Schedule callback for a raw socketN)r+   )r)   r,   r4   r5   r6   r!   r!   r"   _watch_raw_socketV   s    z_AsyncPoller._watch_raw_socket)r,   socketsr&   c             G   s
   t  dS )z$Unschedule callback for a raw socketN)r+   )r)   r,   r8   r!   r!   r"   _unwatch_raw_socketsZ   s    z!_AsyncPoller._unwatch_raw_sockets)r&   c          
      s     |dkr\yt d}W n, tk
rL } z | W dd}~X Y nX  |  S    g fdd}fdd xjD ]\}}t	|t
jrt	|jsƈj|}|t
j@ r|jdd |t
j@ r|jdd q| d}|t
j@ r|jO }|t
j@ r2|jO }||| qW  fd	d
}| |dk	r|dkrfdd}	d| |	fdd}
 |
 fdd} |  S )z Return a Future for a poll eventr   Nc                 s      s d  d S )N)done
set_result)args)watcherr!   r"   wake_rawr   s    z#_AsyncPoller.poll.<locals>.wake_rawc                s   j  f S )N)r9   )r6   )r,   r3   r)   r!   r"   <lambda>w       z#_AsyncPoller.poll.<locals>.<lambda>poll)r   c          
      s     rd S  r:y  W n tk
r4   Y nX d S  rR  nLyttd}W n, t	k
r } z| W d d }~X Y nX 
| d S )Nr   )r;   	cancelledcancelRuntimeError	exceptionset_exceptionsuperr.   rB   	Exceptionr<   )r6   resulte)	__class__r   r)   r>   r!   r"   on_poll_ready   s    z(_AsyncPoller.poll.<locals>.on_poll_readyc                  s      s d  d S )N)r;   r<   r!   )r>   r!   r"   trigger_timeout   s    z*_AsyncPoller.poll.<locals>.trigger_timeoutgMbP?c                s"   t dr  n
  d S )NrD   )hasattrrD   Zremove_timeout)r6   )r,   timeout_handler!   r"   cancel_timeout   s    

z)_AsyncPoller.poll.<locals>.cancel_timeoutc                s      s   d S )N)r;   rD   )r6   )r>   r!   r"   cancel_watcher   s    z)_AsyncPoller.poll.<locals>.cancel_watcher)r%   rH   rB   rI   rG   r<   r*   add_done_callbackr8   
isinstance_zmqSocketr0   from_socketr   _add_recv_eventr   _add_send_eventappendr1   r2   r7   
call_later)r)   timeoutrJ   rK   r?   r4   maskr5   rM   rN   rQ   rR   )rL   )r   r,   r3   r)   rP   r>   r"   rB   ^   sN    








z_AsyncPoller.poll)r:   )r   r   r   r-   r   r   intr	   r   r   r7   r9   r   r   rB   __classcell__r!   r!   )rL   r"   r.   N   s   
r.   c               @   s   e Zd Zedd ZdS )_NoTimerc               C   s   d S )Nr!   r!   r!   r!   r"   rD      s    z_NoTimer.cancelN)r   r   r   staticmethodrD   r!   r!   r!   r"   r`      s   r`   Tr/   )Zboundc            
       s  e Zd ZU dZdZdZded< eZdZ	dKe
d dd fddZedLee deed	d
dZdMe
e dd fddZejjje_ fddZejjje_edNddeeeee  dddZedOddeed eeee  dddZedPddeed eeeej  dddZedQeeeeeee eej f  dddZdReeeeeee eej f  dddZdSeeeeeeejf  dddZ dTeeeee
ej!  ddd Z"dUeeeeeee
ej!  d!d"d#Z#d$d% Z$dej%fee d&d'd(Z&ee' d& fd)d*Z(dVe'ee'ed d, fd-d.Z)d/d0 Z*d1d2 Z+e,d3d4 Z-dWd5d6Z.dXd7d8Z/d9d: Z0d;d< Z1dYd=d>Z2dZd?d@Z3dAdB Z4dCdD Z5dEdF Z6d[dGdHZ7dIdJ Z8  Z9S )\r/   Nr   z_zmq.Socket_shadow_sockr:   )_from_socketr&   c                s   t |tjrd | }}|d k	r6t j|jd || _n"t j||f| tj| j| _|d k	rztj	| j
j dtdd t | _t | _d| _| jj| _d S )N)shadowz^(io_loop) argument is deprecated in pyzmq 22.2. The currently active loop will always be used.   )
stacklevelr   )rT   rU   rV   rH   __init__Z
underlyingrc   re   warningswarnrL   r   DeprecationWarningr   _recv_futures_send_futures_stateZFD_fd)r)   contextZsocket_typeio_looprd   r   )rL   r!   r"   rh      s     
z_AsyncSocket.__init__)clsr4   rq   r&   c             C   s   | ||dS )z.Create an async socket from an existing Socket)rd   rq   r!   )rr   r4   rq   r!   r!   r"   rW      s    z_AsyncSocket.from_socket)lingerr&   c          	      s   | j sn| jd k	rntt| jpg | jp$g }x:|D ]2}|j s0y|j  W q0 t	k
r`   Y q0X q0W | 
  t j|d d S )N)rs   )closedro   listr   rl   rm   r   r;   rD   rE   _clear_io_staterH   close)r)   rs   
event_listevent)rL   r!   r"   rw      s    


z_AsyncSocket.closec                s"   t  |}|tkr| | |S )N)rH   getr   _schedule_remaining_events)r)   keyrJ   )rL   r!   r"   rz     s    
z_AsyncSocket.getF)track)flagsr}   r&   c            C   s   d S )Nr!   )r)   r~   r}   r!   r!   r"   recv_multipart
  s    z_AsyncSocket.recv_multipartT)r~   copyr}   r&   c            C   s   d S )Nr!   )r)   r~   r   r}   r!   r!   r"   r     s    c            C   s   d S )Nr!   )r)   r~   r   r}   r!   r!   r"   r     s    c             C   s   d S )Nr!   )r)   r~   r   r}   r!   r!   r"   r     s    c             C   s   |  dt|||dS )zvReceive a complete multipart zmq message.

        Returns a Future whose result will be a multipart message.
        r   )r~   r   r}   )rX   dict)r)   r~   r   r}   r!   r!   r"   r   "  s    c             C   s   |  dt|||dS )zReceive a single zmq frame.

        Returns a Future, whose result will be the received frame.

        Recommend using recv_multipart instead.
        recv)r~   r   r}   )rX   r   )r)   r~   r   r}   r!   r!   r"   r   -  s    	z_AsyncSocket.recv)	msg_partsr~   r   r&   c             K   s(   ||d< ||d< ||d< | j d||dS )zqSend a complete multipart zmq message.

        Returns a Future that resolves when sending is complete.
        r~   r   r}   send_multipart)r   r   )rY   )r)   r   r~   r   r}   r   r!   r!   r"   r   8  s    z_AsyncSocket.send_multipart)datar~   r   r}   r   r&   c             K   s<   ||d< ||d< ||d< | t|||d | jd||dS )zSend a single zmq frame.

        Returns a Future that resolves when sending is complete.

        Recommend using send_multipart instead.
        r~   r   r}   )r~   r   r}   send)r   r   )updater   rY   )r)   r   r~   r   r}   r   r!   r!   r"   r   D  s
    z_AsyncSocket.sendc                s>   |     fdd}|  fdd} |  S )zDeserialize with Futuresc          
      st      rdS  r$   nL }y|}W n, tk
rd } z | W dd}~X Y nX  | dS )z+Chain result through serialization to recvdN)r;   rF   rG   rJ   rI   r<   )_ZbufZloadedrK   )r6   loadrecvdr!   r"   _chain\  s    z)_AsyncSocket._deserialize.<locals>._chainc                s      rdS   r  dS )z"Chain cancellation from f to recvdN)r;   rC   rD   )r   )r6   r   r!   r"   _chain_cancelm  s    z0_AsyncSocket._deserialize.<locals>._chain_cancel)r%   rS   )r)   r   r   r   r   r!   )r6   r   r   r"   _deserializeX  s    

z_AsyncSocket._deserialize)r&   c                sl   j rttj }|| tt||}	   fdd}|
 r^|| n
||  S )zSpoll the socket for events

        returns a Future for the poll results.
        c                st      rd S |  r:y   W n tk
r4   Y nX d S |  rR |   nt|  } |	d d S )Nr   )
r;   rC   rD   rE   rF   rG   r   rJ   r<   rz   )r6   Zevts)r   r)   r!   r"   unwrap_result  s    z(_AsyncSocket.poll.<locals>.unwrap_result)rt   rU   ZZMQErrorZENOTSUP_poller_classregisterr   r   rB   r%   r;   rS   )r)   r\   r~   pr6   r   r!   )r   r)   r"   rB   x  s    

z_AsyncSocket.pollc                s   t  j||S )N)rH   recv_string)r)   r=   r   )rL   r!   r"   r     s    z_AsyncSocket.recv_stringutf-8)sr~   encodingr&   c                s   t  j|||dS )N)r~   r   )rH   send_string)r)   r   r~   r   )rL   r!   r"   r     s    z_AsyncSocket.send_stringc                s    fdd}|  ||S )z'Add a timeout for a send or recv Futurec                  s      rd S  t  d S )N)r;   rG   rU   Againr!   )r   r!   r"   future_timeout  s    z1_AsyncSocket._add_timeout.<locals>.future_timeout)_call_later)r)   r   r\   r   r!   )r   r"   _add_timeout  s    z_AsyncSocket._add_timeoutc             C   s   |   ||S )zSchedule a function to be called later

        Override for different IOLoop implementations

        Tornado and asyncio happen to both have ioloop.call_later
        with the same signature.
        )r*   r[   )r)   Zdelaycallbackr!   r!   r"   r     s    z_AsyncSocket._call_laterc             C   s8   x$t |D ]\}}|j| kr
P q
W dS |||  dS )zMake sure that futures are removed from the event list when they resolve

        Avoids delaying cleanup until the next send/recv event,
        which may never come.
        N)	enumerater   remove)r   rx   Zf_idxry   r!   r!   r"   _remove_finished_future  s
    
z$_AsyncSocket._remove_finished_futurec       
   
      s  |p
   }|dr~|ddtj@ r~t j|}y|f |}W n, tk
rn } z|| W dd}~X Y nX |	| |S t
}ttdr jj}	|	dkr ||	d } jt|||d|d | fdd	  jtt@ r    jr t |S )
z4Add a recv event, returning the corresponding Futurer   r~   r   NZRCVTIMEOgMbP?)r   r   c                s     |  jS )N)r   rl   )r6   )r)   r!   r"   r@     rA   z._AsyncSocket._add_recv_event.<locals>.<lambda>)r%   
startswithrz   rU   DONTWAITgetattrrc   rI   rG   r<   r`   rO   Zrcvtimeor   rl   rZ   r   rS   r   r   _handle_recv_add_io_state)
r)   r   r   r   r6   r   rrK   r   
timeout_msr!   )r)   r"   rX     s,    


z_AsyncSocket._add_recv_eventc          
      sX  |p
   }|dkr js|dd}| }|tjB |d< t j|}d}	y||f|}
W nh tjk
r } z|tj@ r|	| nd}	W dd}~X Y n6 t
k
r } z|	| W dd}~X Y nX ||
 |	r jr   |S t}ttdr jtj}|dkr ||d } jt|||||d	 | fd
d  t |S )z4Add a send event, returning the corresponding Future)r   r   r~   r   TFNSNDTIMEOgMbP?)r   r   r   c                s     |  jS )N)r   rm   )r6   )r)   r!   r"   r@   "  rA   z._AsyncSocket._add_send_event.<locals>.<lambda>)r%   rm   rz   r   rU   r   r   rc   r   rG   rI   r<   rl   r{   r`   rO   r   r   rZ   r   rS   r   r   )r)   r   r   r   r   r6   r~   Znowait_kwargsr   Zfinish_earlyr   rK   r   r   r!   )r)   r"   rY     s>    



z_AsyncSocket._add_send_eventc       	   
   C   s  | j tt@ sdS d}x.| jrF| j \}}}}}| rBd}qP qW | jsX| t |dkrddS |  |dkr|	d dS |dkr| j j
}n|dkr| j j}ntd| |d  tjO  < y|f |}W n. tk
r  } z|| W dd}~X Y nX |	| dS )zHandle recv eventsNrB   r   r   zUnhandled recv event type: %rr~   )rc   rz   r   r   rl   popleftr;   _drop_io_staterD   r<   r   r   
ValueErrorrU   r   rI   rG   )	r)   r6   r   r   r   r   r   rJ   rK   r!   r!   r"   r   (  s6    



z_AsyncSocket._handle_recvc       	   
   C   s  | j tt@ sd S d }x.| jrF| j \}}}}}| rBd }qP qW | jsX| t |d krdd S |  |dkr|	d  d S |dkr| j j
}n|dkr| j j}ntd| |d  tjO  < y||f|}W n. tk
r } z|| W d d }~X Y nX |	| d S )NrB   r   r   zUnhandled send event type: %rr~   )rc   rz   r   r   rm   r   r;   r   rD   r<   r   r   r   rU   r   rI   rG   )	r)   r6   r   r   r   r   r   rJ   rK   r!   r!   r"   _handle_sendQ  s6    



z_AsyncSocket._handle_sendc             C   s<   | j t}|tj@ r|   |tj@ r0|   |   dS )z(Dispatch IO events to _handle_recv, etc.N)	rc   rz   r   rU   r   r   r   r   r{   )r)   fdeventsZ
zmq_eventsr!   r!   r"   _handle_eventsz  s    

z_AsyncSocket._handle_eventsc             C   s>   | j dkrdS |dkr"| jt}|| j @ r:| d| j dS )zkSchedule a call to handle_events next loop iteration

        If there are still events to handle.
        r   N)rn   rc   rz   r   r   r   )r)   r   r!   r!   r"   r{     s    

z'_AsyncSocket._schedule_remaining_eventsc             C   s*   | j |kr| j |B  }| _ | | j  dS )zAdd io_state to poller.N)rn   _update_handler)r)   stater!   r!   r"   r     s    
z_AsyncSocket._add_io_statec             C   s(   | j |@ r| j | @ | _ | | j  dS )z&Stop poller from watching an io_state.N)rn   r   )r)   r   r!   r!   r"   r     s    
z_AsyncSocket._drop_io_statec             C   s   |r|    |   dS )zOUpdate IOLoop handler with state.

        zmq FD is always read-only.
        N)r*   r{   )r)   r   r!   r!   r"   r     s    z_AsyncSocket._update_handlerc             C   s6   |dkr|   }|| j| j| j | d| j dS )z#initialize the ioloop event handlerNr   )r*   Zadd_handlerrc   r   r1   r   )r)   r,   r!   r!   r"   r(     s    z_AsyncSocket._init_io_statec             C   s.   | j }| j jr| j}| jdk	r*| j| dS )zNunregister the ioloop event handler

        called once during close
        N)rc   rt   ro   r$   Zremove_handler)r)   r   r!   r!   r"   rv     s
    
z_AsyncSocket._clear_io_state)Nr:   NN)N)N)r   )r   )r   )r   TF)r   TF)r   TF)r   TF)r   TF)r   r   )NN)NNN)r   r   )N)N):r   r   r   rl   rm   rn   r   r.   r   ro   r   rh   classmethodr   rb   r   rW   r^   rw   rU   rV   r-   rz   r   boolr   r	   bytesr   r   ZFramer   r   ZMessageTrackerr   r   r   r   rB   r    r   r   r   r   ra   r   rX   rY   r   r   r   r{   r   r   r   r(   rv   r_   r!   r!   )rL   r"   r/      st   
   &$

  
 '

$
8))
	

)%r-   ri   Zasyncior   collectionsr   	itertoolsr   typingr   r   r   r   r	   r
   r   r   r   r   r   r   r   ZzmqrU   r   r   r   Zzmq._typingr   r   r#   ZPollerr.   r`   rb   rV   r/   r!   r!   r!   r"   <module>   s   <m