Hướng dẫn python elasticache redis example - ví dụ về trăn đàn hồi redis

Trong hướng dẫn này, bạn sử dụng SDK AWS cho Python (BOTO3) để viết các chương trình đơn giản để thực hiện các hoạt động đàn hồi sau:

  • Tạo các cụm đàn hồi (bật chế độ cụm và chế độ cụm bị vô hiệu hóa)

  • Kiểm tra xem người dùng hoặc nhóm người dùng có tồn tại không, nếu không hãy tạo chúng (chỉ Redis 6.0 trở đi)

  • Kết nối với đàn hồi

  • Thực hiện các hoạt động như cài đặt và nhận chuỗi, đọc và viết cho Steams và xuất bản và đăng ký từ Pub/Sub Channel.

Khi bạn làm việc thông qua hướng dẫn này, bạn có thể tham khảo tài liệu AWS SDK cho Python (BOTO). Phần sau đây là đặc trưng cho đàn hồi:

Ứng dụng khách cấp thấp đàn hồi

Điều kiện tiên quyết hướng dẫn

  • Thiết lập khóa truy cập AWS để sử dụng SDK AWS. Để biết thêm thông tin, xem thiết lập.

  • Cài đặt Python 3.0 trở lên. Để biết thêm thông tin, xem https://www.python.org/doads. Để được hướng dẫn, hãy xem QuickStart trong tài liệu BOTO 3.https://www.python.org/downloads. For instructions, see Quickstart in the Boto 3 documentation.

Các ví dụ sau đây sử dụng SDK BOTO3 để đàn hồi để hoạt động với sự giãn nở.

Chủ đề

  • Đặt và nhận chuỗi
  • Đặt và nhận băm với nhiều mục
  • Xuất bản (Viết) và Đăng ký (đọc) từ Pub/Sub Channel
  • Viết và đọc từ một luồng

Đặt và nhận chuỗi

Đặt và nhận băm với nhiều mục

import time
import logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s')

keyName='mykey'
currTime=time.ctime(time.time())

# Set the key 'mykey' with the current date and time as value. 
# The Key will expire and removed from cache in 60 seconds.
redis.set(keyName, currTime, ex=60)

# Sleep just for better illustration of TTL (expiration) value
time.sleep(5)

# Retrieve the key value and current TTL
keyValue=redis.get(keyName)
keyTTL=redis.ttl(keyName)

logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValue, keyTTL))

Xuất bản (Viết) và Đăng ký (đọc) từ Pub/Sub Channel

Viết và đọc từ một luồng

Đặt và nhận băm với nhiều mục

Xuất bản (Viết) và Đăng ký (đọc) từ Pub/Sub Channel

import logging
import time

logging.basicConfig(level=logging.INFO,format='%(asctime)s: %(message)s')

keyName='mykey'
keyValues={'datetime': time.ctime(time.time()), 'epochtime': time.time()}

# Set the hash 'mykey' with the current date and time in human readable format (datetime field) and epoch number (epochtime field). 
redis.hset(keyName, mapping=keyValues)

# Set the key to expire and removed from cache in 60 seconds.
redis.expire(keyName, 60)

# Sleep just for better illustration of TTL (expiration) value
time.sleep(5)

# Retrieves all the fields and current TTL
keyValues=redis.hgetall(keyName)
keyTTL=redis.ttl(keyName)

logging.info("Key {} was set at {} and has {} seconds until expired".format(keyName, keyValues, keyTTL))

Xuất bản (Viết) và Đăng ký (đọc) từ Pub/Sub Channel

Viết và đọc từ một luồng

Xuất bản (Viết) và Đăng ký (đọc) từ Pub/Sub Channel

Viết và đọc từ một luồng

import logging
import time

def handlerFunction(message):
    """Prints message got from PubSub channel to the log output

    Return None
    :param message: message to log
    """
    logging.info(message)

logging.basicConfig(level=logging.INFO)
redis = Redis(host="redis202104053.tihewd.ng.0001.use1.cache.amazonaws.com", port=6379, decode_responses=True)


# Creates the subscriber connection on "mychannel"
subscriber = redis.pubsub()
subscriber.subscribe(**{'mychannel': handlerFunction})

# Creates a new thread to watch for messages while the main process continues with its routines
thread = subscriber.run_in_thread(sleep_time=0.01)

# Creates publisher connection on "mychannel"
redis.publish('mychannel', 'My message')

# Publishes several messages. Subscriber thread will read and print on log.
while True:
    redis.publish('mychannel',time.ctime(time.time()))
    time.sleep(1)

Xuất bản (Viết) và Đăng ký (đọc) từ Pub/Sub Channel

Viết và đọc từ một luồng

Viết và đọc từ một luồng

Sao chép chương trình sau và dán nó vào một tệp có tên readwriteSream.py.

from redis import Redis
import redis.exceptions as exceptions
import logging
import time
import threading

logging.basicConfig(level=logging.INFO)

def writeMessage(streamName):
    """Starts a loop writting the current time and thread name to 'streamName'

    :param streamName: Stream (key) name to write messages.
    """
    fieldsDict={'writerId':threading.currentThread().getName(),'myvalue':None}
    while True:
        fieldsDict['myvalue'] = time.ctime(time.time())
        redis.xadd(streamName,fieldsDict)
        time.sleep(1)

def readMessage(groupName=None,streamName=None):
    """Starts a loop reading from 'streamName'
    Multiple threads will read from the same stream consumer group. Consumer group is used to coordinate data distribution.
    Once a thread acknowleges the message, it won't be provided again. If message wasn't acknowledged, it can be served to another thread.

    :param groupName: stream group were multiple threads will read.
    :param streamName: Stream (key) name where messages will be read.
    """

    readerID=threading.currentThread().getName()
    while True:
        try:
            # Check if the stream has any message
            if redis.xlen(streamName)>0:
                # Check if if the messages are new (not acknowledged) or not (already processed)
                streamData=redis.xreadgroup(groupName,readerID,{streamName:'>'},count=1)
                if len(streamData) > 0:
                    msgId,message = streamData[0][1][0]
                    logging.info("{}: Got {} from ID {}".format(readerID,message,msgId))
                    #Do some processing here. If the message has been processed sucessfuly, acknowledge it and (optional) delete the message.
                    redis.xack(streamName,groupName,msgId)
                    logging.info("Stream message ID {} read and processed successfuly by {}".format(msgId,readerID))
                    redis.xdel(streamName,msgId)
            else:
                pass
        except:
            raise
            
        time.sleep(0.5)

# Creates the stream 'mystream' and consumer group 'myworkergroup' where multiple threads will write/read.
try:
    redis.xgroup_create('mystream','myworkergroup',mkstream=True)
except exceptions.ResponseError as e:
    logging.info("Consumer group already exists. Will continue despite the error: {}".format(e))
except:
    raise

# Starts 5 writer threads.
for writer_no in range(5):
    writerThread = threading.Thread(target=writeMessage, name='writer-'+str(writer_no), args=('mystream',),daemon=True)
    writerThread.start()

# Starts 10 reader threads
for reader_no in range(10):
    readerThread = threading.Thread(target=readMessage, name='reader-'+str(reader_no), args=('myworkergroup','mystream',),daemon=True)
    readerThread.daemon = True
    readerThread.start()

# Keep the code running for 30 seconds
time.sleep(30)

Để chạy chương trình, hãy nhập lệnh sau:

python ReadWriteStream.py