Usage Examples¶
Practical examples for using pycubrid — from basic CRUD to advanced features.
Table of Contents¶
- Basic Connection
- CRUD Operations
- Create Table
- Insert Rows
- Select Rows
- Update Rows
- Delete Rows
- Transactions
- Manual Commit/Rollback
- Context Manager
- Autocommit Mode
- Parameterized Queries
- Batch Operations
- executemany
- executemany_batch
- Fetching Strategies
- Cursor as Iterator
- Column Metadata
- LOB Handling
- Schema Introspection
- Stored Procedures
- Date and Time
- Error Handling
- SQLAlchemy Integration
- Connection Pooling Pattern
Basic Connection¶
import pycubrid
# Connect to CUBRID
conn = pycubrid.connect(
host="localhost",
port=33000,
database="testdb",
user="dba",
password="",
)
# Check server version
print(f"Server: {conn.get_server_version()}")
# Do work...
cur = conn.cursor()
cur.execute("SELECT 1 + 1")
print(cur.fetchone()) # (2,)
# Cleanup
cur.close()
conn.close()
Tip
Keep one connection open for related operations instead of repeatedly opening/closing per statement.
CRUD Operations¶
mermaid
flowchart LR
A[CREATE TABLE] --> B[INSERT]
B --> C[SELECT]
C --> D[UPDATE]
D --> E[DELETE]
E --> F[COMMIT]
Create Table¶
import pycubrid
conn = pycubrid.connect(database="testdb")
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS cookbook_users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(200) UNIQUE,
age INT DEFAULT 0,
created_at DATETIME DEFAULT SYS_DATETIME
)
""")
conn.commit()
cur.close()
conn.close()
Insert Rows¶
conn = pycubrid.connect(database="testdb")
cur = conn.cursor()
# Single insert
cur.execute(
"INSERT INTO cookbook_users (name, email, age) VALUES (?, ?, ?)",
["Alice", "alice@example.com", 30],
)
print(f"Inserted ID: {cur.lastrowid}")
# Multiple inserts
users = [
["Bob", "bob@example.com", 25],
["Carol", "carol@example.com", 28],
["Dave", "dave@example.com", 35],
]
cur.executemany(
"INSERT INTO cookbook_users (name, email, age) VALUES (?, ?, ?)",
users,
)
print(f"Inserted {cur.rowcount} rows")
conn.commit()
cur.close()
conn.close()
Warning
Do not use string interpolation for SQL. Always use ? placeholders with parameter lists or tuples.
Select Rows¶
conn = pycubrid.connect(database="testdb")
cur = conn.cursor()
# All rows
cur.execute("SELECT id, name, email, age FROM cookbook_users ORDER BY id")
for row in cur.fetchall():
print(f" {row[0]}: {row[1]} ({row[2]}) age={row[3]}")
# Filtered query
cur.execute("SELECT name, age FROM cookbook_users WHERE age > ?", [27])
print(f"\nUsers older than 27:")
for name, age in cur:
print(f" {name}: {age}")
cur.close()
conn.close()
Update Rows¶
conn = pycubrid.connect(database="testdb")
cur = conn.cursor()
cur.execute(
"UPDATE cookbook_users SET age = ? WHERE name = ?",
[31, "Alice"],
)
print(f"Updated {cur.rowcount} row(s)")
conn.commit()
cur.close()
conn.close()
Delete Rows¶
conn = pycubrid.connect(database="testdb")
cur = conn.cursor()
cur.execute("DELETE FROM cookbook_users WHERE name = ?", ["Dave"])
print(f"Deleted {cur.rowcount} row(s)")
conn.commit()
cur.close()
conn.close()
Transactions¶
Manual Commit/Rollback¶
conn = pycubrid.connect(database="testdb")
cur = conn.cursor()
try:
cur.execute("INSERT INTO cookbook_users (name, email) VALUES (?, ?)",
["Eve", "eve@example.com"])
cur.execute("INSERT INTO cookbook_users (name, email) VALUES (?, ?)",
["Frank", "frank@example.com"])
conn.commit()
print("Transaction committed")
except pycubrid.Error as e:
conn.rollback()
print(f"Transaction rolled back: {e}")
finally:
cur.close()
conn.close()
Context Manager¶
The connection context manager auto-commits on success and auto-rolls back on exception:
with pycubrid.connect(database="testdb") as conn:
cur = conn.cursor()
cur.execute("INSERT INTO cookbook_users (name, email) VALUES (?, ?)",
["Grace", "grace@example.com"])
# Auto-commits when exiting the `with` block without exception
# Auto-rollbacks if an exception is raised
Note
With context manager usage, explicit conn.commit() is unnecessary on the success path.
Autocommit Mode¶
# Explicitly enable autocommit (default is False)
conn = pycubrid.connect(database="testdb", autocommit=True)
cur = conn.cursor()
# Each statement commits immediately — no explicit commit needed
cur.execute("INSERT INTO cookbook_users (name) VALUES (?)", ["Heidi"])
cur.execute("INSERT INTO cookbook_users (name) VALUES (?)", ["Ivan"])
# Can also toggle dynamically
conn.autocommit = False
cur.execute("INSERT INTO cookbook_users (name) VALUES (?)", ["Judy"])
conn.commit() # Manual commit required now
cur.close()
conn.close()
Parameterized Queries¶
pycubrid uses qmark parameter style — ? placeholders:
cur = conn.cursor()
# Positional parameters (list or tuple)
cur.execute("SELECT * FROM cookbook_users WHERE name = ? AND age > ?", ["Alice", 25])
# Supported types
import datetime
from decimal import Decimal
cur.execute("""
INSERT INTO cookbook_products (name, price, available, launch_date)
VALUES (?, ?, ?, ?)
""", [
"Widget", # str → 'Widget'
Decimal("19.99"), # Decimal → 19.99
True, # bool → 1
datetime.date(2025, 6, 15), # date → DATE'2025-06-15'
])
# None maps to NULL
cur.execute("INSERT INTO cookbook_users (name, email) VALUES (?, ?)",
["Nobody", None])
Parameter binding reference¶
| Parameter shape | Example | Notes |
|---|---|---|
list / tuple |
cur.execute(sql, ["Alice", 25]) |
Recommended for positional clarity |
None |
cur.execute(sql, [None]) |
Encoded as SQL NULL |
Danger
Passing a scalar or mapping instead of a non-string sequence (for example params="Alice"
or params={"name": "Alice"}) raises ProgrammingError.
Batch Operations¶
executemany¶
Execute the same SQL with different parameter sets:
cur = conn.cursor()
users = [
("Alice", 30),
("Bob", 25),
("Carol", 28),
]
cur.executemany(
"INSERT INTO cookbook_users (name, age) VALUES (?, ?)",
users,
)
print(f"Inserted {cur.rowcount} rows") # 3
conn.commit()
executemany_batch¶
Execute different SQL statements in a single server round-trip:
cur = conn.cursor()
results = cur.executemany_batch([
"INSERT INTO cookbook_users (name, age) VALUES ('Xena', 40)",
"INSERT INTO cookbook_users (name, age) VALUES ('Yuri', 22)",
"UPDATE cookbook_users SET age = 26 WHERE name = 'Bob'",
])
for stmt_type, count in results:
print(f"Statement type {stmt_type}: affected {count} row(s)")
conn.commit()
Fetching Strategies¶
cur = conn.cursor()
cur.execute("SELECT id, name FROM cookbook_users ORDER BY id")
# fetchone — one row at a time
row = cur.fetchone()
print(f"First: {row}")
# fetchmany — batch of N rows
batch = cur.fetchmany(3)
print(f"Next 3: {batch}")
# fetchall — everything remaining
rest = cur.fetchall()
print(f"Remaining: {len(rest)} rows")
Array Size¶
Control the default batch size for fetchmany():
cur.arraysize = 50
cur.execute("SELECT * FROM cookbook_users")
batch = cur.fetchmany() # Fetches up to 50 rows
Cursor as Iterator¶
cur = conn.cursor()
cur.execute("SELECT name, age FROM cookbook_users")
for name, age in cur:
print(f"{name} is {age} years old")
Column Metadata¶
cur = conn.cursor()
cur.execute("SELECT id, name, email, age FROM cookbook_users")
print("Columns:")
for col in cur.description:
print(f" {col[0]:15s} type={col[1]:3d} precision={col[4]} nullable={col[6]}")
# Output:
# id type= 8 precision=10 nullable=False
# name type= 2 precision=100 nullable=False
# email type= 2 precision=200 nullable=True
# age type= 8 precision=10 nullable=True
LOB Handling¶
Inserting LOB Data¶
For most use cases, insert strings or bytes directly:
cur = conn.cursor()
# CLOB — insert text directly
cur.execute("""
CREATE TABLE IF NOT EXISTS cookbook_documents (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(100),
content CLOB
)
""")
conn.commit()
cur.execute(
"INSERT INTO cookbook_documents (title, content) VALUES (?, ?)",
["Report", "This is a large text document..."],
)
conn.commit()
Reading LOB Data¶
LOB columns return a dict with metadata:
cur.execute("SELECT title, content FROM cookbook_documents WHERE id = 1")
row = cur.fetchone()
title = row[0] # "Report"
lob_info = row[1] # dict
print(f"LOB type: {lob_info['lob_type']}") # 24 (CLOB)
print(f"LOB length: {lob_info['lob_length']}") # byte length
print(f"Locator: {lob_info['file_locator']}") # server file path
Using the Lob Class¶
For fine-grained LOB control:
from pycubrid.constants import CUBRIDDataType
# Create a LOB handle on the server
lob = conn.create_lob(CUBRIDDataType.CLOB) # 24
# Write data
lob.write(b"Hello, CUBRID LOB!")
# Read data back
data = lob.read(length=1024, offset=0)
print(data) # b"Hello, CUBRID LOB!"
Schema Introspection¶
from pycubrid.constants import CCISchemaType
# List all tables
packet = conn.get_schema_info(CCISchemaType.CLASS)
print(f"Found {packet.tuple_count} tables")
# List columns of a specific table
packet = conn.get_schema_info(CCISchemaType.ATTRIBUTE, table_name="cookbook_users")
print(f"Table has {packet.tuple_count} columns")
# Get primary key info
packet = conn.get_schema_info(CCISchemaType.PRIMARY_KEY, table_name="cookbook_users")
print(f"Primary key entries: {packet.tuple_count}")
Stored Procedures¶
cur = conn.cursor()
# Create a stored procedure
cur.execute("""
CREATE OR REPLACE PROCEDURE cookbook_greet(name VARCHAR)
AS LANGUAGE JAVA
NAME 'com.example.Greet.greet(java.lang.String)'
""")
conn.commit()
# Call it
cur.callproc("cookbook_greet", ["World"])
Note: CUBRID stored procedures are Java-based. Ensure the Java class is registered on the server.
Date and Time¶
import datetime
import pycubrid
conn = pycubrid.connect(database="testdb")
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS cookbook_events (
id INT AUTO_INCREMENT PRIMARY KEY,
event_name VARCHAR(100),
event_date DATE,
event_time TIME,
event_ts DATETIME
)
""")
conn.commit()
# Insert with Python datetime objects
cur.execute(
"INSERT INTO cookbook_events (event_name, event_date, event_time, event_ts) VALUES (?, ?, ?, ?)",
[
"Launch Party",
datetime.date(2025, 6, 15),
datetime.time(14, 30, 0),
datetime.datetime(2025, 6, 15, 14, 30, 0),
],
)
conn.commit()
# Read back — returns Python datetime objects
cur.execute("SELECT event_name, event_date, event_time, event_ts FROM cookbook_events")
row = cur.fetchone()
print(f"Event: {row[0]}")
print(f"Date: {row[1]}") # datetime.date(2025, 6, 15)
print(f"Time: {row[2]}") # datetime.time(14, 30)
print(f"TS: {row[3]}") # datetime.datetime(2025, 6, 15, 14, 30)
# Using PEP 249 constructors
d = pycubrid.Date(2025, 1, 1)
t = pycubrid.Time(12, 0, 0)
ts = pycubrid.Timestamp(2025, 1, 1, 12, 0, 0)
cur.close()
conn.close()
Error Handling¶
Catching Specific Errors¶
import pycubrid
conn = pycubrid.connect(database="testdb")
cur = conn.cursor()
try:
cur.execute("INSERT INTO cookbook_users (email) VALUES (?)", ["duplicate@example.com"])
cur.execute("INSERT INTO cookbook_users (email) VALUES (?)", ["duplicate@example.com"])
conn.commit()
except pycubrid.IntegrityError as e:
print(f"Duplicate key: {e.msg}")
conn.rollback()
except pycubrid.ProgrammingError as e:
print(f"SQL error: {e.msg}")
conn.rollback()
except pycubrid.OperationalError as e:
print(f"Connection error: {e.msg}")
except pycubrid.Error as e:
print(f"Database error: {e.msg} (code={e.code})")
conn.rollback()
finally:
cur.close()
conn.close()
Checking Closed State¶
conn = pycubrid.connect(database="testdb")
conn.close()
try:
cur = conn.cursor()
except pycubrid.InterfaceError as e:
print(f"Expected: {e.msg}") # "connection is closed"
SQLAlchemy Integration¶
pycubrid works as a driver for sqlalchemy-cubrid:
from sqlalchemy import create_engine, text, Column, Integer, String
from sqlalchemy.orm import DeclarativeBase, Session
# Connect using pycubrid driver
engine = create_engine("cubrid+pycubrid://dba@localhost:33000/testdb")
# Raw SQL
with engine.connect() as conn:
result = conn.execute(text("SELECT 1 + 1"))
print(result.scalar()) # 2
# ORM
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "cookbook_sa_users"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100))
Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(User(name="Alice"))
session.commit()
users = session.query(User).all()
for u in users:
print(f"{u.id}: {u.name}")
Connection Pooling Pattern¶
pycubrid itself does not include a connection pool, but you can use a simple pattern or SQLAlchemy's built-in pool:
Simple Pool with queue¶
import queue
import pycubrid
class ConnectionPool:
def __init__(self, size: int = 5, **connect_kwargs):
self._pool: queue.Queue = queue.Queue(maxsize=size)
self._connect_kwargs = connect_kwargs
for _ in range(size):
self._pool.put(pycubrid.connect(**connect_kwargs))
def get(self) -> pycubrid.connection.Connection:
return self._pool.get()
def put(self, conn) -> None:
self._pool.put(conn)
def close_all(self) -> None:
while not self._pool.empty():
conn = self._pool.get_nowait()
conn.close()
# Usage
pool = ConnectionPool(size=3, database="testdb")
conn = pool.get()
try:
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM cookbook_users")
print(cur.fetchone())
cur.close()
finally:
pool.put(conn)
pool.close_all()