/* * IndexBuilder.java */ package org.ngbw.utils; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * * @author Paul Hoover * */ public class IndexBuilder { /** * */ private static class UsageException extends Exception { static final long serialVersionUID = -4447650232273507293L; public UsageException() { super("usage: IndexBuilder [ -n jobs ] datadir indexdir index..."); } } /** * */ private class StopException extends RuntimeException { static final long serialVersionUID = -4543276601240307967L; } /** * */ private class StopWork implements Task { public void run() { throw new StopException(); } public void runInChildProcess() { throw new StopException(); } } /** * */ private class UncaughtWorkerException implements Thread.UncaughtExceptionHandler { public void uncaughtException(Thread worker, Throwable err) { System.err.println("Worker thread " + String.valueOf(worker.getId()) + " threw an uncaught exception: "); err.printStackTrace(System.err); } } /** * */ private class WorkerThread extends Thread { public WorkerThread() { setUncaughtExceptionHandler(new UncaughtWorkerException()); } @Override public void run() { while (true) { try { Task task = getTask(); task.runInChildProcess(); } catch (StopException stopErr) { break; } catch (InterruptedException interruptErr) { // do nothing } } } } private enum IndexType { FASTA, GENBANK, PDB, PDBSEQ, UNIPROT } private final int m_numWorkers; private final Queue m_taskQueue = new LinkedList(); private final Lock m_queueLock = new ReentrantLock(); private final Condition m_workAvailable = m_queueLock.newCondition(); private final Condition m_allWorkersIdle = m_queueLock.newCondition(); private int m_numIdleWorkers; public IndexBuilder() { this(Runtime.getRuntime().availableProcessors()); } public IndexBuilder(int numThreads) { m_numWorkers = numThreads; } public void buildIndices(IndexTask[] tasks) throws InterruptedException { WorkerThread[] workers = new WorkerThread[m_numWorkers]; for (int i = 0 ; i < workers.length ; i += 1) workers[i] = new WorkerThread(); m_numIdleWorkers = 0; m_queueLock.lock(); try { for (int i = 0 ; i < tasks.length ; i += 1) m_taskQueue.offer(tasks[i]); for (int i = 0 ; i < workers.length ; i += 1) workers[i].start(); m_allWorkersIdle.await(); for (int i = 0 ; i < workers.length ; i += 1) m_taskQueue.offer(new StopWork()); m_workAvailable.signalAll(); } finally { m_queueLock.unlock(); } for (int i = 0 ; i < workers.length ; i += 1) workers[i].join(); } /** * * @param args */ public static void main(String[] args) { try { if (args.length < 3) throw new UsageException(); int offset; int numJobs; String dataDir; String indexDir; if (args[0].charAt(0) == '-') { if (!args[0].equals("-n")) throw new UsageException(); offset = 4; numJobs = Integer.parseInt(args[1]); dataDir = args[2]; indexDir = args[3]; } else { offset = 2; numJobs = Runtime.getRuntime().availableProcessors(); dataDir = args[0]; indexDir = args[1]; } IndexTask[] tasks = new IndexTask[args.length - offset]; for (int i = 0 ; i < tasks.length ; i += 1) { String[] subStrings = args[i + offset].split(":"); if (subStrings.length != 3) throw new Exception("unexpected index format: " + args[i + offset]); switch (IndexType.valueOf(subStrings[2])) { case FASTA: tasks[i] = new IndexFasta(subStrings[0], dataDir, indexDir, subStrings[1]); break; case GENBANK: tasks[i] = new IndexGenbank(subStrings[0], dataDir, indexDir, subStrings[1]); break; case PDB: tasks[i] = new IndexPdbStructures(subStrings[0], dataDir, indexDir, subStrings[1]); break; case PDBSEQ: tasks[i] = new IndexPdbSequences(subStrings[0], dataDir, indexDir, subStrings[1]); break; case UNIPROT: tasks[i] = new IndexUniprot(subStrings[0], dataDir, indexDir, subStrings[1]); } } (new IndexBuilder(numJobs)).buildIndices(tasks); } catch (Exception err) { err.printStackTrace(System.err); System.exit(-1); } } /** * * @return * @throws InterruptedException */ private Task getTask() throws InterruptedException { m_queueLock.lock(); try { while (true) { Task task = m_taskQueue.poll(); if (task != null) return task; m_numIdleWorkers += 1; if (m_numIdleWorkers == m_numWorkers) m_allWorkersIdle.signal(); m_workAvailable.await(); m_numIdleWorkers -= 1; } } finally { m_queueLock.unlock(); } } }