Writing fast asynchronous SGD/AdaGrad with RcppParallel

Dmitriy Selivanov — written Jan 24, 2016 — source

Word embeddings

After Tomas Mikolov et al. released word2vec tool, there was a boom of articles about words vector representations. One of the greatest is GloVe, which did a big thing by explaining how such algorithms work. It also refolmulates word2vec optimization as a special kind of factoriazation for word cooccurences matrix.

This post is devided into two main parts:

  1. Very brief introduction into GloVe algorithm.
  2. Details of implementation. I will show how to write fast parallel asynchronous SGD with RcppParallel with adaptive learning rate in C++ using Intel TBB and RcppParallel.

Introduction to GloVe algorithm

GloVe algorithm consists of the following steps:

  1. Collect word cooccurence statistics in the form of word coocurence matrix \(X\). Each element \(X_{ij}\) of such matrix represents measure of how often word i appears in context of word j. Usually we scan our corpus in the following manner: for each term we look for context terms within some area - window_size before and window_size after. Also we give less weight for more distant words (usually \(decay = 1/offset\)).
  2. Define soft constraint for each word pair: \(w_i^Tw_j + b_i + b_j = log(X_{ij})\). Here \(w_i\) - vector for main word, \(w_j\) - vector for context word, \(b_i\), \(b_j\) - scalar biases for main and context words.
  3. Define cost function \(J = \sum_{i=1}^V \sum_{j=1}^V \; f(X_{ij}) ( w_i^T w_j + b_i + b_j - \log X_{ij})^2\). Here \(f\) is a weighting function which help us to prevent learning only from exremely common word pairs. GloVe authors choose the following function:
\[f(X_{ij}) = \begin{cases} (\frac{X_{ij}}{x_{max}})^\alpha & \text{if } X_{ij} < XMAX \\ 1 & \text{otherwise} \end{cases}\]

Implementation

The main challenges I faced during implementation:

  1. Efficient cooccurence matrix creation.
  2. Implementation of efficient SGD for objective cost function minimization.

Cooccurence matrix creation

There are a few main issues with term cooccurence matrix (tcm):

  1. tcm should be sparse. We should be able to construct tcm for large vocabularies ( > 100k words).
  2. Fast lookups/inserts.

To meet requirement of sparsity we need to store data in associative array. unordered_map is good candidate because of \(O(1)\) lookups/inserts complexity. I ended with std::unordered_map< std::pair<uint32_t, uint32_t>, T > as container for sparse matrix in triplet form. Performance of unordered_map heavily depends on underlying hash fucntion. Fortunately, we can pack pair<uint32_t, uint32_t> into single uint64_t in a determenistic way without any collisions.
Hash function for std::pair<uint32_t, uint32_t> will look like:

namespace std {
  template <>
  struct hash<std::pair<uint32_t, uint32_t>>
  {
    inline uint64_t operator()(const std::pair<uint32_t, uint32_t>& k) const
    {
      return (uint64_t) k.first << 32 | k.second;
    }
  };
}

For details see this and this stackoverflow questions.

Also note, that our cooccurence matrix is symmetrical, so internally we will store only elements above main diagonal.

Stochastic gradient descent

Now we should implement efficient parallel asynchronous stochastic gradient descent for word cooccurence matrix factorization, which is proposed in GloVe paper. Interesting thing - SGD inherently is serial algoritms, but when your problem is sparse, you can do asynchronous updates without any locks and achieve speedup proportional to number of cores on your machine! If you didn’t read HOGWILD!, I recomment to do that.

Let me remind formulation of SGD. We try to move \(x_t\) parameters in a minimizing direction, given by \(−g_t\) with a learning rate \(\alpha\):

\[x_{t+1} = x_t − \alpha g_t\]

So, we have to calculate gradients for our cost function \(J = \sum_{i=1}^V \sum_{j=1}^V f(X_{ij}) ( w_i^T w_j + b_i + b_j - \log X_{ij} )^2\):

\[\frac{\partial J}{\partial w_i} = f(X_{ij}) w_j ( w_i^T w_j + b_i + b_j - \log X_{ij})\] \[\frac{\partial J}{\partial w_j} = f(X_{ij}) w_i ( w_i^T w_j + b_i + b_j - \log X_{ij})\] \[\frac{\partial J}{\partial b_i} = f(X_{ij}) (w_i^T w_j + b_i + b_j - \log X_{ij})\] \[\frac{\partial J}{\partial b_j} = f(X_{ij}) (w_i^T w_j + b_i + b_j - \log X_{ij})\]

AdaGrad

We will use modification of SGD - AdaGrad algoritm. It automaticaly determines per-feature learning rate by tracking historical gradients, so that frequently occurring features in the gradients get small learning rates and infrequent features get higher ones. For AdaGrad implementation details see excellent Notes on AdaGrad by Chris Dyer.

Formulation of AdaGrad step \(t\) and feature \(i\) is the following:

\[x_{t+1, i} = x_{t, i} − \frac{\alpha}{\sqrt{\sum_{\tau=1}^{t-1}} g_{\tau,i}^2} g_{t,i}\]

As we can see, at each iteration \(t\) we need to keep track of sum over all historical gradients.

Parallel asynchronous AdaGrad

Actually we will use modification of AdaGrad - HOGWILD-style asynchronous AdaGrad :-) Main idea of HOGWILD! algorithm is very simple - don’t use any synchronizations. If your problem is sparse, allow threads to overwrite each other! This works and works fine. Again, see HOGWILD! paper for details and theoretical proof.

Code

Now lets put all into the code.

As seen from the analysis above, GloveFit class should consist of the following parameters:

  1. word vectors w_i, w_j (for main and context words).
  2. biases b_i, b_j.
  3. word vectors square gradients grad_sq_w_i, grad_sq_w_j for adaptive learning rates.
  4. word biases square gradients grad_sq_b_i, grad_sq_b_j for adaptive learning rates.
  5. learning_rate, max_cost and other scalar model parameters.
class GloveFit {
private:
  size_t vocab_size, word_vec_size;
  double x_max, learning_rate;
  // see https://github.com/maciejkula/glove-python/pull/9#issuecomment-68058795
  // clips the cost for numerical stability
  double max_cost;
  // initial learning rate
  double alpha;
  // word vecrtors
  vector<vector<double> > w_i, w_j;
  // word biases
  vector<double> b_i, b_j;
  // word vectors square gradients
  vector<vector<double> > grad_sq_w_i, grad_sq_w_j;
  // word biases square gradients
  vector<double> grad_sq_b_i, grad_sq_b_j;
}

Single iteration

Now we should initialize parameters and perform iteration of SGD:

//init cost
global_cost = 0
// assume tcm is sparse matrix in triplet form - <i, j, x>
for_each (<i, j, x> ) {
  //compute cost function and add it to global cost
  global_cost += J(x)
  //Compute gradients for bias terms and perform adaptive updates for bias terms
  //Compute gradients for word vector terms and perform adaptive updates for word vectors
  //Update squared gradient sums for word vectors
  //Update squared gradient sums for biases
}
return global_cost;

OpenMP

As discussed above, all these steps can be performed in parallel loop (over all non-zero word-coocurence scores). This can be easily done via OpenMP parallel for and reduction: #pragma omp parallel for reduction(+:global_cost). But there is one significant issue with this approach - it is very hard to make portable R-package with OpenMP support. By default it will work only on linux distributions because:

  1. default clang on OS X doesn’t support OpenMP (of course you can install clang-omp or gcc with brew but this also could be tricky).
  2. Rtools starts support of OpenMP on Windows only in 2015. But even modern implementation has substantial overheads.

For more details see OpenMP-support section of Writing R Extensions manual.

Intel TBB

Luckily we have a better alternative - Intel Thread Building Blocks library and RcppParallel package which provides RVector and RMatrix wrapper classes for safe and convenient access to R data structures in a multi-threaded environment! Moreover it just works on main platforms - OS X, Windows, Linux.

Using TBB is a little bit trickier than writing simple OpenMP #pragma directives. You should implement functor which operates on a chunk of data and call parallelReduce or parallelFor on entire data collection. You can find useful (and simple) examples at RcppParallel examples section.

Putting it all together

For now suppose we have partial_fit method in GloveFit class with following signature (see actual code here):

double partial_fit( size_t begin, 
                    size_t end, 
                    const RVector<int> &x_irow, 
                    const RVector<int> &x_icol, 
                    const RVector<double> &x_val);

It takes

  1. tcm in sparse triplet form <x_irow, x_icol, x_val>
  2. begin and end pointers for a range on which we want to perform our SGD.

Then it performs SGD steps over this range - updates word vectors, gradients, etc. As a result it returns value of accumulated cost function. Note that internally this method modifies input object.

Also note that signature of partial_fit is very similar to what we have to implement in our TBB functor. Now we are ready to write it:

struct AdaGradIter : public Worker {
  RVector<int> x_irow, x_icol;
  RVector<double> x_val;
  GloveFit &fit;

  int vocab_size, word_vec_size, num_iters, x_max;
  double learning_rate;

  // accumulated value
  double global_cost;

