Truyền tệp tới S3 Python

Lưu trữ, truy xuất và sử dụng tệp trong S3 là hoạt động diễn ra thường xuyên nên sẽ dễ dàng. Nó cũng nên

  • truyền dữ liệu
  • có một api giống như tệp python-io
  • xử lý một số công cụ khử mỡ và nén vì tại sao không

Cài đặt

pip install s3-streaming

Truyền các đối tượng S3 như các tệp thông thường

Những thứ cơ bản

Mở và đọc các đối tượng S3 tương tự như python io thông thường. Sự khác biệt duy nhất là bạn cần cung cấp một phiên bản boto3.session.Session để xử lý quyền truy cập bộ chứa

import boto3
from s3streaming import s3_open


with s3_open['s3://bucket/key', boto_session=boto3.session.Session[]] as f:
    for next_line in f:
        print[next_line]

Tiêm khử lưu huỳnh và xử lý nén trong luồng

Xem xét một tệp được nén ____23_______ và chứa các dòng ____24_______. Có một số bản tóm tắt trong việc giải quyết vấn đề đó, nhưng tại sao phải bận tâm?

AWS S3 là dịch vụ lưu trữ đối tượng hàng đầu trong ngành. Chúng tôi có xu hướng lưu trữ nhiều tệp dữ liệu trên S3 và đôi khi yêu cầu xử lý các tệp này. Nếu kích thước của tệp mà chúng tôi đang xử lý nhỏ, về cơ bản, chúng tôi có thể thực hiện quy trình xử lý tệp truyền thống, trong đó chúng tôi tìm nạp tệp từ S3 và sau đó xử lý tệp theo cấp độ hàng. Nhưng câu hỏi đặt ra, điều gì sẽ xảy ra nếu tệp có kích thước lớn hơn. > 1GB?

Nhập [đọc] một tệp lớn dẫn đến lỗi Out of Memory. Nó cũng có thể dẫn đến một sự kiện sụp đổ hệ thống. Có thư viện viz. Gấu trúc, Dask, v.v. rất tốt trong việc xử lý các tệp lớn nhưng một lần nữa, tệp sẽ có mặt tại địa phương tôi. e. chúng tôi sẽ phải nhập nó từ S3 vào máy cục bộ của mình. Nhưng nếu chúng tôi không muốn tìm nạp và lưu trữ cục bộ toàn bộ tệp S3 thì sao?

📜 Hãy xem xét một số trường hợp sử dụng

  • Chúng tôi muốn xử lý một tệp CSV S3 lớn [~2GB] mỗi ngày. Nó phải được xử lý trong một khung thời gian nhất định [e. g. trong 4 giờ]
  • Chúng tôi được yêu cầu xử lý các tệp S3 lớn thường xuyên từ máy chủ FTP. Các tệp mới xuất hiện trong khoảng thời gian nhất định và được xử lý tuần tự. e. tệp cũ phải được xử lý trước khi bắt đầu xử lý các tệp mới hơn

Đây là một số tình huống rất hay trong đó quá trình xử lý cục bộ có thể ảnh hưởng đến luồng tổng thể của hệ thống. Ngoài ra, nếu chúng tôi đang chạy các đơn vị xử lý tệp này trong vùng chứa, thì chúng tôi có dung lượng đĩa hạn chế để làm việc với. Do đó, cần có một luồng phát trực tuyến trên đám mây [cũng có thể _______ của cùng một tệp bằng cách truyền phát các đoạn khác nhau của cùng một tệp trong các luồng/quy trình song song]. Đây là nơi tôi bắt gặp tính năng AWS S3 Select. 😎

📝 Bài đăng này tập trung vào việc truyền một tệp lớn thành các phần nhỏ hơn có thể quản lý được [tuần tự]. Cách tiếp cận này sau đó có thể được sử dụng để xử lý song song bằng cách chạy trong các luồng/quy trình đồng thời. Kiểm tra bài viết tiếp theo của tôi về điều này

S3 Chọn

Với Amazon S3 Select, bạn có thể sử dụng các câu lệnh ngôn ngữ truy vấn có cấu trúc [SQL] đơn giản để lọc nội dung của các đối tượng Amazon S3 và chỉ truy xuất tập hợp con dữ liệu mà bạn cần. Sử dụng Amazon S3 Select để lọc dữ liệu này, bạn có thể giảm lượng dữ liệu mà Amazon S3 truyền, giảm chi phí và độ trễ để truy xuất dữ liệu này

Amazon S3 Select hoạt động trên các đối tượng được lưu trữ ở định dạng CSV, JSON hoặc Apache Parquet. Nó cũng hoạt động với các đối tượng được nén bằng GZIP hoặc BZIP2 [chỉ dành cho các đối tượng CSV và JSON] và các đối tượng được mã hóa phía máy chủ. Bạn có thể chỉ định định dạng của kết quả là CSV hoặc JSON và bạn có thể xác định cách phân tách các bản ghi trong kết quả

