abc-master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
pyabc_split.py
Go to the documentation of this file.
1 """
2 module pyabc_split
3 
4 Executes python functions and their arguements as separate processes and returns their return values through pickling. This modules offers a single function:
5 
6 Function: split_all(funcs)
7 
8 The function returns a generator objects that allowes iteration over the results,
9 
10 Arguments:
11 
12 funcs: a list of tuples (f, args) where f is a python function and args is a collection of arguments for f.
13 
14 Caveats:
15 
16 1. Global variables in the parent process are not affected by the child processes.
17 2. The functions can only return simple types, see the pickle module for details
18 
19 Usage:
20 
21 Assume you would like to run the function f_1(1), f_2(1,2), f_3(1,2,3) in different processes.
22 
23 def f_1(i):
24  return i+1
25 
26 def f_2(i,j):
27  return i*10+j+1
28 
29 def f_3(i,j,k):
30  return i*100+j*10+k+1
31 
32 Construct a tuple of the function and arguments for each function
33 
34 t_1 = (f_1, [1])
35 t_2 = (f_2, [1,2])
36 t_3 = (f_3, [1,2,3])
37 
38 Create a list containing these tuples:
39 
40 funcs = [t_1, t_2, t_3]
41 
42 Use the function split_all() to run these functions in separate processes:
43 
44 for res in split_all(funcs):
45  print res
46 
47 The output will be:
48 
49 2
50 13
51 124
52 
53 (The order may be different, except that in this case the processes are so fast that they terminate before the next one is created)
54 
55 Alternatively, you may quite in the middle, say after the first process returns:
56 
57 for res in split_all(funcs):
58  print res
59  break
60 
61 This will kill all processes not yet finished.
62 
63 To run ABC operations, that required saving the child process state and restoring it at the parent, use abc_split_all().
64 
65  import pyabc
66 
67  def abc_f(truth):
68  import os
69  print "pid=%d, abc_f(%s)"%(os.getpid(), truth)
70  pyabc.run_command('read_truth %s'%truth)
71  pyabc.run_command('strash')
72 
73  funcs = [
74  defer(abc_f)("1000"),
75  defer(abc_f)("0001")
76  ]
77 
78  for _ in abc_split_all(funcs):
79  pyabc.run_command('write_verilog /dev/stdout')
80 
81 Author: Baruch Sterin <sterin@berkeley.edu>
82 """
83 
84 import os
85 import select
86 import fcntl
87 import errno
88 import sys
89 import cPickle as pickle
90 import signal
91 import cStringIO
92 
93 import traceback
94 
95 from contextlib import contextmanager
96 
97 import pyabc
98 
99 def _retry_select(rlist):
100  while True:
101  try:
102  rrdy,_,_ = select.select(rlist,[],[])
103  if rrdy:
104  return rrdy
105  except select.error as e:
106  if e[0] == errno.EINTR:
107  continue
108  raise
109 
110 class _splitter(object):
111 
112  def __init__(self):
113  self.pids = []
114  self.fds = {}
115  self.buffers = {}
116  self.results = {}
117 
118  def is_done(self):
119  return len(self.fds) == 0
120 
121  def _kill(self, pids):
122 
123  # close pipes and kill child processes
124  for pid in pids:
125 
126  if pid == -1:
127  continue
128 
129  i, fd = self.fds[pid]
130 
131  del self.buffers[fd]
132  del self.fds[pid]
133 
134  self.pids[i] = -1
135  self.results[pid] = None
136 
137  os.close(fd)
138 
139  try:
140  os.kill( pid, signal.SIGINT)
141  except Exception as e:
142  print >>sys.stderr, 'exception while trying to kill pid=%d: '%pid, e
143  raise
144 
145  # wait for termination and update result
146  for pid in pids:
147  os.waitpid( pid, 0 )
148 
149  def kill(self, ids):
150 
151  self._kill( [ self.pids[i] for i in ids ] )
152 
153  def cleanup(self):
154  self._kill( self.fds.keys() )
155 
156  def child( self, fdw, f):
157 
158  # call function
159  try:
160  res = f()
161  except:
162  traceback.print_exc()
163  raise
164 
165  # write return value into pipe
166  with os.fdopen( fdw, "w" ) as fout:
167  pickle.dump(res, fout)
168 
169  return 0
170 
171  def _fork_one(self, f):
172 
173  # create a pipe to communicate with the child process
174  pr,pw = os.pipe()
175 
176  # set pr to be non-blocking
177  fcntl.fcntl(pr, fcntl.F_SETFL, os.O_NONBLOCK)
178 
179  parentpid = os.getpid()
180  rc = 1
181 
182  try:
183 
184  # create child process
185  pid = os.fork()
186 
187  if pid == 0:
188  # child process:
189  os.close(pr)
190  pyabc.close_on_fork(pw)
191 
192  rc = self.child( pw, f)
193  os._exit(rc)
194  else:
195  # parent process:
196  os.close(pw)
197  return (pid, pr)
198 
199  finally:
200  if os.getpid() != parentpid:
201  os._exit(rc)
202 
203  def fork_one(self, func):
204  pid, fd = self._fork_one(func)
205  i = len(self.pids)
206  self.pids.append(pid)
207  self.fds[pid] = (i, fd)
208  self.buffers[fd] = cStringIO.StringIO()
209  return i
210 
211  def fork_all(self, funcs):
212  return [ self.fork_one(f) for f in funcs ]
213 
214  def communicate(self):
215 
216  rlist = [ fd for _, (_,fd) in self.fds.iteritems() ]
217  rlist.append(pyabc.wait_fd)
218 
219  stop = False
220 
221  while not stop:
222 
223  rrdy = _retry_select( rlist )
224 
225  for fd in rrdy:
226 
227  if fd == pyabc.wait_fd:
228  stop = True
229  continue
230 
231  self.buffers[fd].write( os.read(fd, 16384) )
232 
233  def get_next_result(self):
234 
235  # read from the pipes as needed, while waiting for the next child process to terminate
236  self.communicate()
237 
238  # wait for the next child process to terminate
239  pid, rc = os.wait()
240  assert pid in self.fds
241 
242  # retrieve the pipe file descriptor
243  i, fd = self.fds[pid]
244  del self.fds[pid]
245 
246  # remove the pid
247  self.pids[i] = -1
248 
249  # retrieve the buffer
250  buffer = self.buffers[fd]
251  del self.buffers[fd]
252 
253  # fill the buffer
254  while True:
255  s = os.read(fd, 16384)
256  if not s:
257  break
258  buffer.write(s)
259 
260  os.close(fd)
261 
262  try:
263  return (i, pickle.loads(buffer.getvalue()))
264  except EOFError, pickle.UnpicklingError:
265  return (i, None)
266 
267  def __iter__(self):
268  def iterator():
269  while not self.is_done():
270  yield self.get_next_result()
271  return iterator()
272 
273 @contextmanager
275  # ensure cleanup of child processes
276  s = _splitter()
277  try:
278  yield s
279  finally:
280  s.cleanup()
281 
282 def split_all_full(funcs):
283  # provide an iterator for child process result
284  with make_splitter() as s:
285 
286  s.fork_all(funcs)
287 
288  for res in s:
289  yield res
290 
291 def defer(f):
292  return lambda *args, **kwargs: lambda : f(*args,**kwargs)
293 
294 def split_all(funcs):
295  for _, res in split_all_full( ( defer(f)(*args) for f,args in funcs ) ):
296  yield res
297 
298 import tempfile
299 
300 @contextmanager
301 def temp_file_names(suffixes):
302  names = []
303  try:
304  for suffix in suffixes:
305  with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as file:
306  names.append( file.name )
307  yield names
308  finally:
309  for name in names:
310  os.unlink(name)
311 
312 class abc_state(object):
313  def __init__(self):
314  with tempfile.NamedTemporaryFile(delete=False, suffix='.aig') as file:
315  self.aig = file.name
316  with tempfile.NamedTemporaryFile(delete=False, suffix='.log') as file:
317  self.log = file.name
318  pyabc.run_command(r'write_status %s'%self.log)
319  pyabc.run_command(r'write_aiger %s'%self.aig)
320 
321  def __del__(self):
322  os.unlink( self.aig )
323  os.unlink( self.log )
324 
325  def restore(self):
326  pyabc.run_command(r'read_aiger %s'%self.aig)
327  pyabc.run_command(r'read_status %s'%self.log)
328 
329 def abc_split_all(funcs):
330  import pyabc
331 
332  def child(f, aig, log):
333  res = f()
334  pyabc.run_command(r'write_status %s'%log)
335  pyabc.run_command(r'write_aiger %s'%aig)
336  return res
337 
338  def parent(res, aig, log):
339  pyabc.run_command(r'read_aiger %s'%aig)
340  pyabc.run_command(r'read_status %s'%log)
341  return res
342 
343  with temp_file_names( ['.aig','.log']*len(funcs) ) as tmp:
344 
345  funcs = [ defer(child)(f, tmp[2*i],tmp[2*i+1]) for i,f in enumerate(funcs) ]
346 
347  for i, res in split_all_full(funcs):
348  yield i, parent(res, tmp[2*i],tmp[2*i+1])
349 
350 if __name__ == "__main__":
351 
352  # define some functions to run
353 
354  def f_1(i):
355  return i+1
356 
357  def f_2(i,j):
358  return i*10+j+1
359 
360  def f_3(i,j,k):
361  return i*100+j*10+k+1
362 
363  # Construct a tuple of the function and arguments for each function
364 
365  t_1 = (f_1, [1])
366  t_2 = (f_2, [1,2])
367  t_3 = (f_3, [1,2,3])
368 
369  # Create a list containing these tuples:
370 
371  funcs = [t_1, t_2, t_3]
372 
373  # Use the function split_all() to run these functions in separate processes:
374 
375  for res in split_all(funcs):
376  print res
377 
378  # Alternatively, quit after the first process returns:
379 
380  for res in split_all(funcs):
381  print res
382  break
383 
384  # For operations with ABC that save and restore status
385 
386  import pyabc
387 
388  def abc_f(truth):
389  import os
390  print "pid=%d, abc_f(%s)"%(os.getpid(), truth)
391  pyabc.run_command('read_truth %s'%truth)
392  pyabc.run_command('strash')
393  return 100
394 
395  funcs = [
396  defer(abc_f)("1000"),
397  defer(abc_f)("0001")
398  ]
399 
400  best = None
401 
402  for i, res in abc_split_all(funcs):
403  print i, res
404  if best is None:\
405  # save state
406  best = abc_state()
407  pyabc.run_command('write_verilog /dev/stdout')
408 
409  # if there is a saved state, restore it
410  if best is not None:
411  best.restore()
412  pyabc.run_command('write_verilog /dev/stdout')
def abc_split_all
Definition: pyabc_split.py:329
def make_splitter
Definition: pyabc_split.py:274
def _retry_select
Definition: pyabc_split.py:99
def split_all_full
Definition: pyabc_split.py:282
def temp_file_names
Definition: pyabc_split.py:301