Internals

This document describes the crawler’s architecture and internal structures.

Service Architecture

This diagram shows the components used to deploy Starbelly.

graph service_diagram {
    graph [bgcolor=transparent];
    node [shape=box];
    edge [fontsize=10];

    // Nodes
    app            [label="Starbelly"];
    web            [label="Web server"];
    database       [label="RethinkDB"];
    webclient      [label="Web client"];
    othclient      [label="Other client"];

    // Edges
    web -- app [label="Proxy WebSocket"];
    app -- database;
    webclient -- web [label="Static Assets\n& WebSocket"];
    othclient -- web [label="WebSocket"];
}

The web server serves static assets (such as the web client’s HTML, CSS, and images) and also proxies the WebSocket API to the crawler. The official deployment uses Nginx as its web server, but other servers like Apache could be used instead.

The web client is the standard client for end users to interact with the crawler. Other clients (e.g. a client you write in Python) connect over the same WebSocket API as the web client and have access to all of the same functionality. (See WebSocket API.)

The crawler itself is a single Python process that runs crawls jobs and services WebSocket API requests.

All configuration, metadata, and crawl data are stored in RethinkDB..

Asynchronous Framework

Starbelly is written in Python 3 using the Trio framework for most of the asynchronous I/O. Some parts of the implementation depend on asyncio libraries, which are invoked using the trio-asyncio bridge:

  • The downloader uses aiohttp and aiosocks, because there is not a mature Trio library for HTTP that supports SOCKS proxies.
  • The downloader users aiohttp. It could be ported to asks but I didn’t want to mix two different HTTP clients into the same project. When the downloader is updated, this can be updated as well.

Crawling Pipeline

Starbelly’s crawling operations consist of a pipeline of various components. The frontier is a component that keeps track of which URLs remain to be crawled. These URLs are fed to a rate limiter which acts as a bottleneck that prevents crawling sites too fast. The downloader fetches URLs from the rate limiter and downloads them over the network. The storage component saves the downloaded items in the database. The extractor finds URLs in the downloaded items and sends them back to the frontier so those URLs can be crawled, too.

Each crawl job creates its own crawling components (frontier, downloader, storage, extractor) and they run concurrently with all other jobs. The rate limiter is shared across all jobs to ensure that rate limits are enforced correctly even when two different jobs are crawling the same domain.

digraph crawl_pipeline {
    graph [bgcolor=transparent];
    node [shape=box, style=filled];

    frontier1 [label="Job #1 Frontier", fillcolor="#9ac2f9"]
    downloader1 [label="Downloader", fillcolor="#9ac2f9"]
    extractor1 [label="Extractor", fillcolor="#9ac2f9"]
    storage1 [label="Storage", fillcolor="#9ac2f9"]

    frontier2 [label="Job #2 Frontier", fillcolor="#9af9ad"];
    downloader2 [label="Downloader", fillcolor="#9af9ad"];
    extractor2 [label="Extractor", fillcolor="#9af9ad"];
    storage2 [label="Storage", fillcolor="#9af9ad"];

    rate_limiter [label="Rate Limiter", fillcolor=grey];

    frontier1 -> rate_limiter -> downloader1 -> storage1 -> extractor1;
    frontier2 -> rate_limiter -> downloader2 -> storage2 -> extractor2;
    frontier1 -> extractor1 [dir=back, style=dashed];
    frontier2 -> extractor2 [dir=back, style=dashed];
}

This diagram depicts the crawling pipeline for two concurrent jobs: Job #1 in blue and Job #2 in green. The solid lines indicate “channels”, i.e. in-memory communication between components. The dashed lines indicate data that is written to a database table by one component and later read back by another component.

System Components

Starbelly doesn’t just contain crawling components: it also contains components that provide management and introspection for the crawling system. This section briefly explains each high-level component, and the subsequent sections provide low-level details for each component.

