package org.ngbw.utils; import java.util.concurrent.*; import java.util.List; import org.ngbw.sdk.Workbench; import org.ngbw.sdk.database.RunningTask; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.NDC; import java.util.Vector; import org.ngbw.sdk.database.ConnectionManager; import org.ngbw.sdk.database.DriverConnectionSource; /** Stand alone program to retrieve the results for all running_tasks that are done but that haven't had their results stored yet. System Properties: submitter = get results for this submitter id only (see submitter column in db, normally the url of the web site that submitted) recover = if "true" tries to recover results for RunningTask entries with status ERROR_GETTING_RESULTS. Only runs one pass thru RunningTasks and then exits. local = if non null, this is the path to a local directory that will be used as the parent of the working dir to recover or load results. Only runs one pass, like recover. age = only relevant if recover is true. This is the maximum number days since submission. Won't recover results for older jobs. */ public class LoadResults { private static final Log log = LogFactory.getLog(LoadResults.class.getName()); private static ThreadPoolExecutor threadPool = null; Vector inProgressList = new Vector(); private static String m_submitter; private static String m_default_submitter; private static long m_poll_interval; private static int m_pool_size; private static boolean m_recover = false; private static String m_local; private static String m_status; private static int m_age = 0; // this number is kind of arbitrary, see use of threshold in the code below. private static int threshold; public LoadResults() throws Exception { threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(m_pool_size); } public static void main(String[] args) { try { Workbench wb = Workbench.getInstance(); // This overrides db connection pool establised by Workbench. // c3p0 threadpool acted buggy under heavy load testing. ConnectionManager.getConnectionSource().close(); ConnectionManager.setConnectionSource(new DriverConnectionSource()); m_default_submitter = wb.getProperties().getProperty("application.instance"); String p; p = wb.getProperties().getProperty("loadResults.poll.seconds"); if (p == null) { throw new Exception("Missing workbench property: loadResults.poll.seconds"); } m_poll_interval = new Long(p); p = wb.getProperties().getProperty("loadResults.pool.size"); if (p == null) { throw new Exception("Missing workbench property: loadResults.pool.size"); } m_pool_size = new Integer(p); //threshold = thread_pool_size * 4; threshold = 0; m_submitter = System.getProperty("submitter"); if (m_submitter == null) { throw new Exception("Missing system property submitter"); } // Properties that control behavior when used in RecoverResults mode. // Todo: not using m_local yet. Also need to add ability to specify just a single task or resource // to process. m_recover = Boolean.getBoolean("recover"); m_local = System.getProperty("local"); m_status = (m_recover ? RunningTask.STATUS_ERROR_GETTING_RESULTS : RunningTask.STATUS_DONE); Integer i = Integer.getInteger("age"); m_age = (i == null) ? 0 : i; LoadResults lr = new LoadResults(); log.debug("LOAD RESULTS: for submitter=" + m_submitter + ", poll_interval in seconds=" + m_poll_interval + ", thread pool size=" + m_pool_size + ", max jobs queued = " + threshold + (m_recover ? ", Recovery Mode" : ", Normal Mode")); lr.keepWorking(); threadPool.shutdown(); // arbitrarily setting a 7 day timeout for any threads we started to finish. threadPool.awaitTermination(60 * 60 * 24 * 7, java.util.concurrent.TimeUnit.SECONDS); log.debug("LOAD RESULTS: exitting."); } catch (Exception e) { log.error("Caught Exception", e); return; } } private void keepWorking() throws Exception { while(true) { /* log.debug("Threads busy=" + threadPool.getActiveCount() + ", jobs in Q=" + threadPool.getQueue().size() + ", taskCount=" + threadPool.getTaskCount()); */ int jobsQueued; if ((threshold == 0) || (jobsQueued = threadPool.getQueue().size()) < threshold) { int jobCount = scanAndProcess(); } else { log.warn("Thread pool has " + jobsQueued + " jobs queued, not queuing more until queue drains."); } // In recovery mode we only scan the database once. Otherwise if a resource is down we would keep // processing the same records over and over. if (m_recover || (m_local != null)) { return; } Thread.sleep(1000 * m_poll_interval); } } private int scanAndProcess() throws Exception { // select tasks with specified submitter and status that aren't locked. List list = RunningTask.findRunningTaskBySubmitterAndStatus(m_submitter, m_status, m_age); if (list.size() > 0) { String tmp = ""; for (RunningTask rt : list) { tmp += rt.getRunningTaskId() + "-" + rt.getStatus() + "-" + (rt.getLocked() == null ? "" : rt.getLocked().toString()) + ", "; } log.debug("Found " + list.size() + " tasks to process: " + tmp); } // At least on triton, if we try to read stderr and stdout right after we're notified that job completed // the read will fail, but later it's ok. Thread.sleep(10 * 1000); for (RunningTask rt : list) { if (!inProgressList.contains(new Long(rt.getRunningTaskId()))) { threadPool.execute(this.new ProcessRunningTask(rt.getRunningTaskId())); inProgressList.add(new Long(rt.getRunningTaskId())); } } return list.size(); } private class ProcessRunningTask implements Runnable { long m_running_task_id; long m_taskId; String m_jobHandle; ProcessRunningTask(long running_task_id) throws Exception { m_running_task_id = running_task_id; } public void run() { long startTime = System.currentTimeMillis(); NDC.push("[running_task_id=" + m_running_task_id + "]"); boolean gotLock = false; try { gotLock = RunningTask.lock(m_running_task_id); if (gotLock) { RunningTask rt = new RunningTask(m_running_task_id); m_taskId = rt.getTaskId(); m_jobHandle = rt.getJobhandle(); NDC.pop(); NDC.push("[task=" + m_taskId +", job=" + m_jobHandle + ", running_task_id=" + m_running_task_id + "]"); if (!rt.getStatus().equals(m_status)) { log.debug("Skipping " + m_running_task_id + ". Status isn't " + m_status + ", it's " + rt.getStatus()); } else { log.debug("Loading Results for running_task_id " + m_running_task_id); StoreRTaskOutput.processTask(m_running_task_id, m_recover, m_local); } } else { log.debug("Skipping " + m_running_task_id + ". Already locked."); } } catch(Exception e) { log.error("", e); } finally { try { if (gotLock) { RunningTask.unlock(m_running_task_id); long elapsedTime = System.currentTimeMillis() - startTime; log.debug("LoadResults for " + m_running_task_id + " took " + elapsedTime + " ms, or " + elapsedTime/1000 + " seconds."); } } catch(Exception e) { log.debug("", e); } inProgressList.remove(new Long(m_running_task_id)); NDC.pop(); NDC.remove(); } } } }