abc-master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
pyabc_split._splitter Class Reference

Inherits object.

Public Member Functions

def __init__
 
def is_done
 
def kill
 
def cleanup
 
def child
 
def fork_one
 
def fork_all
 
def communicate
 
def get_next_result
 
def __iter__
 

Data Fields

 pids
 
 fds
 
 buffers
 
 results
 

Private Member Functions

def _kill
 
def _fork_one
 

Detailed Description

Definition at line 110 of file pyabc_split.py.

Constructor & Destructor Documentation

def pyabc_split._splitter.__init__ (   self)

Definition at line 112 of file pyabc_split.py.

113  def __init__(self):
114  self.pids = []
115  self.fds = {}
116  self.buffers = {}
117  self.results = {}

Member Function Documentation

def pyabc_split._splitter.__iter__ (   self)

Definition at line 267 of file pyabc_split.py.

268  def __iter__(self):
269  def iterator():
270  while not self.is_done():
271  yield self.get_next_result()
272  return iterator()
273 
@contextmanager
def pyabc_split._splitter._fork_one (   self,
  f 
)
private

Definition at line 171 of file pyabc_split.py.

172  def _fork_one(self, f):
173 
174  # create a pipe to communicate with the child process
175  pr,pw = os.pipe()
176 
177  # set pr to be non-blocking
178  fcntl.fcntl(pr, fcntl.F_SETFL, os.O_NONBLOCK)
179 
180  parentpid = os.getpid()
181  rc = 1
182 
183  try:
184 
185  # create child process
186  pid = os.fork()
187 
188  if pid == 0:
189  # child process:
190  os.close(pr)
191  pyabc.close_on_fork(pw)
192 
193  rc = self.child( pw, f)
194  os._exit(rc)
195  else:
196  # parent process:
197  os.close(pw)
198  return (pid, pr)
199 
200  finally:
201  if os.getpid() != parentpid:
202  os._exit(rc)
def pyabc_split._splitter._kill (   self,
  pids 
)
private

Definition at line 121 of file pyabc_split.py.

122  def _kill(self, pids):
123 
124  # close pipes and kill child processes
125  for pid in pids:
126 
127  if pid == -1:
128  continue
129 
130  i, fd = self.fds[pid]
131 
132  del self.buffers[fd]
133  del self.fds[pid]
134 
135  self.pids[i] = -1
136  self.results[pid] = None
137 
138  os.close(fd)
139 
140  try:
141  os.kill( pid, signal.SIGINT)
142  except Exception as e:
143  print >>sys.stderr, 'exception while trying to kill pid=%d: '%pid, e
144  raise
145 
146  # wait for termination and update result
147  for pid in pids:
148  os.waitpid( pid, 0 )
def pyabc_split._splitter.child (   self,
  fdw,
  f 
)

Definition at line 156 of file pyabc_split.py.

157  def child( self, fdw, f):
158 
159  # call function
160  try:
161  res = f()
162  except:
163  traceback.print_exc()
164  raise
165 
166  # write return value into pipe
167  with os.fdopen( fdw, "w" ) as fout:
168  pickle.dump(res, fout)
169 
170  return 0
def pyabc_split._splitter.cleanup (   self)

Definition at line 153 of file pyabc_split.py.

154  def cleanup(self):
155  self._kill( self.fds.keys() )
def pyabc_split._splitter.communicate (   self)

Definition at line 214 of file pyabc_split.py.

215  def communicate(self):
216 
217  rlist = [ fd for _, (_,fd) in self.fds.iteritems() ]
218  rlist.append(pyabc.wait_fd)
219 
220  stop = False
221 
222  while not stop:
223 
224  rrdy = _retry_select( rlist )
225 
226  for fd in rrdy:
227 
228  if fd == pyabc.wait_fd:
229  stop = True
230  continue
231 
232  self.buffers[fd].write( os.read(fd, 16384) )
def _retry_select
Definition: pyabc_split.py:99
def pyabc_split._splitter.fork_all (   self,
  funcs 
)

Definition at line 211 of file pyabc_split.py.

212  def fork_all(self, funcs):
213  return [ self.fork_one(f) for f in funcs ]
def pyabc_split._splitter.fork_one (   self,
  func 
)

Definition at line 203 of file pyabc_split.py.

204  def fork_one(self, func):
205  pid, fd = self._fork_one(func)
206  i = len(self.pids)
207  self.pids.append(pid)
208  self.fds[pid] = (i, fd)
209  self.buffers[fd] = cStringIO.StringIO()
210  return i
def pyabc_split._splitter.get_next_result (   self)

Definition at line 233 of file pyabc_split.py.

234  def get_next_result(self):
235 
236  # read from the pipes as needed, while waiting for the next child process to terminate
237  self.communicate()
238 
239  # wait for the next child process to terminate
240  pid, rc = os.wait()
241  assert pid in self.fds
242 
243  # retrieve the pipe file descriptor
244  i, fd = self.fds[pid]
245  del self.fds[pid]
246 
247  # remove the pid
248  self.pids[i] = -1
249 
250  # retrieve the buffer
251  buffer = self.buffers[fd]
252  del self.buffers[fd]
253 
254  # fill the buffer
255  while True:
256  s = os.read(fd, 16384)
257  if not s:
258  break
259  buffer.write(s)
260 
261  os.close(fd)
262 
263  try:
264  return (i, pickle.loads(buffer.getvalue()))
265  except EOFError, pickle.UnpicklingError:
266  return (i, None)
def pyabc_split._splitter.is_done (   self)

Definition at line 118 of file pyabc_split.py.

119  def is_done(self):
120  return len(self.fds) == 0
def pyabc_split._splitter.kill (   self,
  ids 
)

Definition at line 149 of file pyabc_split.py.

150  def kill(self, ids):
151 
152  self._kill( [ self.pids[i] for i in ids ] )

Field Documentation

pyabc_split._splitter.buffers

Definition at line 115 of file pyabc_split.py.

pyabc_split._splitter.fds

Definition at line 114 of file pyabc_split.py.

pyabc_split._splitter.pids

Definition at line 113 of file pyabc_split.py.

pyabc_split._splitter.results

Definition at line 116 of file pyabc_split.py.


The documentation for this class was generated from the following file: