Là thợ sửa ống nước dữ liệu được tôn vinh, chúng tôi thường được giao nhiệm vụ tải dữ liệu được tìm nạp từ một nguồn từ xa vào hệ thống của chúng tôi. Nếu chúng tôi may mắn, dữ liệu được tuần tự hóa dưới dạng JSON hoặc YAML. Khi kém may mắn hơn, chúng tôi nhận được một bảng tính Excel hoặc tệp CSV luôn bị hỏng theo một cách nào đó, không thể giải thích được
Dữ liệu từ các công ty lớn hoặc hệ thống cũ bằng cách nào đó luôn được mã hóa theo cách kỳ lạ và Quản trị viên hệ thống luôn nghĩ rằng họ giúp chúng tôi bằng cách nén các tệp [vui lòng gzip] hoặc chia chúng thành các tệp nhỏ hơn với tên ngẫu nhiên
Các dịch vụ hiện đại có thể cung cấp một API phù hợp, nhưng thường thì chúng ta không cần tìm nạp tệp từ FTP, SFTP, S3 hoặc một số kho tiền độc quyền chỉ hoạt động trên Windows
Trong bài viết này, chúng ta khám phá cách tốt nhất để nhập dữ liệu lộn xộn từ nguồn từ xa vào PostgreSQL
Để cung cấp một giải pháp thực tế, khả thi, chúng tôi đặt các vai trò cơ bản sau
- Dữ liệu được lấy từ một nguồn từ xa
- Dữ liệu bị bẩn và cần được chuyển đổi
- Dữ liệu lớn
Tôi đã tìm thấy API công khai tuyệt vời này dành cho bia, vì vậy chúng tôi sẽ nhập dữ liệu vào bảng bia trong cơ sở dữ liệu
Một loại bia từ API trông như thế này
$ curl //api.punkapi.com/v2/beers/?per_page=1&page=1 [ { "id": 1, "name": "Buzz", "tagline": "A Real Bitter Experience.", "first_brewed": "09/2007", "description": "A light, crisp and bitter IPA ...", "image_url": "//images.punkapi.com/v2/keg.png", "abv": 4.5, "ibu": 60, "target_fg": 1010, "target_og": 1044, "ebc": 20, "srm": 10, "ph": 4.4, "attenuation_level": 75, "volume": { "value": 20, "unit": "litres" }, "contributed_by": "Sam Mason " "brewers_tips": "The earthy and floral aromas from...", "boil_volume": {}, "method": {}, "ingredients": {}, "food_pairing": [], } ]
Tôi đã cắt bớt đầu ra cho ngắn gọn, nhưng có rất nhiều thông tin về các loại bia ở đây. Trong bài viết này, chúng tôi muốn nhập tất cả các trường trước
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 17 vào một bảng trong cơ sở dữ liệu
Trường
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 18 được lồng vào nhau. Chúng tôi chỉ muốn trích xuất
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 19 từ trường và lưu nó vào trường có tên là
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 18 trong bảng
volume = beer['volume']['value']
Trường
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,1 chỉ chứa năm và tháng, và trong một số trường hợp, chỉ chứa năm. Chúng tôi muốn chuyển đổi giá trị thành một ngày hợp lệ. Ví dụ: giá trị
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,2 sẽ được chuyển thành ngày
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,3. Giá trị
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,4 sẽ được chuyển thành ngày
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,5
Hãy viết một hàm đơn giản để chuyển đổi giá trị văn bản trong trường, sang Python
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,6
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'
Hãy nhanh chóng đảm bảo rằng nó hoạt động
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]
Trong cuộc sống thực, các phép biến đổi có thể phức tạp hơn nhiều. Nhưng với mục đích của chúng tôi, điều này là quá đủ
API cung cấp kết quả được phân trang. Để đóng gói phân trang, chúng tôi tạo một trình tạo tạo ra từng loại bia một
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 1
Và để sử dụng hàm tạo, chúng ta gọi và lặp lại nó
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,
Bạn sẽ nhận thấy rằng kết quả đầu tiên của mỗi trang mất nhiều thời gian hơn một chút. Điều này là do mạng yêu cầu tìm nạp trang
Bước tiếp theo là tạo một bảng trong cơ sở dữ liệu để nhập dữ liệu vào
Tạo cơ sở dữ liệu
$ createdb -O haki testload
Thay đổi
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,7 trong ví dụ thành người dùng cục bộ của bạn
Để kết nối từ Python với cơ sở dữ liệu PostgreSQL, chúng tôi sử dụng psycopg
$ python -m pip install psycopg2
Sử dụng psycopg, tạo kết nối đến cơ sở dữ liệu
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True
Chúng tôi đặt để mọi lệnh chúng tôi thực hiện sẽ có hiệu lực ngay lập tức. Đối với mục đích của bài viết này, điều này là tốt
Bây giờ chúng ta có một kết nối, chúng ta có thể viết một hàm để tạo một bảng
def create_staging_table[cursor] -> None: cursor.execute[""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers [ id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ]; """]
Hàm nhận một con trỏ và tạo một bảng chưa đăng nhập có tên là
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,9
BẢNG CHƯA ĐĂNG NHẬP
Dữ liệu được ghi vào một sẽ không được ghi vào nhật ký ghi trước [WAL], làm cho nó trở nên lý tưởng cho các bảng trung gian. Lưu ý rằng bảng
$ createdb -O haki testload
0 sẽ không được khôi phục trong trường hợp xảy ra sự cố và sẽ không được sao chépSử dụng kết nối mà chúng ta đã tạo trước đó, đây là cách chức năng được sử dụng
volume = beer['volume']['value']0
Bây giờ chúng ta đã sẵn sàng để chuyển sang phần tiếp theo
Trong suốt bài viết này, chúng tôi quan tâm đến hai số liệu chính. thời gian và ký ức
Để đo thời gian cho từng phương pháp, chúng tôi sử dụng mô-đun
$ createdb -O haki testload
1 tích hợpvolume = beer['volume']['value']1
Chức năng này cung cấp cho đồng hồ độ phân giải cao nhất hiện có, khiến nó trở nên lý tưởng cho các mục đích của chúng tôi
Để đo mức tiêu thụ bộ nhớ, chúng tôi sẽ sử dụng gói memory-profiler
volume = beer['volume']['value']2
Gói này cung cấp mức sử dụng bộ nhớ và mức sử dụng bộ nhớ gia tăng cho mỗi dòng trong mã. Điều này rất hữu ích khi tối ưu hóa bộ nhớ. Để minh họa, đây là ví dụ được cung cấp trong PyPI
volume = beer['volume']['value']3
Phần thú vị là cột
$ createdb -O haki testload
3 hiển thị bộ nhớ bổ sung được phân bổ bởi mã trong mỗi dòngTrong bài viết này, chúng tôi quan tâm đến bộ nhớ cao nhất được sử dụng bởi chức năng. Bộ nhớ cao nhất là sự khác biệt giữa giá trị bắt đầu của cột "Mem usage" và giá trị cao nhất [còn được gọi là "high watermark"]
Để lấy danh sách "Mem sử dụng" ta dùng hàm
$ createdb -O haki testload
4 từ $ createdb -O haki testload
5volume = beer['volume']['value']4
Khi được sử dụng như thế này, hàm
$ createdb -O haki testload
4 sẽ thực thi hàm $ createdb -O haki testload
7 với $ createdb -O haki testload
8 và $ createdb -O haki testload
9 được cung cấp, nhưng cũng khởi chạy một quy trình khác trong nền để giám sát việc sử dụng bộ nhớ mỗi __18_______0 giâyĐối với các hoạt động rất nhanh, chức năng
$ createdb -O haki testload
7 có thể được thực hiện nhiều lần. Bằng cách đặt $ python -m pip install psycopg2
0 thành một giá trị, chúng tôi buộc nó chỉ thực thi một lầnĐối số
$ python -m pip install psycopg2
3 yêu cầu hàm trả về kết quả của $ createdb -O haki testload
7Để kết hợp tất cả lại với nhau, chúng tôi tạo trình trang trí sau để đo và báo cáo thời gian và bộ nhớ
volume = beer['volume']['value']5
Để loại bỏ ảnh hưởng lẫn nhau của thời gian trên bộ nhớ và ngược lại, chúng tôi thực hiện chức năng hai lần. Đầu tiên là tính thời gian, thứ hai là đo mức sử dụng bộ nhớ
Trình trang trí sẽ in tên hàm và bất kỳ đối số từ khóa nào, đồng thời báo cáo thời gian và bộ nhớ đã sử dụng
volume = beer['volume']['value']6
Chỉ các đối số từ khóa được in. Điều này là có chủ ý, chúng tôi sẽ sử dụng nó trong các thử nghiệm tham số hóa
Tại thời điểm viết bài, API bia chỉ chứa 325 loại bia. Để làm việc trên một tập dữ liệu lớn, chúng tôi sao chép nó 100 lần và lưu trữ trong bộ nhớ. Tập dữ liệu kết quả chứa 32.500 loại bia
volume = beer['volume']['value']7
Để bắt chước API từ xa, các chức năng của chúng tôi sẽ chấp nhận các trình vòng lặp tương tự như giá trị trả về của
$ python -m pip install psycopg2
6volume = beer['volume']['value']8
Đối với điểm chuẩn, chúng tôi sẽ nhập dữ liệu bia vào cơ sở dữ liệu. Để loại bỏ các ảnh hưởng bên ngoài chẳng hạn như mạng, chúng tôi tìm nạp dữ liệu từ API trước và phân phát cục bộ
Để có được thời gian chính xác, chúng tôi "giả mạo" API từ xa
volume = beer['volume']['value']9
Trong một tình huống thực tế, bạn sẽ sử dụng trực tiếp hàm
$ python -m pip install psycopg2
6import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'0
Bây giờ chúng tôi đã sẵn sàng để bắt đầu
Để thiết lập đường cơ sở, chúng tôi bắt đầu với cách tiếp cận đơn giản nhất, chèn từng hàng một
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'1
Lưu ý rằng khi chúng tôi lặp lại các loại bia, chúng tôi chuyển đổi
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,1 thành
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,6 và trích xuất giá trị âm lượng từ trường
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 18 lồng nhau
Chạy chức năng này tạo ra đầu ra sau
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'2
Hàm mất 129 giây để nhập 32K hàng. Trình lược tả bộ nhớ cho thấy chức năng này tiêu tốn rất ít bộ nhớ
Theo trực giác, việc chèn từng hàng một nghe có vẻ không hiệu quả lắm. Việc chuyển đổi bối cảnh liên tục giữa chương trình và cơ sở dữ liệu phải làm chậm nó lại
Psycopg2 cung cấp một cách để chèn nhiều hàng cùng một lúc bằng cách sử dụng. Từ các tài liệu
Thực thi một thao tác cơ sở dữ liệu [truy vấn hoặc lệnh] đối với tất cả các bộ tham số hoặc ánh xạ được tìm thấy trong chuỗi vars_list
Âm thanh đầy hứa hẹn
Hãy thử nhập dữ liệu bằng cách sử dụng
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True1
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'3
Hàm trông rất giống với hàm trước đó và các phép biến đổi cũng giống như vậy. Sự khác biệt chính ở đây là trước tiên chúng tôi chuyển đổi tất cả dữ liệu trong bộ nhớ và chỉ sau đó nhập dữ liệu đó vào cơ sở dữ liệu
Chạy chức năng này tạo ra đầu ra sau
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'4
Điều này thật đáng thất vọng. Thời gian chỉ tốt hơn một chút, nhưng chức năng hiện tiêu tốn 2. 7MB bộ nhớ
Để xem xét mức sử dụng bộ nhớ, một tệp JSON chỉ chứa dữ liệu chúng tôi nhập nặng 25 MB trên đĩa. Xem xét tỷ lệ, sử dụng phương pháp này để nhập tệp 1GB sẽ cần 110MB bộ nhớ
Phương pháp trước đó tiêu tốn rất nhiều bộ nhớ vì dữ liệu đã chuyển đổi được lưu trữ trong bộ nhớ trước khi được psycopg xử lý
Hãy xem liệu chúng ta có thể sử dụng trình vòng lặp để tránh lưu trữ dữ liệu trong bộ nhớ không
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'5
Sự khác biệt ở đây là dữ liệu được chuyển đổi được "truyền trực tuyến" vào
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True1 bằng cách sử dụng trình vòng lặp
Hàm này tạo ra kết quả sau
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'6
Giải pháp "phát trực tuyến" của chúng tôi hoạt động như mong đợi và chúng tôi đã cố gắng đưa bộ nhớ về 0. Tuy nhiên, thời gian vẫn gần như giống nhau, thậm chí so với phương pháp từng người một
Tài liệu về psycopg có một lưu ý rất thú vị về
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True1 trong
Việc triển khai execmany[] hiện tại là [sử dụng một cách nói cực kỳ từ thiện] không hoạt động đặc biệt. Các chức năng này có thể được sử dụng để tăng tốc độ thực hiện lặp lại một câu lệnh đối với một tập hợp các tham số. Bằng cách giảm số lượng vòng lặp của máy chủ, hiệu suất có thể tốt hơn so với việc sử dụng execmany[]
Vì vậy, chúng tôi đã làm điều đó sai tất cả cùng
Chức năng ngay bên dưới phần này là
Thực hiện các nhóm câu lệnh trong ít vòng lặp máy chủ hơn
Hãy triển khai chức năng tải bằng cách sử dụng
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True5
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'7
Thực thi chức năng
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'8
Ồ. Đó là một bước nhảy vọt. Chức năng hoàn thành chỉ trong chưa đầy 4 giây. Nhanh hơn ~33 lần so với 129 giây mà chúng tôi đã bắt đầu
Hàm
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True5 sử dụng ít bộ nhớ hơn hàm
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True1 cho cùng một dữ liệu. Hãy cố gắng loại bỏ bộ nhớ bằng cách "truyền" dữ liệu vào
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True5 bằng cách sử dụng trình vòng lặp
import datetime def parse_first_brewed[text: str] -> datetime.date: parts = text.split['/'] if len[parts] == 2: return datetime.date[int[parts[1]], int[parts[0]], 1] elif len[parts] == 1: return datetime.date[int[parts[0]], 1, 1] else: assert False, 'Unknown date format'9
Thực thi chức năng
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]0
Chúng tôi có khoảng thời gian gần giống nhau, nhưng với ít bộ nhớ hơn
Tuy nhiên, khi đọc, lập luận
def create_staging_table[cursor] -> None: cursor.execute[""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers [ id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ]; """]1 đã thu hút sự chú ý của tôi
page_size – số lượng mục danh sách đối số tối đa để đưa vào mỗi câu lệnh. Nếu có nhiều mục hơn, hàm sẽ thực thi nhiều hơn một câu lệnh
Tài liệu trước đây đã tuyên bố rằng chức năng này hoạt động tốt hơn vì nó thực hiện ít vòng lặp hơn đối với cơ sở dữ liệu. Nếu đó là trường hợp, kích thước trang lớn hơn sẽ giảm số vòng lặp và dẫn đến thời gian tải nhanh hơn
Hãy thêm một đối số cho kích thước trang vào chức năng của chúng tôi để chúng tôi có thể thử nghiệm
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]1
Kích thước trang mặc định là 100. Hãy đánh giá các giá trị khác nhau và so sánh kết quả
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]2
Chúng tôi đã nhận được một số kết quả thú vị, hãy phá vỡ nó
- 1. Các kết quả tương tự như kết quả chúng tôi đã chèn từng hàng một
- 100. Đây là
def create_staging_table[cursor] -> None: cursor.execute[""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers [ id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ]; """]
1 mặc định, vì vậy kết quả tương tự như điểm chuẩn trước đây của chúng tôi - 1000. Thời gian ở đây nhanh hơn khoảng 40% và bộ nhớ thấp
- 10000. Thời gian không nhanh hơn nhiều so với kích thước trang 1000, nhưng bộ nhớ cao hơn đáng kể
Kết quả cho thấy có sự đánh đổi giữa bộ nhớ và tốc độ. Trong trường hợp này, có vẻ như điểm hấp dẫn là kích thước trang là 1000
Những viên ngọc quý trong tài liệu của psycopg không kết thúc bằng
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True5. Trong khi lướt qua tài liệu, một chức năng khác có tên đã thu hút sự chú ý của tôi
Thực thi một câu lệnh sử dụng GIÁ TRỊ với một chuỗi các tham số
Hàm
def create_staging_table[cursor] -> None: cursor.execute[""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers [ id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ]; """]4 hoạt động bằng cách tạo một danh sách GIÁ TRỊ khổng lồ cho truy vấn
Hãy cho nó một spin
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]3
Nhập bia bằng chức năng
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]4
Vì vậy, ngay lập tức, chúng tôi đã tăng tốc một chút so với
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True5. Tuy nhiên bộ nhớ hơi cao
Giống như chúng tôi đã làm trước đây, để giảm mức tiêu thụ bộ nhớ, chúng tôi cố gắng tránh lưu trữ dữ liệu trong bộ nhớ bằng cách sử dụng trình vòng lặp thay vì danh sách
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]5
Thực hiện chức năng tạo ra các kết quả sau
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]6
Vì vậy, thời gian gần như giống nhau, nhưng bộ nhớ trở về số không
Giống như
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True5, hàm
def create_staging_table[cursor] -> None: cursor.execute[""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers [ id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ]; """]4 cũng chấp nhận đối số
def create_staging_table[cursor] -> None: cursor.execute[""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers [ id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ]; """]1
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]7
Thực thi với các kích thước trang khác nhau
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]8
Cũng giống như
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True5, chúng tôi nhận thấy sự đánh đổi giữa bộ nhớ và tốc độ. Ở đây cũng vậy, điểm hấp dẫn là khoảng kích thước trang 1000. Tuy nhiên, sử dụng
def create_staging_table[cursor] -> None: cursor.execute[""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers [ id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ]; """]4, chúng tôi nhận được kết quả nhanh hơn ~20% so với cùng kích thước trang khi sử dụng
import psycopg2 connection = psycopg2.connect[ host="localhost", database="testload", user="haki", password=None, ] connection.autocommit = True5
Tài liệu chính thức cho PostgreSQL có toàn bộ phần về. Theo tài liệu, cách tốt nhất để tải dữ liệu vào cơ sở dữ liệu là sử dụng lệnh
volume = beer['volume']['value']03
Để sử dụng
volume = beer['volume']['value']03 từ Python, psycopg cung cấp một chức năng đặc biệt gọi là. Lệnh
volume = beer['volume']['value']03 yêu cầu tệp CSV. Hãy xem liệu chúng ta có thể chuyển đổi dữ liệu của mình thành CSV và tải dữ liệu đó vào cơ sở dữ liệu bằng cách sử dụng
volume = beer['volume']['value']05
>>> parse_first_brewed['09/2007'] datetime.date[2007, 9, 1] >>> parse_first_brewed['2006'] datetime.date[2006, 1, 1]9
Hãy phá vỡ nó
volume = beer['volume']['value']
08. Chuyển đổi một giá trị duy nhất- Thoát dòng mới. một số trường văn bản bao gồm các dòng mới, vì vậy chúng tôi thoát khỏi
volume = beer['volume']['value']
09 ->volume = beer['volume']['value']
10 - Các giá trị trống được chuyển đổi thành
volume = beer['volume']['value']
11. Chuỗivolume = beer['volume']['value']
12 là chuỗi mặc định được PostgreSQL sử dụng để biểu thị NULL trong COPY [điều này có thể được thay đổi bằng cách sử dụng tùy chọnvolume = beer['volume']['value']
13]
- Thoát dòng mới. một số trường văn bản bao gồm các dòng mới, vì vậy chúng tôi thoát khỏi
volume = beer['volume']['value']
14. Tạo một tệp giống như đối tượng bằng cách sử dụng. Một đối tượngvolume = beer['volume']['value']
16 chứa một chuỗi có thể được sử dụng như một tệp. Trong trường hợp của chúng tôi, tệp CSVvolume = beer['volume']['value']
17. Chuyển đổi bia thành hàng CSV- Chuyển đổi dữ liệu. các phép biến đổi trên
>>> beers = iter_beers_from_api[] >>> next[beers] {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': '//images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next[beers] {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': '//images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,
1 vàfrom typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 1
8 được thực hiện tại đây - Chọn một dấu phân cách. Một số trường trong tập dữ liệu chứa văn bản tự do có dấu phẩy. Để tránh xung đột, chúng tôi chọn ". " làm dấu phân cách [một tùy chọn khác là sử dụng
volume = beer['volume']['value']
20]
- Chuyển đổi dữ liệu. các phép biến đổi trên
Bây giờ hãy xem nếu tất cả công việc khó khăn này được đền đáp
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 10
Lệnh
volume = beer['volume']['value']03 là lệnh nhanh nhất mà chúng tôi từng thấy cho đến nay. Sử dụng
volume = beer['volume']['value']22, quy trình hoàn tất trong chưa đầy một giây. Tuy nhiên, có vẻ như phương pháp này lãng phí hơn nhiều về việc sử dụng bộ nhớ. Hàm này tiêu tốn 99 MB, lớn hơn gấp đôi kích thước tệp JSON của chúng tôi trên đĩa
Một trong những nhược điểm chính của việc sử dụng bản sao với
volume = beer['volume']['value']16 là toàn bộ tệp được tạo trong bộ nhớ. Điều gì sẽ xảy ra nếu thay vì tạo toàn bộ tệp trong bộ nhớ, chúng tôi tạo một đối tượng giống như tệp sẽ hoạt động như một vùng đệm giữa nguồn từ xa và lệnh
volume = beer['volume']['value']22. Bộ đệm sẽ sử dụng JSON thông qua trình vòng lặp, làm sạch và chuyển đổi dữ liệu, đồng thời xuất ra CSV sạch
Lấy cảm hứng từ câu trả lời tràn ngăn xếp này, chúng tôi đã tạo một đối tượng cung cấp một trình vòng lặp và cung cấp một giao diện giống như tệp
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 11
Để minh họa cách thức hoạt động của tính năng này, đây là cách một đối tượng giống như tệp CSV có thể được tạo từ danh sách các số
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 12
Lưu ý rằng chúng tôi đã sử dụng
volume = beer['volume']['value']25 như một tệp. Bên trong, nó chỉ tìm nạp các hàng từ
volume = beer['volume']['value']26 khi bộ đệm dòng bên trong của nó trống
Chức năng tải bằng cách sử dụng
volume = beer['volume']['value']27 trông như thế này
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 13
Sự khác biệt chính là tệp CSV của bia được sử dụng theo yêu cầu và dữ liệu không được lưu trữ trong bộ nhớ sau khi được sử dụng
Hãy thực hiện chức năng và xem kết quả
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 14
Tuyệt. Thời gian thấp và bộ nhớ trở về 0
Trong nỗ lực giảm hiệu suất cuối cùng, chúng tôi nhận thấy rằng giống như
def create_staging_table[cursor] -> None: cursor.execute[""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers [ id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ]; """]1, lệnh
volume = beer['volume']['value']03 cũng chấp nhận một đối số tương tự có tên là
volume = beer['volume']['value']30
kích thước - kích thước của bộ đệm được sử dụng để đọc từ tệp
Hãy thêm một đối số
volume = beer['volume']['value']30 vào hàm
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 15
Giá trị mặc định cho kích thước là 8192, tức là
volume = beer['volume']['value']32, vì vậy chúng tôi sẽ giữ kích thước ở lũy thừa 2
from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api[page_size: int = 5] -> Iterator[Dict[str, Any]]: session = requests.Session[] page = 1 while True: response = session.get['//api.punkapi.com/v2/beers?' + urlencode[{ 'page': page, 'per_page': page_size }]] response.raise_for_status[] data = response.json[] if not data: break yield from data page += 16
Không giống như các ví dụ trước, có vẻ như không có sự đánh đổi giữa tốc độ và bộ nhớ. Điều này có ý nghĩa bởi vì phương pháp này được thiết kế để không tiêu tốn bộ nhớ. Tuy nhiên, chúng tôi nhận được thời gian khác nhau khi thay đổi kích thước trang. Đối với tập dữ liệu của chúng tôi, 8192 mặc định là điểm hấp dẫn
Tóm tắt kết quả
Chức năngThời gian [giây]Bộ nhớ [MB]______5_______33128. 80. 08203125____5_______34124. 72. 765625____5_______35129. 30. 0____5_______363. 9172. 50390625____5_______37130. 20. 0____5_______384. 3330. 0____5_______392. 5370. 2265625____5_______402. 58525. 4453125____5_______413. 6664. 50390625____5_______42127. 40. 0____5_______433. 6770. 0____5_______441. 4680. 0____5_______451. 5032. 25____5_______460. 627499. 109375____5_______470. 45360. 0____5_______480. 45960. 0____5_______490. 46490. 0____5_______500. 61710. 0
Câu hỏi lớn bây giờ là Tôi nên sử dụng cái gì?
Mỗi phương pháp đều có ưu nhược điểm riêng và phù hợp với từng trường hợp khác nhau.
Mua mang về
Ưu tiên các phương pháp tích hợp sẵn cho các loại dữ liệu phức tạp
Thực thi nhiều, thực thi các giá trị và xử lý hàng loạt việc chuyển đổi giữa các loại dữ liệu Python sang các loại cơ sở dữ liệu. Phương pháp tiếp cận CSV yêu cầu thoát
Mua mang về
Thích các phương pháp tích hợp sẵn cho khối lượng dữ liệu nhỏ
Các phương pháp tích hợp dễ đọc hơn và ít có khả năng bị hỏng trong tương lai. Nếu bộ nhớ và thời gian không phải là vấn đề, hãy giữ cho nó đơn giản