Load data from PostgreSQL to Redis using Python (loop, pipeline, multiprocesses, and both)
Here is showed four different methods of transferring data from PostgreSQL to Redis using Python. The selection of methods depends on the amount of data we’re working with. These methods involve a basic loop, a loop with a pipeline function, Python multiprocessing, and the combined use of pipeline and multiprocessing.
Create table and generate sample data with 10 000 records.
CREATE TABLE players (
player_id integer,
name text,
score integer,
current_level integer,
won_prizes integer[],
register_date date
);
-- Generate random data for the players table
INSERT INTO players (player_id, name, score, current_level, won_prizes, register_date)
SELECT
(floor(random() * 900000) + 100000)::integer,
(md5(random()::text))::text,
floor(random() * 101),
floor(random() * 10) + 1,
ARRAY[ floor(random() * 21) + 1, floor(random() * 21) + 1, floor(random() * 21) + 1 ],
'2022-01-01'::date + (floor(random() * 365))::integer
FROM generate_series(1, 10000);
Python libraries
# for measure execution time
from datetime import datetime
import redis
import json
# redis connection configuration
r = redis.Redis(host='198.', port=1, db=0, password='***')
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
# connect to Postgres
db_uri = 'postgresql://...'
engine = create_engine(db_uri)
# create multiple processes
import multiprocessing
# for asynchronously executing functions using threads or processes
import concurrent.futures
Sava data into the dataframe
#
sql = "SELECT * FROM backgammon.players"
df = pd.read_sql(sql, engine)
df.head()
Method 1 - simple loop
In all of the methods I use Hash file type and hset function.
in this script I made loop of dataframe and made key and value inside:
for index, row in chunk.iterrows():
# generating key and value of redis hash type
r_key = 'bg_pl:' + str(row[0])
r_value_json = f'{{"username": "{row[1]}", "score": "{row[2]}", "current_level": "{row[3]}",\
"won_prizes": "{row[4]}", "register_date": "{row[5]}"}}'
r_value_fields = json.loads(r_value_json)
res1 = r.hset(
r_key,
mapping=r_value_fields,
)
Method 2 - Pipeline
pipe = r.pipeline()
for index, row in chunk.iterrows():
# generating key and value of redis hash type
r_key = 'bg_pl:' + str(row[0])
r_value_fields = {
"username": str(row[1]),
"score": str(row[2]),
"current_level": str(row[3]),
"won_prizes": str(row[4]),
"register_date": str(row[5])
}
for field, value in r_value_fields.items():
pipe.hset(r_key, field, value)
results = pipe.execute()
pipe.reset()
Method 3 - Multi Processing
Count CPU possibilities:
multiprocessing.cpu_count()
max_threads = 4 # Adjust as needed
# Function to process a single row
def process_row(row):
# Generating key and value for Redis hash
r_key = 'bg_pl:' + str(row[0])
r_value_json = f'{{"username": "{row[1]}", "score": "{row[2]}", "current_level": "{row[3]}", "won_prizes": "{row[4]}", "register_date": "{row[5]}"}}'
r_value_fields = json.loads(r_value_json)
# Set the values in Redis
r.hset(r_key, mapping=r_value_fields)
start_time = datetime.now()
chunk = df.sample(10000)
max_threads = 4 # Adjust as needed
# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:
future_to_row = {executor.submit(process_row, row): row for _, row in chunk.iterrows()}
# Wait for the tasks to complete
for future in concurrent.futures.as_completed(future_to_row):
row = future_to_row[future]
try:
future.result()
except Exception as e:
print(f"An error occurred for row {row}: {e}")
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds()
formatted_time = "{:.2f}".format(execution_time)
exec_time = np.append(exec_time, formatted_time)
Method 4 - Multi threads with Pipeline
def process_row(row):
try:
# Generating key and value for Redis hash
r_key = 'bg_pl:' + str(row[0])
r_value_fields = {
"username": str(row[1]),
"score": str(row[2]),
"current_level": str(row[3]),
"won_prizes": json.dumps(row[4]), # Serialize won_prizes as JSON
"register_date": str(row[5])
}
pipe = r.pipeline()
for field, value in r_value_fields.items():
pipe.hset(r_key, field, value)
results = pipe.execute()
except Exception as e:
print(f"An error occurred: {e}")
chunk = df.sample(100)
max_threads = 4 # Adjust as needed
# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:
future_to_row = {executor.submit(process_row, row): row for _, row in chunk.iterrows()}
# Wait for the tasks to complete
for future in concurrent.futures.as_completed(future_to_row):
row = future_to_row[future]
try:
future.result()
except Exception as e:
print(f"An error occurred for row {row}: {e}")