Detecting Roots in a Graph, and a challenge

How would you detect roots in a simple directed graph? I was recently asked this question, with varying parameters, and there’s an aspect of the problem for which I have no satisfying solution.
A root is a node which points to other nodes, but has no nodes pointing to it; in our simple problem, a root has to point to something, so we don’t have to worry about handling nodes hanging out in space. The relationship will be referred to as a root and a leaf, such that a leaf is a node that is being “pointed to” in a given relationship.
We can visualize the nodes like this:

0,1
0,2
1,2
3,1
4,3

Graphed, it would look like this:
simpledirectedgraph
Our root nodes would be 0 and 4 – the only nodes for which there are no inbound edges.
Without digging too much into rationales, let’s create a simple interface with default behaviors that accomplish our task very simply:

public interface GraphUtility {
  public default Set arrayToSet(int[] data) {
    Set set = new HashSet<>();
    Arrays.stream(data).forEach(set::add);
    return set;
  }
  public default Set findRoots(int[][] graph) {
    Set roots = new HashSet<>();
    Set leaves = new HashSet<>();
    Arrays.stream(graph).forEach(d -> {
      roots.add(d[0]);
      leaves.add(d[1]);
    });
    roots.removeAll(leaves);
    return roots;
  }
}

We’re also going to write a wrapper that keeps track of elapsed time. It’s not great code – multithreading with an instance would be horrible – but it’ll do for what we want.

public class TimingWrapper {
  long elapsedTime=0;
  public Set time(Function> findRoots, int[][] graph) {
    Set set;
    long start=System.nanoTime();
    set=findRoots.apply(graph);
    long end=System.nanoTime();
    elapsedTime=end-start;
    return set;
  }
  @Override
  public String toString() {
    return "TimingWrapper{" +
        "elapsedTime=" + elapsedTime +
        '}';
  }
}

Now let’s build our test. We’re using an anonymous implementation of GraphUtility because the interface has our desired functionality. The wrapper’s time is output on System.out, and isn’t part of the actual test; the numbers would vary based on the actual runtime platform, so we don’t want to rely on the runtime as a metric for success.

GraphUtility util = new GraphUtility() {
};
@Test
public void firstTest() {
  TimingWrapper wrapper = new TimingWrapper();
  int[][] graph = {{0, 1}, {0, 2}, {1, 2}, {3, 1}, {4, 3}};
  Set knownRoots = util.arrayToSet(new int[]{0, 4});
  Set roots = wrapper.time(util::findRoots, graph);
  assertEquals(roots, knownRoots);
  System.out.println(wrapper);
}

Invoke this method enough times, and you’ll see some interesting fluctuations in runtime; on my system, the runtime was around 2500 nanoseconds, which is simply amazing. (There were some outliers, largely based on garbage collection.)
However, that’s a tiny, tiny graph. What happens when we store more nodes? Let’s try it with .. oh, 62,000 nodes:

@Test
public void secondTest() {
  TimingWrapper wrapper = new TimingWrapper();
  int[][] graph = new int[62000][];
  for (int i = 0; i < 62000; i++) {
    graph[i] = new int[]{r.nextInt(7000), r.nextInt(7000)};
  }
  //Set knownRoots = util.arrayToSet(new int[]{0, 4});
  Set roots = wrapper.time(util::findRoots, graph);
  sum += roots.size();
  System.out.println(wrapper);
}

Repeating this test yielded more valuable numbers, and more variant ones, too: we’re now looking at four million nanoseconds for an invocation – so we’re still able to pick out the root nodes in four milliseconds, which isn’t too bad at all.
We can do more with this, too, before we even get to our challenge. Let’s see what happens if we use a database. (If you want to skip ahead to the challenge, feel free to do so; this next section fell out of the sky just because I was interested in seeing how well databases might handle the process.)
We’re going to use a constant graph – built using a Random with a consistent seed – that happens to have ten root nodes in it. First, the code that builds the graph:

public class ConstantGraph {
  int[][] dataSet;
  private ConstantGraph() {
    // DO NOT CHANGE THIS SEED!
    Random random = new Random(1029);
    dataSet = new int[62000][];
    for (int i = 0; i < dataSet.length; i++) {
      int[] datum = new int[2];
      datum[0] = random.nextInt(9000);
      datum[1] = random.nextInt(9000);
      dataSet[i] = datum;
    }
  }
  public int[][] getDataSet() {
    return dataSet;
  }
  public final static ConstantGraph instance = new ConstantGraph();
}

Now we'll build an enum that has all of our SQL isolated for three configurations: embedded Derby, embedded H2, and external Derby. This is really ugly, so we're going to leave it up to the Github repository to hold the actual source and we'll only echo the structure here:

public enum DatabaseInterface {
  DERBYEMBEDDED(...),
  DERBY(...),
  H2(...);
  final String url;
  final String[] ddl;
  final String insert;
  final String dbRootSelection;
  final String dbRowSelection;
}