  // function to set global_cost = 0 between iterations
  void set_cost_zero() { global_cost = 0; };
  
  //init function to use between iterations
  void init(const IntegerVector &x_irowR,
            const IntegerVector &x_icolR,
            const NumericVector &x_valR,
            const IntegerVector &iter_orderR,
            GloveFit &fit) {
    x_irow = RVector<int>(x_irowR);
    x_icol = RVector<int>(x_icolR);
    x_val  = RVector<double> (x_valR);
    iter_order = RVector<int> (iter_orderR);
    fit = fit;
    global_cost = 0;
  }
  // dummy constructor
  // used at first initialization of GloveFitter
  AdaGradIter(GloveFit &fit):
    x_irow(IntegerVector(0)),
    x_icol(IntegerVector(0)),
    x_val(NumericVector(0)),
    iter_order(IntegerVector(0)),
    fit(fit) {};

  // constructors
  AdaGradIter(const IntegerVector &x_irowR,
              const IntegerVector &x_icolR,
              const NumericVector &x_valR,
              GloveFit &fit):
    x_irow(x_irowR), x_icol(x_icolR), x_val(x_valR),
    fit(fit), global_cost(0) {}
    
  // constructor called at split
  AdaGradIter(const AdaGradIter& AdaGradIter, Split):
    x_irow(AdaGradIter.x_irow), x_icol(AdaGradIter.x_icol), x_val(AdaGradIter.x_val), 
    fit(AdaGradIter.fit), global_cost(0) {}

  // process just the elements of the range
  void operator()(size_t begin, size_t end) {
    global_cost += this->fit.partial_fit(begin, end, x_irow, x_icol, x_val);
  }
  
  // join my value with that of another global_cost
  void join(const AdaGradIter& rhs) {
    global_cost += rhs.global_cost;
  }
};

As you can see, it is very similar to example form RcppParallel site. One difference - it has side-effects. By calling partial_fit it modifies internal state of the input instance of GloveFit class (which actually contains our GloVe model).

Now lets write GloveFitter class which will be callable from R via Rcpp-modules. It will act as interface for fitting our model and take all input model parameters such as vocabulary size, desired word vectors size, initial AdaGrad learning rate, etc. Also we want to track cost between iterations and want to be able to perform some early stopping strategy between SGD iterations. For that purpose we keep our model in C++ class, so we can modify it “in place” at each SGD iteration (which can be problematic in R)

class GloveFitter {
public:
  GloveFitter(size_t vocab_size,
            size_t word_vec_size,
            uint32_t x_max,
            double learning_rate,
            double max_cost,
            double alpha):
  gloveFit(vocab_size,  word_vec_size, learning_rate, x_max, max_cost, alpha),
  adaGradIter(gloveFit) {}
  
  // function to set cost to zero from R (used between SGD iterations)
  void set_cost_zero() {adaGradIter.set_cost_zero();};

  double fit_chunk(const IntegerVector x_irow,
                   const IntegerVector  x_icol,
                   const NumericVector x_val,
                   const IntegerVector iter_order) {
    
    this->adaGradIter.init(x_irow, x_icol, x_val, iter_order, gloveFit);
    // 
    parallelReduce(0, x_irow.size(), adaGradIter);
    return (this->adaGradIter.global_cost);
  }
  // export word vectors to R
  List get_word_vectors() {
    return List::create(_["word_vectors"] = adaGradIter.fit.get_word_vectors());
  }

private:
  GloveFit gloveFit;
  AdaGradIter adaGradIter;
};

And create wrapper with Rcpp-Modules:

RCPP_MODULE(GloveFitter) {
  class_< GloveFitter >( "GloveFitter" )
  //<vocab_size, word_vec_size, x_max, learning_rate, grain_size, max_cost, alpha>
  .constructor<size_t, size_t, uint32_t, double, uint32_t, double, double>()
  .method( "get_word_vectors", &GloveFitter::get_word_vectors, "returns word vectors")
  .method( "set_cost_zero", &GloveFitter::set_cost_zero, "sets cost to zero")
  .method( "fit_chunk", &GloveFitter::fit_chunk, "process TCM data chunk")
  ;
}

Now we can use GloveFitter class from R:

fit <- new( GloveFitter, vocabulary_size, word_vectors_size, x_max, 
            learning_rate, grain_size, max_cost, alpha)
NUM_ITER <- 10
for(i in seq_len(NUM_ITER)) {
  cost <- fit$fit_chunk(tcm@i, tcm@j, tcm@x, iter_order)
  print(cost)
  fit$set_cost_zero()
}

tags: parallel 

Related Articles