Truyền tệp tới S3 Python

Lưu trữ, truy xuất và sử dụng tệp trong S3 là hoạt động diễn ra thường xuyên nên sẽ dễ dàng. Nó cũng nên

  • truyền dữ liệu
  • có một api giống như tệp python-io
  • xử lý một số công cụ khử mỡ và nén vì tại sao không

Cài đặt

pip install s3-streaming

Truyền các đối tượng S3 như các tệp thông thường

Những thứ cơ bản

Mở và đọc các đối tượng S3 tương tự như python io thông thường. Sự khác biệt duy nhất là bạn cần cung cấp một phiên bản boto3.session.Session để xử lý quyền truy cập bộ chứa

import boto3
from s3streaming import s3_open


with s3_open('s3://bucket/key', boto_session=boto3.session.Session()) as f:
    for next_line in f:
        print(next_line)

Tiêm khử lưu huỳnh và xử lý nén trong luồng

Xem xét một tệp được nén ____23_______ và chứa các dòng ____24_______. Có một số bản tóm tắt trong việc giải quyết vấn đề đó, nhưng tại sao phải bận tâm?

AWS S3 là dịch vụ lưu trữ đối tượng hàng đầu trong ngành. Chúng tôi có xu hướng lưu trữ nhiều tệp dữ liệu trên S3 và đôi khi yêu cầu xử lý các tệp này. Nếu kích thước của tệp mà chúng tôi đang xử lý nhỏ, về cơ bản, chúng tôi có thể thực hiện quy trình xử lý tệp truyền thống, trong đó chúng tôi tìm nạp tệp từ S3 và sau đó xử lý tệp theo cấp độ hàng. Nhưng câu hỏi đặt ra, điều gì sẽ xảy ra nếu tệp có kích thước lớn hơn. > 1GB?

Nhập (đọc) một tệp lớn dẫn đến lỗi Out of Memory. Nó cũng có thể dẫn đến một sự kiện sụp đổ hệ thống. Có thư viện viz. Gấu trúc, Dask, v.v. rất tốt trong việc xử lý các tệp lớn nhưng một lần nữa, tệp sẽ có mặt tại địa phương tôi. e. chúng tôi sẽ phải nhập nó từ S3 vào máy cục bộ của mình. Nhưng nếu chúng tôi không muốn tìm nạp và lưu trữ cục bộ toàn bộ tệp S3 thì sao?

📜 Hãy xem xét một số trường hợp sử dụng

  • Chúng tôi muốn xử lý một tệp CSV S3 lớn (~2GB) mỗi ngày. Nó phải được xử lý trong một khung thời gian nhất định (e. g. trong 4 giờ)
  • Chúng tôi được yêu cầu xử lý các tệp S3 lớn thường xuyên từ máy chủ FTP. Các tệp mới xuất hiện trong khoảng thời gian nhất định và được xử lý tuần tự. e. tệp cũ phải được xử lý trước khi bắt đầu xử lý các tệp mới hơn

Đây là một số tình huống rất hay trong đó quá trình xử lý cục bộ có thể ảnh hưởng đến luồng tổng thể của hệ thống. Ngoài ra, nếu chúng tôi đang chạy các đơn vị xử lý tệp này trong vùng chứa, thì chúng tôi có dung lượng đĩa hạn chế để làm việc với. Do đó, cần có một luồng phát trực tuyến trên đám mây (cũng có thể _______ của cùng một tệp bằng cách truyền phát các đoạn khác nhau của cùng một tệp trong các luồng/quy trình song song). Đây là nơi tôi bắt gặp tính năng AWS S3 Select. 😎

📝 Bài đăng này tập trung vào việc truyền một tệp lớn thành các phần nhỏ hơn có thể quản lý được (tuần tự). Cách tiếp cận này sau đó có thể được sử dụng để xử lý song song bằng cách chạy trong các luồng/quy trình đồng thời. Kiểm tra bài viết tiếp theo của tôi về điều này


S3 Chọn

Với Amazon S3 Select, bạn có thể sử dụng các câu lệnh ngôn ngữ truy vấn có cấu trúc (SQL) đơn giản để lọc nội dung của các đối tượng Amazon S3 và chỉ truy xuất tập hợp con dữ liệu mà bạn cần. Sử dụng Amazon S3 Select để lọc dữ liệu này, bạn có thể giảm lượng dữ liệu mà Amazon S3 truyền, giảm chi phí và độ trễ để truy xuất dữ liệu này

Amazon S3 Select hoạt động trên các đối tượng được lưu trữ ở định dạng CSV, JSON hoặc Apache Parquet. Nó cũng hoạt động với các đối tượng được nén bằng GZIP hoặc BZIP2 (chỉ dành cho các đối tượng CSV và JSON) và các đối tượng được mã hóa phía máy chủ. Bạn có thể chỉ định định dạng của kết quả là CSV hoặc JSON và bạn có thể xác định cách phân tách các bản ghi trong kết quả

