|
| 1 | +# Copyright (c) Microsoft Corporation. |
| 2 | +# Licensed under the MIT License. |
| 3 | +import typing |
| 4 | +import unittest |
| 5 | + |
| 6 | +import numpy as np |
| 7 | +import onnx_ir as ir |
| 8 | +import parameterized |
| 9 | +from onnx_ir.passes.common import onnx_checker |
| 10 | + |
| 11 | +from onnxscript.rewriter import pattern as orp |
| 12 | +from onnxscript.rewriter import testing |
| 13 | +from onnxscript.rewriter.fuse_pad_into_conv import ( |
| 14 | + fuse_pad_into_conv, |
| 15 | + fuse_pad_into_conv_rule_set, |
| 16 | +) |
| 17 | + |
| 18 | + |
| 19 | +def _clone_model(model: ir.Model) -> ir.Model: |
| 20 | + return ir.from_proto(ir.to_proto(model)) |
| 21 | + |
| 22 | + |
| 23 | +class FusePadConvBaseTest(unittest.TestCase): |
| 24 | + @property |
| 25 | + def rng(self): |
| 26 | + return np.random.default_rng(20250522) |
| 27 | + |
| 28 | + def get_conv_weights(self, shape: typing.Sequence[int], tape: ir.tape.Tape = None): |
| 29 | + w = ir.tensor(self.rng.uniform(-0.5, 0.5, shape).astype("float32"), name="W") |
| 30 | + if tape is not None: |
| 31 | + w = tape.initializer(w) |
| 32 | + return w |
| 33 | + |
| 34 | + def build_model( |
| 35 | + self, |
| 36 | + input_shape: ir.Shape, |
| 37 | + weight_shape: typing.Sequence[int], |
| 38 | + pad_inputs: typing.Sequence[ir.TensorProtocol | ir.Value | None], |
| 39 | + pad_attributes: typing.Mapping[str, ir.Attr] | None = None, |
| 40 | + conv_attributes: typing.Mapping[str, ir.Attr] | None = None, |
| 41 | + opset_imports: typing.Mapping[str, int] = {"": 20}, |
| 42 | + ) -> ir.Model: |
| 43 | + tape = ir.tape.Tape() |
| 44 | + inputs = [] |
| 45 | + output_shape = ir.Shape((input_shape[0],) + ("?",) * (len(input_shape) - 1)) |
| 46 | + |
| 47 | + # Convert pad_inputs to initializers (if needed) |
| 48 | + pad_inputs = list(pad_inputs) |
| 49 | + for idx, x in enumerate(pad_inputs): |
| 50 | + if isinstance(x, ir.TensorProtocol): |
| 51 | + pad_inputs[idx] = tape.initializer(x) |
| 52 | + elif isinstance(x, ir.Value): |
| 53 | + inputs.append(x) |
| 54 | + elif isinstance(x, float): |
| 55 | + pad_inputs[idx] = tape.op("Constant", inputs=[], attributes={"value_float": x}) |
| 56 | + elif x is not None: |
| 57 | + raise ValueError(f"Unsupported type for pad input ({x}): {type(x)}.") |
| 58 | + |
| 59 | + # Register operations in the tape |
| 60 | + x = ir.Input("X", shape=input_shape, type=ir.TensorType(ir.DataType.FLOAT)) |
| 61 | + y = tape.op("Pad", inputs=[x, *pad_inputs], attributes=pad_attributes) |
| 62 | + y = tape.op( |
| 63 | + "Conv", |
| 64 | + inputs=[y, self.get_conv_weights(weight_shape, tape)], |
| 65 | + attributes=conv_attributes, |
| 66 | + output=ir.Input("Y", shape=output_shape, type=ir.TensorType(x.dtype)), |
| 67 | + ) |
| 68 | + |
| 69 | + # Build the model |
| 70 | + ir_model = ir.Model( |
| 71 | + ir.Graph( |
| 72 | + inputs=[x, *inputs], |
| 73 | + outputs=[y], |
| 74 | + nodes=tape.nodes, |
| 75 | + initializers=tape.initializers, |
| 76 | + opset_imports=opset_imports, |
| 77 | + name="model", |
| 78 | + ), |
| 79 | + ir_version=9, |
| 80 | + ) |
| 81 | + onnx_checker.CheckerPass(True)(ir_model) |
| 82 | + return ir_model |
| 83 | + |
| 84 | + |
| 85 | +class FusePadConvTest(FusePadConvBaseTest): |
| 86 | + @parameterized.parameterized.expand( |
| 87 | + [ |
| 88 | + (pad_pads, const_value, axes, conv_pads) |
| 89 | + for pad_pads, axes, conv_pads in [ |
| 90 | + ([0, 0, 2, 2, 0, 0, 2, 2], None, None), |
| 91 | + ([0, 2, 2, 0, 2, 2], ir.tensor([1, -2, -1], name="axes"), [2, 0, 2, 0]), |
| 92 | + ([1, 1, 1, 1], ir.tensor([-2, 3], name="axes"), [0, 1, 0, 1]), |
| 93 | + ] |
| 94 | + for const_value in [None, 0.0] |
| 95 | + ] |
| 96 | + ) |
| 97 | + def test_fuse_pad_into_conv(self, pad_pads, const_value, axes, conv_pads): |
| 98 | + pad_inputs = [ir.tensor(pad_pads, name="pads")] |
| 99 | + if const_value is not None or axes is not None: |
| 100 | + pad_inputs.append(const_value) |
| 101 | + if axes is not None: |
| 102 | + pad_inputs.append(axes) |
| 103 | + base_model = self.build_model( |
| 104 | + input_shape=ir.Shape(("N", 32, 14, 16)), |
| 105 | + weight_shape=(10, 32, 3, 3), |
| 106 | + pad_inputs=pad_inputs, |
| 107 | + conv_attributes={"pads": conv_pads}, |
| 108 | + ) |
| 109 | + updated_model = _clone_model(base_model) |
| 110 | + |
| 111 | + # Apply rule |
| 112 | + count = fuse_pad_into_conv_rule_set().apply_to_model(updated_model) |
| 113 | + |
| 114 | + # Check that Pad was fused |
| 115 | + self.assertEqual(count, 1) |
| 116 | + self.assertEqual(updated_model.graph.num_nodes(), 1) |
| 117 | + onnx_checker.CheckerPass(True)(updated_model) |
| 118 | + |
| 119 | + # Check inference |
| 120 | + inputs = self.rng.random((1, 32, 14, 16), dtype="float32") |
| 121 | + testing.assert_numerically_equal(base_model, updated_model, (inputs,), atol=0, rtol=0) |
| 122 | + |
| 123 | + @parameterized.parameterized.expand( |
| 124 | + [ |
| 125 | + ( |
| 126 | + "constant", |
| 127 | + ir.tensor([1] * 10, name="pads"), |
| 128 | + ir.tensor([0.0], name="const_value"), |
| 129 | + None, |
| 130 | + "NOTSET", |
| 131 | + "must be zero in non-spatial dimensions", |
| 132 | + ), |
| 133 | + ( |
| 134 | + "constant", |
| 135 | + ir.tensor([0, 0, 0, 0], name="pads"), |
| 136 | + ir.tensor([1.0], name="const_value"), |
| 137 | + ir.tensor([0, -1], name="axes"), |
| 138 | + "NOTSET", |
| 139 | + "must be equal to 0.", |
| 140 | + ), |
| 141 | + ( |
| 142 | + "edge", |
| 143 | + ir.tensor([0, 0, 0, 0], name="pads"), |
| 144 | + ir.tensor([0.0], name="const_value"), |
| 145 | + ir.tensor([0, -1], name="axes"), |
| 146 | + "NOTSET", |
| 147 | + "mode must be 'constant'.", |
| 148 | + ), |
| 149 | + ( |
| 150 | + "constant", |
| 151 | + ir.Value( |
| 152 | + name="pads", shape=ir.Shape([4]), type=ir.TensorType(ir.DataType.INT64) |
| 153 | + ), |
| 154 | + None, |
| 155 | + ir.tensor([0, -1], name="axes"), |
| 156 | + "NOTSET", |
| 157 | + "pads is not a constant/initializer.", |
| 158 | + ), |
| 159 | + ( |
| 160 | + "constant", |
| 161 | + ir.tensor([0] * 10, name="pads"), |
| 162 | + ir.Value( |
| 163 | + name="cval", shape=ir.Shape([1]), type=ir.TensorType(ir.DataType.FLOAT) |
| 164 | + ), |
| 165 | + None, |
| 166 | + "NOTSET", |
| 167 | + "cval is not a constant", |
| 168 | + ), |
| 169 | + ( |
| 170 | + "constant", |
| 171 | + ir.tensor([0, 0, 0, 0], name="pads"), |
| 172 | + None, |
| 173 | + ir.Value( |
| 174 | + name="axes", shape=ir.Shape([2]), type=ir.TensorType(ir.DataType.INT64) |
| 175 | + ), |
| 176 | + "NOTSET", |
| 177 | + "axes is not a constant", |
| 178 | + ), |
| 179 | + ( |
| 180 | + "constant", |
| 181 | + ir.tensor([0, 0, 0, 0], name="pads"), |
| 182 | + ir.tensor([0.0], name="const_value"), |
| 183 | + ir.tensor([0, -1], name="axes"), |
| 184 | + "VALID", |
| 185 | + "auto_pad must be 'NOTSET'.", |
| 186 | + ), |
| 187 | + ] |
| 188 | + ) |
| 189 | + def test_unsupported_fuse_pad_into_conv( |
| 190 | + self, mode, pads, const_value, axes, auto_pad, err_msg |
| 191 | + ): |
| 192 | + base_model = self.build_model( |
| 193 | + input_shape=ir.Shape(("N", 32, 14, 16, 12)), |
| 194 | + weight_shape=(10, 32, 3, 4, 5), |
| 195 | + pad_inputs=[pads, const_value, axes], |
| 196 | + pad_attributes={"mode": mode}, |
| 197 | + conv_attributes={"auto_pad": auto_pad}, |
| 198 | + ) |
| 199 | + |
| 200 | + # Apply rule and check it was not applied |
| 201 | + tracer = orp.MatchingTracer() |
| 202 | + count = fuse_pad_into_conv.apply_to_model(base_model, tracer=tracer) |
| 203 | + self.assertEqual(count, 0) |
| 204 | + |
| 205 | + # Check that the error message is the expected one |
| 206 | + tracer_match = tracer.best_matches_map[fuse_pad_into_conv][0] |
| 207 | + self.assertEqual(tracer_match.status.value, orp.MatchStatus.CONDITION_FAILED) |
| 208 | + self.assertRegex(tracer_match.match_result.reason, err_msg) |
| 209 | + |
| 210 | + |
| 211 | +if __name__ == "__main__": |
| 212 | + unittest.main() |
0 commit comments