# # Copyright (C) Stanislaw Adaszewski, 2020 # License: GPLv3 # import numpy as np import torch import torch.utils.data from typing import List, \ Union, \ Tuple from .data import Data, \ EdgeType from .cumcount import cumcount import time def fixed_unigram_candidate_sampler( true_classes: torch.Tensor, num_repeats: torch.Tensor, unigrams: torch.Tensor, distortion: float = 1.): if len(true_classes.shape) != 2: raise ValueError('true_classes must be a 2D matrix with shape (num_samples, num_true)') if len(num_repeats.shape) != 1: raise ValueError('num_repeats must be 1D') num_rows = true_classes.shape[0] # unigrams = np.array(unigrams) if distortion != 1.: unigrams = unigrams.to(torch.float64) ** distortion # print('unigrams:', unigrams) indices = torch.arange(num_rows) indices = torch.repeat_interleave(indices, num_repeats) num_samples = len(indices) result = torch.zeros(num_samples, dtype=torch.long) while len(indices) > 0: # print('len(indices):', len(indices)) sampler = torch.utils.data.WeightedRandomSampler(unigrams, len(indices)) candidates = torch.tensor(list(sampler)) candidates = candidates.view(len(indices), 1) # print('candidates:', candidates) # print('true_classes:', true_classes[indices, :]) result[indices] = candidates.transpose(0, 1) # print('result:', result) mask = (candidates == true_classes[indices, :]) mask = mask.sum(1).to(torch.bool) # print('mask:', mask) indices = indices[mask] # result[indices] = 0 return result def get_edges_and_degrees(adj_mat: torch.Tensor) -> \ Tuple[torch.Tensor, torch.Tensor]: if adj_mat.is_sparse: adj_mat = adj_mat.coalesce() degrees = torch.zeros(adj_mat.shape[1], dtype=torch.int64, device=adj_mat.device) degrees = degrees.index_add(0, adj_mat.indices()[1], torch.ones(adj_mat.indices().shape[1], dtype=torch.int64, device=adj_mat.device)) edges_pos = adj_mat.indices().transpose(0, 1) else: degrees = adj_mat.sum(0) edges_pos = torch.nonzero(adj_mat, as_tuple=False) return edges_pos, degrees def get_true_classes(adj_mat: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: indices = adj_mat.indices() row_count = torch.zeros(adj_mat.shape[0], dtype=torch.long) #print('indices[0]:', indices[0], count[indices[0]]) row_count = row_count.index_add(0, indices[0], torch.ones(indices.shape[1], dtype=torch.long)) #print('count:', count) max_true_classes = torch.max(row_count).item() #print('max_true_classes:', max_true_classes) true_classes = torch.full((adj_mat.shape[0], max_true_classes), -1, dtype=torch.long) # inv = torch.unique(indices[0], return_inverse=True) # indices = indices.copy() # true_classes[indices[0], 0] = indices[1] t = time.time() cc = cumcount(indices[0].cpu().numpy()) print('cumcount() took:', time.time() - t) cc = torch.tensor(cc) t = time.time() true_classes[indices[0], cc] = indices[1] print('assignment took:', time.time() - t) ''' count = torch.zeros(adj_mat.shape[0], dtype=torch.long) for i in range(indices.shape[1]): # print('looping...') row = indices[0, i] col = indices[1, i] #print('row:', row, 'col:', col, 'count[row]:', count[row]) true_classes[row, count[row]] = col count[row] += 1 ''' # t = time.time() # true_classes = torch.repeat_interleave(true_classes, row_count, dim=0) # print('repeat_interleave() took:', time.time() - t) return true_classes, row_count def negative_sample_adj_mat(adj_mat: torch.Tensor) -> torch.Tensor: if not isinstance(adj_mat, torch.Tensor): raise ValueError('adj_mat must be a torch.Tensor, got: %s' % adj_mat.__class__.__name__) edges_pos, degrees = get_edges_and_degrees(adj_mat) true_classes, row_count = get_true_classes(adj_mat) # true_classes = edges_pos[:, 1].view(-1, 1) # print('true_classes:', true_classes) neg_neighbors = fixed_unigram_candidate_sampler( true_classes, row_count, degrees, 0.75).to(adj_mat.device) print('neg_neighbors:', neg_neighbors) edges_neg = torch.cat([ edges_pos[:, 0].view(-1, 1), neg_neighbors.view(-1, 1) ], 1) adj_mat_neg = torch.sparse_coo_tensor(indices = edges_neg.transpose(0, 1), values=torch.ones(len(edges_neg)), size=adj_mat.shape, dtype=adj_mat.dtype, device=adj_mat.device) adj_mat_neg = adj_mat_neg.coalesce() indices = adj_mat_neg.indices() adj_mat_neg = torch.sparse_coo_tensor(indices, torch.ones(indices.shape[1]), adj_mat.shape, dtype=adj_mat.dtype, device=adj_mat.device) adj_mat_neg = adj_mat_neg.coalesce() return adj_mat_neg def negative_sample_data(data: Data) -> Data: new_edge_types = {} res = Data(target_value=0) for vt in data.vertex_types: res.add_vertex_type(vt.name, vt.count) for key, et in data.edge_types.items(): adjacency_matrices_neg = [] for adj_mat in et.adjacency_matrices: adj_mat_neg = negative_sample_adj_mat(adj_mat) adjacency_matrices_neg.append(adj_mat_neg) res.add_edge_type(et.name, et.vertex_type_row, et.vertex_type_column, adjacency_matrices_neg, et.decoder_factory) #new_et = EdgeType(et.name, et.vertex_type_row, # et.vertex_type_column, adjacency_matrices_neg, # et.decoder_factory, et.total_connectivity) #new_edge_types[key] = new_et #res = Data(data.vertex_types, new_edge_types) return res