class Channel(object):
Communication channel between two possibly remote threads of code.
methods:
def __init__(self, gateway, id):
*no docstring available*
arguments:
- self: <UNKNOWN>
- gateway: <UNKNOWN>
- id: <UNKNOWN>
return value:
<UNKNOWN>
source: execnet/channel.py
|
def __init__(self, gateway, id): |
assert isinstance(id, int) |
self.gateway = gateway |
self.id = id |
self._items = Queue.Queue() |
self._closed = False |
self._receiveclosed = threading.Event() |
self._remoteerrors = [] | |
def close(self, error=None):
close down this channel on both sides.
arguments:
- self: <UNKNOWN>
- error: <UNKNOWN>
return value:
<UNKNOWN>
source: execnet/channel.py
115 |
116 |
117 |
118 |
119 |
120 |
121 |
122 |
123 |
124 |
125 |
126 |
127 |
128 |
129 |
130 |
131 |
132 |
133 | |
def close(self, error=None): |
""" close down this channel on both sides. """ |
if not self._closed: |
|
|
|
put = self.gateway._send |
if error is not None: |
put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error))) |
else: |
put(Message.CHANNEL_CLOSE(self.id)) |
if isinstance(error, RemoteError): |
self._remoteerrors.append(error) |
self._closed = True |
self._receiveclosed.set() |
queue = self._items |
if queue is not None: |
queue.put(ENDMARKER) |
self.gateway._channelfactory._no_longer_opened(self.id) | |
def isclosed(self):
return True if the channel is closed. A closed
channel may still hold items.
arguments:
return value:
<UNKNOWN>
source: execnet/channel.py
|
def isclosed(self): |
""" return True if the channel is closed. A closed |
channel may still hold items. |
""" |
return self._closed | |
def makefile(self, mode='w', proxyclose=False):
return a file-like object. Only supported mode right
now is 'w' for binary writes. If you want to have
a subsequent file.close() mean to close the channel
as well, then pass proxyclose=True.
arguments:
- self: <UNKNOWN>
- mode: <UNKNOWN>
- proxyclose: <UNKNOWN>
return value:
<UNKNOWN>
source: execnet/channel.py
106 |
107 |
108 |
109 |
110 |
111 |
112 |
113 | |
def makefile(self, mode='w', proxyclose=False): |
""" return a file-like object. Only supported mode right |
now is 'w' for binary writes. If you want to have |
a subsequent file.close() mean to close the channel |
as well, then pass proxyclose=True. |
""" |
assert mode == 'w', "mode %r not availabe" %(mode,) |
return ChannelFile(channel=self, proxyclose=proxyclose) | |
def next(self):
*no docstring available*
arguments:
return value:
<UNKNOWN>
def receive(self):
receives an item that was sent from the other side,
possibly blocking if there is none.
Note that exceptions from the other side will be
reraised as channel.RemoteError exceptions containing
a textual representation of the remote traceback.
arguments:
return value:
<UNKNOWN>
source: execnet/channel.py
163 |
164 |
165 |
166 |
167 |
168 |
169 |
170 |
171 |
172 |
173 |
174 |
175 |
176 |
177 |
178 | |
def receive(self): |
"""receives an item that was sent from the other side, |
possibly blocking if there is none. |
Note that exceptions from the other side will be |
reraised as channel.RemoteError exceptions containing |
a textual representation of the remote traceback. |
""" |
queue = self._items |
if queue is None: |
raise IOError("calling receive() on channel with receiver callback") |
x = queue.get() |
if x is ENDMARKER: |
queue.put(x) |
raise self._getremoteerror() or EOFError() |
else: |
return x | |
def send(self, item):
sends the given item to the other side of the channel,
possibly blocking if the sender queue is full.
Note that an item needs to be marshallable.
arguments:
- self: <UNKNOWN>
- item: <UNKNOWN>
return value:
<UNKNOWN>
source: execnet/channel.py
150 |
151 |
152 |
153 |
154 |
155 |
156 |
157 |
158 |
159 |
160 |
161 | |
def send(self, item): |
"""sends the given item to the other side of the channel, |
possibly blocking if the sender queue is full. |
Note that an item needs to be marshallable. |
""" |
if self.isclosed(): |
raise IOError, "cannot send to %r" %(self,) |
if isinstance(item, Channel): |
data = Message.CHANNEL_NEW(self.id, item.id) |
else: |
data = Message.CHANNEL_DATA(self.id, item) |
self.gateway._send(data) | |
def setcallback(self, callback, endmarker=<object object at 0x4003c550>):
*no docstring available*
arguments:
- self: <UNKNOWN>
- callback: <UNKNOWN>
- endmarker: <UNKNOWN>
return value:
<UNKNOWN>
source: execnet/channel.py
39 |
40 |
41 |
42 |
43 |
44 |
45 |
46 |
47 |
48 |
49 |
50 |
51 |
52 |
53 |
54 |
55 |
56 |
57 |
58 |
59 |
60 |
61 |
62 |
63 |
64 | |
def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED): |
queue = self._items |
lock = self.gateway._channelfactory._receivelock |
lock.acquire() |
try: |
_callbacks = self.gateway._channelfactory._callbacks |
dictvalue = (callback, endmarker) |
if _callbacks.setdefault(self.id, dictvalue) != dictvalue: |
raise IOError("%r has callback already registered" %(self,)) |
self._items = None |
while 1: |
try: |
olditem = queue.get(block=False) |
except Queue.Empty: |
break |
else: |
if olditem is ENDMARKER: |
queue.put(olditem) |
break |
else: |
callback(olditem) |
if self._closed or self._receiveclosed.isSet(): |
|
self.gateway._channelfactory._close_callback(self.id) |
finally: |
lock.release() | |
def waitclose(self, timeout=None):
wait until this channel is closed (or the remote side
otherwise signalled that no more data was being sent).
The channel may still hold receiveable items, but not receive
more. waitclose() reraises exceptions from executing code on
the other side as channel.RemoteErrors containing a a textual
representation of the remote traceback.
arguments:
- self: <UNKNOWN>
- timeout: <UNKNOWN>
return value:
<UNKNOWN>
source: execnet/channel.py
135 |
136 |
137 |
138 |
139 |
140 |
141 |
142 |
143 |
144 |
145 |
146 |
147 |
148 | |
def waitclose(self, timeout=None): |
""" wait until this channel is closed (or the remote side |
otherwise signalled that no more data was being sent). |
The channel may still hold receiveable items, but not receive |
more. waitclose() reraises exceptions from executing code on |
the other side as channel.RemoteErrors containing a a textual |
representation of the remote traceback. |
""" |
self._receiveclosed.wait(timeout=timeout) |
if not self._receiveclosed.isSet(): |
raise IOError, "Timeout" |
error = self._getremoteerror() |
if error: |
raise error | |
def __del__(self):
*no docstring available*
arguments:
return value:
<UNKNOWN>
source: execnet/channel.py
70 |
71 |
72 |
73 |
74 |
75 |
76 |
77 |
78 |
79 |
80 |
81 |
82 |
83 |
84 |
85 |
86 |
87 |
88 |
89 | |
def __del__(self): |
if self.gateway is None: |
return |
self.gateway._trace("Channel(%d).__del__" % self.id) |
|
if self._closed: |
|
for error in self._remoteerrors: |
error.warn() |
elif self._receiveclosed.isSet(): |
|
|
pass |
else: |
|
if self._items is None: |
Msg = Message.CHANNEL_LAST_MESSAGE |
else: |
Msg = Message.CHANNEL_CLOSE |
self.gateway._send(Msg(self.id)) | |
def __iter__(self):
*no docstring available*
arguments:
return value:
<UNKNOWN>
def __repr__(self):
*no docstring available*
arguments:
return value:
<UNKNOWN>
source: execnet/channel.py
|
def __repr__(self): |
flag = self.isclosed() and "closed" or "open" |
return "<Channel id=%d %s>" % (self.id, flag) | |