Source code for cogdl.models.nn.pyg_grand

import math

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from torch_scatter import scatter_add, scatter
from .. import BaseModel, register_model


[docs]class MLPLayer(nn.Module): def __init__(self, in_features, out_features, bias=True): super(MLPLayer, self).__init__() self.in_features = in_features self.out_features = out_features self.weight = Parameter(torch.FloatTensor(in_features, out_features)) if bias: self.bias = Parameter(torch.FloatTensor(out_features)) else: self.register_parameter('bias', None) self.reset_parameters()
[docs] def reset_parameters(self): stdv = 1. / math.sqrt(self.weight.size(1)) self.weight.data.normal_(-stdv, stdv) if self.bias is not None: self.bias.data.normal_(-stdv, stdv)
[docs] def forward(self, x): output = torch.mm(x, self.weight) if self.bias is not None: return output + self.bias else: return output
[docs] def __repr__(self): return self.__class__.__name__ + ' (' \ + str(self.in_features) + ' -> ' \ + str(self.out_features) + ')'
[docs]@register_model("grand") class Grand(BaseModel): @staticmethod
[docs] def add_args(parser): """Add model-specific arguments to the parser.""" # fmt: off parser.add_argument("--num-features", type=int) parser.add_argument("--num-classes", type=int) parser.add_argument("--hidden-size", type=int, default=32) parser.add_argument("--hidden_dropout", type=float, default=0.5) parser.add_argument("--input_dropout", type=float, default=0.5) parser.add_argument("--bn", type=bool, default=False) parser.add_argument("--dropnode_rate", type=float, default=0.5) parser.add_argument('--order', type = int, default = 5) parser.add_argument('--tem', type = float, default = 0.5) parser.add_argument('--lam', type = float, default = 0.5) parser.add_argument('--sample', type = int, default = 2) parser.add_argument('--alpha', type = float, default = 0.2)
# fmt: on @classmethod
[docs] def build_model_from_args(cls, args): return cls(args.num_features, args.hidden_size, args.num_classes, args.input_dropout, args.hidden_dropout, args.bn, args.dropnode_rate, args.tem, args.lam, args.order, args.sample, args.alpha)
def __init__(self, nfeat, nhid, nclass, input_droprate, hidden_droprate, use_bn, dropnode_rate, tem, lam, order, sample, alpha): super(Grand, self).__init__() self.layer1 = MLPLayer(nfeat, nhid) self.layer2 = MLPLayer(nhid, nclass) self.input_droprate = input_droprate self.hidden_droprate = hidden_droprate self.bn1 = nn.BatchNorm1d(nfeat) self.bn2 = nn.BatchNorm1d(nhid) self.use_bn = use_bn self.tem = tem self.lam = lam self.order = order self.dropnode_rate = dropnode_rate self.sample = sample self.alpha = alpha
[docs] def dropNode(self, x): n = x.shape[0] drop_rates = torch.ones(n) * self.dropnode_rate if self.training: masks = torch.bernoulli(1. - drop_rates).unsqueeze(1) x = masks.to(x.device) * x else: x = x * (1. - self.dropnode_rate) return x
[docs] def normalize_adj(self, edge_index, edge_weight, num_nodes): row, col = edge_index[0], edge_index[1] deg = scatter_add(edge_weight, row, dim = 0, dim_size = num_nodes) deg_inv_sqrt = deg.pow_(-0.5) deg_inv_sqrt.masked_fill_(deg_inv_sqrt == float('inf'), 0) edge_weight = deg_inv_sqrt[row] * edge_weight * deg_inv_sqrt[col] #print(edge_weight) return edge_weight
[docs] def rand_prop(self, x, edge_index, edge_weight): edge_weight = self.normalize_adj(edge_index, edge_weight, x.shape[0]) row, col = edge_index[0], edge_index[1] x = self.dropNode(x) y = x for i in range(self.order): x_source = x[col] x = scatter(x_source * edge_weight[:, None], row[:,None], dim=0, dim_size=x.shape[0], reduce='sum').detach_() #x = torch.spmm(adj, x).detach_() y.add_(x) return y.div_(self.order + 1.0).detach_()
[docs] def consis_loss(self, logps, train_mask): temp = self.tem ps = [torch.exp(p)[~train_mask] for p in logps] sum_p = 0. for p in ps: sum_p = sum_p + p avg_p = sum_p/len(ps) sharp_p = (torch.pow(avg_p, 1./temp) / torch.sum(torch.pow(avg_p, 1./temp), dim=1, keepdim=True)).detach() loss = 0. for p in ps: loss += torch.mean((p-sharp_p).pow(2).sum(1)) loss = loss/len(ps) return self.lam * loss
[docs] def normalize_x(self, x): row_sum = x.sum(1) row_inv = row_sum.pow_(-1) row_inv.masked_fill_(row_inv == float('inf'), 0) x = x * row_inv[:, None] return x
[docs] def forward(self, x, edge_index): """ adj = torch.sparse_coo_tensor( edge_index, torch.ones(edge_index.shape[1]).float(), (x.shape[0], x.shape[0]), ).to(x.device) """ x = self.normalize_x(x) edge_weight = torch.ones(edge_index.shape[1], dtype=torch.float32).to(x.device) x = self.rand_prop(x, edge_index, edge_weight) if self.use_bn: x = self.bn1(x) x = F.dropout(x, self.input_droprate, training = self.training) x = F.relu(self.layer1(x)) if self.use_bn: x = self.bn2(x) x = F.dropout(x, self.hidden_droprate, training = self.training) x = self.layer2(x) return F.log_softmax(x, dim=-1)
[docs] def loss(self, data): output_list = [] for i in range(self.sample): output_list.append(self.forward(data.x, data.edge_index)) loss_train = 0. for output in output_list: loss_train += F.nll_loss(output[data.train_mask], data.y[data.train_mask]) loss_train = loss_train/self.sample loss_consis = self.consis_loss(output_list, data.train_mask) return loss_train + loss_consis
[docs] def predict(self, data): return self.forward(data.x, data.edge_index)