From 6c759bf2dd966d0826eef9e2ebbed6b0fcb05c18 Mon Sep 17 00:00:00 2001 From: angelotc Date: Mon, 13 Apr 2026 22:11:04 -0700 Subject: [PATCH] add metadata for mdx, and add explicit wait --- substack_scraper.py | 142 +++++++++++++++++++++++++++++++------------- 1 file changed, 101 insertions(+), 41 deletions(-) diff --git a/substack_scraper.py b/substack_scraper.py index ee9fae8..21b89fa 100644 --- a/substack_scraper.py +++ b/substack_scraper.py @@ -28,7 +28,9 @@ from selenium.webdriver.edge.options import Options as EdgeOptions from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.edge.service import Service as EdgeService -from selenium.common.exceptions import SessionNotCreatedException, WebDriverException +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.common.exceptions import SessionNotCreatedException, TimeoutException, WebDriverException from config import EMAIL, PASSWORD @@ -881,68 +883,100 @@ def get_filename_from_url(url: str, filetype: str = ".md") -> str: return url.split("/")[-1] + filetype @staticmethod - def combine_metadata_and_content(title: str, subtitle: str, date: str, like_count: str, content) -> str: - """Combines the title, subtitle, and content into a single string with Markdown format.""" + def combine_metadata_and_content(title: str, subtitle: str, date: str, author: str, cover_image: str, content) -> str: + """Combines the title, subtitle, and content into a single string with MDX frontmatter.""" if not isinstance(title, str): raise ValueError("title must be a string") if not isinstance(content, str): raise ValueError("content must be a string") - metadata = f"# {title}\n\n" - if subtitle: - metadata += f"## {subtitle}\n\n" - metadata += f"**{date}**\n\n" - metadata += f"**Likes:** {like_count}\n\n" + safe_title = title.replace('"', '\\"') + safe_subtitle = subtitle.replace('"', '\\"') if subtitle else "" + safe_author = author.replace('"', '\\"') if author else "" - return metadata + content + frontmatter = '---\n' + frontmatter += f'title: "{safe_title}"\n' + if safe_subtitle: + frontmatter += f'subtitle: "{safe_subtitle}"\n' + frontmatter += f'date: "{date}"\n' + frontmatter += f'author: "{safe_author}"\n' + if cover_image: + frontmatter += f'image: "{cover_image}"\n' + frontmatter += '---\n\n' - def extract_post_data(self, soup: BeautifulSoup) -> Tuple[str, str, str, str, str]: - """Converts a Substack post soup to markdown, returning metadata and content.""" + return frontmatter + content + + def extract_post_data(self, soup: BeautifulSoup, url: str = "") -> Tuple[str, str, str, str, str, str]: + """Converts a Substack post soup to markdown, returning (title, subtitle, author, date, cover_image, md_content).""" # Title title_element = soup.select_one("h1.post-title, h2") title = title_element.text.strip() if title_element else "Untitled" + title_found = title_element is not None # Subtitle subtitle_element = soup.select_one("h3.subtitle, div.subtitle-HEEcLo") subtitle = subtitle_element.text.strip() if subtitle_element else "" - # Date + # Date, Author, and Cover Image from ld+json (most reliable source) date = "" - date_element = soup.select_one("div.meta-EgzBVA") - if date_element and date_element.text.strip(): - date = date_element.text.strip() - - if not date: - script_tag = soup.find("script", {"type": "application/ld+json"}) - if script_tag and script_tag.string: - try: - metadata = json.loads(script_tag.string) - if "datePublished" in metadata: - date_str = metadata["datePublished"] - date_obj = datetime.fromisoformat(date_str.replace("Z", "+00:00")) - date = date_obj.strftime("%b %d, %Y") - except (json.JSONDecodeError, ValueError, KeyError): - pass + author = "" + cover_image = "" + script_tag = soup.find("script", {"type": "application/ld+json"}) + if script_tag and script_tag.string: + try: + ld_json = json.loads(script_tag.string) + if "datePublished" in ld_json: + date_str = ld_json["datePublished"] + date_obj = datetime.fromisoformat(date_str.replace("Z", "+00:00")) + date = date_obj.strftime("%Y-%m-%d") + if "author" in ld_json: + authors = ld_json["author"] + if isinstance(authors, list) and authors: + author = authors[0].get("name", "") + elif isinstance(authors, dict): + author = authors.get("name", "") + if "image" in ld_json: + images = ld_json["image"] + if isinstance(images, list) and images: + img = images[0] + cover_image = img.get("url", "") if isinstance(img, dict) else str(img) + elif isinstance(images, dict): + cover_image = images.get("url", "") + except (json.JSONDecodeError, ValueError, KeyError): + pass if not date: date = "Date not found" - # Like count - like_count_element = soup.select_one('div.like-button-container button div.label') - like_count = ( - like_count_element.text.strip() - if like_count_element and like_count_element.text.strip().isdigit() - else "0" - ) - # Content content_element = soup.select_one("div.available-content") content_html = str(content_element) if content_element else "" md = self.html_to_md(content_html) - md_content = self.combine_metadata_and_content(title, subtitle, date, like_count, md) + # Diagnostic: detect extraction failure (missing title or empty content) and dump page + if not title_found or not content_element: + paywall = soup.select_one("h2.paywall-title") + ld_script = soup.find("script", {"type": "application/ld+json"}) + print(f"[EXTRACT FAIL] url={url}") + print(f" title_found={title_found} title={title!r}") + print(f" content_element_found={content_element is not None}") + print(f" paywall_present={paywall is not None}") + print(f" ld_json_present={ld_script is not None}") + print(f" date={date!r} author={author!r}") + try: + debug_dir = os.path.join(os.path.dirname(self.md_save_dir), "_debug", self.writer_name) + os.makedirs(debug_dir, exist_ok=True) + slug = (get_post_slug(url) if url and is_post_url(url) else (url.rstrip('/').split('/')[-1] or "unknown")) + debug_path = os.path.join(debug_dir, f"{slug}.html") + with open(debug_path, "w", encoding="utf-8") as f: + f.write(str(soup)) + print(f" dumped raw HTML -> {debug_path}") + except Exception as dump_err: + print(f" failed to dump debug HTML: {dump_err}") + + md_content = self.combine_metadata_and_content(title, subtitle, date, author, cover_image, md) - return title, subtitle, like_count, date, md_content + return title, subtitle, author, date, cover_image, md_content @abstractmethod def get_url_soup(self, url: str) -> str: @@ -983,7 +1017,17 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None: pbar.refresh() continue - title, subtitle, like_count, date, md = self.extract_post_data(soup) + title, subtitle, author, date, cover_image, md = self.extract_post_data(soup, url) + + # Skip writing if extraction clearly failed — leaves no stale file so reruns retry. + content_element = soup.select_one("div.available-content") + if title == "Untitled" or content_element is None: + pbar.write(f"[SKIP] Extraction failed for {url} (title={title!r}, content_present={content_element is not None}). See _debug dump.") + count += 1 + pbar.update(1) + if num_posts_to_scrape != 0 and count == num_posts_to_scrape: + break + continue if self.download_images: total_images = count_images_in_markdown(md) @@ -1002,8 +1046,9 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None: essays_data.append({ "title": title, "subtitle": subtitle, - "like_count": like_count, + "author": author, "date": date, + "cover_image": cover_image, "file_link": md_filepath, "html_link": html_filepath }) @@ -1163,12 +1208,23 @@ def is_login_failed(self) -> bool: error_container = self.driver.find_elements(By.ID, 'error-container') return len(error_container) > 0 and error_container[0].is_displayed() - def get_url_soup(self, url: str, max_attempts: int = 5) -> BeautifulSoup: + def get_url_soup(self, url: str, max_attempts: int = 5) -> Optional[BeautifulSoup]: """Gets soup from URL using logged-in Selenium driver, with retry on rate limiting.""" for attempt in range(1, max_attempts + 1): try: self.driver.get(url) - sleep(2) # Small delay to ensure page loads + + # Wait up to 20s for the post body (or a paywall marker) to appear, instead of a fixed sleep. + try: + WebDriverWait(self.driver, 20).until( + lambda d: d.find_elements(By.CSS_SELECTOR, "div.available-content") + or d.find_elements(By.CSS_SELECTOR, "h1.post-title") + or d.find_elements(By.CSS_SELECTOR, "h2.paywall-title") + or d.find_elements(By.CSS_SELECTOR, "body > pre") + ) + except TimeoutException: + print(f"[WARN] Timeout waiting for post content to render: {url}") + soup = BeautifulSoup(self.driver.page_source, "html.parser") pre = soup.select_one("body > pre") @@ -1181,6 +1237,10 @@ def get_url_soup(self, url: str, max_attempts: int = 5) -> BeautifulSoup: sleep(delay) continue + if soup.find("h2", class_="paywall-title"): + print(f"Skipping premium article (no access): {url}") + return None + return soup except RuntimeError: raise