Internals¶
This document describes the crawler’s architecture and internal structures.
Service Architecture¶
This diagram shows the components used to deploy Starbelly.
The web server serves static assets (such as the web client’s HTML, CSS, and images) and also proxies the WebSocket API to the crawler. The official deployment uses Nginx as its web server, but other servers like Apache could be used instead.
The web client is the standard client for end users to interact with the crawler. Other clients (e.g. a client you write in Python) connect over the same WebSocket API as the web client and have access to all of the same functionality. (See WebSocket API.)
The crawler itself is a single Python process that runs crawls jobs and services WebSocket API requests.
All configuration, metadata, and crawl data are stored in RethinkDB..
Asynchronous Framework¶
Starbelly is written in Python 3 using the Trio framework for most of the asynchronous I/O. Some parts of the implementation depend on asyncio libraries, which are invoked using the trio-asyncio bridge:
- The downloader uses
aiohttp
andaiosocks
, because there is not a mature Trio library for HTTP that supports SOCKS proxies. - The downloader users
aiohttp
. It could be ported toasks
but I didn’t want to mix two different HTTP clients into the same project. When the downloader is updated, this can be updated as well.
Crawling Pipeline¶
Starbelly’s crawling operations consist of a pipeline of various components. The frontier is a component that keeps track of which URLs remain to be crawled. These URLs are fed to a rate limiter which acts as a bottleneck that prevents crawling sites too fast. The downloader fetches URLs from the rate limiter and downloads them over the network. The storage component saves the downloaded items in the database. The extractor finds URLs in the downloaded items and sends them back to the frontier so those URLs can be crawled, too.
Each crawl job creates its own crawling components (frontier, downloader, storage, extractor) and they run concurrently with all other jobs. The rate limiter is shared across all jobs to ensure that rate limits are enforced correctly even when two different jobs are crawling the same domain.
This diagram depicts the crawling pipeline for two concurrent jobs: Job #1 in blue and Job #2 in green. The solid lines indicate “channels”, i.e. in-memory communication between components. The dashed lines indicate data that is written to a database table by one component and later read back by another component.
System Components¶
Starbelly doesn’t just contain crawling components: it also contains components that provide management and introspection for the crawling system. This section briefly explains each high-level component, and the subsequent sections provide low-level details for each component.
- API Server
- The API server allows clients to interact with Starbelly by sending protobuf messages over a WebSocket connection. The server uses a simple request/response model for most API calls, and also has a subscription/event model when the client wants push updates.
- CAPTCHA
- Components that deal with CAPTCHA images.
- Crawl Management
- The crawl manager is responsible for the lifecycle of crawl jobs, such as starting/stopping jobs and keeping track of resource usage.
- Downloader
- Responsible for fetching items from the network. Although this is a seemingly simple responsibility, the downloader is responsible for significant portions of the crawling policy, such as enforcing proxy policy and MIME type rules.
- Frontier
- The crawl frontier keeps track of the URLs that are pending download.
- Extractor
- Parses response bodies to discover new URLs that may be added to the crawl frontier.
- Login Manager
- Automates the process of logging in to a site to perform an authenticated crawl.
- Policy
- Controls the crawler’s decision making, for example how to handle robots.txt exclusion rules, how to prioritize URLs, how long to run the crawl, etc.
- Rate Limiter
- Acts a bottleneck between the jobs’ crawl frontiers and the jobs’ downloaders. It prevents sites from being crawled too quickly, even when multiple jobs are crawling the same domain.
- Resource Monitor
- Provides introspection into other components to track things like how many items are currently being download, how many items are queued in the rate limiter, etc.
- Robots.txt Manager
- Responsible for fetching robots.txt files as necessary, maintaining a local cache of robots.txt files, and making access control decisions, such as, “is job X allowed to access URL Y?”
- Scheduler
- Controls the crawling schedule. When a job needs to run, the scheduler will automatically start it.
- Storage
- The crawl storage saves downloaded items into the database.
- Subscription
- Handles API subscriptions such as syncing crawl data.
API Server¶
The main interaction point for Starbelly is through its WebSocket API.
-
exception
starbelly.server.
InvalidRequestException
¶ Indicates a request is invalid.
-
class
starbelly.server.
Server
(host, port, server_db, subscription_db, crawl_manager, rate_limiter, resource_monitor, stats_tracker, scheduler)¶ Handles websocket connections from clients and command dispatching.
-
await
run
(*, task_status=TASK_STATUS_IGNORED)¶ Run the websocket server.
To ensure that the server is ready, call
await nursery.start(server.run)
.Returns: Runs until cancelled.
-
await
-
starbelly.server.
api_handler
(handler)¶ This decorator registers a function as a callable command through the API server.
-
await
starbelly.server.captcha.
delete_captcha_solver
(command, server_db)¶ Delete a a CAPTCHA solver.
-
await
starbelly.server.captcha.
get_captcha_solver
(command, response, server_db)¶ Get a CAPTCHA solver.
-
await
starbelly.server.captcha.
list_captcha_solvers
(command, response, server_db)¶ Return a list of CAPTCHA solvers.
-
await
starbelly.server.captcha.
set_captcha_solver
(command, response, server_db)¶ Create or update CAPTCHA solver.
-
await
starbelly.server.job.
delete_job
(command, server_db, stats_tracker)¶ Delete a job.
-
await
starbelly.server.job.
get_job
(command, response, server_db)¶ Get status for a single job.
-
await
starbelly.server.job.
get_job_items
(command, response, server_db)¶ Get a page of items (crawl responses) from a job.
-
await
starbelly.server.job.
list_jobs
(command, response, server_db)¶ Return a list of jobs.
-
await
starbelly.server.job.
set_job
(command, crawl_manager, response)¶ Create or update job metadata.
-
await
starbelly.server.login.
delete_domain_login
(command, server_db)¶ Delete a domain login and all of its users.
-
await
starbelly.server.login.
get_domain_login
(command, response, server_db)¶ Get a domain login.
-
await
starbelly.server.login.
list_domain_logins
(command, response, server_db)¶ Return a list of domain logins.
-
await
starbelly.server.login.
set_domain_login
(command, server_db)¶ Create or update a domain login.
-
await
starbelly.server.policy.
delete_policy
(command, server_db)¶ Delete a policy.
-
await
starbelly.server.policy.
get_policy
(command, response, server_db)¶ Get a single policy.
-
await
starbelly.server.policy.
list_policies
(command, response, server_db)¶ Get a list of policies.
-
await
starbelly.server.policy.
set_policy
(command, response, server_db)¶ Create or update a single policy.
If the policy ID is set, then update the corresponding policy. Otherwise, create a new policy.
-
await
starbelly.server.rate_limit.
list_rate_limits
(command, response, server_db)¶ Get a page of rate limits.
-
await
starbelly.server.rate_limit.
set_rate_limit
(command, rate_limiter, server_db)¶ Set a rate limit.
-
await
starbelly.server.schedule.
delete_schedule
(command, scheduler, server_db)¶ Delete a job schedule.
-
await
starbelly.server.schedule.
get_schedule
(command, response, server_db)¶ Get metadata for a job schedule.
-
await
starbelly.server.schedule.
list_schedule_jobs
(command, response, server_db)¶ Return a list of job schedules.
-
await
starbelly.server.schedule.
list_schedules
(command, response, server_db)¶ Return a list of job schedules.
-
await
starbelly.server.schedule.
set_schedule
(command, response, scheduler, server_db)¶ Create or update job schedule metadata.
-
await
starbelly.server.subscription.
subscribe_job_status
(command, response, subscription_manager, stats_tracker)¶ Handle the subscribe crawl status command.
-
await
starbelly.server.subscription.
subscribe_job_sync
(command, crawl_manager, response, subscription_manager)¶ Handle the subscribe crawl items command.
-
await
starbelly.server.subscription.
subscribe_resource_monitor
(command, response, resource_monitor, subscription_manager)¶ Handle the subscribe resource monitor command.
-
await
starbelly.server.subscription.
subscribe_task_monitor
(command, response, subscription_manager)¶ Handle the subscribe task monitor command.
-
await
starbelly.server.subscription.
unsubscribe
(command, subscription_manager)¶ Handle an unsubscribe command.
-
await
starbelly.server.system.
performance_profile
(command, response)¶ Run CPU profiler.
CAPTCHA¶
Starbelly supports passing CAPTCHA images to third-party solving services.
-
class
starbelly.captcha.
CaptchaSolver
(doc)¶ An interface for a CAPTCHA solving service.
-
get_command
(img_data)¶ Return a JSON API command.
Parameters: img_data (bytes) – The image data for the CAPTCHA. Returns: A command that can be serialized to JSON. Return type: dict
-
-
starbelly.captcha.
captcha_doc_to_pb
(doc)¶ Convert CAPTCHA solver from database document to protobuf.
Parameters: doc (dict) – A database document. Returns: A protobuf message.
-
starbelly.captcha.
captcha_pb_to_doc
(pb)¶ Convert CAPTCHA solver from protobuf to database document.
Parameters: pb – A protobuf message. Returns: A database document. Return type: dict
Crawl Management¶
Crawl management is implemented by two main classes. The first class,
CrawlManager
acts as a registry and supervisor for all currently running
jobs. All interactions with jobs (starting, pausing, etc.) are done through this
class.
-
class
starbelly.job.
CrawlManager
(rate_limiter, stats_tracker, robots_txt_manager, crawl_db, frontier_db, extractor_db, storage_db, login_db)¶ Manage crawl jobs and provide introspection into their states.
-
await
cancel_job
(job_id)¶ Stop the specified job if it is currently running and set it to cancelled in the database.
Parameters: job_id (str) –
-
get_job_state_channel
(size=10)¶ Open a new job state channel.
JobStateEvent objects will be sent to this channel when a job changes state. When the channel is full, state events will not be sent, so consumers need to read events continually.
Parameters: size (int) – The size of the channel. Return type: trio.ReceiveChannel
-
get_resource_usage
()¶ Return statistics about crawler resource usage, i.e. concurrent downloads.
Rtype dict:
-
await
pause_job
(job_id)¶ Stop the specified job and set it to paused in the database.
Parameters: job_id (str) –
-
await
resume_job
(job_id)¶ Resume a paused job: load it from the database and run it.
Parameters: job_id (str) – The ID of the job to resume.
-
await
run
(*, task_status=TASK_STATUS_IGNORED)¶ Run the crawl manager.
You should call
await nursery.start(crawl_manager.run)
to ensure that the crawl manager is ready before calling any of its job methods.Returns: This function runs until cancelled.
-
await
start_job
(name, seeds, tags, policy_id, schedule_id=None)¶ Start a new job.
This adds the job to the database and runs it.
Parameters: - name (str) – The name of the new job.
- seeds (list[str]) – A list of seeds.
- tags (list[str]) – A list of tags.
- policy_id (str) – The ID of the policy to apply to this job.
- schedule_id (str) – (Optional) The schedule that started this job.
Returns: The new job ID.
Return type: str
-
await
Each job also has a single object that acts as a supervisor for all of the rest of the crawling components in that job.
-
class
starbelly.job.
CrawlJob
(name, job_id, schedule_id, policy, frontier, downloader, storage, extractor, terminator)¶ Manages job state.
-
id
¶ The job’s unique ID.
Rtype str:
-
old_urls
¶ Returns a set of hashed URLs that the crawl has seen before.
-
await
run
()¶ Start all of the subcomponents of the job.
Returns: Runs until this job finishes.
-
await
stop
()¶ Stop the crawl job.
-
Downloader¶
The downloader is responsible for fetching resources over the network and sending them back to the crawl manager.
-
class
starbelly.downloader.
DownloadRequest
(frontier_id: bytes, job_id: bytes, method: str, url: str, form_data: dict, cost: float)¶ Represents a resource that needs to be downloaded.
-
class
starbelly.downloader.
DownloadResponse
(frontier_id: bytes, cost: float, url: str, canonical_url: str, content_type: str = None, body: bytes = None, started_at: datetime.datetime = None, completed_at: datetime.datetime = None, exception: str = None, status_code: int = None, headers: dict = None)¶ Represents the result of downloading a resource, which could contain a successful response body, an HTTP error, or an exception.
-
classmethod
from_request
(request)¶ Initialize a response from its corresponding request.
Parameters: request (DownloadRequest) – The request that generated this response.
-
set_exception
(exception)¶ Indicate that an exception occurred while downloading this resource.
Parameters: exception (str) – Traceback of the exception.
-
set_response
(http_response, body)¶ Update state from HTTP response.
-
start
()¶ Called when the request has been sent and the response is being waited for.
-
classmethod
-
class
starbelly.downloader.
Downloader
(job_id, policy, send_channel, recv_channel, semaphore, rate_limiter_reset, stats)¶ This class is responsible for downloading resources. A new instance is created for each crawl job.
-
count
¶ Return number of current downloads in progress.
Rtype int:
-
await
download
(request, skip_mime=False)¶ Download a requested resource and return it.
Note: this is probably not the method you want! Most downloads should be sent through the request channel. This method is only for unusual cases where we want to download one item and return the response directly to the caller, such as a robot.txt or a login page.
These responses are not included in job statistics and do not get stored in the database. The caller should apply their own timeout here.
Parameters: - request (DownloadRequest) –
- skip_mime (bool) – If True, the MIME type will not be checked against the policy.
Rtype DownloadResponse:
-
await
run
()¶ Run the downloader, including all concurrent download tasks. When cancelled, all download tasks are also cancelled.
Returns: Runs until cancelled.
-
-
class
starbelly.downloader.
MimeNotAllowedError
(mime)¶ Indicates that the MIME type of a response is not allowed by policy.
Extractor¶
The extractor component extracts URLs from download resources and adds those URLs that comply with the policy to the fontier.
-
class
starbelly.extractor.
CrawlExtractor
(job_id, db, send_channel, receive_channel, policy, downloader, robots_txt_manager, old_urls, stats, batch_size=100)¶ Extract URLs from crawled items and add them to the frontier table.
-
await
run
()¶ Read responses from extraction channel and add them to the frontier.
Returns: This function runs until cancelled.
-
await
Frontier¶
The frontier component keeps track of which URLs are pending to be crawled. The frontier issues download requests ordered by “crawl cost”, i.e. pending items with lower cost values are issued first.
-
class
starbelly.frontier.
CrawlFrontier
(job_id, db, send_channel, login_manager, policy, stats)¶ Contains the logic for managing a crawl frontier, i.e. the URLs that have already been crawled and the URLs that are remaining to be crawled.
-
await
run
()¶ This task takes items off the frontier and sends them to the rate limiter.
Returns: This function runs until cancelled.
-
await
Login Manager¶
The login manager uses the Formasaurus library to find a login form on a page, fill it out, and submit it to get session cookies for authenticated crawling. Under the hood, Formasaurus uses a pre-built machine learning model (a conditional random field) to classify types of forms on a page and to classify the types of inputs in each form. If necessary, the login manager will also request a CAPTCHA solution from a CAPTCHA solver.
Policy¶
Policy objects guide the crawler’s decision making, i.e. which links to follow, which resources to download, when to use a proxy, etc. The policy manager is responsible for saving and loading policies from the database.
A policy object is a container that includes many various subpolicies.
-
class
starbelly.policy.
Policy
(doc, version, seeds)¶ A container for subpolicies.
-
staticmethod
convert_doc_to_pb
(doc, pb)¶ Convert policy from database document to protobuf.
Parameters: - doc (dict) – Database document.
- pb (starbelly.starbelly_pb2.Policy) – An empty protobuf.
-
staticmethod
convert_pb_to_doc
(pb)¶ Convert protobuf to database document.
Parameters: pb (starbelly.starbelly_pb2.Policy.) – A protobuf Returns: Database document. Return type: dict
-
staticmethod
-
class
starbelly.policy.
PolicyAuthentication
(doc)¶ Policy for authenticated crawling.
-
staticmethod
convert_doc_to_pb
(doc, pb)¶ Convert from database document to protobuf.
Parameters: - doc (dict) – Database document.
- pb (starbelly.starbelly_pb2.PolicyAuthentication) – An empty protobuf.
-
staticmethod
convert_pb_to_doc
(pb, doc)¶ Convert protobuf to database document.
Parameters: pb (starbelly.starbelly_pb2.PolicyAuthentication) – A protobuf Returns: Database document. Return type: dict
-
is_enabled
()¶ Return True if authentication is enabled.
Return type: bool
-
staticmethod
-
class
starbelly.policy.
PolicyLimits
(doc)¶ Limits on crawl size/duration.
-
staticmethod
convert_doc_to_pb
(doc, pb)¶ Convert from database document to protobuf.
Parameters: - doc (dict) – Database document.
- pb (starbelly.starbelly_pb2.PolicyLimits) – An empty protobuf.
-
staticmethod
convert_pb_to_doc
(pb, doc)¶ Convert protobuf to database document.
Parameters: pb (starbelly.starbelly_pb2.PolicyLimits) – A protobuf Returns: Database document. Return type: dict
-
exceeds_max_cost
(cost)¶ Return true if
cost
is greater than the policy’s max cost.Parameters: cost (float) – Return type: bool
-
max_duration
¶ The maximum duration that a crawl is allowed to run.
Return type: float or None
-
met_item_limit
(items)¶ Return true if
items
is greater than or equal to the policy’s max item count.Parameters: items (int) – Return type: bool
-
staticmethod
-
class
starbelly.policy.
PolicyMimeTypeRules
(docs)¶ Filter responses by MIME type.
-
staticmethod
convert_doc_to_pb
(doc, pb)¶ Convert from database document to protobuf.
Parameters: - doc (dict) – Database document.
- pb (starbelly.starbelly_pb2.PolicyMimeTypeRules) – An empty protobuf.
-
staticmethod
convert_pb_to_doc
(pb, doc)¶ Convert protobuf to database document.
Parameters: pb (starbelly.starbelly_pb2.PolicyMimeTypeRules) – A protobuf Returns: Database document. Return type: dict
-
should_save
(mime_type)¶ Returns True if
mime_type
is approved by this policy.If rules are valid, this method always returns True or False.
Parameters: mime_type (str) – Return type: bool
-
staticmethod
-
class
starbelly.policy.
PolicyProxyRules
(docs)¶ Modify which proxies are used for each request.
-
staticmethod
convert_doc_to_pb
(doc, pb)¶ Convert from database document to protobuf.
Parameters: - doc (dict) – Database document.
- pb (starbelly.starbelly_pb2.PolicyProxyRules) – An empty protobuf.
-
staticmethod
convert_pb_to_doc
(pb, doc)¶ Convert protobuf to database document.
Parameters: pb (starbelly.starbelly_pb2.PolicyProxyRules) – A protobuf Returns: Database document. Return type: dict
-
get_proxy_url
(target_url)¶ Return a proxy (type, URL) tuple associated with
target_url
or (None, None) if no such proxy is defined.Parameters: target_url (str) – Return type: tuple[proxy_type,URL]
-
staticmethod
-
class
starbelly.policy.
PolicyRobotsTxt
(doc)¶ Designate how robots.txt affects crawl behavior.
-
staticmethod
convert_doc_to_pb
(doc, pb)¶ Convert from database document to protobuf.
Parameters: - doc (dict) – Database document.
- pb (starbelly.starbelly_pb2.PolicyRobotsTxt) – An empty protobuf.
-
staticmethod
convert_pb_to_doc
(pb, doc)¶ Convert protobuf to database document.
Parameters: pb (starbelly.starbelly_pb2.PolicyRobotsTxt) – A protobuf Returns: Database document. Return type: dict
-
usage
¶ OBEY, IGNORE, or INVERT
-
staticmethod
-
class
starbelly.policy.
PolicyValidationError
¶ Custom error for policy validation.
-
class
starbelly.policy.
PolicyUrlNormalization
(doc)¶ Customize URL normalization.
-
staticmethod
convert_doc_to_pb
(doc, pb)¶ Convert from database document to protobuf.
Parameters: - doc (dict) – Database document.
- pb (starbelly.starbelly_pb2.PolicyUrlNormalization) – An empty protobuf.
-
staticmethod
convert_pb_to_doc
(pb, doc)¶ Convert protobuf to database document.
Parameters: pb (starbelly.starbelly_pb2.PolicyUrlNormalization) – A protobuf Returns: Database document. Return type: dict
-
normalize
(url)¶ Normalize
url
according to policy.Parameters: url (str) – The URL to be normalized. Returns: The normalized URL. Rtype str:
-
staticmethod
-
class
starbelly.policy.
PolicyUrlRules
(docs, seeds)¶ Customize link priorities based on URL.
-
staticmethod
convert_doc_to_pb
(doc, pb)¶ Convert from database document to protobuf.
Parameters: - doc (dict) – Database document.
- pb (starbelly.starbelly_pb2.PolicyUrlRules) – An empty protobuf.
-
staticmethod
convert_pb_to_doc
(pb, doc)¶ Convert protobuf to database document.
Parameters: pb (starbelly.starbelly_pb2.PolicyUrlRules) – A protobuf Returns: Database document. Return type: dict
-
get_cost
(parent_cost, url)¶ Return the cost for a URL.
Parameters: - parent_cost (float) – The cost of the resource which yielded this URL.
- url (str) – The URL to compute cost for.
Returns: Cost of
url
.Return type: float
-
staticmethod
-
class
starbelly.policy.
PolicyUserAgents
(docs, version)¶ Specify user agent string to send in HTTP requests.
-
staticmethod
convert_doc_to_pb
(doc, pb)¶ Convert from database document to protobuf.
Parameters: - doc (dict) – Database document.
- pb (starbelly.starbelly_pb2.PolicyUserAgents) – An empty protobuf.
-
staticmethod
convert_pb_to_doc
(pb, doc)¶ Convert protobuf to database document.
Parameters: pb (starbelly.starbelly_pb2.PolicyUserAgents) – A protobuf Returns: Database document. Return type: dict
-
get_first_user_agent
()¶ Returns: Return the first user agent. Return type: str
-
get_user_agent
()¶ Returns: A randomly selected user agent string. Return type: str
-
staticmethod
Rate Limiter¶
The rate limiter ensures that multiple requests to the same domain are not sent too quickly. The rate limiter acts a bottle neck between the crawl manager and the downloader. The crawl manager sends items to the rate limiter, and the rate limiter forwards those items to the downloader when the appropriate amount of time has passed.
-
class
starbelly.rate_limiter.
RateLimiter
(capacity)¶ This class is responsible for enforcing rate limits.
The rate limiter acts as a bottleneck between the multiple crawl frontiers and the downloader, enforcing the configured rate limits. Each crawl job sends download requests to the rate limiter ordered by priority of download, then the rate limiter forwards those items to the appropriate downloader when the appropriate amount of time has elapsed. This ensure that rate limits are correctly enforced even if multiple crawl jobs are accessing the same domain.
In order to provide some flexibility, rate limits are not strictly tied to individual domains. Each URL is mapped to a “rate limit token”. URLs with the same token will share the same rate limit. This system will allow more flexible rate limiting policies in the future, such as applying a single rate limit to a set of domains.
Internally, a queue is maintained for each rate limit token that has pending requests to it. When a rate limit expires for a given token, the next URL in the corresponding queue is sent to the downloader. The rate limiter has a fixed capacity. This prevents the rate limiter’s memory usage from growing without bounds and applies backpressure to the crawl jobs that are submitting download requests to the rate limiter.
-
add_job
(job_id)¶ Add a job to the rate limiter. Returns a send channel that requests for this job will be sent to.
Parameters: job_id (str) – A job ID.
-
delete_rate_limit
(token)¶ Remove a rate limit for a given token.
If
token
does not exist, then this has no effect.Parameters: token (bytes) – A rate limit token.
-
get_request_channel
()¶ Get a channel that can send requests to the rate limiter.
Return type: trio.SendChannel
-
get_reset_channel
()¶ Get a channel that can send resets to the rate limiter.
Return type: trio.ReceiveChannel
-
item_count
¶ The number of requests queueud inside the rate limiter.
-
job_count
¶ The number of jobs tracked by the rate limiter.
-
await
remove_job
(job_id)¶ Remove all download requests for the given job.
Parameters: job_id (str) –
-
await
run
()¶ Run the rate limiter.
-
set_rate_limit
(token, delay)¶ Set the rate limit for the specified token.
Parameters: - token (str) – The rate limit token.
- delay (float) – The delay between subsequent requests, in seconds.
-
The rate limiter uses the following class to store expiry information.
-
class
starbelly.rate_limiter.
Expiry
(time: float, token: bytes)¶ Represents a rate limit token that will expire at a given time.
Expiries can be compared to each other, e.g.
expiry1 < expiry2
, or to a timestamp, e.g.expiry1 < trio.current_time()
.
The following functions are used to determine the token to be used for a request.
-
starbelly.rate_limiter.
get_domain_token
(domain)¶ Get a token for a domain.
Parameters: domain (str) – The domain to generate a token for. Returns: The token corresponding to the domain. Return type: bytes
Resource Monitor¶
The resource monitor introspects various objects in the crawling pipe in order to keep track of consumption and usage of various resources, such as where items are in the crawling pipeline, CPU utilization, memory usage, etc.
-
class
starbelly.resource_monitor.
ResourceMonitor
(interval, buffer_size, crawl_resources_fn, rate_limiter)¶ Keep track of consumption and usage statistics for various resources.
-
get_channel
(channel_size)¶ Get a statistics channel. The resource monitor will send measurements to this channel until the receive end is closed. Note that if the channel is full, the resource monitor does not block! It will drop messages instead.
Parameters: channel_size (int) – The size of the channel to create. Returns: A channel that will receive resource statistics at regular intervals. Return type: trio.ReceiveChannel
-
history
(n=None)¶ Return the most recent
n
measurements.Parameters: n (int) – The number of measurements to retrieve. If n
is None or there are fewer thann
measurements, return all measurements.Return type: list
-
await
run
()¶ Run the resource monitor.
Returns: Runs until cancelled.
-
Robots.txt Manager¶
The Robots.txt manager is responsible for deciding when to download a robots.txt file and for making enforcement decisions for robots.txt policy.
-
class
starbelly.robots.
RobotsTxtManager
(db_pool, max_age=86400, max_cache=1000.0)¶ Store and manage robots.txt files.
-
await
is_allowed
(url, policy, downloader)¶ Return True if
url
is allowed by the applicable robots.txt file.This fetches the applicable robots.txt if we don’t have a recent copy of it cached in memory or in the database. The
policy
is used if a robots.txt file needs to be fetched from the network.Parameters: - url (str) – Check this URL to see if the robots.txt and accompanying policy permit access to it.
- policy (Policy) –
- downloader (Downloader) –
Return type: bool
-
await
-
class
starbelly.robots.
RobotsTxt
(robots_doc)¶ Wrapper around robots.txt parser that adds the date the file was fetched.
If the
robots_file
is None or cannot be parsed, then it’s treated as a highly permissive robots.txt.-
is_allowed
(user_agent, url)¶ Return True if
url
is allowed by this robots.txt file.Parameters: - user_agent (str) – The user agent that want to access the URL.
- url (str) – The URL that the user agent wants to access.
Return type: bool
-
is_older_than
(age)¶ Return True if this robots file is older than
age
.Parameters: age (datetime) – A timezone-aware datetime. Return type: bool
-
Scheduler¶
The scheduler is responsible for ensuring that scheduled jobs run at appropriate times.
-
class
starbelly.schedule.
Scheduler
(db, crawl_manager)¶ Starts jobs according to a schedule.
-
add_schedule
(schedule_doc, latest_job_doc=None)¶ Add a new schedule.
Parameters: - schedule_doc (dict) –
- latest_job_doc (dict) –
-
remove_schedule
(schedule_id)¶ Remove pending events for the specified schedule and do not schedule additional events.
It is easier to copy all non-cancelled events into a new list than to remove an element from the middle of the list.
Parameters: schedule_id (bytes) – The ID of the schedule to disable.
-
await
run
()¶ Listen for changes in job status and check if a job needs to be rescheduled.
Returns: This method runs until cancelled.
-
The following model classes are used by the Scheduler.
-
class
starbelly.schedule.
Schedule
(id: str, name: str, enabled: bool, created_at: datetime.datetime, updated_at: datetime.datetime, time_unit: str, num_units: int, timing: str, job_name: str, job_count: int, seeds: list, tags: list, policy_id: str)¶ A schedule for running a job periodically.
-
format_job_name
(when)¶ Format a name for a new job.
Parameters: when (datetime) – The datetime when the job is starting. Returns: A formatted job name. Return type: str
-
classmethod
from_doc
(doc)¶ Create a schedule from a database document.
Parameters: doc (dict) –
-
classmethod
from_pb
(pb)¶ Create a schedule from a protobuf object.
Parameters: pb –
-
to_doc
()¶ Convert schedule to database document.
Returns: A database document. Return type: dict
-
to_pb
(pb)¶ Convert schedule to a protobuf object.
Parameters: pb – A schedule protobuf.
-
-
class
starbelly.schedule.
ScheduleEvent
(schedule, due)¶ An instance of one event in a schedule.
The scheduler will instantiate one event for each schedule. That event corresponds to the next time that scheduled job needs to run. When a job starts and finishes, the scheduler will check if it needs to instantiate a new event and add it to the schedule.
This class implements comparison operators to make it easy to sort events into chronological order.
-
due
¶ Returns: The timestamp when this event is due. Rtype datetime:
-
is_due
¶ Returns: Indicates if this event is due. Return type: bool
-
seconds_until_due
¶ Returns: The number of seconds until this event is due. Return type: float
-
-
class
starbelly.schedule.
ScheduleValidationError
¶ Custom error for job schedule validation.
Storage¶
The storage component saves downloaded items into the database.
Subscription¶
The API supports multiple types of subscriptions. Unlike the rest of the API, which consists of a simple request → response model, subscriptions push data to the client. Some subscriptions can be paused and resumed using a “sync token”.
-
class
starbelly.subscription.
SyncTokenError
¶ A sync token is syntactically invalid or was used with an incompatible stream type.
-
class
starbelly.subscription.
SyncTokenInt
¶ A sync token that stores a 64 bit integer. The first byte contains a literal 1 and the next 8 bytes contains the value.
-
classmethod
decode
(token)¶ Unpack a token and return the value contained in it.
Parameters: token (datetime) – Return type: int
-
classmethod
encode
(val)¶ Encode a 64 bit integer value as a token.
Parameters: val (int) – Return type: bytes
-
classmethod
The following classes implement subscription behavior.
-
class
starbelly.subscription.
JobSyncSubscription
(id_, websocket, job_id, subscription_db, compression_ok, job_state_recv, sync_token=None)¶ A subscription stream that allows a client to sync items from a specific crawl job.
This subscription includes a “sync token” that allows the subscription to be canceled and then resumed later. For example, if the network connection drops, the client may reconnect and resubscribe without missing any items or restarting the sync from the beginning.
-
cancel
()¶ Cancel the subscription.
-
id_
¶ Get this subscription’s ID.
Return type: int
-
await
run
()¶ Run the subscription.
Returns: This function returns when the sync is complete.
-
-
class
starbelly.subscription.
JobStatusSubscription
(id_, websocket, stats_tracker, min_interval)¶ A subscription stream that emits updates about the status of all running jobs.
It only sends events when something about a job has changed.
-
cancel
()¶ Cancel the subscription.
-
await
run
()¶ Start the subscription stream.
Returns: This function runs until cancel()
is called.
-