📝 Chúng tôi sẽ sử dụng boto3 của Python để hoàn thành mục tiêu cuối cùng của mình

🧱 Xây dựng biểu thức SQL

Để làm việc với

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
0, boto3 cung cấp chức năng truy vấn S3. Bạn chuyển các biểu thức SQL tới Amazon S3 trong yêu cầu. Amazon S3 Select hỗ trợ một tập hợp con của SQL. Kiểm tra liên kết này để biết thêm thông tin về điều này

response = s3_client.select_object_content(
    Bucket=bucket,
    Key=key,
    ExpressionType='SQL',
    Expression='SELECT * FROM S3Object',
    InputSerialization={
        'CSV': {
            'FileHeaderInfo': 'USE',
            'FieldDelimiter': ',',
            'RecordDelimiter': '\n'
        }
    },
    OutputSerialization={
        'JSON': {
            'RecordDelimiter': ','
        }
    }
)

Vào chế độ toàn màn hình Thoát chế độ toàn màn hình

Trong yêu cầu trên,

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
3 xác định loại tệp S3 và các thuộc tính liên quan, trong khi
def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
4 xác định
def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
5 mà chúng tôi nhận được từ
def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
6 này

🌫️ Đoạn phát trực tuyến

Bây giờ, vì chúng ta đã có một số ý tưởng về cách hoạt động của

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
0, hãy thử hoàn thành trường hợp sử dụng của chúng ta về phát trực tuyến các đoạn (tập hợp con) của một tệp lớn giống như cách một
def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
8. 😋

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
0 hỗ trợ tham số
import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = ''
    key = ''
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')
0 giúp chúng tôi truyền một tập hợp con của một đối tượng bằng cách chỉ định một phạm vi byte để truy vấn.
def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
0 yêu cầu cho một loạt phạm vi quét không chồng chéo. Phạm vi quét không cần phải được căn chỉnh với ranh giới bản ghi. Một bản ghi bắt đầu trong phạm vi quét được chỉ định nhưng mở rộng ra ngoài phạm vi quét sẽ được xử lý bởi truy vấn. Điều đó có nghĩa là hàng sẽ được tìm nạp trong phạm vi quét và nó có thể mở rộng để tìm nạp toàn bộ hàng. Nó
import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = ''
    key = ''
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')
2, toàn bộ hàng được tìm nạp hoặc nó bị bỏ qua (được tìm nạp trong phạm vi quét khác)

Hãy cố gắng đạt được điều này trong 2 bước đơn giản

1. Tìm tổng số byte của tệp S3

Đoạn mã sau giới thiệu chức năng sẽ thực hiện yêu cầu

import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = ''
    key = ''
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')
3 trên tệp S3 của chúng tôi và xác định kích thước tệp theo byte

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size

Vào chế độ toàn màn hình Thoát chế độ toàn màn hình

2. Tạo một trình tạo để truyền các khối

Bây giờ, logic là tạo ra các đoạn luồng byte của tệp S3 cho đến khi chúng tôi đạt đến kích thước tệp. Hãy yên tâm, phạm vi quét liên tục này sẽ không dẫn đến chồng chéo các hàng trong phản hồi 😉 (kiểm tra hình ảnh đầu ra / repo GitHub). Đủ đơn giản phải không?

import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = ''
    key = ''
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')

Vào chế độ toàn màn hình Thoát chế độ toàn màn hình

Truyền tệp tới S3 Python

Xin chúc mừng. 👏 Chúng tôi đã giải quyết thành công một trong những thách thức chính khi xử lý tệp S3 lớn mà không làm hỏng hệ thống của chúng tôi. 🤘

📌 Bạn có thể xem kho lưu trữ GitHub của tôi để biết ví dụ hoạt động hoàn chỉnh của phương pháp này 👇

idris-rampurawala / s3-select-demo

Dự án này giới thiệu tính năng AWS S3 Select phong phú để truyền tệp dữ liệu lớn theo kiểu phân trang

AWS S3 Chọn Demo

Dự án này giới thiệu tính năng AWS S3 Select phong phú để truyền một tệp dữ liệu lớn trong một

import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = ''
    key = ''
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')
5

Hiện tại,

