Scrapy TSV скачать файл. как конвертировать файл в паркет перед загрузкой на s3

У меня есть рабочий проект Scrapy, который загружает файлы TSV и сохраняет их в s3.

Я использую собственный конвейер для сохранения исходных имен файлов с датами.

Меня интересует, можно ли конвертировать tsv файлы в паркет перед загрузкой на s3. Если да, то как мне это сделать в Scrapy?

Должен отметить, что я могу конвертировать файлы локально (последний блок кода), но хотел бы сделать это встроенным, прежде чем они будут загружены в s3.

Это то, над чем я сейчас работаю....

##items
class DownfilesItem(scrapy.Item): 
    file_urls = scrapy.Field() 
    files = scrapy.Field() 
    original_file_name = scrapy.Field()
    date = scrapy.Field()
##pipeline to save original file names with dates
class OriginalNameFilesPipeline(FilesPipeline):
    def file_path(self, request, response=None, info=None):
        test = request
        file_name_xml = request.url.split(" = ")[-1]
        file_name: str = file_name_xml.removesuffix('.tsv') + '_' + datetime.today().strftime("%Y%m%d") + '.' +  file_name_xml.split(".")[-1]
        return file_name
##in my scraper
def parse_all_items(self, response):
        all_urls = [bunch or urls]
        
        for url in all_urls:
            item = DownfilesItem()
            item['file_urls'] = [url]
            item['original_file_name'] = url.split(" = ")[-1] 
            yield item
##converting tsv to parquet locally 
parse_options = csv.ParseOptions(delimiter = "\t")

for name in os.listdir(src_dir):
    localpath = os.path.join(src_dir, name)
    print(localpath)
    if ".tsv" in localpath:
        table = csv.read_csv(localpath, parse_options=parse_options)
        pq.write_table(table, localpath.replace('tsv', 'parquet'))

🤔 А знаете ли вы, что...
С Python можно создавать кросс-платформенные приложения для Windows, macOS и Linux.


81
3

Ответы:

У меня недостаточно информации, чтобы предоставить полное рабочее решение, но, возможно, что-то в этом роде сработает?

import re
from collections import defaultdict

import pandas as pd


class ParquetPipeline:
    def open_spider(self, spider):
        self.items = defaultdict(lambda: [])

    def close_spider(self, spider):
        # Iterate over pages, writing data for each to a parquet file.
        #
        for page, data in self.items.items():
            df = pd.DataFrame(data)
            df.to_parquet(f"page-{page}.parquet", index=False)

    def process_item(self, item, spider):
        # Extract page number from URL.
        #
        # ⚠️ You'd probably want to replace the page number with the file name here.
        #
        page = int(re.search(r"/page/(\d+)/", item["file_urls"]).group(1))

        self.items[page].append(dict(item))
        return item

Я интегрировал этот конвейер в пример учебного пособия по сканеру Scrapy для очистки котировок.

Это общий принцип:

  1. Паук перемещается по нескольким страницам цитат.
  2. Для каждой страницы паук выдает серию элементов с полем file_urls.
  3. Конвейер обрабатывает каждый элемент, добавляя его в словарь. В словаре указан номер страницы, который соответствует списку элементов, извлеченных для этой страницы.
  4. По завершении вызывается метод close_spider(), который перебирает пары ключ/значение в словаре, записывая данные для каждой страницы в отдельный файл.

Я бы посоветовал вам применить аналогичный подход, но использовать для ввода словаря исходное имя файла, а не номер страницы.

При таком подходе все данные хранятся в памяти до конца сканирования, а затем все они записываются в файл. Этого должно быть достаточно, чтобы получить работающее решение, однако вы, вероятно, захотите найти способ перенести данные на диск раньше, чтобы не сталкиваться с ограничениями памяти при больших обходах.


В свой FilesPipeline вы можете добавить функцию

def item_completed(self, results, item, info):

который выполняется при загрузке файла, а затем вы можете его преобразовать.

(Док: FilesPipeline.item_completed , Исходный код: FilesPipeline.item_completed)

Что-то вроде этого.

