Java Source Code: org.archive.crawler.frontier.AbstractFrontier


   1: /* AbstractFrontier
   2:  *
   3:  * $Id: AbstractFrontier.java,v 1.60 2006/08/04 18:06:28 stack-sf Exp $
   4:  *
   5:  * Created on Aug 17, 2004
   6:  *
   7:  * Copyright (C) 2004 Internet Archive.
   8:  *
   9:  * This file is part of the Heritrix web crawler (crawler.archive.org).
  10:  *
  11:  * Heritrix is free software; you can redistribute it and/or modify
  12:  * it under the terms of the GNU Lesser Public License as published by
  13:  * the Free Software Foundation; either version 2.1 of the License, or
  14:  * any later version.
  15:  *
  16:  * Heritrix is distributed in the hope that it will be useful,
  17:  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  18:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  19:  * GNU Lesser Public License for more details.
  20:  *
  21:  * You should have received a copy of the GNU Lesser Public License
  22:  * along with Heritrix; if not, write to the Free Software
  23:  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  24:  */
  25: package org.archive.crawler.frontier;
  26: 
  27: import java.io.BufferedWriter;
  28: import java.io.File;
  29: import java.io.FileWriter;
  30: import java.io.IOException;
  31: import java.io.PrintWriter;
  32: import java.io.Serializable;
  33: import java.io.StringWriter;
  34: import java.io.Writer;
  35: import java.util.Iterator;
  36: import java.util.List;
  37: import java.util.logging.Level;
  38: import java.util.logging.Logger;
  39: import java.util.regex.Pattern;
  40: 
  41: import javax.management.AttributeNotFoundException;
  42: 
  43: import org.apache.commons.httpclient.HttpStatus;
  44: import org.archive.crawler.datamodel.CandidateURI;
  45: import org.archive.crawler.datamodel.CoreAttributeConstants;
  46: import org.archive.crawler.datamodel.CrawlHost;
  47: import org.archive.crawler.datamodel.CrawlOrder;
  48: import org.archive.crawler.datamodel.CrawlServer;
  49: import org.archive.crawler.datamodel.CrawlURI;
  50: import org.archive.crawler.datamodel.FetchStatusCodes;
  51: import org.archive.crawler.event.CrawlStatusListener;
  52: import org.archive.crawler.framework.CrawlController;
  53: import org.archive.crawler.framework.Frontier;
  54: import org.archive.crawler.framework.ToeThread;
  55: import org.archive.crawler.framework.exceptions.EndedException;
  56: import org.archive.crawler.framework.exceptions.FatalConfigurationException;
  57: import org.archive.crawler.settings.ModuleType;
  58: import org.archive.crawler.settings.RegularExpressionConstraint;
  59: import org.archive.crawler.settings.SimpleType;
  60: import org.archive.crawler.settings.Type;
  61: import org.archive.crawler.url.Canonicalizer;
  62: import org.archive.net.UURI;
  63: import org.archive.util.ArchiveUtils;
  64: 
  65: /**
  66:  * Shared facilities for Frontier implementations.
  67:  * 
  68:  * @author gojomo
  69:  */
  70: public abstract class AbstractFrontier extends ModuleType
  71: implements CrawlStatusListener, Frontier, FetchStatusCodes,
  72:	          CoreAttributeConstants, Serializable {
  73:    private static final Logger logger = Logger
  74:            .getLogger(AbstractFrontier.class.getName());
  75:
  76:    protected transient CrawlController controller;
  77:
  78:    /** ordinal numbers to assign to created CrawlURIs */
  79:    protected long nextOrdinal = 1;
  80:
  81:    /** should the frontier hold any threads asking for URIs? */
  82:    protected boolean shouldPause = false;
  83:
  84:    /**
  85:     * should the frontier send an EndedException to any threads asking for
  86:     * URIs?
  87:     */
  88:    protected transient boolean shouldTerminate = false;
  89:
  90:    /**
  91:     * how many multiples of last fetch elapsed time to wait before recontacting
  92:     * same server
  93:     */
  94:    public final static String ATTR_DELAY_FACTOR = "delay-factor";
  95:
  96:    protected final static Float DEFAULT_DELAY_FACTOR = new Float(5);
  97:
  98:    /**
  99:     * always wait this long after one completion before recontacting same
 100:     * server, regardless of multiple
 101:     */
 102:    public final static String ATTR_MIN_DELAY = "min-delay-ms";
 103:
 104:    // 3 secs.
 105:    protected final static Integer DEFAULT_MIN_DELAY = new Integer(3000);
 106:
 107:    /** never wait more than this long, regardless of multiple */
 108:    public final static String ATTR_MAX_DELAY = "max-delay-ms";
 109:
 110:    // 30 secs
 111:    protected final static Integer DEFAULT_MAX_DELAY = new Integer(30000);
 112:
 113:    /** number of hops of embeds (ERX) to bump to front of host queue */
 114:    public final static String ATTR_PREFERENCE_EMBED_HOPS =
 115:        "preference-embed-hops";
 116:
 117:    protected final static Integer DEFAULT_PREFERENCE_EMBED_HOPS =
 118:        new Integer(1);
 119:
 120:    /** maximum per-host bandwidth usage */
 121:    public final static String ATTR_MAX_HOST_BANDWIDTH_USAGE =
 122:        "max-per-host-bandwidth-usage-KB-sec";
 123:
 124:    protected final static Integer DEFAULT_MAX_HOST_BANDWIDTH_USAGE =
 125:        new Integer(0);
 126:
 127:    /** maximum overall bandwidth usage */
 128:    public final static String ATTR_MAX_OVERALL_BANDWIDTH_USAGE =
 129:        "total-bandwidth-usage-KB-sec";
 130:
 131:    protected final static Integer DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE =
 132:        new Integer(0);
 133:
 134:    /** for retryable problems, seconds to wait before a retry */
 135:    public final static String ATTR_RETRY_DELAY = "retry-delay-seconds";
 136:
 137:    // 15 mins
 138:    protected final static Long DEFAULT_RETRY_DELAY = new Long(900);
 139:
 140:    /** maximum times to emit a CrawlURI without final disposition */
 141:    public final static String ATTR_MAX_RETRIES = "max-retries";
 142:
 143:    protected final static Integer DEFAULT_MAX_RETRIES = new Integer(30);
 144:
 145:    public final static String ATTR_QUEUE_ASSIGNMENT_POLICY =
 146:        "queue-assignment-policy";
 147:
 148:    /** queue assignment to force onto CrawlURIs; intended to be overridden */
 149:    public final static String ATTR_FORCE_QUEUE = "force-queue-assignment";
 150:
 151:    protected final static String DEFAULT_FORCE_QUEUE = "";
 152:
 153:    // word chars, dash, period, comma, colon
 154:    protected final static String ACCEPTABLE_FORCE_QUEUE = "[-\\w\\.,:]*";
 155:        
 156:    /** whether pause, rather than finish, when crawl appears done */
 157:    public final static String ATTR_PAUSE_AT_FINISH = "pause-at-finish";
 158:    // TODO: change default to true once well-tested
 159:    protected final static Boolean DEFAULT_PAUSE_AT_FINISH = Boolean.FALSE;
 160:    
 161:    /** whether to pause at crawl start */
 162:    public final static String ATTR_PAUSE_AT_START = "pause-at-start";
 163:    protected final static Boolean DEFAULT_PAUSE_AT_START = Boolean.FALSE;
 164:    
 165:    /** whether to pause at crawl start */
 166:    public final static String ATTR_SOURCE_TAG_SEEDS = "source-tag-seeds";
 167:    protected final static Boolean DEFAULT_SOURCE_TAG_SEEDS = Boolean.FALSE;
 168:
 169:    /**
 170:     * Recover log on or off attribute.
 171:     */
 172:    protected final static String ATTR_RECOVERY_ENABLED =
 173:        "recovery-log-enabled";
 174:    protected final static Boolean DEFAULT_ATTR_RECOVERY_ENABLED =
 175:        Boolean.TRUE;
 176:
 177:    // top-level stats
 178:    protected long queuedUriCount = 0; // total URIs queued to be visited
 179:
 180:    protected long succeededFetchCount = 0;
 181:
 182:    protected long failedFetchCount = 0;
 183:
 184:    protected long disregardedUriCount = 0; //URIs that are disregarded (for
 185:                                          // example because of robot.txt rules)
 186:
 187:    /**
 188:     * Used when bandwidth constraint are used.
 189:     */
 190:    protected long totalProcessedBytes = 0;
 191:
 192:    private transient long nextURIEmitTime = 0;
 193:
 194:    protected long processedBytesAfterLastEmittedURI = 0;
 195:    
 196:    protected int lastMaxBandwidthKB = 0;
 197:
 198:    /** Policy for assigning CrawlURIs to named queues */
 199:    protected transient QueueAssignmentPolicy queueAssignmentPolicy = null;
 200:
 201:    /**
 202:     * Crawl replay logger.
 203:     * 
 204:     * Currently captures Frontier/URI transitions.
 205:     * Can be null if user chose not to run a recovery.log.
 206:     */
 207:    private transient FrontierJournal recover = null;
 208:
 209:    /** file collecting report of ignored seed-file entries (if any) */
 210:    public static final String IGNORED_SEEDS_FILENAME = "seeds.ignored";
 211:
 212:    /**
 213:     * @param name Name of this frontier.
 214:     * @param description Description for this frontier.
 215:     */
 216:	      public AbstractFrontier(String name, String description) {
 217:        super(name, description);
 218:        addElementToDefinition(new SimpleType(ATTR_DELAY_FACTOR,
 219:                "How many multiples of last fetch elapsed time to wait before "
 220:                        + "recontacting same server", DEFAULT_DELAY_FACTOR));
 221:        addElementToDefinition(new SimpleType(ATTR_MAX_DELAY,
 222:                "Never wait more than this long.", DEFAULT_MAX_DELAY));
 223:        addElementToDefinition(new SimpleType(ATTR_MIN_DELAY,
 224:                "Always wait this long after one completion before recontacting "
 225:                        + "same server.", DEFAULT_MIN_DELAY));
 226:        addElementToDefinition(new SimpleType(ATTR_MAX_RETRIES,
 227:                "How often to retry fetching a URI that failed to be retrieved. "
 228:                        + "If zero, the crawler will get the robots.txt only.",
 229:                DEFAULT_MAX_RETRIES));
 230:        addElementToDefinition(new SimpleType(ATTR_RETRY_DELAY,
 231:                "How long to wait by default until we retry fetching a"
 232:                        + " URI that failed to be retrieved (seconds). ",
 233:                DEFAULT_RETRY_DELAY));
 234:        addElementToDefinition(new SimpleType(
 235:                ATTR_PREFERENCE_EMBED_HOPS,
 236:                "Number of embedded (or redirected) hops up to which "
 237:                + "a URI has higher priority scheduling. For example, if set "
 238:                + "to 1 (the default), items such as inline images (1-hop "
 239:                + "embedded resources) will be scheduled ahead of all regular "
 240:                + "links (or many-hop resources, like nested frames). If set to "
 241:                + "zero, no preferencing will occur, and embeds/redirects are "
 242:                + "scheduled the same as regular links.",
 243:                DEFAULT_PREFERENCE_EMBED_HOPS));
 244:        Type t;
 245:        t = addElementToDefinition(new SimpleType(
 246:                ATTR_MAX_OVERALL_BANDWIDTH_USAGE,
 247:                "The maximum average bandwidth the crawler is allowed to use. "
 248:                + "The actual read speed is not affected by this setting, it only "
 249:                + "holds back new URIs from being processed when the bandwidth "
 250:                + "usage has been to high. 0 means no bandwidth limitation.",
 251:                DEFAULT_MAX_OVERALL_BANDWIDTH_USAGE));
 252:        t.setOverrideable(false);
 253:        t = addElementToDefinition(new SimpleType(
 254:                ATTR_MAX_HOST_BANDWIDTH_USAGE,
 255:                "The maximum average bandwidth the crawler is allowed to use per "
 256:                + "host. The actual read speed is not affected by this setting, "
 257:                + "it only holds back new URIs from being processed when the "
 258:                + "bandwidth usage has been to high. 0 means no bandwidth "
 259:                + "limitation.", DEFAULT_MAX_HOST_BANDWIDTH_USAGE));
 260:        t.setExpertSetting(true);
 261:
 262:        // Read the list of permissible choices from heritrix.properties.
 263:        // Its a list of space- or comma-separated values.
 264:        String queueStr = System.getProperty(AbstractFrontier.class.getName() +
 265:                "." + ATTR_QUEUE_ASSIGNMENT_POLICY,
 266:                HostnameQueueAssignmentPolicy.class.getName() + " " +
 267:                IPQueueAssignmentPolicy.class.getName() + " " +
 268:                BucketQueueAssignmentPolicy.class.getName() + " " +
 269:                SurtAuthorityQueueAssignmentPolicy.class.getName());
 270:        Pattern p = Pattern.compile("\\s*,\\s*|\\s+");
 271:        String [] queues = p.split(queueStr);
 272:	          if (queues.length <= 0) {
 273:            throw new RuntimeException("Failed parse of " +
 274:                    " assignment queue policy string: " + queueStr);
 275:        }
 276:        t = addElementToDefinition(new SimpleType(ATTR_QUEUE_ASSIGNMENT_POLICY,
 277:                "Defines how to assign URIs to queues. Can assign by host, " +
 278:                "by ip, and into one of a fixed set of buckets (1k).",
 279:                queues[0], queues));
 280:        t.setExpertSetting(true);
 281:        t.setOverrideable(false);
 282:
 283:        t = addElementToDefinition(new SimpleType(
 284:                ATTR_FORCE_QUEUE,
 285:                "The queue name into which to force URIs. Should "
 286:                + "be left blank at global level.  Specify a "
 287:                + "per-domain/per-host override to force URIs into "
 288:                + "a particular named queue, regardless of the assignment "
 289:                + "policy in effect (domain or ip-based politeness). "
 290:                + "This could be used on domains known to all be from "
 291:                + "the same small set of IPs (eg blogspot, dailykos, etc.) "
 292:                + "to simulate IP-based politeness, or it could be used if "
 293:                + "you wanted to enforce politeness over a whole domain, even "
 294:                + "though the subdomains are split across many IPs.",
 295:                DEFAULT_FORCE_QUEUE));
 296:        t.setOverrideable(true);
 297:        t.setExpertSetting(true);
 298:        t.addConstraint(new RegularExpressionConstraint(ACCEPTABLE_FORCE_QUEUE,
 299:                Level.WARNING, "This field must contain only alphanumeric "
 300:                + "characters plus period, dash, comma, colon, or underscore."));
 301:        t = addElementToDefinition(new SimpleType(
 302:                ATTR_PAUSE_AT_START,
 303:                "Whether to pause when the crawl begins, before any URIs " +
 304:                "are tried. This gives the operator a chance to verify or " +
 305:                "adjust the crawl before actual work begins. " +
 306:                "Default is false.", DEFAULT_PAUSE_AT_START));
 307:        t = addElementToDefinition(new SimpleType(
 308:                ATTR_PAUSE_AT_FINISH,
 309:                "Whether to pause when the crawl appears finished, rather "
 310:                + "than immediately end the crawl. This gives the operator an "
 311:                + "opportunity to view crawl results, and possibly add URIs or "
 312:                + "adjust settings, while the crawl state is still available. "
 313:                + "Default is false.", DEFAULT_PAUSE_AT_FINISH));
 314:        t.setOverrideable(false);
 315:        
 316:        t = addElementToDefinition(new SimpleType(
 317:                ATTR_SOURCE_TAG_SEEDS,
 318:                "Whether to tag seeds with their own URI as a heritable " +
 319:                "'source' String, which will be carried-forward to all URIs " +
 320:                "discovered on paths originating from that seed. When " +
 321:                "present, such source tags appear in the second-to-last " +
 322:                "crawl.log field.", DEFAULT_SOURCE_TAG_SEEDS));
 323:        t.setOverrideable(false);
 324:        
 325:        t = addElementToDefinition(new SimpleType(ATTR_RECOVERY_ENABLED,
 326:                "Set to false to disable recovery log writing.  Do this if " +
 327:                "you you are using the checkpoint feature for recovering " +
 328:                "crashed crawls.", DEFAULT_ATTR_RECOVERY_ENABLED));
 329:        t.setExpertSetting(true);
 330:        // No sense in it being overrideable.
 331:        t.setOverrideable(false);
 332:    }
 333:
 334:	      public void start() {
 335:        if (((Boolean)getUncheckedAttribute(null, ATTR_PAUSE_AT_START))
 336:	                  .booleanValue()) {
 337:            // trigger crawl-wide pause
 338:            controller.requestCrawlPause();
 339:        } else {
 340:            // simply begin
 341:            unpause(); 
 342:        }
 343:    }
 344:    
 345:	      synchronized public void pause() {
 346:        shouldPause = true;
 347:    }
 348:
 349:	      synchronized public void unpause() {
 350:        shouldPause = false;
 351:        notifyAll();
 352:    }
 353:
 354:    public void initialize(CrawlController c)
 355:	              throws FatalConfigurationException, IOException {
 356:        c.addCrawlStatusListener(this);
 357:        File logsDisk = null;
 358:	          try {
 359:            logsDisk = c.getSettingsDir(CrawlOrder.ATTR_LOGS_PATH);
 360:        } catch (AttributeNotFoundException e) {
 361:            logger.log(Level.SEVERE, "Failed to get logs directory", e);
 362:        }
 363:	          if (logsDisk != null) {
 364:            String logsPath = logsDisk.getAbsolutePath() + File.separatorChar;
 365:            if (((Boolean)getUncheckedAttribute(null, ATTR_RECOVERY_ENABLED))
 366:	                      .booleanValue()) {
 367:                this.recover = new RecoveryJournal(logsPath,
 368:                    FrontierJournal.LOGNAME_RECOVER);
 369:            }
 370:        }
 371:	          try {
 372:            final Class qapClass = Class.forName((String)getUncheckedAttribute(
 373:                    null, ATTR_QUEUE_ASSIGNMENT_POLICY));
 374:
 375:            queueAssignmentPolicy =
 376:                (QueueAssignmentPolicy)qapClass.newInstance();
 377:        } catch (Exception e) {
 378:            logger.log(Level.SEVERE, "Bad queue assignment policy class", e);
 379:            throw new FatalConfigurationException(e.getMessage());
 380:        }
 381:    }
 382:
 383:	      synchronized public void terminate() {
 384:        shouldTerminate = true;
 385:	          if (this.recover != null) {
 386:            this.recover.close();
 387:            this.recover = null;
 388:        }
 389:        unpause();
 390:    }
 391:
 392:	      protected void doJournalFinishedSuccess(CrawlURI c) {
 393:	          if (this.recover != null) {
 394:            this.recover.finishedSuccess(c);
 395:        }
 396:    }
 397:
 398:	      protected void doJournalAdded(CrawlURI c) {
 399:	          if (this.recover != null) {
 400:            this.recover.added(c);
 401:        }
 402:    }
 403:
 404:	      protected void doJournalRescheduled(CrawlURI c) {
 405:	          if (this.recover != null) {
 406:            this.recover.rescheduled(c);
 407:        }
 408:    }
 409:
 410:	      protected void doJournalFinishedFailure(CrawlURI c) {
 411:	          if (this.recover != null) {
 412:            this.recover.finishedFailure(c);
 413:        }
 414:    }
 415:
 416:	      protected void doJournalEmitted(CrawlURI c) {
 417:	          if (this.recover != null) {
 418:            this.recover.emitted(c);
 419:        }
 420:    }
 421:
 422:    /**
 423:     * Frontier is empty only if all queues are empty and no URIs are in-process
 424:     * 
 425:     * @return True if queues are empty.
 426:     */
 427:	      public synchronized boolean isEmpty() {
 428:        return queuedUriCount == 0;
 429:    }
 430:
 431:    /**
 432:     * Increment the running count of queued URIs. Synchronized because
 433:     * operations on longs are not atomic.
 434:     */
 435:	      protected synchronized void incrementQueuedUriCount() {
 436:        queuedUriCount++;
 437:    }
 438:
 439:    /**
 440:     * Increment the running count of queued URIs. Synchronized because
 441:     * operations on longs are not atomic.
 442:     * 
 443:     * @param increment
 444:     *            amount to increment the queued count
 445:     */
 446:	      protected synchronized void incrementQueuedUriCount(long increment) {
 447:        queuedUriCount += increment;
 448:    }
 449:
 450:    /**
 451:     * Note that a number of queued Uris have been deleted.
 452:     * 
 453:     * @param numberOfDeletes
 454:     */
 455:	      protected synchronized void decrementQueuedCount(long numberOfDeletes) {
 456:        queuedUriCount -= numberOfDeletes;
 457:    }
 458:
 459:    /**
 460:     * (non-Javadoc)
 461:     * 
 462:     * @see org.archive.crawler.framework.Frontier#queuedUriCount()
 463:     */
 464:	      public long queuedUriCount() {
 465:        return queuedUriCount;
 466:    }
 467:
 468:    /**
 469:     * (non-Javadoc)
 470:     * 
 471:     * @see org.archive.crawler.framework.Frontier#finishedUriCount()
 472:     */
 473:	      public long finishedUriCount() {
 474:        return succeededFetchCount + failedFetchCount + disregardedUriCount;
 475:    }
 476:
 477:    /**
 478:     * Increment the running count of successfully fetched URIs. Synchronized
 479:     * because operations on longs are not atomic.
 480:     */
 481:	      protected synchronized void incrementSucceededFetchCount() {
 482:        succeededFetchCount++;
 483:    }
 484:
 485:    /**
 486:     * (non-Javadoc)
 487:     * 
 488:     * @see org.archive.crawler.framework.Frontier#succeededFetchCount()
 489:     */
 490:	      public long succeededFetchCount() {
 491:        return succeededFetchCount;
 492:    }
 493:
 494:    /**
 495:     * Increment the running count of failed URIs. Synchronized because
 496:     * operations on longs are not atomic.
 497:     */
 498:	      protected synchronized void incrementFailedFetchCount() {
 499:        failedFetchCount++;
 500:    }
 501:
 502:    /**
 503:     * (non-Javadoc)
 504:     * 
 505:     * @see org.archive.crawler.framework.Frontier#failedFetchCount()
 506:     */
 507:	      public long failedFetchCount() {
 508:        return failedFetchCount;
 509:    }
 510:
 511:    /**
 512:     * Increment the running count of disregarded URIs. Synchronized because
 513:     * operations on longs are not atomic.
 514:     */
 515:	      protected synchronized void incrementDisregardedUriCount() {
 516:        disregardedUriCount++;
 517:    }
 518:
 519:	      public long disregardedUriCount() {
 520:        return disregardedUriCount;
 521:    }
 522:
 523:	      public long totalBytesWritten() {
 524:        return totalProcessedBytes;
 525:    }
 526:
 527:    /**
 528:     * Load up the seeds.
 529:     * 
 530:     * This method is called on initialize and inside in the crawlcontroller
 531:     * when it wants to force reloading of configuration.
 532:     * 
 533:     * @see org.archive.crawler.framework.CrawlController#kickUpdate()
 534:     */
 535:	      public void loadSeeds() {
 536:        Writer ignoredWriter = new StringWriter();
 537:        logger.info("beginning");
 538:        // Get the seeds to refresh.
 539:        Iterator iter = this.controller.getScope().seedsIterator(ignoredWriter);
 540:        int count = 0; 
 541:	          while (iter.hasNext()) {
 542:            UURI u = (UURI)iter.next();
 543:            CandidateURI caUri = CandidateURI.createSeedCandidateURI(u);
 544:            caUri.setSchedulingDirective(CandidateURI.MEDIUM);
 545:            if (((Boolean)getUncheckedAttribute(null, ATTR_SOURCE_TAG_SEEDS))
 546:	                      .booleanValue()) {
 547:                caUri.putString(CoreAttributeConstants.A_SOURCE_TAG,caUri.toString());
 548:                caUri.makeHeritable(CoreAttributeConstants.A_SOURCE_TAG);
 549:            }
 550:            schedule(caUri);
 551:            count++;
 552:	              if(count%1000==0) {
 553:                logger.info(count+" seeds");
 554:            }
 555:        }
 556:        // save ignored items (if any) where they can be consulted later
 557:        saveIgnoredItems(ignoredWriter.toString(), controller.getDisk());
 558:        logger.info("finished");
 559:    }
 560:
 561:    /**
 562:     * Dump ignored seed items (if any) to disk; delete file otherwise.
 563:     * Static to allow non-derived sibling classes (frontiers not yet 
 564:     * subclassed here) to reuse.
 565:     * 
 566:     * @param ignoredItems
 567:     * @param dir 
 568:     */
 569:	      public static void saveIgnoredItems(String ignoredItems, File dir) {
 570:        File ignoredFile = new File(dir, IGNORED_SEEDS_FILENAME);
 571:	          if(ignoredItems==null | ignoredItems.length()>0) {
 572:	              try {
 573:                BufferedWriter bw = new BufferedWriter(new FileWriter(ignoredFile));
 574:                bw.write(ignoredItems);
 575:                bw.close();
 576:            } catch (IOException e) {
 577:                // TODO make an alert?
 578:                e.printStackTrace();
 579:            }
 580:        } else {
 581:            // delete any older file (if any)
 582:            ignoredFile.delete();
 583:        }
 584:    }
 585:
 586:	      protected CrawlURI asCrawlUri(CandidateURI caUri) {
 587:        CrawlURI curi;
 588:	          if (caUri instanceof CrawlURI) {
 589:            curi = (CrawlURI)caUri;
 590:        } else {
 591:            curi = CrawlURI.from(caUri, nextOrdinal++);
 592:        }
 593:        curi.setClassKey(getClassKey(curi));
 594:        return curi;
 595:    }
 596:
 597:    /**
 598:     * @param now
 599:     * @throws InterruptedException
 600:     * @throws EndedException
 601:     */
 602:    protected synchronized void preNext(long now) throws InterruptedException,
 603:	              EndedException {
 604:	          if (this.controller == null) {
 605:            return;
 606:        }
 607:        
 608:        // Check completion conditions
 609:	          if (this.controller.atFinish()) {
 610:            if (((Boolean)getUncheckedAttribute(null, ATTR_PAUSE_AT_FINISH))
 611:	                      .booleanValue()) {
 612:                this.controller.requestCrawlPause();
 613:            } else {
 614:                this.controller.beginCrawlStop();
 615:            }
 616:        }
 617:
 618:        // enforce operator pause
 619:	          if (shouldPause) {
 620:	              while (shouldPause) {
 621:                this.controller.toePaused();
 622:                wait();
 623:            }
 624:            // exitted pause; possibly finish regardless of pause-at-finish
 625:	              if (controller != null && controller.atFinish()) {
 626:                this.controller.beginCrawlStop();
 627:            }
 628:        }
 629:
 630:        // enforce operator terminate or thread retirement
 631:        if (shouldTerminate
 632:	                  || ((ToeThread)Thread.currentThread()).shouldRetire()) {
 633:            throw new EndedException("terminated");
 634:        }
 635:
 636:        enforceBandwidthThrottle(now);
 637:    }
 638:
 639:    /**
 640:     * Perform any special handling of the CrawlURI, such as promoting its URI
 641:     * to seed-status, or preferencing it because it is an embed.
 642:     * 
 643:     * @param curi
 644:     */
 645:	      protected void applySpecialHandling(CrawlURI curi) {
 646:        if (curi.isSeed() && curi.getVia() != null
 647:	                  && curi.flattenVia().length() > 0) {
 648:            // The only way a seed can have a non-empty via is if it is the
 649:            // result of a seed redirect. Add it to the seeds list.
 650:            //
 651:            // This is a feature. This is handling for case where a seed
 652:            // gets immediately redirected to another page. What we're doing is
 653:            // treating the immediate redirect target as a seed.
 654:            this.controller.getScope().addSeed(curi);
 655:            // And it needs rapid scheduling.
 656:        if (curi.getSchedulingDirective() == CandidateURI.NORMAL)
 657:                curi.setSchedulingDirective(CandidateURI.MEDIUM);
 658:        }
 659:
 660:        // optionally preferencing embeds up to MEDIUM
 661:        int prefHops = ((Integer)getUncheckedAttribute(curi,
 662:                ATTR_PREFERENCE_EMBED_HOPS)).intValue();
 663:	          if (prefHops > 0) {
 664:            int embedHops = curi.getTransHops();
 665:            if (embedHops > 0 && embedHops <= prefHops
 666:	                      && curi.getSchedulingDirective() == CandidateURI.NORMAL) {
 667:                // number of embed hops falls within the preferenced range, and
 668:                // uri is not already MEDIUM -- so promote it
 669:                curi.setSchedulingDirective(CandidateURI.MEDIUM);
 670:            }
 671:        }
 672:    }
 673:
 674:    /**
 675:     * Perform fixups on a CrawlURI about to be returned via next().
 676:     * 
 677:     * @param curi
 678:     *            CrawlURI about to be returned by next()
 679:     * @param q
 680:     *            the queue from which the CrawlURI came
 681:     */
 682:	      protected void noteAboutToEmit(CrawlURI curi, WorkQueue q) {
 683:        curi.setHolder(q);
 684:	          // if (curi.getServer() == null) {
 685:        //    // TODO: perhaps short-circuit the emit here,
 686:        //    // because URI will be rejected as unfetchable
 687:        // }
 688:        doJournalEmitted(curi);
 689:    }
 690:
 691:    /**
 692:     * @param curi
 693:     * @return the CrawlServer to be associated with this CrawlURI
 694:     */
 695:	      protected CrawlServer getServer(CrawlURI curi) {
 696:        return this.controller.getServerCache().getServerFor(curi);
 697:    }
 698:
 699:    /**
 700:     * Return a suitable value to wait before retrying the given URI.
 701:     * 
 702:     * @param curi
 703:     *            CrawlURI to be retried
 704:     * @return millisecond delay before retry
 705:     */
 706:	      protected long retryDelayFor(CrawlURI curi) {
 707:        int status = curi.getFetchStatus();
 708:        return (status == S_CONNECT_FAILED || status == S_CONNECT_LOST ||
 709:                status == S_DOMAIN_UNRESOLVABLE)?
 710:            ((Long)getUncheckedAttribute(curi, ATTR_RETRY_DELAY)).longValue():
 711:            0; // no delay for most
 712:    }
 713:
 714:    /**
 715:     * Update any scheduling structures with the new information in this
 716:     * CrawlURI. Chiefly means make necessary arrangements for no other URIs at
 717:     * the same host to be visited within the appropriate politeness window.
 718:     * 
 719:     * @param curi
 720:     *            The CrawlURI
 721:     * @return millisecond politeness delay
 722:     */
 723:	      protected long politenessDelayFor(CrawlURI curi) {
 724:        long durationToWait = 0;
 725:        if (curi.containsKey(A_FETCH_BEGAN_TIME)
 726:	                  && curi.containsKey(A_FETCH_COMPLETED_TIME)) {
 727:
 728:            long completeTime = curi.getLong(A_FETCH_COMPLETED_TIME);
 729:            long durationTaken = (completeTime - curi
 730:                    .getLong(A_FETCH_BEGAN_TIME));
 731:            durationToWait = (long)(((Float)getUncheckedAttribute(curi,
 732:                    ATTR_DELAY_FACTOR)).floatValue() * durationTaken);
 733:
 734:            long minDelay = ((Integer)getUncheckedAttribute(curi,
 735:                    ATTR_MIN_DELAY)).longValue();
 736:	              if (minDelay > durationToWait) {
 737:                // wait at least the minimum
 738:                durationToWait = minDelay;
 739:            }
 740:
 741:            long maxDelay = ((Integer)getUncheckedAttribute(curi,
 742:                    ATTR_MAX_DELAY)).longValue();
 743:	              if (durationToWait > maxDelay) {
 744:                // wait no more than the maximum
 745:                durationToWait = maxDelay;
 746:            }
 747:
 748:            long now = System.currentTimeMillis();
 749:            int maxBandwidthKB = ((Integer)getUncheckedAttribute(curi,
 750:                    ATTR_MAX_HOST_BANDWIDTH_USAGE)).intValue();
 751:	              if (maxBandwidthKB > 0) {
 752:                // Enforce bandwidth limit
 753:                CrawlHost host = controller.getServerCache().getHostFor(curi);
 754:                long minDurationToWait = host.getEarliestNextURIEmitTime()
 755:                        - now;
 756:                float maxBandwidth = maxBandwidthKB * 1.024F; // kilo factor
 757:                long processedBytes = curi.getContentSize();
 758:                host
 759:                        .setEarliestNextURIEmitTime((long)(processedBytes / maxBandwidth)
 760:                                + now);
 761:
 762:	                  if (minDurationToWait > durationToWait) {
 763:                    durationToWait = minDurationToWait;
 764:                }
 765:            }
 766:        }
 767:        return durationToWait;
 768:    }
 769:
 770:    /**
 771:     * Ensure that any overall-bandwidth-usage limit is respected, by pausing as
 772:     * long as necessary.
 773:     * 
 774:     * @param now
 775:     * @throws InterruptedException
 776:     */
 777:	      private void enforceBandwidthThrottle(long now) throws InterruptedException {
 778:        int maxBandwidthKB = ((Integer)getUncheckedAttribute(null,
 779:                ATTR_MAX_OVERALL_BANDWIDTH_USAGE)).intValue();
 780:	          if (maxBandwidthKB > 0) {
 781:            // Make sure that new bandwidth setting doesn't affect total crawl
 782:	              if (maxBandwidthKB != lastMaxBandwidthKB) {
 783:                lastMaxBandwidthKB = maxBandwidthKB;
 784:                processedBytesAfterLastEmittedURI = totalProcessedBytes;
 785:            }
 786:
 787:            // Enforce bandwidth limit
 788:            long sleepTime = nextURIEmitTime - now;
 789:            float maxBandwidth = maxBandwidthKB * 1.024F; // Kilo_factor
 790:            long processedBytes = totalProcessedBytes
 791:                    - processedBytesAfterLastEmittedURI;
 792:            long shouldHaveEmittedDiff = nextURIEmitTime == 0? 0
 793:                    : nextURIEmitTime - now;
 794:            nextURIEmitTime = (long)(processedBytes / maxBandwidth) + now
 795:                    + shouldHaveEmittedDiff;
 796:            processedBytesAfterLastEmittedURI = totalProcessedBytes;
 797:	              if (sleepTime > 0) {
 798:                long targetTime = now + sleepTime;
 799:                now = System.currentTimeMillis();
 800:	                  while (now < targetTime) {
 801:	                      synchronized (this) {
 802:	                          if (logger.isLoggable(Level.FINE)) {
 803:                            logger.fine("Frontier waits for: " + sleepTime
 804:                                    + "ms to respect bandwidth limit.");
 805:                        }
 806:                        // TODO: now that this is a wait(), frontier can
 807:                        // still schedule and finish items while waiting,
 808:                        // which is good, but multiple threads could all
 809:                        // wait for the same wakeTime, which somewhat
 810:                        // spoils the throttle... should be fixed.
 811:                        wait(targetTime - now);
 812:                    }
 813:                    now = System.currentTimeMillis();
 814:                }
 815:            }
 816:        }
 817:    }
 818:
 819:    /**
 820:     * Take note of any processor-local errors that have been entered into the
 821:     * CrawlURI.
 822:     * 
 823:     * @param curi
 824:     *  
 825:     */
 826:	      protected void logLocalizedErrors(CrawlURI curi) {
 827:	          if (curi.containsKey(A_LOCALIZED_ERRORS)) {
 828:            List localErrors = (List)curi.getObject(A_LOCALIZED_ERRORS);
 829:            Iterator iter = localErrors.iterator();
 830:	              while (iter.hasNext()) {
 831:                Object array[] = {curi, iter.next()};
 832:                controller.localErrors.log(Level.WARNING, curi.getUURI()
 833:                        .toString(), array);
 834:            }
 835:            // once logged, discard
 836:            curi.remove(A_LOCALIZED_ERRORS);
 837:        }
 838:    }
 839:
 840:    /**
 841:     * Utility method to return a scratch dir for the given key's temp files.
 842:     * Every key gets its own subdir. To avoid having any one directory with
 843:     * thousands of files, there are also two levels of enclosing directory
 844:     * named by the least-significant hex digits of the key string's java
 845:     * hashcode.
 846:     * 
 847:     * @param key
 848:     * @return File representing scratch directory
 849:     */
 850:	      protected File scratchDirFor(String key) {
 851:        String hex = Integer.toHexString(key.hashCode());
 852:	          while (hex.length() < 4) {
 853:            hex = "0" + hex;
 854:        }
 855:        int len = hex.length();
 856:        return new File(this.controller.getStateDisk(), hex.substring(len - 2,
 857:                len)
 858:                + File.separator
 859:                + hex.substring(len - 4, len - 2)
 860:                + File.separator + key);
 861:    }
 862:
 863:	      protected boolean overMaxRetries(CrawlURI curi) {
 864:        // never retry more than the max number of times
 865:        if (curi.getFetchAttempts() >= ((Integer)getUncheckedAttribute(curi,
 866:	                  ATTR_MAX_RETRIES)).intValue()) {
 867:            return true;
 868:        }
 869:        return false;
 870:    }
 871:
 872:    public void importRecoverLog(String pathToLog, boolean retainFailures)
 873:	              throws IOException {
 874:        File source = new File(pathToLog);
 875:	          if (!source.isAbsolute()) {
 876:            source = new File(getSettingsHandler().getOrder().getController()
 877:                    .getDisk(), pathToLog);
 878:        }
 879:        RecoveryJournal.importRecoverLog(source, this, retainFailures);
 880:    }
 881:
 882:    /*
 883:     * (non-Javadoc)
 884:     * 
 885:     * @see org.archive.crawler.framework.URIFrontier#kickUpdate()
 886:     */
 887:	      public void kickUpdate() {
 888:        // by default, do nothing
 889:        // (scope will loadSeeds, if appropriate)
 890:    }
 891:
 892:    /**
 893:     * Log to the main crawl.log
 894:     * 
 895:     * @param curi
 896:     */
 897:	      protected void log(CrawlURI curi) {
 898:        curi.aboutToLog();
 899:        Object array[] = {curi};
 900:        this.controller.uriProcessing.log(Level.INFO,
 901:                curi.getUURI().toString(), array);
 902:    }
 903:
 904:	      protected boolean isDisregarded(CrawlURI curi) {
 905:	          switch (curi.getFetchStatus()) {
 906:        case S_ROBOTS_PRECLUDED: // they don't want us to have it
 907:        case S_BLOCKED_BY_CUSTOM_PROCESSOR:
 908:        case S_OUT_OF_SCOPE: // filtered out by scope
 909:        case S_BLOCKED_BY_USER: // filtered out by user
 910:        case S_TOO_MANY_EMBED_HOPS: // too far from last true link
 911:        case S_TOO_MANY_LINK_HOPS: // too far from seeds
 912:        case S_DELETED_BY_USER: // user deleted
 913:            return true;
 914:        default:
 915:            return false;
 916:        }
 917:    }
 918:
 919:    /**
 920:     * Checks if a recently completed CrawlURI that did not finish successfully
 921:     * needs to be retried (processed again after some time elapses)
 922:     * 
 923:     * @param curi
 924:     *            The CrawlURI to check
 925:     * @return True if we need to retry.
 926:     */
 927:	      protected boolean needsRetrying(CrawlURI curi) {
 928:	          if (overMaxRetries(curi)) {
 929:            return false;
 930:        }
 931:
 932:	          switch (curi.getFetchStatus()) {
 933:        case HttpStatus.SC_UNAUTHORIZED:
 934:            // We can get here though usually a positive status code is
 935:            // a success. We get here if there is rfc2617 credential data
 936:            // loaded and we're supposed to go around again. See if any
 937:            // rfc2617 credential present and if there, assume it got
 938:            // loaded in FetchHTTP on expectation that we're to go around
 939:            // again. If no rfc2617 loaded, we should not be here.
 940:            boolean loaded = curi.hasRfc2617CredentialAvatar();
 941:	              if (!loaded && logger.isLoggable(Level.INFO)) {
 942:                logger.info("Have 401 but no creds loaded " + curi);
 943:            }
 944:            return loaded;
 945:        case S_DEFERRED:
 946:        case S_CONNECT_FAILED:
 947:        case S_CONNECT_LOST:
 948:        case S_DOMAIN_UNRESOLVABLE:
 949:            // these are all worth a retry
 950:            // TODO: consider if any others (S_TIMEOUT in some cases?) deserve
 951:            // retry
 952:            return true;
 953:        default:
 954:            return false;
 955:        }
 956:    }
 957:
 958:    /**
 959:     * Canonicalize passed uuri. Its would be sweeter if this canonicalize
 960:     * function was encapsulated by that which it canonicalizes but because
 961:     * settings change with context -- i.e. there may be overrides in operation
 962:     * for a particular URI -- its not so easy; Each CandidateURI would need a
 963:     * reference to the settings system. That's awkward to pass in.
 964:     * 
 965:     * @param uuri Candidate URI to canonicalize.
 966:     * @return Canonicalized version of passed <code>uuri</code>.
 967:     */
 968:	      protected String canonicalize(UURI uuri) {
 969:        return Canonicalizer.canonicalize(uuri, this.controller.getOrder());
 970:    }
 971:
 972:    /**
 973:     * Canonicalize passed CandidateURI. This method differs from
 974:     * {@link #canonicalize(UURI)} in that it takes a look at
 975:     * the CandidateURI context possibly overriding any canonicalization effect if
 976:     * it could make us miss content. If canonicalization produces an URL that
 977:     * was 'alreadyseen', but the entry in the 'alreadyseen' database did
 978:     * nothing but redirect to the current URL, we won't get the current URL;
 979:     * we'll think we've already see it. Examples would be archive.org
 980:     * redirecting to www.archive.org or the inverse, www.netarkivet.net
 981:     * redirecting to netarkivet.net (assuming stripWWW rule enabled).
 982:     * <p>Note, this method under circumstance sets the forceFetch flag.
 983:     * 
 984:     * @param cauri CandidateURI to examine.
 985:     * @return Canonicalized <code>cacuri</code>.
 986:     */
 987:	      protected String canonicalize(CandidateURI cauri) {
 988:        String canon = canonicalize(cauri.getUURI());
 989:	          if (cauri.isLocation()) {
 990:            // If the via is not the same as where we're being redirected (i.e.
 991:            // we're not being redirected back to the same page, AND the
 992:            // canonicalization of the via is equal to the the current cauri, 
 993:            // THEN forcefetch (Forcefetch so no chance of our not crawling
 994:            // content because alreadyseen check things its seen the url before.
 995:            // An example of an URL that redirects to itself is:
 996:            // http://bridalelegance.com/images/buttons3/tuxedos-off.gif.
 997:            // An example of an URL whose canonicalization equals its via's
 998:            // canonicalization, and we want to fetch content at the
 999:            // redirection (i.e. need to set forcefetch), is netarkivet.dk.
1000:            if (!cauri.toString().equals(cauri.getVia().toString()) &&
1001:	                      canonicalize(cauri.getVia()).equals(canon)) {
1002:                cauri.setForceFetch(true);
1003:            }
1004:        }
1005:        return canon;
1006:    }
1007:
1008:    /**
1009:     * @param cauri CrawlURI we're to get a key for.
1010:     * @return a String token representing a queue
1011:     */
1012:	      public String getClassKey(CandidateURI cauri) {
1013:        String queueKey = (String)getUncheckedAttribute(cauri,
1014:            ATTR_FORCE_QUEUE);
1015:	          if ("".equals(queueKey)) {
1016:            // Typical case, barring overrides
1017:            queueKey =
1018:                queueAssignmentPolicy.getClassKey(this.controller, cauri);
1019:        }
1020:        return queueKey;
1021:    }
1022:
1023:    /**
1024:     * @return RecoveryJournal instance.  May be null.
1025:     */
1026:	      public FrontierJournal getFrontierJournal() {
1027:        return this.recover;
1028:    }
1029:
1030:	      public void crawlEnding(String sExitMessage) {
1031:        // TODO Auto-generated method stub
1032:    }
1033:
1034:	      public void crawlEnded(String sExitMessage) {
1035:	          if (logger.isLoggable(Level.INFO)) {
1036:            logger.info("Closing with " + Long.toString(queuedUriCount()) +
1037:                " urls still in queue.");
1038:        }
1039:    }
1040:
1041:	      public void crawlStarted(String message) {
1042:        // TODO Auto-generated method stub
1043:    }
1044:
1045:	      public void crawlPausing(String statusMessage) {
1046:        // TODO Auto-generated method stub
1047:    }
1048:
1049:	      public void crawlPaused(String statusMessage) {
1050:        // TODO Auto-generated method stub
1051:    }
1052:
1053:	      public void crawlResuming(String statusMessage) {
1054:        // TODO Auto-generated method stub
1055:    }
1056:    
1057:    public void crawlCheckpoint(File checkpointDir)
1058:	      throws Exception {
1059:	          if (this.recover == null) {
1060:            return;
1061:        }
1062:        this.recover.checkpoint(checkpointDir);
1063:    }
1064:    
1065:    //
1066:    // Reporter implementation
1067:    // 
1068:	      public String singleLineReport() {
1069:        return ArchiveUtils.singleLineReport(this);
1070:    }
1071:
1072:	      public void reportTo(PrintWriter writer) {
1073:        reportTo(null, writer);
1074:    }
1075:}