Now let’s build a test that walks through the enums, testing two different mechanisms of determining the root nodes.
The simple mechanism mirrors what we’ve already seen, simply retrieving the roots and leaves and storing them in two Sets; it then removes the leaves from the set of roots. Therefore, it’s mostly testing how quickly the database can retrieve the data.
The other method is a little more interesting; it does everything in SQL. Here’s the query (which happens to be the same for H2 and Derby):

select distinct root from graph g
  where root not in (select distinct leaf from graph)

Exciting stuff – and chances are that a good DBA can optimize this, honestly (and I’d love to see what a DBA would suggest.) It works pretty well so far, although it’s not quick, as we’ll see.
Here’s our test:

public class ITDatabaseGraphTest {
  ConstantGraph graph = ConstantGraph.instance;
  @Test
  public void testAgainstDatabaseSimple() throws SQLException {
    System.out.println("Simple query test----------------------");
    for (DatabaseInterface database : DatabaseInterface.values()) {
      System.out.println("Testing against "+database.url);
      populateDatabase(database);
      try (Connection conn = DriverManager.getConnection(database.url, "sa", "sa")) {
        try (PreparedStatement ps = conn.prepareStatement(database.dbRowSelection)) {
          Set roots=new HashSet<>();
          Set leaves=new HashSet<>();
          long start = System.nanoTime();
          ResultSet rs=ps.executeQuery();
          while(rs.next()) {
            roots.add(rs.getInt(1));
            leaves.add(rs.getInt(2));
          }
          roots.removeAll(leaves);
          assertEquals(roots, graph.arrayToSet(graph.getRoots()));
          long end = System.nanoTime();
          System.out.println((end-start)+" nanoseconds ("+(end-start)/1000000000.0+" ms)");
        }
      }
    }
  }
  @Test
  public void testAgainstDatabaseComplex() throws SQLException {
    System.out.println("Complex query test---------------------");
    for (DatabaseInterface database : DatabaseInterface.values()) {
      System.out.println("Testing against "+database.url);
      populateDatabase(database);
      try (Connection conn = DriverManager.getConnection(database.url, "sa", "sa")) {
        try (PreparedStatement ps = conn.prepareStatement(database.dbRootSelection)) {
          Set roots=new HashSet<>();
          long start = System.nanoTime();
          ResultSet rs=ps.executeQuery();
          while(rs.next()) {
            roots.add(rs.getInt(1));
          }
          assertEquals(roots, graph.arrayToSet(graph.getRoots()));
          long end = System.nanoTime();
          System.out.println((end-start)+" nanoseconds ("+(end-start)/1000000000.0+" ms)");
        }
      }
    }
  }
  private void populateDatabase(DatabaseInterface database) throws SQLException {
    try (Connection conn = DriverManager.getConnection(database.url, "sa", "sa")) {
      conn.setAutoCommit(false);
      for (String ddl : database.ddl) {
        try (PreparedStatement ps = conn.prepareStatement(ddl)) {
          ps.executeUpdate();
        } catch (SQLException ignored) {
          System.out.println(ignored.getMessage());
        }
      }
      int counter=0;
      try (PreparedStatement ps = conn.prepareStatement(database.insert)) {
        for (int[] data : graph.getDataSet()) {
          ps.setInt(1, data[0]);
          ps.setInt(2, data[1]);
          try {
            ps.executeUpdate();
            counter++;
            if(counter%250==0) {
              conn.commit();
            }
          } catch(SQLException ignored) {
            // we know we have 27 duplicates.
          }
        }
      } catch (SQLException ignored) {
        System.out.println(ignored.getMessage());
      }
      conn.setAutoCommit(true);
    }
  }
}

On my system, I got this output, with the insert process taking roughly 40 seconds:

Complex query test---------------------
Testing against jdbc:derby:foo;create=true
576525951 nanoseconds (0.576525951 ms)
Testing against jdbc:derby://localhost:1527/foo;create=true
527347131 nanoseconds (0.527347131 ms)
Testing against jdbc:h2:./foo
3659439329 nanoseconds (3.659439329 ms)
Simple query test----------------------
Testing against jdbc:derby:foo;create=true
38152214 nanoseconds (0.038152214 ms)
Testing against jdbc:derby://localhost:1527/foo;create=true
67817224 nanoseconds (0.067817224 ms)
Testing against jdbc:h2:./foo
115023709 nanoseconds (0.115023709 ms)

The Challenge

So what we were able to see is that it’s fairly easy to process 62,000 edges with a straight linear process, even factoring in the time a database adds to the mechanism. However, how would you make the root detection multithreaded? What would be a good way to get the detection time down, based on how many cores you could throw at it?
At 620,000 edges, the simple test runs in around 27,000,000 nanoseconds (or 27ms); could you shrink that down? What about 6.2 million edges? What would happen if you tried to add map/reduce? Could you do so?