Viewing File: /home/ubuntu/combine_ai/combine/lib/python3.10/site-packages/numba/core/rvsdg_frontend/bcinterp.py
"""
This file contains logic to convert RVSDG into Numba IR.
Key APIs:
- function `run_frontend()`
- function `rvsdg_to_ir()`
"""
import dis
from contextlib import contextmanager
import builtins
import operator
from typing import Iterator
from functools import reduce
from numba.core import (
ir,
bytecode,
ir_utils,
utils,
)
from numba.core.utils import (
BINOPS_TO_OPERATORS,
INPLACE_BINOPS_TO_OPERATORS,
)
from .rvsdg.bc2rvsdg import (
build_rvsdg,
SCFG,
BasicBlock,
RegionBlock,
DDGBlock,
DDGControlVariable,
DDGBranch,
DDGRegion,
Op,
ValueState,
DEBUG_GRAPH,
)
from .rvsdg.regionpasses import RegionVisitor
def run_frontend(func):
sig = utils.pySignature.from_callable(func)
argnames = tuple(sig.parameters)
rvsdg = build_rvsdg(func.__code__, argnames)
func_id = bytecode.FunctionIdentity.from_function(func)
func_ir = rvsdg_to_ir(func_id, rvsdg)
return func_ir
def _get_first_bytecode(ops: list[Op]) -> dis.Instruction | None:
for bc in (op.bc_inst for op in ops if op.bc_inst is not None):
return bc
return None
def _innermost_exiting(blk: RegionBlock) -> BasicBlock:
# This will iterate through the RegionBlocks until it finds an exiting
# attribute that is not a RegionBlock.
while isinstance(blk, RegionBlock):
blk = blk.subregion.graph[blk.exiting]
return blk
_noop = {"var.incoming", "start"}
_Data = dict[str, ir.Var]
class RVSDG2IR(RegionVisitor[_Data]):
blocks: dict[int, ir.Block]
func_id: bytecode.FunctionIdentity
local_scope: ir.Scope
global_scope: ir.Scope
vsmap: dict[ValueState, ir.Var]
_current_block: ir.Block | None
last_block_label: int | None
branch_predicate: ir.Var | None
_label_map: dict[str, int]
_emit_debug_print = False
def __init__(self, func_id):
self.func_id = func_id
self.loc = self.first_loc = ir.Loc.from_function_id(func_id)
self.global_scope = ir.Scope(parent=None, loc=self.loc)
self.local_scope = ir.Scope(parent=self.global_scope, loc=self.loc)
self.blocks = {}
self.vsmap = {}
self._current_block = None
self.last_block_label = None
self._label_map = {}
@property
def current_block(self) -> ir.Block:
out = self._current_block
assert out is not None
return out
@property
def last_block(self) -> ir.Block:
assert self.last_block_label is not None # for typing
return self.blocks[self.last_block_label]
def _get_phi_name(self, varname: str, label: str) -> str:
suffix = str(self._get_label(label))
return f"$phi.{varname}.{suffix}"
def _get_cp_name(self, cpname: str) -> str:
return f"$.cp.{cpname}"
def _get_label(self, label: str) -> int:
num = self._label_map.setdefault(f"block.{label}", len(self._label_map))
return num
def _get_temp_label(self) -> int:
num = len(self._label_map)
assert num not in self._label_map
self._label_map[f"annoy.{num}"] = num
return num
def initialize(self) -> _Data:
label = self._get_temp_label()
with self.set_block(label, ir.Block(scope=self.local_scope,
loc=self.loc)):
data: _Data = {}
for i, k in enumerate(self.func_id.arg_names): # type: ignore
val = ir.Arg(index=i, name=k, loc=self.loc)
data[f"var.{k}"] = self.store(val, str(k))
return data
def finalize(self):
if self.last_block_label is not None:
last_block = self.blocks[self.last_block_label]
if not last_block.is_terminated:
last_block.append(
ir.StaticRaise(AssertionError, (), loc=self.loc)
)
def visit_block(self, block: BasicBlock, data: _Data) -> _Data:
if isinstance(block, DDGBlock):
# Prepare incoming variables
for k, vs in block.in_vars.items():
self.vsmap[vs] = data[k]
# Emit instruction body
ops = block.get_toposorted_ops()
firstbc: dis.Instruction | None = _get_first_bytecode(ops)
if firstbc is not None:
assert firstbc.positions is not None
self.loc = self.loc.with_lineno(
firstbc.positions.lineno,
firstbc.positions.col_offset,
)
with self.set_block(
self._get_label(block.name),
ir.Block(scope=self.local_scope, loc=self.loc),
):
for op in ops:
if op.opname in _noop:
pass
elif op.bc_inst is not None:
self.interpret_bytecode(op)
elif op.opname in {"stack.export", "stack.incoming"}:
[arg] = op.inputs
[res] = op.outputs
self.vsmap[res] = self.store(
self.vsmap[arg], f"${res.name}"
)
else:
raise NotImplementedError(op.opname, op)
if len(block._jump_targets) > 1:
assert self.branch_predicate is not None
truebr = self._get_label(block._jump_targets[1])
falsebr = self._get_label(block._jump_targets[0])
br = ir.Branch(
cond=self.branch_predicate,
truebr=truebr,
falsebr=falsebr,
loc=self.loc,
)
self.current_block.append(br)
# Prepare outgoing variables
data = {k: self.vsmap[vs] for k, vs in block.out_vars.items()}
return data
elif isinstance(block, DDGControlVariable):
# Emit body
with self.set_block(
self._get_label(block.name),
ir.Block(scope=self.local_scope, loc=self.loc),
):
for cp, v in block.variable_assignment.items():
const = ir.Const(v, loc=self.loc, use_literal_type=False)
self.store(const, self._get_cp_name(cp), redefine=False)
return data
elif isinstance(block, DDGBranch):
# Emit body
if len(block.branch_value_table) == 2:
self._emit_two_way_switch(block)
else:
self._emit_n_way_switch(block)
return data
else:
raise NotImplementedError(block.name, type(block))
def _emit_two_way_switch(self, block):
with self.set_block(
self._get_label(block.name),
ir.Block(scope=self.local_scope, loc=self.loc),
):
# Handle simple two-way branch
assert set(block.branch_value_table.keys()) == {0, 1}
cp = block.variable
cpvar = self.local_scope.get_exact(self._get_cp_name(cp))
truebr = self._get_label(block.branch_value_table[1])
falsebr = self._get_label(block.branch_value_table[0])
br = ir.Branch(
cond=cpvar,
truebr=truebr,
falsebr=falsebr,
loc=self.loc,
)
self.current_block.append(br)
def _emit_n_way_switch(self, block):
"""
This handles emitting a switch block with N cases. It does the
following:
- The branch value table in the block provides information about the
case index and the case label in this switch.
- Emit a block that unconditionally jumps to the first case block.
- In each case block:
- Compare the control variable to the expected case index for
that case.
- Branch to the target label on true, or the next case on false.
- There is no default case. The control variable must match a case index
┌───────────────────────┐
│ current block │
└───────────┬───────────┘
│
└─────────┐
▼
┌───────────────────┐
│ case 0 │
└─────────┬─────────┘
│
│
▼
┌───────────────────┐
│ case 1 │
└─────────┬─────────┘
│
│
▼
┌───────────────────┐
│ case N-1 │
└─────────┬─────────┘
│
┌─────────┘
│
▼
┌───────────────────────┐
│ subsequent blocks │
└───────────────────────┘
"""
# with self.set_block(
# self._get_label(block.name),
# ir.Block(scope=self.local_scope, loc=self.loc)):
# # Handle simple two-way branch
# # assert set(block.branch_value_table.keys()) == {0, 1}
# cp = block.variable
# cpvar = self.local_scope.get_exact(f"$.cp.{cp}")
# falsebr = self._get_label(block.branch_value_table[1])
# truebr = self._get_label(block.branch_value_table[0])
# const = self.store(ir.Const(0, loc=self.loc), "$.const")
# cmp = ir.Expr.binop(operator.eq, const, cpvar,
# loc=self.loc)
# pred = self.store(cmp, "$.pred")
# br = ir.Branch(
# cond=pred,
# truebr=truebr,
# falsebr=falsebr,
# loc=self.loc,
# )
# self.current_block.append(br)
bvt = block.branch_value_table
# The control variable
cp = block.variable
cpvar = self.local_scope.get_exact(self._get_cp_name(cp))
labels = [(k, self._get_label(v)) for k, v in bvt.items()]
blocks = []
# For all but last labels
for _ in range(len(labels) - 1):
blocks.append(
(
self._get_temp_label(),
ir.Block(scope=self.local_scope, loc=self.loc),
)
)
# Jump into the first block
with self.set_block(
self._get_label(block.name),
ir.Block(scope=self.local_scope, loc=self.loc),
):
self.current_block.append(ir.Jump(blocks[-1][0], loc=self.loc))
# Handle jump tree
while blocks:
cp_expect, cp_label = labels.pop()
cur_label, cur_block = blocks.pop()
with self.set_block(cur_label, cur_block):
const = self.store(ir.Const(cp_expect, loc=self.loc), "$.const")
cmp = ir.Expr.binop(operator.eq, const, cpvar, loc=self.loc)
pred = self.store(cmp, "$.cmp")
if not blocks:
_, falsebr = labels.pop()
else:
falsebr, _ = blocks[-1]
br = ir.Branch(
cond=pred,
truebr=cp_label,
falsebr=falsebr,
loc=self.loc,
)
self.current_block.append(br)
def visit_loop(self, region: RegionBlock, data: _Data) -> _Data:
assert isinstance(region, DDGRegion)
# Prepare incoming states
inner_data: _Data = {}
for k in region.incoming_states:
inner_data[k] = self.store(
data[k],
self._get_phi_name(k, region.name),
redefine=False,
block=self.last_block,
)
# Emit loop body
out_data = self.visit_linear(region, inner_data)
# Prepare outgoing states
exit_data = {}
for k in region.outgoing_states:
exit_data[k] = self.store(
out_data[k],
self._get_phi_name(k, region.name),
redefine=False,
block=self.last_block,
)
return exit_data
def visit_switch(self, region: RegionBlock, data: _Data) -> _Data:
# Emit header
header = region.header
header_block = region.subregion[header]
assert header_block.kind == "head"
self.branch_predicate = None
data_at_head = self.visit_linear(header_block, data)
# NOTE: This check is needed due to the mess that the responsibility of
# terminating the block is unclear. Some rvsdg block is
# fallthrough but some have jumps.
if not self.last_block.is_terminated:
assert self.branch_predicate is not None # for typing
# Jump-target 1 is when a the jump is taken.
# Jump-target 0 is when the jump fallthrough
truebr = self._get_label(header_block.jump_targets[1])
falsebr = self._get_label(header_block.jump_targets[0])
br = ir.Branch(self.branch_predicate, truebr, falsebr, loc=self.loc)
self.last_block.append(br)
# Emit branches
data_for_branches = []
branch_blocks = []
for blk in region.subregion.graph.values():
if blk.kind == "branch":
branch_blocks.append(_innermost_exiting(blk))
data_for_branches.append(self.visit_linear(blk, data_at_head))
# Add jump to tail
if not self.last_block.is_terminated:
[target] = blk.jump_targets
self.last_block.append(
ir.Jump(self._get_label(target), loc=self.loc)
)
# handle outgoing values from the branches
names: set[str] = reduce(
operator.or_, map(set, data_for_branches)) # type: ignore
for blk, branch_data in zip(
branch_blocks, data_for_branches, strict=True
):
for k in names:
# Set undefined variable to None
# (It should be a "zeroinitializer" but ir.Expr.null doesn't
# work)
rhs = branch_data.get(k, ir.Const(None, loc=self.loc))
# Insert stores to export
phiname = self._get_phi_name(k, region.name)
self.store(
rhs,
phiname,
redefine=False,
block=self.blocks[self._get_label(blk.name)],
)
# Emit tail
data_after_branches = {
k: self.local_scope.get_exact(self._get_phi_name(k, region.name))
for k in names
}
exiting = region.exiting
exiting_block = region.subregion[exiting]
assert exiting_block.kind == "tail"
data_at_tail = self.visit_linear(exiting_block, data_after_branches)
return data_at_tail
def visit_linear(self, region: RegionBlock, data: _Data) -> _Data:
with self.set_block(
self._get_label(region.name),
ir.Block(scope=self.local_scope, loc=self.loc),
):
# Ensures there's a block for all regions
pass
return super().visit_linear(region, data)
@contextmanager
def set_block(self, label: int, block: ir.Block) -> Iterator[ir.Block]:
"""A context manager that set the current block for other IR building
methods.
In addition,
- It closes any existing block in ``last_block_label`` by jumping to the
new block.
- If there is a existing block, it will be restored as the current block
after the context manager.
"""
if self.last_block_label is not None:
last_block = self.blocks[self.last_block_label]
if not last_block.is_terminated:
last_block.append(ir.Jump(label, loc=self.loc))
if self._emit_debug_print:
print("begin dump last blk".center(80, "-"))
last_block.dump()
print("end dump last blk".center(80, "="))
self.blocks[label] = block
old = self._current_block
self._current_block = block
try:
yield block
finally:
self.last_block_label = label
self._current_block = old
# dump
if self._emit_debug_print:
print(f"begin dump blk: {label}".center(80, "-"))
block.dump()
print("end dump blk".center(80, "="))
def store(self, value, name, *, redefine=True, block=None) -> ir.Var:
target: ir.Var
if redefine:
target = self.local_scope.redefine(name, loc=self.loc)
else:
target = self.local_scope.get_or_define(name, loc=self.loc)
stmt = ir.Assign(value=value, target=target, loc=self.loc)
self.append(stmt, block=block)
return target
def store_vsmap(self, val, vs):
self.vsmap[vs] = self.store(val, f"${vs.name}")
def append(self, stmt: ir.Stmt, block=None):
if block is None:
block = self.current_block
if block.is_terminated:
block.insert_before_terminator(stmt)
else:
block.append(stmt)
def get_global_value(self, name):
"""THIS IS COPIED from interpreter.py
Get a global value from the func_global (first) or
as a builtins (second). If both failed, return a ir.UNDEFINED.
"""
try:
return self.func_id.func.__globals__[name]
except KeyError:
return getattr(builtins, name, ir.UNDEFINED)
def get_closure_value(self, index):
"""
Get a value from the cell contained in this function's closure.
If not set, return a ir.UNDEFINED.
"""
cell = self.func_id.func.__closure__[index]
try:
return cell.cell_contents
except ValueError:
return ir.UNDEFINED
def debug_print(self, msg: str, *args):
msg_const = self.store(ir.Const(msg, loc=self.loc), "$.debug.msg")
fn = self.store(ir.Const(print, loc=self.loc), "$.debug.print")
res = ir.Expr.call(fn, (msg_const, *args), (), loc=self.loc)
self.store(res, "$.debug.res")
def interpret_bytecode(self, op: Op):
"""Interpret a single Op containing bytecode instructions.
Internally, it dispatches to methods with names following the pattern
`op_<opname>`.
"""
assert op.bc_inst is not None
pos = op.bc_inst.positions
assert pos is not None
self.loc = self.loc.with_lineno(pos.lineno, pos.col_offset)
# debug print
if self._emit_debug_print:
where = f"{op.bc_inst.offset:3}:({pos.lineno:3}:{pos.col_offset:3})"
msg = f"[{where}] {op.bc_inst.opname}({op.bc_inst.argrepr}) "
self.debug_print(msg)
for k, vs in op.input_ports.items():
val = self.vsmap.get(vs, None)
if val is None:
self.debug_print(f" in {k:>6}: <undef>")
else:
self.debug_print(f" in {k:>6}:", val)
# dispatch
fn = getattr(self, f"op_{op.bc_inst.opname}")
fn(op, op.bc_inst)
# debug print
if self._emit_debug_print:
for k, vs in op.output_ports.items():
val = self.vsmap.get(vs, None)
if val is None:
self.debug_print(f" out {k:>6}: <undef>")
else:
self.debug_print(f" out {k:>6}:", val)
def op_PUSH_NULL(self, op: Op, bc: dis.Instruction):
pass
def op_LOAD_CONST(self, op: Op, bc: dis.Instruction):
assert not op.inputs
[vs] = op.outputs
# TODO: handle non scalar
value = ir.Const(bc.argval, loc=self.loc)
self.store_vsmap(value, vs)
def op_LOAD_GLOBAL(self, op: Op, bc: dis.Instruction):
# intentionally ignoring the nil
[_nil, res] = op.outputs
value = self.get_global_value(bc.argval)
# TODO: handle non scalar
const = ir.Global(bc.argval, value, loc=self.loc)
self.store_vsmap(const, res)
def op_LOAD_ATTR(self, op: Op, bc: dis.Instruction):
[res] = op.outputs
[item] = op.inputs
getattr = ir.Expr.getattr(self.vsmap[item], bc.argval, loc=self.loc)
self.store_vsmap(getattr, res)
def op_LOAD_METHOD(self, op: Op, bc: dis.Instruction):
[_nil, res] = op.outputs
[item] = op.inputs
getattr = ir.Expr.getattr(self.vsmap[item], bc.argval, loc=self.loc)
self.store_vsmap(getattr, res)
def op_LOAD_DEREF(self, op: Op, bc: dis.Instruction):
[out] = op.outputs
code = self.func_id.code # type: ignore
name = bc.argval
if name in code.co_cellvars:
raise NotImplementedError
gl = self.get(name)
elif name in code.co_freevars:
idx = code.co_freevars.index(name)
value = self.get_closure_value(idx)
gl = ir.FreeVar(idx, name, value, loc=self.loc)
self.store_vsmap(gl, out)
def op_STORE_FAST(self, op: Op, bc: dis.Instruction):
[incvar] = op.inputs
[res] = op.outputs
var = self.vsmap[incvar]
self.vsmap[res] = self.store(var, bc.argval)
def op_KW_NAMES(self, op: Op, bc: dis.Instruction):
pass # do nothing
def op_CALL(self, op: Op, bc: dis.Instruction):
[_env, callee_or_null, arg0_or_callee, *args] = op.inputs
[_env, res] = op.outputs
if callee_or_null.name != "null":
raise NotImplementedError
callee = self.vsmap[arg0_or_callee]
if op.opname == "call.kw":
# If this is a keyword call, the last value-state in `args` has
# the names of the keyword arguments. This corresponds to the
# `kw_names` special state in the CPython interpreter.
kw_names_op = args[-1].parent
assert kw_names_op is not None # for typing
assert kw_names_op.bc_inst is not None # for typing
assert kw_names_op.opname == "kw_names"
# Now that we handled the `kw_names`, remove it from the list of
# actual arguments. The last `len(kw_names)` values in it will go
# in the `kwargs`. All values before that is the positional args.
args = args[:-1]
co = self.func_id.code # type: ignore
names = co.co_consts[kw_names_op.bc_inst.arg]
argvars = [self.vsmap[vs] for vs in args]
kwargs = tuple(zip(names, args[-len(names) :]))
argvars = argvars[: -len(names)]
else:
assert op.opname == "call"
argvars = [self.vsmap[vs] for vs in args]
kwargs = ()
expr = ir.Expr.call(callee, argvars, kwargs, loc=self.loc)
self.store_vsmap(expr, res)
def op_COMPARE_OP(self, op: Op, bc: dis.Instruction):
[_env, lhs, rhs] = op.inputs
[_env, out] = op.outputs
operator = bc.argrepr
op = BINOPS_TO_OPERATORS[operator] # type: ignore
lhs_var = self.vsmap[lhs]
rhs_var = self.vsmap[rhs]
expr = ir.Expr.binop(op, lhs=lhs_var, rhs=rhs_var, loc=self.loc)
self.store_vsmap(expr, out)
def _binop(self, operator, op):
[_env, lhs, rhs] = op.inputs
[_env, out] = op.outputs
if "=" in operator:
immuop = BINOPS_TO_OPERATORS[operator[:-1]]
op = INPLACE_BINOPS_TO_OPERATORS[operator]
expr = ir.Expr.inplace_binop(
op,
immuop,
lhs=self.vsmap[lhs],
rhs=self.vsmap[rhs],
loc=self.loc,
)
else:
op = BINOPS_TO_OPERATORS[operator]
lhs = self.vsmap[lhs]
rhs = self.vsmap[rhs]
expr = ir.Expr.binop(op, lhs=lhs, rhs=rhs, loc=self.loc)
self.store_vsmap(expr, out)
def op_BINARY_OP(self, op: Op, bc: dis.Instruction):
self._binop(bc.argrepr, op)
def op_IS_OP(self, op: Op, bc: dis.Instruction):
opname = 'is not' if bc.argval == 1 else 'is'
self._binop(opname, op)
def op_UNARY_NOT(self, op: Op, bc: dis.Instruction):
[val] = op.inputs
[out] = op.outputs
expr = ir.Expr.unary("not", value=self.vsmap[val], loc=self.loc)
self.store_vsmap(expr, out)
def op_BINARY_SUBSCR(self, op: Op, bc: dis.Instruction):
[_env, index, target] = op.inputs
[_env, out] = op.outputs
index_var = self.vsmap[index]
target_var = self.vsmap[target]
expr = ir.Expr.getitem(target_var, index=index_var, loc=self.loc)
self.store_vsmap(expr, out)
def op_STORE_SUBSCR(self, op: Op, bc: dis.Instruction):
[_env, index, target, value] = op.inputs
index_var = self.vsmap[index]
target_var = self.vsmap[target]
value_var = self.vsmap[value]
stmt = ir.SetItem(
target=target_var, index=index_var, value=value_var, loc=self.loc
)
self.append(stmt)
def op_BUILD_TUPLE(self, op: Op, bc: dis.Instruction):
items = op.inputs
[out] = op.outputs
expr = ir.Expr.build_tuple(
items=[self.vsmap[it] for it in items], loc=self.loc
)
self.store_vsmap(expr, out)
def op_BUILD_SLICE(self, op: Op, bc: dis.Instruction):
args = tuple([self.vsmap[v] for v in op.inputs])
[out] = op.outputs
assert len(args) in (2, 3), "expected (start, stop, [step])"
slicegv = ir.Global("slice", slice, loc=self.loc)
slicevar = self.store(value=slicegv, name="$slicevar", redefine=True)
sliceinst = ir.Expr.call(slicevar, args, (), loc=self.loc)
self.store_vsmap(sliceinst, out)
def op_GET_ITER(self, op: Op, bc: dis.Instruction):
[arg] = op.inputs
[res] = op.outputs
expr = ir.Expr.getiter(value=self.vsmap[arg], loc=self.loc)
self.store_vsmap(expr, res)
def op_FOR_ITER(self, op: Op, bc: dis.Instruction):
[iterator] = op.inputs
[res] = op.outputs
# Emit code
pairval = ir.Expr.iternext(value=self.vsmap[iterator], loc=self.loc)
pair = self.store(pairval, "$foriter")
iternext = ir.Expr.pair_first(value=pair, loc=self.loc)
indval = self.store(iternext, "$foriter.indval")
self.vsmap[res] = indval
isvalid = ir.Expr.pair_second(value=pair, loc=self.loc)
pred = self.store(isvalid, "$foriter.isvalid")
not_fn = ir.Const(operator.not_, loc=self.loc)
res = ir.Expr.call(
self.store(not_fn, "$not"), (pred,), (), loc=self.loc
)
self.branch_predicate = self.store(res, "$for_iter")
def _jump_if_not(self, pred):
"""Emit code for jump if predicate is false."""
not_fn = ir.Const(operator.not_, loc=self.loc)
res = ir.Expr.call(
self.store(not_fn, "$not"), (self.vsmap[pred],), (), loc=self.loc
)
self.branch_predicate = self.store(res, "$jump_if")
def _jump_if(self, pred):
"""Emit code for jump if predicate is true."""
self.branch_predicate = self.store(self.vsmap[pred], "$jump_if")
def op_JUMP_IF_FALSE_OR_POP(self, op: Op, bc: dis.Instruction):
[_env, pred] = op.inputs
[_env] = op.outputs
self._jump_if_not(pred)
def op_JUMP_IF_TRUE_OR_POP(self, op: Op, bc: dis.Instruction):
[_env, pred] = op.inputs
[_env] = op.outputs
self._jump_if(pred)
def op_POP_JUMP_FORWARD_IF_FALSE(self, op: Op, bc: dis.Instruction):
[_env, pred] = op.inputs
[_env] = op.outputs
self._jump_if_not(pred)
def op_POP_JUMP_FORWARD_IF_TRUE(self, op: Op, bc: dis.Instruction):
[_env, pred] = op.inputs
[_env] = op.outputs
self._jump_if(pred)
def _test_none_and_jump(self, pred, bc: dis.Instruction, invert: bool):
test = "is not" if invert else "is"
op = BINOPS_TO_OPERATORS[test] # type: ignore
none = self.store(
value=ir.Const(None, loc=self.loc), name=f"$constNone{bc.offset}"
)
isnone = ir.Expr.binop(op, lhs=self.vsmap[pred], rhs=none, loc=self.loc)
self.branch_predicate = self.store(isnone, "$jump_if")
def op_POP_JUMP_FORWARD_IF_NONE(self, op: Op, bc: dis.Instruction):
[_env, pred] = op.inputs
[_env] = op.outputs
self._test_none_and_jump(pred, bc, invert=False)
def op_POP_JUMP_FORWARD_IF_NOT_NONE(self, op: Op, bc: dis.Instruction):
[_env, pred] = op.inputs
[_env] = op.outputs
self._test_none_and_jump(pred, bc, invert=True)
op_POP_JUMP_BACKWARD_IF_TRUE = op_POP_JUMP_FORWARD_IF_TRUE
def op_RETURN_VALUE(self, op: Op, bc: dis.Instruction):
[_env, retval] = op.inputs
self.append(ir.Return(self.vsmap[retval], loc=self.loc))
assert self.current_block.is_terminated
def op_RAISE_VARARGS(self, op: Op, bc: dis.Instruction):
[_env, exc] = op.inputs
# XXX: temporary implementation
self.append(ir.Raise(exception=self.vsmap[exc], loc=self.loc))
assert self.current_block.is_terminated
def rvsdg_to_ir(
func_id: bytecode.FunctionIdentity, rvsdg: SCFG
) -> ir.FunctionIR:
rvsdg2ir = RVSDG2IR(func_id)
data = rvsdg2ir.initialize()
rvsdg2ir.visit_graph(rvsdg, data)
rvsdg2ir.finalize()
for blk in rvsdg2ir.blocks.values():
blk.verify()
defs = ir_utils.build_definitions(rvsdg2ir.blocks)
fir = ir.FunctionIR(
blocks=rvsdg2ir.blocks,
is_generator=False,
func_id=func_id,
loc=rvsdg2ir.first_loc,
definitions=defs,
arg_count=len(func_id.arg_names), # type: ignore
arg_names=func_id.arg_names, # type: ignore
)
# fir.dump()
if DEBUG_GRAPH:
fir.render_dot().view()
return fir
Back to Directory
File Manager