python笔记 - python操作数据库
python 选修课 学业所迫
1. python 连接mysql
import pymysql
conn = pymysql.connect(host='', port=, user='', password='', database='')
curs = conn.cursor()
def select():
sql = "select * from "
try:
curs.execute(sql) # 执行sql语句
res = curs.fetchall() # 获取查询的所有记录
for row in res:
print(row)
except Exception as e:
raise e
def insert(sno1, sname1, sex1, birthday1, maths1, english1, os1):
data = {
'sno': sno1,
'sname': sname1,
'sex': sex1,
"birthday": birthday1,
"maths": maths1,
"english": english1,
"os": os1,
}
table = 'tb_grade'
keys = ', '.join(data.keys())
values = ', '.join(['%s'] * len(data))
sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table, keys=keys, values=values)
try:
curs.execute(sql, tuple(data.values()))
print('Successful')
conn.commit()
except:
print('Failed')
conn.rollback()
def delete(sno):
sql = "DELETE FROM tb_grade WHERE sno = " + sno
try:
curs.execute(sql)
conn.commit()
except:
conn.rollback() # 发生错误时回滚
def updategrade(sno, maths, english, os):
sql = "UPDATE tb_grade SET maths =" + maths + ",english=" + english + ",os=" + os + " WHERE sno=" + sno
try:
curs.execute(sql)
conn.commit()
except:
conn.rollback() # 发生错误时回滚
while 1:
s = input("请输入需要操作的命令:\n"
"1、修改数据\n"
"2、查询数据\n"
"3、插入数据\n"
"4、删除数据\n")
if s == "1":
print("您选择修改数据")
sno = input("请输入要修改的学生学号:")
maths = input("请输入数学成绩:")
english = input("请输入英语成绩:")
os = input("请输入操作系统成绩:")
updategrade(sno, maths, english, os)
print("修改结果如下")
select()
elif s == "2":
print("您选择查询数据")
select()
elif s == "3":
print("您选择插入数据")
sno = input("请输入的学生学号:")
sname = input("请输入的学生姓名:")
sex = input("请输入的学生性别:")
birthday = input("请输入的学生生日:")
maths = input("请输入数学成绩:")
english = input("请输入英语成绩:")
os = input("请输入操作系统成绩:")
insert(sno, sname, sex, birthday, maths, english, os)
print("插入结果如下")
select()
elif s == "4":
sno = input("请输入的要删除的学生学号:")
delete(sno)
print("删除结果如下")
select()
else:
conn.close()
break
2. python mondodb
import datetime
import pymongo
client = pymongo.MongoClient(host='localhost', port=27017)
db = client.manager # 数据库
emp = db.tb_emp # 表
profession = db.tb_profession
dept = db.tb_dept
# tb_emp = {"eid": 2, "name": "lisi", "sex": "男",
# "birthday": "2017-7-6", "intro": "i m lisi", "profession": 2, "dept": 2}
# tb_profession = {
# "id": 1,
# "name": "计算机",
# }
# tb_profession = {
# "id": 2,
# "name": "软工",
# }
# tb_dept = {
# "id": 1,
# "name": "科学部",
# }
# tb_dept = {
# "id": 2,
# "name": "活动部",
# }
# one_insert = emp.insert_one(document=tb_emp)
# one_insert = profession.insert_one(document=tb_profession)
# one_insert = dept.insert_one(document=tb_dept)
# one_result = emp.find_one({"eid": 1})
# find = emp.find()
# for result in find:
# print(result)
# print(one_result["sex"])
# print(one_result)
def selectAll():
find = emp.find()
for result in find:
profession_find_one = profession.find_one({ "id": result["profession"]})
dept_find_one = dept.find_one({ "id": result["dept"]})
result["profession"] = profession_find_one["name"]
result["dept"] = dept_find_one["name"]
print(result)
def selectone(name):
one_result = emp.find_one({ "name": name})
profession_find_one = profession.find_one({ "id": one_result["profession"]})
dept_find_one = dept.find_one({ "id": one_result["dept"]})
one_result["profession"] = profession_find_one["name"]
one_result["dept"] = dept_find_one["name"]
print(one_result)
def delete(eid):
// delete_one()也可
remove = emp.remove({ "eid": eid})
print(remove)
def insert(eid, name, sex, birthday, intro, professionname, deptName):
one_result_profession = profession.find_one({ "name": professionname})
if one_result_profession is None:
print("该专业不存在")
return
profession_id = one_result_profession["id"]
one_result_dept = dept.find_one({ "name": deptName})
if one_result_dept is None:
print("该部门不存在")
return
dept_id = one_result_dept["id"]
tb_emp = { "eid": eid, "name": name, "sex": sex,
"birthday": birthday, "intro": intro, "profession": profession_id, "dept": dept_id}
one_insert = emp.insert_one(document=tb_emp)
print(one_insert, "插入成功!")
def update(eid, updateemp):
conditon = { "eid": eid}
one = emp.find_one({ "eid": eid})
print(one)
for i in updateemp.keys():
one[i] = updateemp[i]
print(one)
result = emp.update(conditon, one)
print(result)
查找所有:
selectAll():
根据名字查找:
selectone(“zhansan”)
插入:
insert(3,“wangwu”,“女”,“2018-9-8”,“i m wangwu”,“计算机”,“科学部”)
selectAll()
update(1, {“name”: “lyq”})
selectAll()
insert(4,“待删除”,“女”,“2018-9-8”,“i m wangwu”,“计算机”,“科学部”)
selectAll()
delete(4)
selectAll()
3. python处理异常(自定义异常)
外加一个
class List_Queue:
def __init__(self):
self.items = []
def enqueue(self, item):
self.items.append(item)
def dequeue(self):
return self.items.pop(0)
def empty(self):
return self.size() == 0
def size(self):
return len(self.items)
def remove(self, index):
if self.size() == 0:
raise List_Queue_Exception(index, "数组为空,无法移除")
elif self.size() < index:
raise List_Queue_Exception(index, "数组越界")
count = 0
for i in self.items:
if count == index:
return self.items.remove(self.items[index])
count = count + 1
return self.items
class List_Queue_Exception(BaseException):
def __init__(self, index, type):
self.message = "当前index" + str(index) + ": " + type
def getMessage(self):
return self.message
from homework4.List_queue import List_Queue_Exception, List_Queue
if __name__ == '__main__':
queue = List_Queue()
try:
queue.enqueue("123")
queue.enqueue("122")
queue.enqueue("121")
print(queue.size())
print(queue.items)
queue.remove(1)
print(queue.size())
print(queue.items)
except List_Queue_Exception as e:
print(e.getMessage())
出现异常:
queue.remove(5)
还没有评论,来说两句吧...