Load data from PostgreSQL to Redis using Python (loop, pipeline, multiprocesses, and both)

Irakli DD
4 min readOct 19, 2023

Here is showed four different methods of transferring data from PostgreSQL to Redis using Python. The selection of methods depends on the amount of data we’re working with. These methods involve a basic loop, a loop with a pipeline function, Python multiprocessing, and the combined use of pipeline and multiprocessing.

Create table and generate sample data with 10 000 records.

CREATE TABLE players (
player_id integer,
name text,
score integer,
current_level integer,
won_prizes integer[],
register_date date
);
-- Generate random data for the players table
INSERT INTO players (player_id, name, score, current_level, won_prizes, register_date)
SELECT
(floor(random() * 900000) + 100000)::integer,
(md5(random()::text))::text,
floor(random() * 101),
floor(random() * 10) + 1,
ARRAY[ floor(random() * 21) + 1, floor(random() * 21) + 1, floor(random() * 21) + 1 ],
'2022-01-01'::date + (floor(random() * 365))::integer
FROM generate_series(1, 10000);

Python libraries

# for measure execution time
from datetime import datetime

import redis
import json

# redis connection configuration
r = redis.Redis(host='198.', port=1, db=0, password='***')

import pandas as pd
import numpy as np
from sqlalchemy import create_engine

# connect to Postgres
db_uri = 'postgresql://...'
engine = create_engine(db_uri)

# create multiple processes
import multiprocessing
# for asynchronously executing functions using threads or processes
import concurrent.futures

Sava data into the dataframe

#
sql = "SELECT * FROM backgammon.players"
df = pd.read_sql(sql, engine)
df.head()

Method 1 - simple loop

https://redis.io/docs/data-types/hashes/

In all of the methods I use Hash file type and hset function.

in this script I made loop of dataframe and made key and value inside:

for index, row in chunk.iterrows():

# generating key and value of redis hash type
r_key = 'bg_pl:' + str(row[0])
r_value_json = f'{{"username": "{row[1]}", "score": "{row[2]}", "current_level": "{row[3]}",\
"won_prizes": "{row[4]}", "register_date": "{row[5]}"}}'
r_value_fields = json.loads(r_value_json)

res1 = r.hset(
r_key,
mapping=r_value_fields,
)
execution time for 10, 100 and 1000 records, in seconds

Method 2 - Pipeline

https://redis.io/docs/manual/pipelining/

pipe = r.pipeline()

for index, row in chunk.iterrows():

# generating key and value of redis hash type
r_key = 'bg_pl:' + str(row[0])
r_value_fields = {
"username": str(row[1]),
"score": str(row[2]),
"current_level": str(row[3]),
"won_prizes": str(row[4]),
"register_date": str(row[5])
}


for field, value in r_value_fields.items():
pipe.hset(r_key, field, value)


results = pipe.execute()
pipe.reset()
execution time for 10, 100, 1000 and 10 000 records, in seconds

Method 3 - Multi Processing

Count CPU possibilities:

multiprocessing.cpu_count()

max_threads = 4 # Adjust as needed
# Function to process a single row
def process_row(row):
# Generating key and value for Redis hash
r_key = 'bg_pl:' + str(row[0])
r_value_json = f'{{"username": "{row[1]}", "score": "{row[2]}", "current_level": "{row[3]}", "won_prizes": "{row[4]}", "register_date": "{row[5]}"}}'
r_value_fields = json.loads(r_value_json)

# Set the values in Redis
r.hset(r_key, mapping=r_value_fields)


start_time = datetime.now()

chunk = df.sample(10000)
max_threads = 4 # Adjust as needed

# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:

future_to_row = {executor.submit(process_row, row): row for _, row in chunk.iterrows()}

# Wait for the tasks to complete
for future in concurrent.futures.as_completed(future_to_row):
row = future_to_row[future]
try:
future.result()
except Exception as e:
print(f"An error occurred for row {row}: {e}")



end_time = datetime.now()



execution_time = (end_time - start_time).total_seconds()
formatted_time = "{:.2f}".format(execution_time)


exec_time = np.append(exec_time, formatted_time)
execution time for 10, 100, 1000 and 10 000 records, in seconds

Method 4 - Multi threads with Pipeline

def process_row(row):
try:

# Generating key and value for Redis hash
r_key = 'bg_pl:' + str(row[0])
r_value_fields = {
"username": str(row[1]),
"score": str(row[2]),
"current_level": str(row[3]),
"won_prizes": json.dumps(row[4]), # Serialize won_prizes as JSON
"register_date": str(row[5])
}

pipe = r.pipeline()

for field, value in r_value_fields.items():
pipe.hset(r_key, field, value)

results = pipe.execute()
except Exception as e:
print(f"An error occurred: {e}")

chunk = df.sample(100)
max_threads = 4 # Adjust as needed

# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:
future_to_row = {executor.submit(process_row, row): row for _, row in chunk.iterrows()}

# Wait for the tasks to complete
for future in concurrent.futures.as_completed(future_to_row):
row = future_to_row[future]
try:
future.result()
except Exception as e:
print(f"An error occurred for row {row}: {e}")
execution time for 10, 100, 1000 and 10 000 records, in seconds
Result

--

--