class OriginalNameFilesPipeline(FilesPipeline):
    
    # ... code ...

    def item_completed(self, results, item, info):

        # filter results - to work only with correctly downloaded files
        results = [x for ok, x in results if ok]

        item['parquet'] = []

        # iterate over downloaded files
        for result in results:
            
            source_path = result['path']
            target_path = source_path.replace('.tsv', '.parquet')
            
            parse_options = csv.ParseOptions(delimiter = "\t")
            table = csv.read_csv(source_path, parse_options=parse_options)
            pq.write_table(table, target_path)
            
            item['parquet'].append(target_path)

        return item

Полный рабочий код, который я тестировал на некоторых .csv на странице https://onlinetestcase.com/csv-file/

Этот код не требует создания проекта.
Вы можете поместить весь код в один файл и запустить его как python script.py.

import scrapy
from scrapy.pipelines.files import FilesPipeline
import pandas as pd

#from contextlib import suppress
#from itemadapter import ItemAdapter

class MySpider(scrapy.Spider):

    name = 'myspider'

    start_urls = ['https://onlinetestcase.com/csv-file/']

    def parse(self, response):
        print('>>> url:', response.url)

        for url in response.css('a::attr(href)').extract():
            if url.endswith('.csv'):
                print('>>> csv url:', url)
                url = response.urljoin(url)
                #yield {'file_urls': [url], 'other_folder': 'hello_world'}
                yield {'file_urls': [url]}


class ConvertFilesPipeline(FilesPipeline):
    
    def item_completed(self, results, item, info):

        #print('>>> results:', results)
        #print('>>> item:', item)
        #print('>>> info:', info)
        
        # filter results - code from original `item_completed()
        #with suppress(KeyError):
        #    ItemAdapter(item)[self.files_result_field] = [x for ok, x in results if ok]

        # filter results - to work only with correctly downloaded files
        results = [name for ok, name in results if ok]

        #item[self.files_result_field] = []  # it should create `item['files']`
        item['parquet'] = []

        # iterate over downloaded files
        for result in results:
            #print('>>> result:', result)
            
            source_path = result['path']
            target_path = source_path.replace('.csv', '.parquet')
            
            df = pd.read_csv(source_path)
            df.to_parquet(target_path)
            
            #result['parquet'] = target_path
            item['parquet'].append(target_path)

        #print('<<< item:', item)
        
        return item

# --- run without project and save in `output.csv` ---

from scrapy.crawler import CrawlerProcess

c = CrawlerProcess({
    #'USER_AGENT': 'Mozilla/5.0',
    'FEEDS': {'output.csv': {'format': 'csv'}},  # save in file CSV, JSON or XML
    'ITEM_PIPELINES': {'__main__.ConvertFilesPipeline': 1}, # using Pipeline created in current file (needs __main___)
    'FILES_STORE': '.',  # this folder has to exist before downloading
})
c.crawl(MySpider)
c.start() 

Решено

Используя тот же пример сайта, что и в обновленном вопросе, вот паук:

import scrapy
import pandas as pd
from io import StringIO


class QuotesSpider(scrapy.Spider):
    name = "parquet"
    start_urls = ["https://onlinetestcase.com/csv-file/"]

    def parse(self, response):
        for url in response.css('a::attr(href)').extract():
            if url.endswith('.csv'):
                url = response.urljoin(url)

                yield scrapy.http.Request(
                    url,
                    callback=self.parse_csv,
                    dont_filter=True
                )

    def parse_csv(self, response):
        yield {
            "url": response.url,
            # Convert CSV text into Data Frame.
            "data": pd.read_csv(StringIO(response.text))
        }

Он идентифицирует ссылки CSV, затем следует по каждой ссылке и извлекает данные, преобразуя их в фрейм данных. Фрейм данных предоставляется вместе с исходным URL-адресом.

Следующий конвейер затем обрабатывает все полученные объекты, преобразуя имя файла CSV в имя файла Parquet и выгружая фрейм данных.

import re
from collections import defaultdict


class ParquetPipeline:
    def open_spider(self, spider):
        self.items = defaultdict(lambda: [])

    def close_spider(self, spider):
        # Iterate over items, writing each to Parquet.
        #
        for name, df in self.items.items():
            df.to_parquet(name, index=False)

    def process_item(self, item, spider):
        # Get CSV filename.
        csv = re.search("[^/]+$", item["url"]).group(0)
        # Create Parquet filename.
        parquet = re.sub("\.csv", ".parquet", csv)

        self.items[parquet] = item["data"]

⚠️ Примечание. Чтобы этот пример работал, вам необходимо установить ROBOTSTXT_OBEY = False в settings.py.