API Server
The API server allows clients to interact with Starbelly by sending protobuf messages over a WebSocket connection. The server uses a simple request/response model for most API calls, and also has a subscription/event model when the client wants push updates.
CAPTCHA
Components that deal with CAPTCHA images.
Crawl Management
The crawl manager is responsible for the lifecycle of crawl jobs, such as starting/stopping jobs and keeping track of resource usage.
Downloader
Responsible for fetching items from the network. Although this is a seemingly simple responsibility, the downloader is responsible for significant portions of the crawling policy, such as enforcing proxy policy and MIME type rules.
Frontier
The crawl frontier keeps track of the URLs that are pending download.
Extractor
Parses response bodies to discover new URLs that may be added to the crawl frontier.
Login Manager
Automates the process of logging in to a site to perform an authenticated crawl.
Policy
Controls the crawler’s decision making, for example how to handle robots.txt exclusion rules, how to prioritize URLs, how long to run the crawl, etc.
Rate Limiter
Acts a bottleneck between the jobs’ crawl frontiers and the jobs’ downloaders. It prevents sites from being crawled too quickly, even when multiple jobs are crawling the same domain.
Resource Monitor
Provides introspection into other components to track things like how many items are currently being download, how many items are queued in the rate limiter, etc.
Robots.txt Manager
Responsible for fetching robots.txt files as necessary, maintaining a local cache of robots.txt files, and making access control decisions, such as, “is job X allowed to access URL Y?”
Scheduler
Controls the crawling schedule. When a job needs to run, the scheduler will automatically start it.
Storage
The crawl storage saves downloaded items into the database.
Subscription
Handles API subscriptions such as syncing crawl data.

API Server

The main interaction point for Starbelly is through its WebSocket API.

exception starbelly.server.InvalidRequestException

Indicates a request is invalid.

class starbelly.server.Server(host, port, server_db, subscription_db, crawl_manager, rate_limiter, resource_monitor, stats_tracker, scheduler)

Handles websocket connections from clients and command dispatching.

await run(*, task_status=TASK_STATUS_IGNORED)

Run the websocket server.

To ensure that the server is ready, call await nursery.start(server.run).

Returns:Runs until cancelled.
starbelly.server.api_handler(handler)

This decorator registers a function as a callable command through the API server.

await starbelly.server.captcha.delete_captcha_solver(command, server_db)

Delete a a CAPTCHA solver.

await starbelly.server.captcha.get_captcha_solver(command, response, server_db)

Get a CAPTCHA solver.

await starbelly.server.captcha.list_captcha_solvers(command, response, server_db)

Return a list of CAPTCHA solvers.

await starbelly.server.captcha.set_captcha_solver(command, response, server_db)

Create or update CAPTCHA solver.

await starbelly.server.job.delete_job(command, server_db, stats_tracker)

Delete a job.

await starbelly.server.job.get_job(command, response, server_db)

Get status for a single job.

await starbelly.server.job.get_job_items(command, response, server_db)

Get a page of items (crawl responses) from a job.

await starbelly.server.job.list_jobs(command, response, server_db)

Return a list of jobs.

await starbelly.server.job.set_job(command, crawl_manager, response)

Create or update job metadata.

await starbelly.server.login.delete_domain_login(command, server_db)

Delete a domain login and all of its users.

await starbelly.server.login.get_domain_login(command, response, server_db)

Get a domain login.

await starbelly.server.login.list_domain_logins(command, response, server_db)

Return a list of domain logins.

await starbelly.server.login.set_domain_login(command, server_db)

Create or update a domain login.

await starbelly.server.policy.delete_policy(command, server_db)

Delete a policy.

await starbelly.server.policy.get_policy(command, response, server_db)

Get a single policy.

await starbelly.server.policy.list_policies(command, response, server_db)

Get a list of policies.

await starbelly.server.policy.set_policy(command, response, server_db)

Create or update a single policy.

If the policy ID is set, then update the corresponding policy. Otherwise, create a new policy.

await starbelly.server.rate_limit.list_rate_limits(command, response, server_db)

Get a page of rate limits.

await starbelly.server.rate_limit.set_rate_limit(command, rate_limiter, server_db)

Set a rate limit.

await starbelly.server.schedule.delete_schedule(command, scheduler, server_db)

Delete a job schedule.

await starbelly.server.schedule.get_schedule(command, response, server_db)

Get metadata for a job schedule.

await starbelly.server.schedule.list_schedule_jobs(command, response, server_db)

Return a list of job schedules.

await starbelly.server.schedule.list_schedules(command, response, server_db)

Return a list of job schedules.

await starbelly.server.schedule.set_schedule(command, response, scheduler, server_db)

Create or update job schedule metadata.

await starbelly.server.subscription.subscribe_job_status(command, response, subscription_manager, stats_tracker)

Handle the subscribe crawl status command.

await starbelly.server.subscription.subscribe_job_sync(command, crawl_manager, response, subscription_manager)

Handle the subscribe crawl items command.

await starbelly.server.subscription.subscribe_resource_monitor(command, response, resource_monitor, subscription_manager)

Handle the subscribe resource monitor command.

await starbelly.server.subscription.subscribe_task_monitor(command, response, subscription_manager)

Handle the subscribe task monitor command.

await starbelly.server.subscription.unsubscribe(command, subscription_manager)

Handle an unsubscribe command.

await starbelly.server.system.performance_profile(command, response)

Run CPU profiler.

CAPTCHA

Starbelly supports passing CAPTCHA images to third-party solving services.

class starbelly.captcha.CaptchaSolver(doc)

An interface for a CAPTCHA solving service.

get_command(img_data)

Return a JSON API command.

Parameters:img_data (bytes) – The image data for the CAPTCHA.
Returns:A command that can be serialized to JSON.
Return type:dict
starbelly.captcha.captcha_doc_to_pb(doc)

Convert CAPTCHA solver from database document to protobuf.

Parameters:doc (dict) – A database document.
Returns:A protobuf message.
starbelly.captcha.captcha_pb_to_doc(pb)

Convert CAPTCHA solver from protobuf to database document.

Parameters:pb – A protobuf message.
Returns:A database document.
Return type:dict

Crawl Management

Crawl management is implemented by two main classes. The first class, CrawlManager acts as a registry and supervisor for all currently running jobs. All interactions with jobs (starting, pausing, etc.) are done through this class.

class starbelly.job.CrawlManager(rate_limiter, stats_tracker, robots_txt_manager, crawl_db, frontier_db, extractor_db, storage_db, login_db)

Manage crawl jobs and provide introspection into their states.

await cancel_job(job_id)

Stop the specified job if it is currently running and set it to cancelled in the database.

Parameters:job_id (str) –
get_job_state_channel(size=10)

Open a new job state channel.

JobStateEvent objects will be sent to this channel when a job changes state. When the channel is full, state events will not be sent, so consumers need to read events continually.

Parameters:size (int) – The size of the channel.
Return type:trio.ReceiveChannel
get_resource_usage()

Return statistics about crawler resource usage, i.e. concurrent downloads.

Rtype dict:
await pause_job(job_id)

Stop the specified job and set it to paused in the database.

Parameters:job_id (str) –
await resume_job(job_id)

Resume a paused job: load it from the database and run it.

Parameters:job_id (str) – The ID of the job to resume.
await run(*, task_status=TASK_STATUS_IGNORED)

Run the crawl manager.

You should call await nursery.start(crawl_manager.run) to ensure that the crawl manager is ready before calling any of its job methods.

Returns:This function runs until cancelled.
await start_job(name, seeds, tags, policy_id, schedule_id=None)

Start a new job.

This adds the job to the database and runs it.

Parameters:
  • name (str) – The name of the new job.
  • seeds (list[str]) – A list of seeds.
  • tags (list[str]) – A list of tags.
  • policy_id (str) – The ID of the policy to apply to this job.
  • schedule_id (str) – (Optional) The schedule that started this job.
Returns:

The new job ID.

Return type:

str

Each job also has a single object that acts as a supervisor for all of the rest of the crawling components in that job.

class starbelly.job.CrawlJob(name, job_id, schedule_id, policy, frontier, downloader, storage, extractor, terminator)

Manages job state.

id

The job’s unique ID.

Rtype str:
old_urls

Returns a set of hashed URLs that the crawl has seen before.

await run()

Start all of the subcomponents of the job.

Returns:Runs until this job finishes.
await stop()

Stop the crawl job.

Downloader

The downloader is responsible for fetching resources over the network and sending them back to the crawl manager.

class starbelly.downloader.DownloadRequest(frontier_id: bytes, job_id: bytes, method: str, url: str, form_data: dict, cost: float)

Represents a resource that needs to be downloaded.

class starbelly.downloader.DownloadResponse(frontier_id: bytes, cost: float, url: str, canonical_url: str, content_type: str = None, body: bytes = None, started_at: datetime.datetime = None, completed_at: datetime.datetime = None, exception: str = None, status_code: int = None, headers: dict = None)

Represents the result of downloading a resource, which could contain a successful response body, an HTTP error, or an exception.

classmethod from_request(request)

Initialize a response from its corresponding request.

Parameters:request (DownloadRequest) – The request that generated this response.
set_exception(exception)

Indicate that an exception occurred while downloading this resource.

Parameters:exception (str) – Traceback of the exception.
set_response(http_response, body)

Update state from HTTP response.

start()

Called when the request has been sent and the response is being waited for.

class starbelly.downloader.Downloader(job_id, policy, send_channel, recv_channel, semaphore, rate_limiter_reset, stats)

This class is responsible for downloading resources. A new instance is created for each crawl job.

count

Return number of current downloads in progress.

Rtype int:
await download(request, skip_mime=False)

Download a requested resource and return it.

Note: this is probably not the method you want! Most downloads should be sent through the request channel. This method is only for unusual cases where we want to download one item and return the response directly to the caller, such as a robot.txt or a login page.

These responses are not included in job statistics and do not get stored in the database. The caller should apply their own timeout here.

Parameters:
  • request (DownloadRequest) –
  • skip_mime (bool) – If True, the MIME type will not be checked against the policy.
Rtype DownloadResponse:
 
await run()

Run the downloader, including all concurrent download tasks. When cancelled, all download tasks are also cancelled.

Returns:Runs until cancelled.
class starbelly.downloader.MimeNotAllowedError(mime)

Indicates that the MIME type of a response is not allowed by policy.

Extractor

The extractor component extracts URLs from download resources and adds those URLs that comply with the policy to the fontier.

class starbelly.extractor.CrawlExtractor(job_id, db, send_channel, receive_channel, policy, downloader, robots_txt_manager, old_urls, stats, batch_size=100)

Extract URLs from crawled items and add them to the frontier table.

await run()

Read responses from extraction channel and add them to the frontier.

Returns:This function runs until cancelled.

Frontier

The frontier component keeps track of which URLs are pending to be crawled. The frontier issues download requests ordered by “crawl cost”, i.e. pending items with lower cost values are issued first.

class starbelly.frontier.CrawlFrontier(job_id, db, send_channel, login_manager, policy, stats)

Contains the logic for managing a crawl frontier, i.e. the URLs that have already been crawled and the URLs that are remaining to be crawled.

await run()

This task takes items off the frontier and sends them to the rate limiter.

Returns:This function runs until cancelled.

Login Manager

The login manager uses the Formasaurus library to find a login form on a page, fill it out, and submit it to get session cookies for authenticated crawling. Under the hood, Formasaurus uses a pre-built machine learning model (a conditional random field) to classify types of forms on a page and to classify the types of inputs in each form. If necessary, the login manager will also request a CAPTCHA solution from a CAPTCHA solver.

class starbelly.login.LoginManager(job_id, db, policy, downloader)
await login(domain)

Attempt a login for the given domain.

Parameters:domain (str) – The domain to log into.

Policy

Policy objects guide the crawler’s decision making, i.e. which links to follow, which resources to download, when to use a proxy, etc. The policy manager is responsible for saving and loading policies from the database.

A policy object is a container that includes many various subpolicies.

class starbelly.policy.Policy(doc, version, seeds)

A container for subpolicies.

staticmethod convert_doc_to_pb(doc, pb)

Convert policy from database document to protobuf.

Parameters:
  • doc (dict) – Database document.
  • pb (starbelly.starbelly_pb2.Policy) – An empty protobuf.
staticmethod convert_pb_to_doc(pb)

Convert protobuf to database document.

Parameters:pb (starbelly.starbelly_pb2.Policy.) – A protobuf
Returns:Database document.
Return type:dict
replace_mime_type_rules(rules)

Return a shallow copy of this policy with new MIME type rules from doc.

Parameters:rules (list) – MIME type rules in database document form.
Returns:A new policy.
Return type:Policy
class starbelly.policy.PolicyAuthentication(doc)

Policy for authenticated crawling.

staticmethod convert_doc_to_pb(doc, pb)

Convert from database document to protobuf.

Parameters:
  • doc (dict) – Database document.
  • pb (starbelly.starbelly_pb2.PolicyAuthentication) – An empty protobuf.
staticmethod convert_pb_to_doc(pb, doc)

Convert protobuf to database document.

Parameters:pb (starbelly.starbelly_pb2.PolicyAuthentication) – A protobuf
Returns:Database document.
Return type:dict
is_enabled()

Return True if authentication is enabled.

Return type:bool
class starbelly.policy.PolicyLimits(doc)

Limits on crawl size/duration.

staticmethod convert_doc_to_pb(doc, pb)

Convert from database document to protobuf.

Parameters:
  • doc (dict) – Database document.
  • pb (starbelly.starbelly_pb2.PolicyLimits) – An empty protobuf.
staticmethod convert_pb_to_doc(pb, doc)

Convert protobuf to database document.

Parameters:pb (starbelly.starbelly_pb2.PolicyLimits) – A protobuf
Returns:Database document.
Return type:dict
exceeds_max_cost(cost)

Return true if cost is greater than the policy’s max cost.

Parameters:cost (float) –
Return type:bool
max_duration

The maximum duration that a crawl is allowed to run.

Return type:float or None
met_item_limit(items)

Return true if items is greater than or equal to the policy’s max item count.

Parameters:items (int) –
Return type:bool
class starbelly.policy.PolicyMimeTypeRules(docs)

Filter responses by MIME type.

staticmethod convert_doc_to_pb(doc, pb)

Convert from database document to protobuf.

Parameters:
  • doc (dict) – Database document.
  • pb (starbelly.starbelly_pb2.PolicyMimeTypeRules) – An empty protobuf.
staticmethod convert_pb_to_doc(pb, doc)

Convert protobuf to database document.

Parameters:pb (starbelly.starbelly_pb2.PolicyMimeTypeRules) – A protobuf
Returns:Database document.
Return type:dict
should_save(mime_type)

Returns True if mime_type is approved by this policy.

If rules are valid, this method always returns True or False.

Parameters:mime_type (str) –
Return type:bool
class starbelly.policy.PolicyProxyRules(docs)

Modify which proxies are used for each request.

staticmethod convert_doc_to_pb(doc, pb)

Convert from database document to protobuf.

Parameters:
  • doc (dict) – Database document.
  • pb (starbelly.starbelly_pb2.PolicyProxyRules) – An empty protobuf.
staticmethod convert_pb_to_doc(pb, doc)

Convert protobuf to database document.

Parameters:pb (starbelly.starbelly_pb2.PolicyProxyRules) – A protobuf
Returns:Database document.
Return type:dict
get_proxy_url(target_url)

Return a proxy (type, URL) tuple associated with target_url or (None, None) if no such proxy is defined.

Parameters:target_url (str) –
Return type:tuple[proxy_type,URL]
class starbelly.policy.PolicyRobotsTxt(doc)

Designate how robots.txt affects crawl behavior.

staticmethod convert_doc_to_pb(doc, pb)

Convert from database document to protobuf.

Parameters:
  • doc (dict) – Database document.
  • pb (starbelly.starbelly_pb2.PolicyRobotsTxt) – An empty protobuf.
staticmethod convert_pb_to_doc(pb, doc)

Convert protobuf to database document.

Parameters:pb (starbelly.starbelly_pb2.PolicyRobotsTxt) – A protobuf
Returns:Database document.
Return type:dict
usage

OBEY, IGNORE, or INVERT

class starbelly.policy.PolicyValidationError

Custom error for policy validation.

class starbelly.policy.PolicyUrlNormalization(doc)

Customize URL normalization.

staticmethod convert_doc_to_pb(doc, pb)

Convert from database document to protobuf.

Parameters:
  • doc (dict) – Database document.
  • pb (starbelly.starbelly_pb2.PolicyUrlNormalization) – An empty protobuf.
staticmethod convert_pb_to_doc(pb, doc)

Convert protobuf to database document.

Parameters:pb (starbelly.starbelly_pb2.PolicyUrlNormalization) – A protobuf
Returns:Database document.
Return type:dict
normalize(url)

Normalize url according to policy.

Parameters:url (str) – The URL to be normalized.
Returns:The normalized URL.
Rtype str:
class starbelly.policy.PolicyUrlRules(docs, seeds)

Customize link priorities based on URL.

staticmethod convert_doc_to_pb(doc, pb)

Convert from database document to protobuf.

Parameters:
  • doc (dict) – Database document.
  • pb (starbelly.starbelly_pb2.PolicyUrlRules) – An empty protobuf.
staticmethod convert_pb_to_doc(pb, doc)

Convert protobuf to database document.

Parameters:pb (starbelly.starbelly_pb2.PolicyUrlRules) – A protobuf
Returns:Database document.
Return type:dict
get_cost(parent_cost, url)

Return the cost for a URL.

Parameters:
  • parent_cost (float) – The cost of the resource which yielded this URL.
  • url (str) – The URL to compute cost for.
Returns:

Cost of url.

Return type:

float

class starbelly.policy.PolicyUserAgents(docs, version)

Specify user agent string to send in HTTP requests.

staticmethod convert_doc_to_pb(doc, pb)

Convert from database document to protobuf.

Parameters:
  • doc (dict) – Database document.
  • pb (starbelly.starbelly_pb2.PolicyUserAgents) – An empty protobuf.
staticmethod convert_pb_to_doc(pb, doc)

Convert protobuf to database document.

Parameters:pb (starbelly.starbelly_pb2.PolicyUserAgents) – A protobuf
Returns:Database document.
Return type:dict
get_first_user_agent()
Returns:Return the first user agent.
Return type:str
get_user_agent()
Returns:A randomly selected user agent string.
Return type:str

Rate Limiter

The rate limiter ensures that multiple requests to the same domain are not sent too quickly. The rate limiter acts a bottle neck between the crawl manager and the downloader. The crawl manager sends items to the rate limiter, and the rate limiter forwards those items to the downloader when the appropriate amount of time has passed.

class starbelly.rate_limiter.RateLimiter(capacity)

This class is responsible for enforcing rate limits.

The rate limiter acts as a bottleneck between the multiple crawl frontiers and the downloader, enforcing the configured rate limits. Each crawl job sends download requests to the rate limiter ordered by priority of download, then the rate limiter forwards those items to the appropriate downloader when the appropriate amount of time has elapsed. This ensure that rate limits are correctly enforced even if multiple crawl jobs are accessing the same domain.

In order to provide some flexibility, rate limits are not strictly tied to individual domains. Each URL is mapped to a “rate limit token”. URLs with the same token will share the same rate limit. This system will allow more flexible rate limiting policies in the future, such as applying a single rate limit to a set of domains.

Internally, a queue is maintained for each rate limit token that has pending requests to it. When a rate limit expires for a given token, the next URL in the corresponding queue is sent to the downloader. The rate limiter has a fixed capacity. This prevents the rate limiter’s memory usage from growing without bounds and applies backpressure to the crawl jobs that are submitting download requests to the rate limiter.

add_job(job_id)

Add a job to the rate limiter. Returns a send channel that requests for this job will be sent to.

Parameters:job_id (str) – A job ID.
delete_rate_limit(token)

Remove a rate limit for a given token.

If token does not exist, then this has no effect.

Parameters:token (bytes) – A rate limit token.
get_request_channel()