def get_s3_file_size(bucket: str, key: str) -> int:
    """Gets the file size of S3 object by a HEAD request

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path

    Returns:
        int: File size in bytes. Defaults to 0 if any error.
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    file_size = 0
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        if response:
            file_size = int(response.get('ResponseMetadata').get('HTTPHeaders').get('content-length'))
    except ClientError:
        logger.exception(f'Client error reading S3 file {bucket} : {key}')
    return file_size
0 không hỗ trợ
import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = ''
    key = ''
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')
7 và do đó chúng tôi không thể đánh số trang kết quả của truy vấn. Do đó, chúng tôi sử dụng tính năng
import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = ''
    key = ''
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')
8 để truyền phát nội dung của tệp S3

Tiểu sử

Nhập (đọc) một tệp lớn dẫn đến lỗi Out of Memory. Nó cũng có thể dẫn đến một sự kiện sụp đổ hệ thống. Có thư viện viz. Gấu trúc, Dask, v.v. rất tốt trong việc xử lý các tệp lớn nhưng một lần nữa, tệp sẽ có mặt tại địa phương tôi. e. chúng tôi sẽ phải nhập nó từ S3 vào máy cục bộ của mình. Nhưng nếu chúng ta không muốn tìm nạp và lưu trữ cục bộ toàn bộ tệp S3 cùng một lúc thì sao?🤔

Chà, chúng ta có thể sử dụng AWS S3 Select để truyền một tệp lớn thông qua tham số

import ast
import boto3
from botocore.exceptions import ClientError

def stream_s3_file(bucket: str, key: str, file_size: int, chunk_bytes=5000) -> tuple[dict]:
    """Streams a S3 file via a generator.

    Args:
        bucket (str): S3 bucket
        key (str): S3 object path
        chunk_bytes (int): Chunk size in bytes. Defaults to 5000
    Returns:
        tuple[dict]: Returns a tuple of dictionary containing rows of file content
    """
    aws_profile = current_app.config.get('AWS_PROFILE_NAME')
    s3_client = boto3.session.Session(profile_name=aws_profile).client('s3')
    expression = 'SELECT * FROM S3Object'
    start_range = 0
    end_range = min(chunk_bytes, file_size)
    while start_range < file_size:
        response = s3_client.select_object_content(
            Bucket=bucket,
            Key=key,
            ExpressionType='SQL',
            Expression=expression,
            InputSerialization={
                'CSV': {
                    'FileHeaderInfo': 'USE',
                    'FieldDelimiter': ',',
                    'RecordDelimiter': '\n'
                }
            },
            OutputSerialization={
                'JSON': {
                    'RecordDelimiter': ','
                }
            },
            ScanRange={
                'Start': start_range,
                'End': end_range
            },
        )

        """
        select_object_content() response is an event stream that can be looped to concatenate the overall result set
        Hence, we are joining the results of the stream in a string before converting it to a tuple of dict
        """
        result_stream = []
        for event in response['Payload']:
            if records := event.get('Records'):
                result_stream.append(records['Payload'].decode('utf-8'))
        yield ast.literal_eval(''.join(result_stream))
        start_range = end_range
        end_range = end_range + min(chunk_bytes, file_size - end_range)


def s3_file_processing():
    bucket = ''
    key = ''
    file_size = get_s3_file_size(bucket=bucket, key=key)
    logger.debug(f'Initiating streaming file of {file_size} bytes')
    chunk_size = 524288  # 512KB or 0.5MB
    for file_chunk in stream_s3_file(bucket=bucket, key=key,
                                     file_size=file_size, chunk_bytes=chunk_size):
        logger.info(f'\n{30 * "*"} New chunk {30 * "*"}')
        id_set = set()
        for row in file_chunk:
            # perform any other processing here
            id_set.add(int(row.get('id')))
        logger.info(f'{min(id_set)} --> {max(id_set)}')
0 của nó. Cách tiếp cận này…

Chúng tôi có thể truyền dữ liệu đến S3 không?

Bạn có thể thiết lập Kinesis Stream thành S3 để bắt đầu truyền dữ liệu của mình tới các bộ chứa Amazon S3 bằng các bước sau. Bước 1. Đăng nhập vào Bảng điều khiển AWS cho Amazon Kinesis. Bước 2. Định cấu hình Luồng phân phối. Bước 3. Chuyển đổi bản ghi bằng Hàm Lambda.

Lambda có thể tải tệp lên S3 không?

Trong bài viết ngắn này, tôi sẽ hướng dẫn bạn cách tải tệp lên AWS S3 bằng AWS Lambda. Chúng tôi sẽ sử dụng thư viện boto3 của Python để tải tệp lên bộ chứa . Sau khi tệp được tải lên S3, chúng tôi sẽ tạo URL GET được ký trước và gửi lại cho khách hàng.

Làm cách nào để tải tệp lên một thư mục cụ thể trong S3 bằng Python boto3?

upload_file .
upload_file(Tên tệp, Nhóm, Khóa, ExtraArgs=Không, Gọi lại=Không, Cấu hình=Không)
nhập máy khách boto3 = boto3. máy khách ("s3") máy khách. .
upload_fileobj(Fileobj, Bucket, Key, ExtraArgs=None, Callback=None, Config=None)
fo = io. .
với open('filename', 'rb') làm dữ liệu. khách hàng. .
với open(filename, "rb") là f. khách hàng