Skip to content

psycopg2

Usage

python
import psycopg2
from psycopg2.extras import RealDictCursor


### init postgres
conn = psycopg2.connect(
    host=os.environ["DB_HOSTNAME"],
    dbname=os.environ["DB_NAME"],
    user=os.environ["DB_USERNAME"],
    password=os.environ["DB_PASSWORD"],
)

cur = conn.cursor(cursor_factory=RealDictCursor)
# or `POSTGRES_URI = f"postgresql:///{POSTGRES_DBNAME}?host={POSTGRES_HOSTNAME}&user={POSTGRES_USERNAME}&password={POSTGRES_PASSWORD}&port={POSTGRES_PORT}&sslmode=disable"`


def query(QUERY: str):
    cur.execute(QUERY)

    return [dict(r) for r in cur.fetchall()]

Bulk write

python
import json
import os

import dotenv
import psycopg2
import requests
from psycopg2.extras import execute_values
from retrying import retry

dotenv.load_dotenv()

##############
# fetch data
##############
print("fetching data...")


@retry(
    wait_exponential_multiplier=1000,
    wait_exponential_max=10000,
    stop_max_attempt_number=5,
)
def make_request():
    url = "https://example.com"

    payload = json.dumps({"flagload": "F"})
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
        "Authorization": f'Bearer {os.getenv("BEARER_ACCESS_TOKEN")}',
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    response.raise_for_status()  # Raise an HTTPError for bad responses (4xx and 5xx status codes)

    return response.json()


try:
    response = make_request()
    print("Request successful")
except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")


# ## temp
# import pickle
# with open('data.pickle', 'wb') as f:
#     pickle.dump(response, f)

##############
# write to db
##############
# ## temp
# import pickle
# with open('data.pickle' , 'rb') as f:
#     response = pickle.load(f)

TABLE_NAME = "employee"
COLUMN_NAMES = (
    "foo",
    "bar",
)


ON_CONFLICT_COLUMN = "foo"

# CREATE TABLE employee (
#     foo VARCHAR(255) PRIMARY KEY,
#     bar VARCHAR(255)
# );

# response = response[:2] # debug
data = response


connection_params = {
    "host": os.getenv("DB_HOSTNAME"),
    "user": os.getenv("DB_USERNAME"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "port": os.getenv("DB_PORT"),
}

print("inserting...")
with psycopg2.connect(**connection_params) as conn:
    with conn.cursor() as cursor:
        sql_query = f"""
            INSERT INTO {TABLE_NAME} ({', '.join(COLUMN_NAMES)})
            VALUES %s
            ON CONFLICT ({ON_CONFLICT_COLUMN}) DO UPDATE
            SET({', '.join(COLUMN_NAMES)}) = ROW(excluded.*)
        """

        print(sql_query)

        # Use execute_values to perform bulk insert
        execute_values(
            cursor,
            sql_query,
            data,
            template=f"({', '.join(['%s' for i in range(len(COLUMN_NAMES))])})",
        )

        # Commit the transaction
        conn.commit()

print("successfully inserted")