Get a channel that can send requests to the rate limiter.

Return type:trio.SendChannel
get_reset_channel()

Get a channel that can send resets to the rate limiter.

Return type:trio.ReceiveChannel
item_count

The number of requests queueud inside the rate limiter.

job_count

The number of jobs tracked by the rate limiter.

await remove_job(job_id)

Remove all download requests for the given job.

Parameters:job_id (str) –
await run()

Run the rate limiter.

set_rate_limit(token, delay)

Set the rate limit for the specified token.

Parameters:
  • token (str) – The rate limit token.
  • delay (float) – The delay between subsequent requests, in seconds.

The rate limiter uses the following class to store expiry information.

class starbelly.rate_limiter.Expiry(time: float, token: bytes)

Represents a rate limit token that will expire at a given time.

Expiries can be compared to each other, e.g. expiry1 < expiry2, or to a timestamp, e.g. expiry1 < trio.current_time().

The following functions are used to determine the token to be used for a request.

starbelly.rate_limiter.get_domain_token(domain)

Get a token for a domain.

Parameters:domain (str) – The domain to generate a token for.
Returns:The token corresponding to the domain.
Return type:bytes

Resource Monitor

The resource monitor introspects various objects in the crawling pipe in order to keep track of consumption and usage of various resources, such as where items are in the crawling pipeline, CPU utilization, memory usage, etc.

class starbelly.resource_monitor.ResourceMonitor(interval, buffer_size, crawl_resources_fn, rate_limiter)

Keep track of consumption and usage statistics for various resources.

get_channel(channel_size)

Get a statistics channel. The resource monitor will send measurements to this channel until the receive end is closed. Note that if the channel is full, the resource monitor does not block! It will drop messages instead.

Parameters:channel_size (int) – The size of the channel to create.
Returns:A channel that will receive resource statistics at regular intervals.
Return type:trio.ReceiveChannel
history(n=None)

Return the most recent n measurements.

Parameters:n (int) – The number of measurements to retrieve. If n is None or there are fewer than n measurements, return all measurements.
Return type:list
await run()

Run the resource monitor.

Returns:Runs until cancelled.

Robots.txt Manager

The Robots.txt manager is responsible for deciding when to download a robots.txt file and for making enforcement decisions for robots.txt policy.

class starbelly.robots.RobotsTxtManager(db_pool, max_age=86400, max_cache=1000.0)

Store and manage robots.txt files.

await is_allowed(url, policy, downloader)

Return True if url is allowed by the applicable robots.txt file.

This fetches the applicable robots.txt if we don’t have a recent copy of it cached in memory or in the database. The policy is used if a robots.txt file needs to be fetched from the network.

Parameters:
  • url (str) – Check this URL to see if the robots.txt and accompanying policy permit access to it.
  • policy (Policy) –
  • downloader (Downloader) –
Return type:

bool

class starbelly.robots.RobotsTxt(robots_doc)

Wrapper around robots.txt parser that adds the date the file was fetched.

If the robots_file is None or cannot be parsed, then it’s treated as a highly permissive robots.txt.

is_allowed(user_agent, url)

Return True if url is allowed by this robots.txt file.

Parameters:
  • user_agent (str) – The user agent that want to access the URL.
  • url (str) – The URL that the user agent wants to access.
Return type:

bool

is_older_than(age)

Return True if this robots file is older than age.

Parameters:age (datetime) – A timezone-aware datetime.
Return type:bool

Scheduler

The scheduler is responsible for ensuring that scheduled jobs run at appropriate times.

class starbelly.schedule.Scheduler(db, crawl_manager)

Starts jobs according to a schedule.

add_schedule(schedule_doc, latest_job_doc=None)

Add a new schedule.

Parameters:
  • schedule_doc (dict) –
  • latest_job_doc (dict) –
remove_schedule(schedule_id)

Remove pending events for the specified schedule and do not schedule additional events.

It is easier to copy all non-cancelled events into a new list than to remove an element from the middle of the list.

Parameters:schedule_id (bytes) – The ID of the schedule to disable.
await run()