📝 Chúng tôi sẽ sử dụng boto3 của Python để hoàn thành mục tiêu cuối cùng của mình

🧱 Xây dựng biểu thức SQL

Để làm việc với

def get_s3_file_size[bucket: str, key: str] -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
0, boto3 cung cấp chức năng truy vấn S3. Bạn chuyển các biểu thức SQL tới Amazon S3 trong yêu cầu. Amazon S3 Select hỗ trợ một tập hợp con của SQL. Kiểm tra liên kết này để biết thêm thông tin về điều này

response = s3_client.select_object_content[
    Bucket=bucket,
    Key=key,
    ExpressionType='SQL',
    Expression='SELECT * FROM S3Object',
    InputSerialization={
        'CSV': {
            'FileHeaderInfo': 'USE',
            'FieldDelimiter': ',',
            'RecordDelimiter': '\n'
        }
    },
    OutputSerialization={
        'JSON': {
            'RecordDelimiter': ','
        }
    }
]

Vào chế độ toàn màn hình Thoát chế độ toàn màn hình

Trong yêu cầu trên,

def get_s3_file_size[bucket: str, key: str] -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
3 xác định loại tệp S3 và các thuộc tính liên quan, trong khi
def get_s3_file_size[bucket: str, key: str] -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
4 xác định
def get_s3_file_size[bucket: str, key: str] -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
5 mà chúng tôi nhận được từ
def get_s3_file_size[bucket: str, key: str] -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
6 này

🌫️ Đoạn phát trực tuyến

Bây giờ, vì chúng ta đã có một số ý tưởng về cách hoạt động của

def get_s3_file_size[bucket: str, key: str] -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
0, hãy thử hoàn thành trường hợp sử dụng của chúng ta về phát trực tuyến các đoạn [tập hợp con] của một tệp lớn giống như cách một
def get_s3_file_size[bucket: str, key: str] -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
8. 😋

def get_s3_file_size[bucket: str, key: str] -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
0 hỗ trợ tham số
import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file[bucket: str, key: str, file_size: int, chunk_bytes=5000] -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path
        chunk_bytes [int]: Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min[chunk_bytes, file_size]
    while start_range  int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
0 yêu cầu cho một loạt phạm vi quét không chồng chéo. Phạm vi quét không cần phải được căn chỉnh với ranh giới bản ghi. Một bản ghi bắt đầu trong phạm vi quét được chỉ định nhưng mở rộng ra ngoài phạm vi quét sẽ được xử lý bởi truy vấn. Điều đó có nghĩa là hàng sẽ được tìm nạp trong phạm vi quét và nó có thể mở rộng để tìm nạp toàn bộ hàng. Nó
import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file[bucket: str, key: str, file_size: int, chunk_bytes=5000] -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path
        chunk_bytes [int]: Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min[chunk_bytes, file_size]
    while start_range  tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path
        chunk_bytes [int]: Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min[chunk_bytes, file_size]
    while start_range  int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size

Vào chế độ toàn màn hình Thoát chế độ toàn màn hình

2. Tạo một trình tạo để truyền các khối

Bây giờ, logic là tạo ra các đoạn luồng byte của tệp S3 cho đến khi chúng tôi đạt đến kích thước tệp. Hãy yên tâm, phạm vi quét liên tục này sẽ không dẫn đến chồng chéo các hàng trong phản hồi 😉 [kiểm tra hình ảnh đầu ra / repo GitHub]. Đủ đơn giản phải không?

import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file[bucket: str, key: str, file_size: int, chunk_bytes=5000] -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path
        chunk_bytes [int]: Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min[chunk_bytes, file_size]
    while start_range  tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path
        chunk_bytes [int]: Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min[chunk_bytes, file_size]
    while start_range  int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    file_size = 0
    try:
        response = s3_client.head_object[Bucket=bucket, Key=key]
        if response:
            file_size = int[response.get['ResponseMetadata'].get['HTTPHeaders'].get['content-length']]
    except ClientError:
        logger.exception[f'Client error reading S3 file {bucket} : {key}']
    return file_size
0 không hỗ trợ
import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file[bucket: str, key: str, file_size: int, chunk_bytes=5000] -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path
        chunk_bytes [int]: Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min[chunk_bytes, file_size]
    while start_range  tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path
        chunk_bytes [int]: Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min[chunk_bytes, file_size]
    while start_range  tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket [str]: S3 bucket
        key [str]: S3 object path
        chunk_bytes [int]: Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get['AWS_PROFILE_NAME']
    s3_client = boto3.session.Session[profile_name=aws_profile].client['s3']
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min[chunk_bytes, file_size]
    while start_range 

Chủ Đề