Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 101 additions & 41 deletions substack_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
from selenium.webdriver.edge.options import Options as EdgeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.common.exceptions import SessionNotCreatedException, WebDriverException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import SessionNotCreatedException, TimeoutException, WebDriverException

from config import EMAIL, PASSWORD

Expand Down Expand Up @@ -881,68 +883,100 @@ def get_filename_from_url(url: str, filetype: str = ".md") -> str:
return url.split("/")[-1] + filetype

@staticmethod
def combine_metadata_and_content(title: str, subtitle: str, date: str, like_count: str, content) -> str:
"""Combines the title, subtitle, and content into a single string with Markdown format."""
def combine_metadata_and_content(title: str, subtitle: str, date: str, author: str, cover_image: str, content) -> str:
"""Combines the title, subtitle, and content into a single string with MDX frontmatter."""
if not isinstance(title, str):
raise ValueError("title must be a string")
if not isinstance(content, str):
raise ValueError("content must be a string")

metadata = f"# {title}\n\n"
if subtitle:
metadata += f"## {subtitle}\n\n"
metadata += f"**{date}**\n\n"
metadata += f"**Likes:** {like_count}\n\n"
safe_title = title.replace('"', '\\"')
safe_subtitle = subtitle.replace('"', '\\"') if subtitle else ""
safe_author = author.replace('"', '\\"') if author else ""

return metadata + content
frontmatter = '---\n'
frontmatter += f'title: "{safe_title}"\n'
if safe_subtitle:
frontmatter += f'subtitle: "{safe_subtitle}"\n'
frontmatter += f'date: "{date}"\n'
frontmatter += f'author: "{safe_author}"\n'
if cover_image:
frontmatter += f'image: "{cover_image}"\n'
frontmatter += '---\n\n'

def extract_post_data(self, soup: BeautifulSoup) -> Tuple[str, str, str, str, str]:
"""Converts a Substack post soup to markdown, returning metadata and content."""
return frontmatter + content

def extract_post_data(self, soup: BeautifulSoup, url: str = "") -> Tuple[str, str, str, str, str, str]:
"""Converts a Substack post soup to markdown, returning (title, subtitle, author, date, cover_image, md_content)."""
# Title
title_element = soup.select_one("h1.post-title, h2")
title = title_element.text.strip() if title_element else "Untitled"
title_found = title_element is not None

# Subtitle
subtitle_element = soup.select_one("h3.subtitle, div.subtitle-HEEcLo")
subtitle = subtitle_element.text.strip() if subtitle_element else ""

# Date
# Date, Author, and Cover Image from ld+json (most reliable source)
date = ""
date_element = soup.select_one("div.meta-EgzBVA")
if date_element and date_element.text.strip():
date = date_element.text.strip()

if not date:
script_tag = soup.find("script", {"type": "application/ld+json"})
if script_tag and script_tag.string:
try:
metadata = json.loads(script_tag.string)
if "datePublished" in metadata:
date_str = metadata["datePublished"]
date_obj = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
date = date_obj.strftime("%b %d, %Y")
except (json.JSONDecodeError, ValueError, KeyError):
pass
author = ""
cover_image = ""
script_tag = soup.find("script", {"type": "application/ld+json"})
if script_tag and script_tag.string:
try:
ld_json = json.loads(script_tag.string)
if "datePublished" in ld_json:
date_str = ld_json["datePublished"]
date_obj = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
date = date_obj.strftime("%Y-%m-%d")
if "author" in ld_json:
authors = ld_json["author"]
if isinstance(authors, list) and authors:
author = authors[0].get("name", "")
elif isinstance(authors, dict):
author = authors.get("name", "")
if "image" in ld_json:
images = ld_json["image"]
if isinstance(images, list) and images:
img = images[0]
cover_image = img.get("url", "") if isinstance(img, dict) else str(img)
elif isinstance(images, dict):
cover_image = images.get("url", "")
except (json.JSONDecodeError, ValueError, KeyError):
pass

if not date:
date = "Date not found"

# Like count
like_count_element = soup.select_one('div.like-button-container button div.label')
like_count = (
like_count_element.text.strip()
if like_count_element and like_count_element.text.strip().isdigit()
else "0"
)