Listen for changes in job status and check if a job needs to be rescheduled.

Returns:This method runs until cancelled.

The following model classes are used by the Scheduler.

class starbelly.schedule.Schedule(id: str, name: str, enabled: bool, created_at: datetime.datetime, updated_at: datetime.datetime, time_unit: str, num_units: int, timing: str, job_name: str, job_count: int, seeds: list, tags: list, policy_id: str)

A schedule for running a job periodically.

format_job_name(when)

Format a name for a new job.

Parameters:when (datetime) – The datetime when the job is starting.
Returns:A formatted job name.
Return type:str
classmethod from_doc(doc)

Create a schedule from a database document.

Parameters:doc (dict) –
classmethod from_pb(pb)

Create a schedule from a protobuf object.

Parameters:pb
to_doc()

Convert schedule to database document.

Returns:A database document.
Return type:dict
to_pb(pb)

Convert schedule to a protobuf object.

Parameters:pb – A schedule protobuf.
class starbelly.schedule.ScheduleEvent(schedule, due)

An instance of one event in a schedule.

The scheduler will instantiate one event for each schedule. That event corresponds to the next time that scheduled job needs to run. When a job starts and finishes, the scheduler will check if it needs to instantiate a new event and add it to the schedule.

This class implements comparison operators to make it easy to sort events into chronological order.

due
Returns:The timestamp when this event is due.
Rtype datetime:
is_due
Returns:Indicates if this event is due.
Return type:bool
schedule
Returns:The schedule object.
Return type:Schedule
seconds_until_due
Returns:The number of seconds until this event is due.
Return type:float
class starbelly.schedule.ScheduleValidationError

Custom error for job schedule validation.

Storage

The storage component saves downloaded items into the database.

class starbelly.storage.CrawlStorage(job_id, db, send_channel, receive_channel, policy, sequence)

This class stores crawl items in the database.

await run()

Read items from channel and saves them into the database.

Returns:This function runs until cancelled.

Subscription

The API supports multiple types of subscriptions. Unlike the rest of the API, which consists of a simple request → response model, subscriptions push data to the client. Some subscriptions can be paused and resumed using a “sync token”.

class starbelly.subscription.SyncTokenError

A sync token is syntactically invalid or was used with an incompatible stream type.

class starbelly.subscription.SyncTokenInt

A sync token that stores a 64 bit integer. The first byte contains a literal 1 and the next 8 bytes contains the value.

classmethod decode(token)

Unpack a token and return the value contained in it.

Parameters:token (datetime) –
Return type:int
classmethod encode(val)

Encode a 64 bit integer value as a token.

Parameters:val (int) –
Return type:bytes

The following classes implement subscription behavior.

class starbelly.subscription.JobSyncSubscription(id_, websocket, job_id, subscription_db, compression_ok, job_state_recv, sync_token=None)

A subscription stream that allows a client to sync items from a specific crawl job.

This subscription includes a “sync token” that allows the subscription to be canceled and then resumed later. For example, if the network connection drops, the client may reconnect and resubscribe without missing any items or restarting the sync from the beginning.

cancel()

Cancel the subscription.

id_

Get this subscription’s ID.

Return type:int
await run()

Run the subscription.

Returns:This function returns when the sync is complete.
class starbelly.subscription.JobStatusSubscription(id_, websocket, stats_tracker, min_interval)

A subscription stream that emits updates about the status of all running jobs.

It only sends events when something about a job has changed.

cancel()

Cancel the subscription.

await run()

Start the subscription stream.

Returns:This function runs until cancel() is called.
class starbelly.subscription.ResourceMonitorSubscription(id_, websocket, resource_monitor, history=0)

Keep track of usage for various resources.

cancel()

Cancel the subscription.

await run()

Start the subscription stream.

Returns:This method runs until cancel is called.
class starbelly.subscription.TaskMonitorSubscription(id_, websocket, period, root_task)

Sends data showing whats kinds of tasks and how many of each are running.

cancel()

Cancel the subscription.

await run()

Start the subscription stream.

Returns:This function runs until cancel() is called.