Source code for cogdl.models.nn.gat

import torch
import torch.nn as nn
import torch.nn.functional as F
import math

from .. import BaseModel, register_model
from cogdl.utils import mul_edge_softmax, spmm, mh_spmm, check_mh_spmm


class GATLayer(nn.Module):
    """
    Sparse version GAT layer, similar to https://arxiv.org/abs/1710.10903
    """

    def __init__(self, in_features, out_features, nhead=1, alpha=0.2, dropout=0.6, concat=True, residual=False):
        super(GATLayer, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.alpha = alpha
        self.concat = concat
        self.nhead = nhead

        self.W = nn.Parameter(torch.FloatTensor(in_features, out_features * nhead))

        self.a_l = nn.Parameter(torch.zeros(size=(1, nhead, out_features)))
        self.a_r = nn.Parameter(torch.zeros(size=(1, nhead, out_features)))

        self.dropout = nn.Dropout(dropout)
        self.leakyrelu = nn.LeakyReLU(self.alpha)

        if residual:
            out_features = out_features * nhead if concat else out_features
            self.residual = nn.Linear(in_features, out_features)
        else:
            self.register_buffer("residual", None)
        self.reset_parameters()

    def reset_parameters(self):
        def reset(tensor):
            stdv = math.sqrt(6.0 / (tensor.size(-2) + tensor.size(-1)))
            tensor.data.uniform_(-stdv, stdv)

        reset(self.a_l)
        reset(self.a_r)
        reset(self.W)

        # nn.init.xavier_uniform_(self.W.data, gain=1.414)
        # nn.init.xavier_uniform_(self.a_r.data, gain=1.414)
        # nn.init.xavier_uniform_(self.a_l.data, gain=1.414)

    def forward(self, graph, x):
        h = torch.matmul(x, self.W).view(-1, self.nhead, self.out_features)
        h[torch.isnan(h)] = 0.0

        row, col = graph.edge_index
        # Self-attention on the nodes - Shared attention mechanism
        h_l = (self.a_l * h).sum(dim=-1)[row]
        h_r = (self.a_r * h).sum(dim=-1)[col]
        edge_attention = self.leakyrelu(h_l + h_r)
        # edge_attention: E * H
        edge_attention = mul_edge_softmax(graph, edge_attention)
        edge_attention = self.dropout(edge_attention)

        if check_mh_spmm() and next(self.parameters()).device.type != "cpu":
            if self.nhead > 1:
                h_prime = mh_spmm(graph, edge_attention, h)
                out = h_prime.view(h_prime.shape[0], -1)
            else:
                edge_weight = edge_attention.view(-1)
                with graph.local_graph():
                    graph.edge_weight = edge_weight
                    out = spmm(graph, h.squeeze(1))
        else:
            with graph.local_graph():
                h_prime = []
                h = h.permute(1, 0, 2).contiguous()
                for i in range(self.nhead):
                    edge_weight = edge_attention[:, i]
                    graph.edge_weight = edge_weight
                    hidden = h[i]
                    assert not torch.isnan(hidden).any()
                    h_prime.append(spmm(graph, hidden))
            out = torch.cat(h_prime, dim=1)

        if self.residual:
            res = self.residual(x)
            out += res
        return out

    def __repr__(self):
        return self.__class__.__name__ + " (" + str(self.in_features) + " -> " + str(self.out_features) + ")"


[docs]@register_model("gat") class GAT(BaseModel): r"""The GAT model from the `"Graph Attention Networks" <https://arxiv.org/abs/1710.10903>`_ paper Args: num_features (int) : Number of input features. num_classes (int) : Number of classes. hidden_size (int) : The dimension of node representation. dropout (float) : Dropout rate for model training. alpha (float) : Coefficient of leaky_relu. nheads (int) : Number of attention heads. """
[docs] @staticmethod def add_args(parser): """Add model-specific arguments to the parser.""" # fmt: off parser.add_argument("--num-features", type=int) parser.add_argument("--num-layers", type=int, default=2) parser.add_argument("--residual", action="store_true") parser.add_argument("--num-classes", type=int) parser.add_argument("--hidden-size", type=int, default=8) parser.add_argument("--dropout", type=float, default=0.6) parser.add_argument("--alpha", type=float, default=0.2) parser.add_argument("--nhead", type=int, default=8) parser.add_argument("--last-nhead", type=int, default=1)
# fmt: on
[docs] @classmethod def build_model_from_args(cls, args): return cls( args.num_features, args.hidden_size, args.num_classes, args.num_layers, args.dropout, args.alpha, args.nhead, args.residual, args.last_nhead, )
def __init__( self, in_feats, hidden_size, out_features, num_layers, dropout, alpha, nhead, residual, last_nhead, ): """Sparse version of GAT.""" super(GAT, self).__init__() self.dropout = dropout self.attentions = nn.ModuleList() self.attentions.append( GATLayer( in_feats, hidden_size, nhead=nhead, dropout=dropout, alpha=alpha, concat=True, residual=residual, ) ) for i in range(num_layers - 2): self.attentions.append( GATLayer( hidden_size * nhead, hidden_size, nhead=nhead, dropout=dropout, alpha=alpha, concat=True, residual=residual, ) ) self.attentions.append( GATLayer( hidden_size * nhead, out_features, dropout=dropout, alpha=alpha, concat=False, nhead=last_nhead, residual=False, ) ) self.num_layers = num_layers self.last_nhead = last_nhead self.residual = residual
[docs] def forward(self, graph): x = graph.x for i, layer in enumerate(self.attentions): x = F.dropout(x, p=self.dropout, training=self.training) x = layer(graph, x) if i != self.num_layers - 1: x = F.elu(x) return x
[docs] def predict(self, graph): return self.forward(graph)