У меня есть рабочий проект Scrapy, который загружает файлы TSV и сохраняет их в s3.
Я использую собственный конвейер для сохранения исходных имен файлов с датами.
Меня интересует, можно ли конвертировать tsv файлы в паркет перед загрузкой на s3. Если да, то как мне это сделать в Scrapy?
Должен отметить, что я могу конвертировать файлы локально (последний блок кода), но хотел бы сделать это встроенным, прежде чем они будут загружены в s3.
Это то, над чем я сейчас работаю....
##items
class DownfilesItem(scrapy.Item):
file_urls = scrapy.Field()
files = scrapy.Field()
original_file_name = scrapy.Field()
date = scrapy.Field()
##pipeline to save original file names with dates
class OriginalNameFilesPipeline(FilesPipeline):
def file_path(self, request, response=None, info=None):
test = request
file_name_xml = request.url.split(" = ")[-1]
file_name: str = file_name_xml.removesuffix('.tsv') + '_' + datetime.today().strftime("%Y%m%d") + '.' + file_name_xml.split(".")[-1]
return file_name
##in my scraper
def parse_all_items(self, response):
all_urls = [bunch or urls]
for url in all_urls:
item = DownfilesItem()
item['file_urls'] = [url]
item['original_file_name'] = url.split(" = ")[-1]
yield item
##converting tsv to parquet locally
parse_options = csv.ParseOptions(delimiter = "\t")
for name in os.listdir(src_dir):
localpath = os.path.join(src_dir, name)
print(localpath)
if ".tsv" in localpath:
table = csv.read_csv(localpath, parse_options=parse_options)
pq.write_table(table, localpath.replace('tsv', 'parquet'))
🤔 А знаете ли вы, что...
С Python можно создавать кросс-платформенные приложения для Windows, macOS и Linux.
У меня недостаточно информации, чтобы предоставить полное рабочее решение, но, возможно, что-то в этом роде сработает?
import re
from collections import defaultdict
import pandas as pd
class ParquetPipeline:
def open_spider(self, spider):
self.items = defaultdict(lambda: [])
def close_spider(self, spider):
# Iterate over pages, writing data for each to a parquet file.
#
for page, data in self.items.items():
df = pd.DataFrame(data)
df.to_parquet(f"page-{page}.parquet", index=False)
def process_item(self, item, spider):
# Extract page number from URL.
#
# ⚠️ You'd probably want to replace the page number with the file name here.
#
page = int(re.search(r"/page/(\d+)/", item["file_urls"]).group(1))
self.items[page].append(dict(item))
return item
Я интегрировал этот конвейер в пример учебного пособия по сканеру Scrapy для очистки котировок.
Это общий принцип:
file_urls
.close_spider()
, который перебирает пары ключ/значение в словаре, записывая данные для каждой страницы в отдельный файл.Я бы посоветовал вам применить аналогичный подход, но использовать для ввода словаря исходное имя файла, а не номер страницы.
При таком подходе все данные хранятся в памяти до конца сканирования, а затем все они записываются в файл. Этого должно быть достаточно, чтобы получить работающее решение, однако вы, вероятно, захотите найти способ перенести данные на диск раньше, чтобы не сталкиваться с ограничениями памяти при больших обходах.
В свой FilesPipeline
вы можете добавить функцию
def item_completed(self, results, item, info):
который выполняется при загрузке файла, а затем вы можете его преобразовать.
(Док: FilesPipeline.item_completed , Исходный код: FilesPipeline.item_completed)
Что-то вроде этого.
class OriginalNameFilesPipeline(FilesPipeline):
# ... code ...
def item_completed(self, results, item, info):
# filter results - to work only with correctly downloaded files
results = [x for ok, x in results if ok]
item['parquet'] = []
# iterate over downloaded files
for result in results:
source_path = result['path']
target_path = source_path.replace('.tsv', '.parquet')
parse_options = csv.ParseOptions(delimiter = "\t")
table = csv.read_csv(source_path, parse_options=parse_options)
pq.write_table(table, target_path)
item['parquet'].append(target_path)
return item
Полный рабочий код, который я тестировал на некоторых .csv
на странице https://onlinetestcase.com/csv-file/
Этот код не требует создания проекта.
Вы можете поместить весь код в один файл и запустить его как python script.py
.
import scrapy
from scrapy.pipelines.files import FilesPipeline
import pandas as pd
#from contextlib import suppress
#from itemadapter import ItemAdapter
class MySpider(scrapy.Spider):
name = 'myspider'
start_urls = ['https://onlinetestcase.com/csv-file/']
def parse(self, response):
print('>>> url:', response.url)
for url in response.css('a::attr(href)').extract():
if url.endswith('.csv'):
print('>>> csv url:', url)
url = response.urljoin(url)
#yield {'file_urls': [url], 'other_folder': 'hello_world'}
yield {'file_urls': [url]}
class ConvertFilesPipeline(FilesPipeline):
def item_completed(self, results, item, info):
#print('>>> results:', results)
#print('>>> item:', item)
#print('>>> info:', info)
# filter results - code from original `item_completed()
#with suppress(KeyError):
# ItemAdapter(item)[self.files_result_field] = [x for ok, x in results if ok]
# filter results - to work only with correctly downloaded files
results = [name for ok, name in results if ok]
#item[self.files_result_field] = [] # it should create `item['files']`
item['parquet'] = []
# iterate over downloaded files
for result in results:
#print('>>> result:', result)
source_path = result['path']
target_path = source_path.replace('.csv', '.parquet')
df = pd.read_csv(source_path)
df.to_parquet(target_path)
#result['parquet'] = target_path
item['parquet'].append(target_path)
#print('<<< item:', item)
return item
# --- run without project and save in `output.csv` ---
from scrapy.crawler import CrawlerProcess
c = CrawlerProcess({
#'USER_AGENT': 'Mozilla/5.0',
'FEEDS': {'output.csv': {'format': 'csv'}}, # save in file CSV, JSON or XML
'ITEM_PIPELINES': {'__main__.ConvertFilesPipeline': 1}, # using Pipeline created in current file (needs __main___)
'FILES_STORE': '.', # this folder has to exist before downloading
})
c.crawl(MySpider)
c.start()
Используя тот же пример сайта, что и в обновленном вопросе, вот паук:
import scrapy
import pandas as pd
from io import StringIO
class QuotesSpider(scrapy.Spider):
name = "parquet"
start_urls = ["https://onlinetestcase.com/csv-file/"]
def parse(self, response):
for url in response.css('a::attr(href)').extract():
if url.endswith('.csv'):
url = response.urljoin(url)
yield scrapy.http.Request(
url,
callback=self.parse_csv,
dont_filter=True
)
def parse_csv(self, response):
yield {
"url": response.url,
# Convert CSV text into Data Frame.
"data": pd.read_csv(StringIO(response.text))
}
Он идентифицирует ссылки CSV, затем следует по каждой ссылке и извлекает данные, преобразуя их в фрейм данных. Фрейм данных предоставляется вместе с исходным URL-адресом.
Следующий конвейер затем обрабатывает все полученные объекты, преобразуя имя файла CSV в имя файла Parquet и выгружая фрейм данных.
import re
from collections import defaultdict
class ParquetPipeline:
def open_spider(self, spider):
self.items = defaultdict(lambda: [])
def close_spider(self, spider):
# Iterate over items, writing each to Parquet.
#
for name, df in self.items.items():
df.to_parquet(name, index=False)
def process_item(self, item, spider):
# Get CSV filename.
csv = re.search("[^/]+$", item["url"]).group(0)
# Create Parquet filename.
parquet = re.sub("\.csv", ".parquet", csv)
self.items[parquet] = item["data"]
⚠️ Примечание. Чтобы этот пример работал, вам необходимо установить ROBOTSTXT_OBEY = False
в settings.py
.