Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a web scraping script that extracts product information from an e-commerce website. The script asynchronously scrapes product listings and their detailed pages to collect titles, prices, links, and barcodes.
Key Changes
- Creates an async web scraper using aiohttp and BeautifulSoup
- Implements product data extraction with barcode lookup from individual product pages
- Outputs scraped data to a JSON file
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| with open("products.json", "w", encoding="utf-8") as f: | ||
| json.dump(products, f, ensure_ascii=False, indent=4) | ||
|
|
||
| print("✅ products.json дайын!") |
There was a problem hiding this comment.
[nitpick] The success message is in a non-English language while the rest of the code uses English. Consider using consistent language throughout the codebase.
| print("✅ products.json дайын!") | |
| print("✅ products.json is ready!") |
| "price": price, | ||
| "link": link, | ||
| "barcode": barcode | ||
| }) |
There was a problem hiding this comment.
No error handling for HTTP requests. Consider adding try-except blocks to handle potential network errors, timeouts, or HTTP error status codes.
| }) | |
| try: | |
| async with session.get(BASE_URL, headers=HEADERS) as response: | |
| if response.status != 200: | |
| print(f"Error: Failed to fetch main page. Status code: {response.status}") | |
| return | |
| html = await response.text() | |
| soup = BS(html, "html.parser") | |
| items = soup.find_all("div", {"class": "bx_catalog_item_container"}) | |
| for item in items: | |
| title_tag = item.find("div", {"class": "bx_catalog_item_title"}) | |
| title = title_tag.text.strip() if title_tag else "No title" | |
| link_tag = title_tag.find("a") if title_tag else None | |
| link = "https://ovdi.ru" + link_tag.get("href") if link_tag else None | |
| price_tag = item.find("div", {"class": "bx_catalog_item_price"}) | |
| price = price_tag.text.strip() if price_tag else "No price" | |
| barcode = "Штрихкод табылмады" | |
| if link: | |
| try: | |
| async with session.get(link, headers=HEADERS) as product_response: | |
| if product_response.status != 200: | |
| print(f"Warning: Failed to fetch product page {link}. Status code: {product_response.status}") | |
| else: | |
| product_html = await product_response.text() | |
| product_soup = BS(product_html, "html.parser") | |
| text = product_soup.get_text(" ", strip=True) | |
| match = re.search(r"Штрихкод:\s*(\d+)", text) | |
| if match: | |
| barcode = match.group(1) | |
| except (ClientError, TimeoutError) as e: | |
| print(f"Warning: Exception occurred while fetching product page {link}: {e}") | |
| products.append({ | |
| "title": title, | |
| "price": price, | |
| "link": link, | |
| "barcode": barcode | |
| }) | |
| except (ClientError, TimeoutError) as e: | |
| print(f"Error: Exception occurred while fetching main page: {e}") | |
| return |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
import asyncio
import aiohttp
from bs4 import BeautifulSoup as BS
from fake_useragent import UserAgent
import json
import re
BASE_URL = "https://ovdi.ru/shop/malysham/"
HEADERS = {"User-Agent": UserAgent().random}
async def main():
products = []
if name == 'main':
asyncio.run(main())