# Content
content_element = soup.select_one("div.available-content")
content_html = str(content_element) if content_element else ""
md = self.html_to_md(content_html)

md_content = self.combine_metadata_and_content(title, subtitle, date, like_count, md)
# Diagnostic: detect extraction failure (missing title or empty content) and dump page
if not title_found or not content_element:
paywall = soup.select_one("h2.paywall-title")
ld_script = soup.find("script", {"type": "application/ld+json"})
print(f"[EXTRACT FAIL] url={url}")
print(f" title_found={title_found} title={title!r}")
print(f" content_element_found={content_element is not None}")
print(f" paywall_present={paywall is not None}")
print(f" ld_json_present={ld_script is not None}")
print(f" date={date!r} author={author!r}")
try:
debug_dir = os.path.join(os.path.dirname(self.md_save_dir), "_debug", self.writer_name)
os.makedirs(debug_dir, exist_ok=True)
slug = (get_post_slug(url) if url and is_post_url(url) else (url.rstrip('/').split('/')[-1] or "unknown"))
debug_path = os.path.join(debug_dir, f"{slug}.html")
with open(debug_path, "w", encoding="utf-8") as f:
f.write(str(soup))
print(f" dumped raw HTML -> {debug_path}")
except Exception as dump_err:
print(f" failed to dump debug HTML: {dump_err}")

md_content = self.combine_metadata_and_content(title, subtitle, date, author, cover_image, md)

return title, subtitle, like_count, date, md_content
return title, subtitle, author, date, cover_image, md_content

@abstractmethod
def get_url_soup(self, url: str) -> str:
Expand Down Expand Up @@ -983,7 +1017,17 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None:
pbar.refresh()
continue

title, subtitle, like_count, date, md = self.extract_post_data(soup)
title, subtitle, author, date, cover_image, md = self.extract_post_data(soup, url)

# Skip writing if extraction clearly failed — leaves no stale file so reruns retry.
content_element = soup.select_one("div.available-content")
if title == "Untitled" or content_element is None:
pbar.write(f"[SKIP] Extraction failed for {url} (title={title!r}, content_present={content_element is not None}). See _debug dump.")
count += 1
pbar.update(1)
if num_posts_to_scrape != 0 and count == num_posts_to_scrape:
break
continue

if self.download_images:
total_images = count_images_in_markdown(md)
Expand All @@ -1002,8 +1046,9 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None:
essays_data.append({
"title": title,
"subtitle": subtitle,
"like_count": like_count,
"author": author,
"date": date,
"cover_image": cover_image,
"file_link": md_filepath,
"html_link": html_filepath
})
Expand Down Expand Up @@ -1163,12 +1208,23 @@ def is_login_failed(self) -> bool:
error_container = self.driver.find_elements(By.ID, 'error-container')
return len(error_container) > 0 and error_container[0].is_displayed()

def get_url_soup(self, url: str, max_attempts: int = 5) -> BeautifulSoup:
def get_url_soup(self, url: str, max_attempts: int = 5) -> Optional[BeautifulSoup]:
"""Gets soup from URL using logged-in Selenium driver, with retry on rate limiting."""
for attempt in range(1, max_attempts + 1):
try:
self.driver.get(url)
sleep(2) # Small delay to ensure page loads

# Wait up to 20s for the post body (or a paywall marker) to appear, instead of a fixed sleep.
try:
WebDriverWait(self.driver, 20).until(
lambda d: d.find_elements(By.CSS_SELECTOR, "div.available-content")
or d.find_elements(By.CSS_SELECTOR, "h1.post-title")
or d.find_elements(By.CSS_SELECTOR, "h2.paywall-title")
or d.find_elements(By.CSS_SELECTOR, "body > pre")
)
except TimeoutException:
print(f"[WARN] Timeout waiting for post content to render: {url}")

soup = BeautifulSoup(self.driver.page_source, "html.parser")

pre = soup.select_one("body > pre")
Expand All @@ -1181,6 +1237,10 @@ def get_url_soup(self, url: str, max_attempts: int = 5) -> BeautifulSoup:
sleep(delay)
continue

if soup.find("h2", class_="paywall-title"):
print(f"Skipping premium article (no access): {url}")
return None

return soup
except RuntimeError:
raise
Expand Down