Source code for neuralogic.nn.module.gnn.tag

from neuralogic.core.constructs.metadata import Metadata
from neuralogic.core.constructs.function import Transformation, Aggregation
from neuralogic.core.constructs.factories import R, V
from neuralogic.nn.module.module import Module


[docs] class TAGConv(Module): r""" Topology Adaptive Graph Convolutional layer from `"Topology Adaptive Graph Convolutional Networks" <https://arxiv.org/abs/1710.10370>`_. Which can be expressed as: .. math:: \mathbf{x}^{\prime}_i = act(\sum_{k=0}^K \mathbf{W}_k \cdot {agg}_{j \in \mathcal{N}^k(i)}(\mathbf{x}_j)) Where *act* is an activation function, *agg* aggregation function, *Wk* are learnable parameters and :math:`\mathcal{N}^k(i)` denotes nodes that are *k* hops away from the node *i*. This equation is translated into the logic form as: This equation is translated into the logic form as: .. code:: logtalk (R.<output_name>(V.I0)[<W0>] <= R.<feature_name>(V.I0)) | [<aggregation>, Transformation.IDENTITY] (R.<output_name>(V.I0)[<W1>] <= (R.<feature_name>(V.I1), R.<edge_name>(V.I1, V.I0))) | [<aggregation>, Transformation.IDENTITY] (R.<output_name>(V.I0)[<W2>] <= (R.<feature_name>(V.I2), R.<edge_name>(V.I1, V.I0), R.<edge_name>(V.I2, V.I1)) | [<aggregation>, Transformation.IDENTITY] ... (R.<output_name>(V.I0)[<Wk>] <= (R.<feature_name>(V.I<k>), R.<edge_name>(V.I1, V.I0), ..., R.<edge_name>(V.I<k>, V.I<k-1>)) | [<aggregation>, Transformation.IDENTITY] R.<output_name> / 1 | [<activation>] Examples -------- The whole computation of this module (parametrized as :code:`TAGConv(1, 2, "h1", "h0", "_edge")`) is as follows: .. code:: logtalk (R.h1(V.I0)[2, 2] <= R.h0(V.I0)) | [Aggregation.SUM, Transformation.IDENTITY] (R.h1(V.I0)[2, 1] <= (R.h0(V.I1), R._edge(V.I1, V.I0)) | [Aggregation.SUM, Transformation.IDENTITY] (R.h1(V.I0)[2, 1] <= (R.h0(V.I2), R._edge(V.I1, V.I0), R._edge(V.I2, V.I1)) | [Aggregation.SUM, Transformation.IDENTITY] R.h1 / 1 | [Transformation.IDENTITY] Module parametrized as :code:`TAGConv(1, 2, "h1", "h0", "_edge", 1)` translates into: .. code:: logtalk (R.h1(V.I0)[2, 1] <= R.h0(V.I0)) | [Aggregation.SUM, Transformation.IDENTITY] (R.h1(V.I0)[2, 1] <= (R.h0(V.I1), R._edge(V.I1, V.I0)) | [Aggregation.SUM, Transformation.IDENTITY] R.h1 / 1 | [Transformation.IDENTITY] Parameters ---------- in_channels : int Input feature size. out_channels : int Output feature size. output_name : str Output (head) predicate name of the module. feature_name : str Feature predicate name to get features from. edge_name : str Edge predicate name to use for neighborhood relations. k : int Number of hops. Default: ``2`` activation : Transformation Activation function of the output. Default: ``Transformation.IDENTITY`` aggregation : Aggregation Aggregation function of nodes' neighbors. Default: ``Aggregation.SUM`` """ def __init__( self, in_channels: int, out_channels: int, output_name: str, feature_name: str, edge_name: str, k: int = 2, activation: Transformation = Transformation.IDENTITY, aggregation: Aggregation = Aggregation.SUM, ): self.output_name = output_name self.feature_name = feature_name self.edge_name = edge_name self.in_channels = in_channels self.out_channels = out_channels self.k = k self.activation = activation self.aggregation = aggregation def __call__(self): metadata = Metadata(transformation=Transformation.IDENTITY, aggregation=self.aggregation) head = R.get(self.output_name) feature = R.get(self.feature_name) edge = R.get(self.edge_name) hop_rules = [] for i in range(self.k + 1): hop_rules.append( ( head(V.I0)[self.out_channels, self.in_channels] <= ( feature(f"I{i}"), *(edge(f"I{b}", f"I{a}") for a, b in zip(range(i), range(1, i + 1))), ) ) | metadata ) return [ *hop_rules, head / 1 | Metadata(transformation=self.activation), ]