Introduction:
In today's digital age, email remains a pivotal communication tool for both personal and professional purposes. The ability to gather email addresses from various sources can be immensely valuable for marketing campaigns, networking, or research purposes. Python, with its rich ecosystem of libraries, offers powerful tools for web scraping, making it an ideal choice for extracting email addresses from web pages.
Scope:
In this blog post, we'll delve into the process of scraping email addresses from web pages using Python. We'll walk through each step of the process, from fetching web pages to extracting email addresses. We'll use a combination of libraries such as requests
, BeautifulSoup
, requests_html
, colorama
and re
to accomplish our task. Additionally, we'll explore threading to parallelize the scraping process for better efficiency.
Requirements:
To follow along with the examples in this blog post, you'll need to have the following libraries installed in your Python environment. You can install them using pip and the provided requirements.txt file:
requirements.txt file
requests_html
bs4
colorama
request
BeautifulSoup
User python3 -m pip install -r requirements.txt
to install the dependencies.
Explanation of Source Code:
The provided source code comprises several components:
- Crawler Class: Responsible for fetching web pages and extracting links recursively.
- EmailSpider Class: Extracts email addresses from web pages.
- Helper Functions: Includes functions to validate email addresses and URLs.
- Main Script: Combines the above components to initiate the crawling and email extraction process.
Step-by-Step Guide:
- Initialization: Import necessary libraries and initialize settings.
- Crawling Web Pages: The Crawler class fetches web pages and extracts links.
- Email Extraction: The EmailSpider class extracts email addresses from the fetched pages.
- Threading: Utilize threading to parallelize the crawling and email extraction process for improved efficiency.
- Output: Save extracted email addresses to a file.
Let's go through each line of the provided source code and explain its functionality:
import re # Importing the regular expression module for pattern matching
import argparse # Importing the argparse module for parsing command-line arguments
import threading # Importing the threading module for parallel execution
from urllib.parse import urlparse, urljoin # Importing functions for URL manipulation
from queue import Queue # Importing the Queue class for managing tasks
import time # Importing time module for delays and tracking execution time
import warnings # Importing warnings module to suppress unnecessary warnings
warnings.filterwarnings("ignore") # Ignoring warnings for cleaner output
import requests # Importing the requests module for making HTTP requests
from bs4 import BeautifulSoup # Importing BeautifulSoup for HTML parsing
import colorama # Importing colorama for colored terminal output
colorama.init() # Initializing colorama for colored terminal output
GREEN = colorama.Fore.GREEN # Defining green color for terminal output
GRAY = colorama.Fore.LIGHTBLACK_EX # Defining gray color for terminal output
RESET = colorama.Fore.RESET # Defining reset color for terminal output
YELLOW = colorama.Fore.YELLOW # Defining yellow color for terminal output
RED = colorama.Fore.RED # Defining red color for terminal output
EMAIL_REGEX = r"""(?:[a-z0-9!#$%&'*+=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f]){2,12})\])"""
# Regular expression for matching email addresses
FORBIDDEN_TLDS = ["js", "css", "jpg", "png", "svg", "webp", "gz", "zip", "webm", "mp3",
"wav", "mp4", "gif", "tar", "gz", "rar", "gzip", "tgz"]
# List of forbidden top-level domains
FORBIDDEN_EXTENSIONS = ["js", "css", "jpg", "png", "svg", "webp", "gz", "zip", "webm", "mp3",
"wav", "mp4", "gif", "tar", "gz", "rar", "gzip", "tgz"]
# List of forbidden file extensions
print_lock = threading.Lock() # Lock for thread-safe printing to console
file_lock = threading.Lock() # Lock for thread-safe file operations
def is_valid_email_address(email):
"""Verify whether `email` is a valid email address"""
for forbidden_tld in FORBIDDEN_TLDS:
if email.endswith(forbidden_tld):
return False # Return False if email ends with forbidden TLDs
if re.search(r"\..{1}$", email):
return False # Return False if TLD has a length of 1
elif re.search(r"\..*\d+.*$", email):
return False # Return False if TLD contains numbers
return True # Return True otherwise
def is_valid_url(url):
"""Checks whether `url` is a valid URL."""
parsed = urlparse(url)
return bool(parsed.netloc) and bool(parsed.scheme) # Return True if URL has both scheme and netloc
def is_text_url(url):
"""Returns False if the URL has forbidden extensions."""
for extension in FORBIDDEN_EXTENSIONS:
if url.endswith(extension):
return False # Return False if URL ends with forbidden extension
return True # Return True otherwise
class Crawler(threading.Thread):
def __init__(self, first_url, delay, crawl_external_urls=False, max_crawl_urls=30):
super().__init__()
self.first_url = first_url # Starting URL for crawling
self.delay = delay # Delay between requests
self.crawl_external_urls = crawl_external_urls # Flag to crawl external URLs
self.max_crawl_urls = max_crawl_urls # Maximum number of URLs to crawl
self.visited_urls = {} # Dictionary to store visited URLs and their HTML content
self.domain_name = urlparse(self.first_url).netloc # Domain name of the base URL
self.internal_urls = set() # Set to store internal URLs
self.external_urls = set() # Set to store external URLs
self.urls_queue = Queue() # Queue to manage URLs to crawl
self.urls_queue.put(self.first_url) # Add the starting URL to the queue
self.total_urls_visited = 0 # Counter for total URLs visited
def get_all_website_links(self, url):
"""Returns all URLs found on `url` belonging to the same website"""
urls = set()
res = requests.get(url, verify=False, timeout=10) # Make HTTP request
soup = BeautifulSoup(res.text, "html.parser") # Parse HTML content
self.visited_urls[url] = res.text # Store visited URL and its HTML content
for a_tag in soup.findAll("a"): # Find all <a> tags in HTML
href = a_tag.attrs.get("href") # Get href attribute of <a> tag
if href == "" or href is None:
continue # Skip if href is empty or None
href = urljoin(url, href) # Join URL if it's relative
parsed_href = urlparse(href) # Parse joined URL
href = parsed_href.scheme + "://" + parsed_href.netloc + parsed_href.path # Remove URL parameters
if not is_valid_url(href):
continue # Skip if URL is not valid
if href in self.internal_urls:
continue # Skip if URL is already in internal URLs set
if self.domain_name not in href:
if href not in self.external_urls:
self.external_urls.add(href) # Add external URL to set
if self.crawl_external_urls:
self.urls_queue.put(href) # Add external URL to queue if crawling external URLs is enabled
continue
urls.add(href) # Add internal URL to set
self.urls_queue.put(href) # Add internal URL to queue
self.internal_urls.add(href) # Add internal URL to internal URLs set
return urls # Return set of URLs
def crawl(self, url):
"""Crawls a web page and extracts all links."""
if not is_text_url(url):
return # Skip if URL is not a text file
self.total_urls_visited += 1 # Increment total URLs visited counter
with print_lock:
print(f"{YELLOW}[*] Crawling: {url}{RESET}") # Print crawling message
links = self.get_all_website_links(url) # Get all links from the URL
for link in links:
if self.total_urls_visited > self.max_crawl_urls:
break # Break if maximum URLs limit is reached
self.crawl(link) # Recursively crawl each link
time.sleep(self.delay) # Add delay between requests
def run(self):
self.crawl(self.first_url) # Start crawling from the first URL
class EmailSpider:
def __init__(self, crawler: Crawler, n_threads=20, output_file="extracted-emails.txt"):
self.crawler = crawler # Initialize Crawler instance
self.extracted_emails = set() # Set to store extracted email addresses
self.n_threads = n_threads # Number of threads for email extraction
self.output_file = output_file # Output file to store extracted email addresses
def get_emails_from_url(self, url):
"""Extracts email addresses from the given URL."""
if not is_text_url(url):
return set() # Return empty set if URL is not a text file
if url not in self.crawler.visited_urls:
try:
r = requests.get(url, verify=False, timeout=10) # Make HTTP request if URL is not visited
except Exception as e:
return set() # Return empty set if an exception occurs
else:
text = r.text
else:
text = self.crawler.visited_urls[url] # Use cached HTML content if URL is visited
emails = set() # Set to store extracted email addresses
try:
for re_match in re.finditer(EMAIL_REGEX, text): # Find all email addresses using regex
email = re_match.group()
if is_valid_email_address(email): # Check if email address is valid
emails.add(email) # Add valid email address to set
except Exception as e:
return set() # Return empty set if an exception occurs
return emails # Return set of email addresses
def scan_urls(self):
while True:
url = self.crawler.urls_queue.get() # Get URL from the queue
emails = self.get_emails_from_url(url) # Extract email addresses from the URL
for email in emails:
with print_lock:
print("[+] Got email:", email, "from url:", url) # Print extracted email address
if email not in self.extracted_emails:
with file_lock:
with open(self.output_file, "a") as f:
print(email, file=f) # Write extracted email address to file
self.extracted_emails.add(email) # Add email address to set
self.crawler.urls_queue.task_done() # Mark URL task as done
def run(self):
for t in range(self.n_threads):
t = threading.Thread(target=self.scan_urls) # Create thread for email extraction
t.daemon = True # Set thread as daemon
t.start() # Start thread
self.crawler.urls_queue.join() # Wait for all URL tasks to be done
def track_stats(crawler: Crawler):
"""Prints statistics about the crawler and active threads."""
while is_running:
with print_lock:
print(f"{RED}[+] Queue size: {crawler.urls_queue.qsize()}{RESET}") # Print queue size
print(f"{GRAY}[+] Total Extracted External links: {len(crawler.external_urls)}{RESET}") # Print total external links
print(f"{GREEN}[+] Total Extracted Internal links: {len(crawler.internal_urls)}{RESET}") # Print total internal links
print(f"[*] Total threads running: {threading.active_count()}") # Print total active threads
time.sleep(5) # Add delay for stats update
def start_stats_tracker(crawler: Crawler):
"""Starts the statistics tracker in a separate thread."""
t = threading.Thread(target=track_stats, args=(crawler,))
t.daemon = True
t.start()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Advanced Email Spider")
parser.add_argument("url", help="URL to start crawling from & extracting email addresses")
parser.add_argument("-m", "--max-crawl-urls", help="The maximum number of URLs to crawl, default is 30.",
type=int, default=30)
parser.add_argument("-t", "--num-threads", help="The number of threads that runs extracting emails"
"from individual pages. Default is 10",
type=int, default=10)
parser.add_argument("--crawl-external-urls", help="Whether to crawl external URLs that the domain specified",
action="store_true")
parser.add_argument("--crawl-delay", help="The crawl delay in seconds, useful for not overloading web servers",
type=float, default=0.01)
args = parser.parse_args() # Parse command-line arguments
url = args.url
is_running = True # Flag to indicate whether the program is still running
crawler = Crawler(url, max_crawl_urls=args.max_crawl_urls, delay=args.crawl_delay,
crawl_external_urls=args.crawl_external_urls) # Initialize crawler
crawler.start() # Start crawling
time.sleep(5) # Wait for queue to fill up
start_stats_tracker(crawler) # Start statistics tracker
email_spider = EmailSpider(crawler, n_threads=args.num_threads) # Initialize email spider
email_spider.run() # Run email spider
is_running = False # Update flag to indicate program is no longer running
Checkout few screens here:
Conclusion:
Scraping email addresses from web pages using Python can be a powerful technique for various purposes. However, it's essential to respect website policies and adhere to legal and ethical guidelines while scraping. By following the steps outlined in this blog post and understanding the provided source code, you can efficiently gather email addresses from web pages using Python.
Hope you find this helpful!!! Do comment on this!
Exploring Email Scraping with Python