Commit 62d43afa authored by jackfrued's avatar jackfrued

更新了文档和静态资源

parent ea9adc68
"""
查找 - 顺序查找和二分查找
算法:解决问题的方法(步骤)
评价一个算法的好坏主要有两个指标:渐近时间复杂度和渐近空间复杂度,通常一个算法很难做到时间复杂度和空间复杂度都很低(因为时间和空间是不可调和的矛盾)
表示渐近时间复杂度通常使用大O标记
O(c):常量时间复杂度 - 哈希存储 / 布隆过滤器
O(log_2 n):对数时间复杂度 - 折半查找
O(n):线性时间复杂度 - 顺序查找
O(n * log_2 n):- 对数线性时间复杂度 - 高级排序算法(归并排序、快速排序)
O(n ** 2):平方时间复杂度 - 简单排序算法(冒泡排序、选择排序、插入排序)
O(n ** 3):立方时间复杂度 - Floyd算法 / 矩阵乘法运算
也称为多项式时间复杂度
O(2 ** n):几何级数时间复杂度 - 汉诺塔
O(3 ** n):几何级数时间复杂度
也称为指数时间复杂度
O(n!):阶乘时间复杂度 - 旅行经销商问题 - NP
"""
from math import log2, factorial
from matplotlib import pyplot
import numpy
def seq_search(items: list, elem) -> int:
"""顺序查找"""
for index, item in enumerate(items):
if elem == item:
return index
return -1
def bin_search(items, elem):
"""二分查找"""
start, end = 0, len(items) - 1
while start <= end:
mid = (start + end) // 2
if elem > items[mid]:
start = mid + 1
elif elem < items[mid]:
end = mid - 1
else:
return mid
return -1
def main():
"""主函数(程序入口)"""
num = 6
styles = ['r-.', 'g-*', 'b-o', 'y-x', 'c-^', 'm-+', 'k-d']
legends = ['对数', '线性', '线性对数', '平方', '立方', '几何级数', '阶乘']
x_data = [x for x in range(1, num + 1)]
y_data1 = [log2(y) for y in range(1, num + 1)]
y_data2 = [y for y in range(1, num + 1)]
y_data3 = [y * log2(y) for y in range(1, num + 1)]
y_data4 = [y ** 2 for y in range(1, num + 1)]
y_data5 = [y ** 3 for y in range(1, num + 1)]
y_data6 = [3 ** y for y in range(1, num + 1)]
y_data7 = [factorial(y) for y in range(1, num + 1)]
y_datas = [y_data1, y_data2, y_data3, y_data4, y_data5, y_data6, y_data7]
for index, y_data in enumerate(y_datas):
pyplot.plot(x_data, y_data, styles[index])
pyplot.legend(legends)
pyplot.xticks(numpy.arange(1, 7, step=1))
pyplot.yticks(numpy.arange(0, 751, step=50))
pyplot.show()
if __name__ == '__main__':
main()
"""
排序 - 冒泡排序、选择排序、归并排序、快速排序
冒泡排序 - O(n ** 2):两两比较,大的下沉
35, 97, 12, 68, 55, 73, 81, 40
35, 12, 68, 55, 73, 81, 40, [97]
12, 35, 55, 68, 73, 40, [81]
12, 35, 55, 68, 40, [73]
...
选择排序 - O(n ** 2):每次从剩下元素中选择最小
-----------------------------------------
归并排序 - O(n * log_2 n) - 高级排序算法
35, 97, 12, 68, 55, 73, 81, 40
[35, 97, 12, 68], [55, 73, 81, 40]
[35, 97], [12, 68], [55, 73], [81, 40]
[35], [97], [12], [68], [55], [73], [81], [40]
[35, 97], [12, 68], [55, 73], [40, 81]
[12, 35, 68, 97], [40, 55, 73, 81]
[12, 35, 40, 55, 68, 73, 81, 97]
-----------------------------------------
快速排序 - 以枢轴为界将列表中的元素划分为两个部分,左边都比枢轴小,右边都比枢轴大
35, 97, 12, 68, 55, 73, 81, 40
35, 12, [40], 68, 55, 73, 81, 97
[12], 35, [40], 68, 55, 73, 81, [97]
[12], 35, [40], 55, [68], 73, 81, [97]
[12], 35, [40], 55, [68], 73, [81], [97]
"""
class Person(object):
"""人"""
def __init__(self, name, age):
self.name = name
self.age = age
# def __gt__(self, other):
# return self.name > other.name
def __str__(self):
return f'{self.name}: {self.age}'
def __repr__(self):
return self.__str__()
def select_sort(origin_items, comp=lambda x, y: x < y):
"""简单选择排序"""
items = origin_items[:]
for i in range(len(items) - 1):
min_index = i
for j in range(i + 1, len(items)):
if comp(items[j], items[min_index]):
min_index = j
items[i], items[min_index] = items[min_index], items[i]
return items
# 函数的设计要尽量做到无副作用(不影响调用者)
# 9 1 2 3 4 5 6 7 8
# 9 2 3 4 5 6 7 8 1
# *前面的参数叫位置参数,传参时只需要对号入座即可
# *后面的参数叫命名关键字参数,传参时必须给出参数名和参数值
# *args - 可变参数 - 元组
# **kwargs - 关键字参数 - 字典
def bubble_sort(origin_items, *, comp=lambda x, y: x > y):
"""冒泡排序"""
items = origin_items[:]
for i in range(1, len(items)):
swapped = False
for j in range(i - 1, len(items) - i):
if comp(items[j], items[j + 1]):
items[j], items[j + 1] = items[j + 1], items[j]
swapped = True
if swapped:
swapped = False
for j in range(len(items) - i - 1, i - 1, -1):
if comp(items[j - 1], items[j]):
items[j], items[j - 1] = items[j - 1], items[j]
swapped = True
if not swapped:
break
return items
def merge_sort(items, comp=lambda x, y: x <= y):
"""归并排序"""
if len(items) < 2:
return items[:]
mid = len(items) // 2
left = merge_sort(items[:mid], comp)
right = merge_sort(items[mid:], comp)
return merge(left, right, comp)
def merge(items1, items2, comp=lambda x, y: x <= y):
"""合并(将两个有序列表合并成一个新的有序列表)"""
items = []
index1, index2 = 0, 0
while index1 < len(items1) and index2 < len(items2):
if comp(items1[index1], items2[index2]):
items.append(items1[index1])
index1 += 1
else:
items.append(items2[index2])
index2 += 1
items += items1[index1:]
items += items2[index2:]
return items
def quick_sort(origin_items, comp=lambda x, y: x <= y):
"""快速排序"""
items = origin_items[:]
_quick_sort(items, 0, len(items) - 1, comp)
return items
def _quick_sort(items, start, end, comp):
"""递归调用划分和排序"""
if start < end:
pos = _partition(items, start, end, comp)
_quick_sort(items, start, pos - 1, comp)
_quick_sort(items, pos + 1, end, comp)
def _partition(items, start, end, comp):
"""划分"""
pivot = items[end]
i = start - 1
for j in range(start, end):
if comp(items[j], pivot):
i += 1
items[i], items[j] = items[j], items[i]
items[i + 1], items[end] = items[end], items[i + 1]
return i + 1
def main():
"""主函数"""
items = [35, 97, 12, 68, 55, 73, 81, 40]
# print(bubble_sort(items))
# print(select_sort(items))
# print(merge_sort(items))
print(quick_sort(items))
items2 = [
Person('Wang', 25), Person('Luo', 39),
Person('Zhang', 50), Person('He', 20)
]
# print(bubble_sort(items2, comp=lambda p1, p2: p1.age > p2.age))
# print(select_sort(items2, comp=lambda p1, p2: p1.name < p2.name))
# print(merge_sort(items2, comp=lambda p1, p2: p1.age <= p2.age))
print(quick_sort(items2, comp=lambda p1, p2: p1.age <= p2.age))
items3 = ['apple', 'orange', 'watermelon', 'durian', 'pear']
# print(bubble_sort(items3))
# print(bubble_sort(items3, comp=lambda x, y: len(x) > len(y)))
# print(merge_sort(items3))
print(merge_sort(items3))
if __name__ == '__main__':
main()
"""
函数递归调用 - 函数直接或者间接的调用了自身
1. 收敛条件
2. 递归公式
n! = n * (n-1)!
f(n) = f(n-1) + f(n-2)
1 1 2 3 5 8 13 21 34 55 ...
"""
from contextlib import contextmanager
from time import perf_counter
def fac(num):
"""求阶乘"""
assert num >= 0
if num in (0, 1):
return 1
return num * fac(num - 1)
def fib2(num):
"""普通函数"""
a, b = 1, 1
for _ in range(num - 1):
a, b = b, a + b
return a
def fib3(num):
"""生成器"""
a, b = 0, 1
for _ in range(num):
a, b = b, a + b
yield a
# 动态规划 - 保存可能进行重复运算的中间结果(空间换时间)
def fib(num, results={}):
"""斐波拉切数"""
assert num > 0
if num in (1, 2):
return 1
try:
return results[num]
except KeyError:
results[num] = fib(num - 1) + fib(num - 2)
return results[num]
@contextmanager
def timer():
try:
start = perf_counter()
yield
finally:
end = perf_counter()
print(f'{end - start}秒')
def main():
"""主函数"""
# for val in fib3(20):
# print(val)
# gen = fib3(20)
# for _ in range(10):
# print(next(gen))
for num in range(1, 121):
with timer():
print(f'{num}: {fib(num)}')
# print(fac(5))
# print(fac(-5))
if __name__ == '__main__':
main()
"""
贪婪法:在对问题求解时,总是做出在当前看来是最好的选择,
不追求最优解,快速找到满意解。
"""
class Thing(object):
"""物品"""
def __init__(self, name, price, weight):
self.name = name
self.price = price
self.weight = weight
@property
def value(self):
"""价格重量比"""
return self.price / self.weight
def input_thing():
"""输入物品信息"""
name_str, price_str, weight_str = input().split()
return name_str, int(price_str), int(weight_str)
def main():
"""主函数"""
max_weight, num_of_things = map(int, input().split())
all_things = []
for _ in range(num_of_things):
all_things.append(Thing(*input_thing()))
all_things.sort(key=lambda x: x.value, reverse=True)
total_weight = 0
total_price = 0
for thing in all_things:
if total_weight + thing.weight <= max_weight:
print(f'小偷拿走了{thing.name}')
total_weight += thing.weight
total_price += thing.price
print(f'总价值: {total_price}美元')
if __name__ == '__main__':
main()
"""
递归回溯法:叫称为试探法,按选优条件向前搜索,当搜索到某一步,
发现原先选择并不优或达不到目标时,就退回一步重新选择。
经典问题:骑士巡逻
"""
import os
import sys
import time
SIZE = 5
total = 0
def print_board(board):
# os.system('clear')
for row in board:
for col in row:
print(str(col).center(4), end='')
print()
def patrol(board, row, col, step=1):
if row >= 0 and row < SIZE and \
col >= 0 and col < SIZE and \
board[row][col] == 0:
board[row][col] = step
if step == SIZE * SIZE:
global total
total += 1
print(f'第{total}种走法: ')
print_board(board)
patrol(board, row - 2, col - 1, step + 1)
patrol(board, row - 1, col - 2, step + 1)
patrol(board, row + 1, col - 2, step + 1)
patrol(board, row + 2, col - 1, step + 1)
patrol(board, row + 2, col + 1, step + 1)
patrol(board, row + 1, col + 2, step + 1)
patrol(board, row - 1, col + 2, step + 1)
patrol(board, row - 2, col + 1, step + 1)
board[row][col] = 0
def main():
board = [[0] * SIZE for _ in range(SIZE)]
patrol(board, SIZE - 1, SIZE - 1)
if __name__ == '__main__':
main()
"""
编码和解码 - BASE64
0-9A-Za-z+/
1100 0101 1001 0011 0111 0110
00110001 00011001 00001101 00110110
base64
b64encode / b64decode
-------------------------------------
序列化和反序列化
序列化 - 将对象变成字节序列(bytes)或者字符序列(str) - 串行化/腌咸菜
反序列化 - 把字节序列或者字符序列还原成对象
Python标准库对序列化的支持:
json - 字符形式的序列化
pickle - 字节形式的序列化
dumps / loads
"""
import base64
import json
import redis
from example02 import Person
class PersonJsonEncoder(json.JSONEncoder):
def default(self, o):
return o.__dict__
def main():
cli = redis.StrictRedis(host='120.77.222.217', port=6379,
password='123123')
data = base64.b64decode(cli.get('guido'))
with open('guido2.jpg', 'wb') as file_stream:
file_stream.write(data)
# with open('guido.jpg', 'rb') as file_stream:
# result = base64.b64encode(file_stream.read())
# cli.set('guido', result)
# persons = [
# Person('骆昊', 39), Person('王大锤', 18),
# Person('白元芳', 25), Person('狄仁杰', 37)
# ]
# persons = json.loads(cli.get('persons'))
# print(persons)
# cli.set('persons', json.dumps(persons, cls=PersonJsonEncoder))
if __name__ == '__main__':
main()
"""
哈希摘要 - 数字签名/指纹 - 单向哈希函数(没有反函数不可逆)
应用领域:
1. 数据库中的用户敏感信息保存成哈希摘要
2. 给数据生成签名验证数据没有被恶意篡改
3. 云存储服务的秒传功能(去重功能)
"""
class StreamHasher():
"""摘要生成器"""
def __init__(self, algorithm='md5', size=4096):
"""初始化方法
@params:
algorithm - 哈希摘要算法
size - 每次读取数据的大小
"""
self.size = size
cls = getattr(__import__('hashlib'), algorithm.lower())
self.hasher = cls()
def digest(self, file_stream):
"""生成十六进制的摘要字符串"""
# data = file_stream.read(self.size)
# while data:
# self.hasher.update(data)
# data = file_stream.read(self.size)
for data in iter(lambda: file_stream.read(self.size), b''):
self.hasher.update(data)
return self.hasher.hexdigest()
def __call__(self, file_stream):
return self.digest(file_stream)
def main():
"""主函数"""
hasher1 = StreamHasher()
hasher2 = StreamHasher('sha1')
hasher3 = StreamHasher('sha256')
with open('Python-3.7.2.tar.xz', 'rb') as file_stream:
print(hasher1.digest(file_stream))
file_stream.seek(0, 0)
print(hasher2.digest(file_stream))
file_stream.seek(0, 0)
print(hasher3(file_stream))
if __name__ == '__main__':
main()
"""
加密和解密
对称加密 - 加密和解密是同一个密钥 - DES / AES
非对称加密 - 加密和解密是不同的密钥 - RSA
pip install pycrypto
"""
import base64
from hashlib import md5
from Crypto.Cipher import AES
from Crypto import Random
from Crypto.PublicKey import RSA
# # AES加密的密钥(长度32个字节)
# key = md5(b'1qaz2wsx').hexdigest()
# # AES加密的初始向量(随机生成)
# iv = Random.new().read(AES.block_size)
def main():
"""主函数"""
# 生成密钥对
key_pair = RSA.generate(1024)
# 导入公钥
pub_key = RSA.importKey(key_pair.publickey().exportKey())
# 导入私钥
pri_key = RSA.importKey(key_pair.exportKey())
message1 = 'hello, world!'
# 加密数据
data = pub_key.encrypt(message1.encode(), None)
# 对加密数据进行BASE64编码
message2 = base64.b64encode(data[0])
print(message2)
# 对加密数据进行BASE64解码
data = base64.b64decode(message2)
# 解密数据
message3 = pri_key.decrypt(data)
print(message3.decode())
# # AES - 对称加密
# str1 = '我爱你们!'
# cipher = AES.new(key, AES.MODE_CFB, iv)
# # 加密
# str2 = cipher.encrypt(str1)
# print(str2)
# # 解密
# cipher = AES.new(key, AES.MODE_CFB, iv)
# str3 = cipher.decrypt(str2)
# print(str3.decode())
if __name__ == '__main__':
main()
"""
装饰器 - 装饰器中放置的通常都是横切关注(cross-concern)功能
所谓横切关注功能就是很多地方都会用到但跟正常业务又逻辑没有必然联系的功能
装饰器实际上是实现了设计模式中的代理模式 - AOP(面向切面编程)
"""
from functools import wraps
from random import randint
from time import time, sleep
import pymysql
def record(output):
def decorate(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time()
ret_value = func(*args, **kwargs)
output(func.__name__, time() - start)
return ret_value
return wrapper
return decorate
def output_to_console(fname, duration):
print('%s: %.3f秒' % (fname, duration))
def output_to_file(fname, duration):
with open('log.txt', 'a') as file_stream:
file_stream.write('%s: %.3f秒\n' % (fname, duration))
def output_to_db(fname, duration):
con = pymysql.connect(host='localhost', port=3306,
database='test', charset='utf8',
user='root', password='123456',
autocommit=True)
try:
with con.cursor() as cursor:
cursor.execute('insert into tb_record values (default, %s, %s)',
(fname, '%.3f' % duration))
finally:
con.close()
@record(output_to_console)
def random_delay(min, max):
sleep(randint(min, max))
def main():
for _ in range(3):
# print(random_delay.__name__)
random_delay(3, 5)
# for _ in range(3):
# # 取消掉装饰器
# random_delay.__wrapped__(3, 5)
if __name__ == '__main__':
main()
"""
装饰类的装饰器 - 单例模式 - 一个类只能创建出唯一的对象
上下文语法:
__enter__ / __exit__
"""
import threading
from functools import wraps
def singleton(cls):
"""单例装饰器"""
instances = {}
lock = threading.Lock()
@wraps(cls)
def wrapper(*args, **kwargs):
if cls not in instances:
with lock:
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return wrapper
@singleton
class President():
def __init__(self, name, country):
self.name = name
self.country = country
def __str__(self):
return f'{self.country}: {self.name}'
def main():
print(President.__name__)
p1 = President('特朗普', '美国')
p2 = President('奥巴马', '美国')
print(p1 == p2)
print(p1)
print(p2)
if __name__ == '__main__':
main()
"""
变量的作用域以及Python搜索变量的顺序
LEGB: Local --> Embedded --> Global --> Built-in
global - 声明或定义全局变量(要么直接使用现有的全局作用域的变量,要么定义一个变量放到全局作用域)
nonlocal - 声明使用嵌套作用域的变量(如果嵌套作用域没有对应的变量直接报错)
"""
x = 100
def foo():
global x
x = 200
def bar():
x = 300
print(x)
bar()
print(x)
foo()
print(x)
"""
面向对象的三大支柱:封装、继承、多态
面向对象的设计原则:SOLID原则
面向对象的设计模式:GoF设计模式(单例、工厂、代理、策略、迭代器)
月薪结算系统 - 部门经理每月15000 程序员每小时200 销售员1800底薪加销售额5%提成
"""
from abc import ABCMeta, abstractmethod
class Employee(metaclass=ABCMeta):
"""员工(抽象类)"""
def __init__(self, name):
self.name = name
@abstractmethod
def get_salary(self):
"""结算月薪(抽象方法)"""
pass
class Manager(Employee):
"""部门经理"""
def get_salary(self):
return 15000.0
class Programmer(Employee):
"""程序员"""
def __init__(self, name, working_hour=0):
self.working_hour = working_hour
super().__init__(name)
def get_salary(self):
return 200.0 * self.working_hour
class Salesman(Employee):
"""销售员"""
def __init__(self, name, sales=0.0):
self.sales = sales
super().__init__(name)
def get_salary(self):
return 1800.0 + self.sales * 0.05
class EmployeeFactory():
"""创建员工的工厂(工厂模式 - 通过工厂实现对象使用者和对象之间的解耦合)"""
@staticmethod
def create(emp_type, *args, **kwargs):
"""创建员工"""
emp_type = emp_type.upper()
emp = None
if emp_type == 'M':
emp = Manager(*args, **kwargs)
elif emp_type == 'P':
emp = Programmer(*args, **kwargs)
elif emp_type == 'S':
emp = Salesman(*args, **kwargs)
return emp
from example12 import EmployeeFactory
def main():
"""主函数"""
emps = [
EmployeeFactory.create('M', '曹操'),
EmployeeFactory.create('P', '荀彧', 120),
EmployeeFactory.create('P', '郭嘉', 85),
EmployeeFactory.create('S', '典韦', 123000),
]
for emp in emps:
print('%s: %.2f元' % (emp.name, emp.get_salary()))
if __name__ == '__main__':
main()
"""
面向对象
枚举 - 一个变量的值只有有限个选择,最适合的类型就是枚举
通过枚举我们可以定义符号常量,符号常量优于字面常量
"""
from enum import Enum, unique
import random
@unique
class Suite(Enum):
"""花色(枚举)"""
SPADE = 0
HEART = 1
CLUB = 2
DIAMOND = 3
def __lt__(self, other):
return self.value < other.value
class Card():
"""牌"""
def __init__(self, suite, face):
self.suite = suite
self.face = face
def __repr__(self):
return self.__str__()
def __str__(self):
suites = ('♠️', '♥️', '♣️', '♦️')
faces = ('', 'A', '2', '3', '4', '5', '6',
'7', '8', '9', '10', 'J', 'Q', 'K')
return f'{suites[self.suite.value]} {faces[self.face]}'
class Poker():
"""扑克"""
def __init__(self):
self.index = 0
self.cards = [Card(suite, face)
for suite in Suite
for face in range(1, 14)]
def shuffle(self):
"""洗牌"""
self.index = 0
random.shuffle(self.cards)
def deal(self):
"""发牌"""
card = self.cards[self.index]
self.index += 1
return card
@property
def has_more(self):
"""是否有更多的牌"""
return self.index < len(self.cards)
class Player():
"""玩家"""
def __init__(self, name):
self.name = name
self.cards = []
def get_card(self, card):
"""摸牌"""
self.cards.append(card)
def arrange(self):
"""整理手上的牌"""
self.cards.sort(key=lambda card: (card.suite, card.face))
def main():
"""主函数"""
poker = Poker()
poker.shuffle()
players = [
Player('东邪'), Player('西毒'),
Player('南帝'), Player('北丐')
]
while poker.has_more:
for player in players:
player.get_card(poker.deal())
for player in players:
player.arrange()
print(player.name, end=': ')
print(player.cards)
if __name__ == '__main__':
main()
"""
迭代器 - __iter__ / __next__
itertools - 生成可迭代序列的工具模块
"""
import itertools
from math import sqrt
def is_prime(num):
"""判断素数"""
for factor in range(2, int(sqrt(num)) + 1):
if num % factor == 0:
return False
return True
class PrimeIter(object):
"""素数迭代器"""
def __init__(self, min_value, max_value):
assert 2 <= min_value <= max_value
self.min_value = min_value - 1
self.max_value = max_value
def __iter__(self):
return self
def __next__(self):
self.min_value += 1
while self.min_value <= self.max_value:
if is_prime(self.min_value):
return self.min_value
self.min_value += 1
raise StopIteration()
class FibIter(object):
"""斐波那契数迭代器"""
def __init__(self, num):
self.num = num
self.a, self.b = 0, 1
self.idx = 0
def __iter__(self):
return self
def __next__(self):
if self.idx < self.num:
self.a, self.b = self.b, self.a + self.b
self.idx += 1
return self.a
raise StopIteration()
def main():
# for val in itertools.permutations('ABCD'):
# print(val)
# for val in itertools.combinations('ABCDE', 3):
# print(val)
# for val in itertools.product('黑红梅方', range(1, 14)):
# print(val)
# fib_iter = FibIter(20)
# print('===>', next(fib_iter))
# print('===>', next(fib_iter))
# for val in fib_iter:
# print(val)
prime_iter = PrimeIter(2, 100000)
for val in prime_iter:
print(val)
if __name__ == '__main__':
main()
"""
魔术方法
如果要把自定义对象放到set或者用作dict的键
那么必须要重写__hash__和__eq__两个魔术方法
前者用来计算对象的哈希码,后者用来判断两个对象是否相同
哈希码不同的对象一定是不同的对象,但哈希码相同未必是相同的对象(哈希码冲撞)
所以在哈希码相同的时候还要通过__eq__来判定对象是否相同
"""
class Student():
__slots__ = ('stuid', 'name', 'gender')
def __init__(self, stuid, name):
self.stuid = stuid
self.name = name
def __hash__(self):
return hash(self.stuid) + hash(self.name)
def __eq__(self, other):
return self.stuid == other.stuid and \
self.name == other.name
def __str__(self):
return f'{self.stuid}: {self.name}'
def __repr__(self):
return self.__str__()
class School():
def __init__(self, name):
self.name = name
self.students = {}
def __setitem__(self, key, student):
self.students[key] = student
def __getitem__(self, key):
return self.students[key]
def main():
# students = set()
# students.add(Student(1001, '王大锤'))
# students.add(Student(1001, '王大锤'))
# students.add(Student(1001, '白元芳'))
# print(len(students))
# print(students)
stu = Student(1234, '骆昊')
stu.gender = 'Male'
# stu.birth = '1980-11-28'
print(stu.name, stu.birth)
school = School('千锋教育')
school[1001] = Student(1001, '王大锤')
school[1002] = Student(1002, '白元芳')
school[1003] = Student(1003, '白洁')
print(school[1002])
print(school[1003])
if __name__ == '__main__':
main()
"""
多重继承 - 一个类有两个或者两个以上的父类
MRO - 方法解析顺序 - Method Resolution Order
当出现菱形继承(钻石继承)的时候,子类到底继承哪个父类的方法
Python 2.x - 深度优先搜索
Python 3.x - C3算法 - 类似于广度优先搜索
"""
class A():
def say_hello(self):
print('Hello, A')
class B(A):
pass
class C(A):
def say_hello(self):
print('Hello, C')
class D(B, C):
pass
class SetOnceMappingMixin():
"""自定义混入类"""
__slots__ = ()
def __setitem__(self, key, value):
if key in self:
raise KeyError(str(key) + ' already set')
return super().__setitem__(key, value)
class SetOnceDict(SetOnceMappingMixin, dict):
"""自定义字典"""
pass
def main():
print(D.mro())
# print(D.__mro__)
D().say_hello()
print(SetOnceDict.__mro__)
my_dict= SetOnceDict()
my_dict['username'] = 'jackfrued'
my_dict['username'] = 'hellokitty'
if __name__ == '__main__':
main()
"""
元 - meta
元数据 - 描述数据的数据 - metadata
元类 - 描述类的类 - metaclass - 继承自type
"""
import threading
class SingletonMeta(type):
"""自定义元类"""
def __init__(cls, *args, **kwargs):
cls.__instance = None
cls.lock = threading.Lock()
super().__init__(*args, **kwargs)
def __call__(cls, *args, **kwargs):
if cls.__instance is None:
with cls.lock:
if cls.__instance is None:
cls.__instance = super().__call__(*args, **kwargs)
return cls.__instance
class President(metaclass=SingletonMeta):
"""总统(单例类)"""
def __init__(self, name, country):
self.name = name
self.country = country
def __str__(self):
return f'{self.country}: {self.name}'
def main():
"""主函数"""
p1 = President('特朗普', '美国')
p2 = President('奥巴马', '美国')
p3 = President.__call__('克林顿', '美国')
print(p1 == p2)
print(p1 == p3)
print(p1, p2, p3, sep='\n')
if __name__ == '__main__':
main()
"""
扩展性系统性能
- 垂直扩展 - 增加单节点处理能力
- 水平扩展 - 将单节点变成多节点(读写分离/分布式集群)
并发编程 - 加速程序执行 / 改善用户体验
耗时间的任务都尽可能独立的执行,不要阻塞代码的其他部分
- 多线程
1. 创建Thread对象指定target和args属性并通过start方法启动线程
2. 继承Thread类并重写run方法来定义线程执行的任务
3. 创建线程池对象ThreadPoolExecutor并通过submit来提交要执行的任务
第3种方式可以通过Future对象的result方法在将来获得线程的执行结果
也可以通过done方法判定线程是否执行结束
- 多进程
- 异步I/O
"""
import glob
import os
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from PIL import Image
# class ThumbnailThread(Thread):
# def __init__(self, infile):
# self.infile = infile
# super().__init__()
# def run(self):
# file, ext = os.path.splitext(self.infile)
# filename = file[file.rfind('/') + 1:]
# for size in (32, 64, 128):
# outfile = f'thumbnails/{filename}_{size}_{size}.png'
# image = Image.open(self.infile)
# image.thumbnail((size, size))
# image.save(outfile, format='PNG')
def gen_thumbnail(infile):
file, ext = os.path.splitext(infile)
filename = file[file.rfind('/') + 1:]
for size in (32, 64, 128):
outfile = f'thumbnails/{filename}_{size}_{size}.png'
image = Image.open(infile)
image.thumbnail((size, size))
image.save(outfile, format='PNG')
# def main():
# start = time.time()
# threads = []
# for infile in glob.glob('images/*'):
# # t = Thread(target=gen_thumbnail, args=(infile, ))
# t = ThumbnailThread(infile)
# t.start()
# threads.append(t)
# for t in threads:
# t.join()
# end = time.time()
# print(f'耗时: {end - start}秒')
def main():
pool = ThreadPoolExecutor(max_workers=30)
futures = []
start = time.time()
for infile in glob.glob('images/*'):
# submit方法是非阻塞式的方法
# 即便工作线程数已经用完,submit方法也会接受提交的任务
future = pool.submit(gen_thumbnail, infile)
futures.append(future)
for future in futures:
# result方法是一个阻塞式的方法 如果线程还没有结束
# 暂时取不到线程的执行结果 代码就会在此处阻塞
future.result()
end = time.time()
print(f'耗时: {end - start}秒')
# shutdown也是非阻塞式的方法 但是如果已经提交的任务还没有执行完
# 线程池是不会停止工作的 shutdown之后再提交任务就不会执行而且会产生异常
pool.shutdown()
if __name__ == '__main__':
main()
"""
线程间通信(共享数据)非常简单因为可以共享同一个进程的内存
进程间通信(共享数据)比较麻烦因为操作系统会保护分配给进程的内存
要实现多进程间的通信通常可以用系统管道、套接字、三方服务来实现
multiprocessing.Queue
守护线程 - daemon thread
守护进程 - firewalld / httpd / mysqld
在系统停机的时候不保留的进程 - 不会因为进程还没有执行结束而阻碍系统停止
"""
from threading import Thread
from time import sleep
def output(content):
while True:
print(content, end='')
def main():
Thread(target=output, args=('Ping', ), daemon=True).start()
Thread(target=output, args=('Pong', ), daemon=True).start()
sleep(5)
print('bye!')
if __name__ == '__main__':
main()
"""
多个线程竞争一个资源 - 保护临界资源 - 锁(Lock/RLock)
多个线程竞争多个资源(线程数>资源数) - 信号量(Semaphore)
多个线程的调度 - 暂停线程执行/唤醒等待中的线程 - Condition
"""
from concurrent.futures import ThreadPoolExecutor
from random import randint
from time import sleep
import threading
class Account():
"""银行账户"""
def __init__(self, balance=0):
self.balance = balance
lock = threading.Lock()
self.condition = threading.Condition(lock)
def withdraw(self, money):
"""取钱"""
with self.condition:
while money > self.balance:
self.condition.wait()
new_balance = self.balance - money
sleep(0.001)
self.balance = new_balance
def deposit(self, money):
"""存钱"""
with self.condition:
new_balance = self.balance + money
sleep(0.001)
self.balance = new_balance
self.condition.notify_all()
def add_money(account):
while True:
money = randint(5, 10)
account.deposit(money)
print(threading.current_thread().name,
':', money, '====>', account.balance)
sleep(0.5)
def sub_money(account):
while True:
money = randint(10, 30)
account.withdraw(money)
print(threading.current_thread().name,
':', money, '<====', account.balance)
sleep(1)
def main():
account = Account()
with ThreadPoolExecutor(max_workers=10) as pool:
for _ in range(5):
pool.submit(add_money, account)
pool.submit(sub_money, account)
if __name__ == '__main__':
main()
"""
多进程和进程池的使用
多线程因为GIL的存在不能够发挥CPU的多核特性
对于计算密集型任务应该考虑使用多进程
time python3 example22.py
real 0m11.512s
user 0m39.319s
sys 0m0.169s
"""
import concurrent.futures
import math
PRIMES = [
1116281,
1297337,
104395303,
472882027,
533000389,
817504243,
982451653,
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419
] * 5
def is_prime(n):
"""判断素数"""
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
"""主函数"""
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
"""
协程(coroutine)- 可以在需要时进行切换的相互协作的子程序
"""
import asyncio
from example15 import is_prime
def num_generator(m, n):
"""指定范围的数字生成器"""
yield from range(m, n + 1)
async def prime_filter(m, n):
"""素数过滤器"""
primes = []
for i in num_generator(m, n):
if is_prime(i):
print('Prime =>', i)
primes.append(i)
await asyncio.sleep(0.001)
return tuple(primes)
async def square_mapper(m, n):
"""平方映射器"""
squares = []
for i in num_generator(m, n):
print('Square =>', i * i)
squares.append(i * i)
await asyncio.sleep(0.001)
return squares
def main():
"""主函数"""
loop = asyncio.get_event_loop()
future = asyncio.gather(prime_filter(2, 100), square_mapper(1, 100))
future.add_done_callback(lambda x: print(x.result()))
loop.run_until_complete(future)
loop.close()
if __name__ == '__main__':
main()
"""
aiohttp - 异步HTTP网络访问
异步I/O(异步编程)- 只使用一个线程(单线程)来处理用户请求
用户请求一旦被接纳,剩下的都是I/O操作,通过多路I/O复用也可以达到并发的效果
这种做法与多线程相比可以让CPU利用率更高,因为没有线程切换的开销
Redis/Node.js - 单线程 + 异步I/O
Celery - 将要执行的耗时间的任务异步化处理
异步I/O事件循环 - uvloop
"""
import asyncio
import re
import aiohttp
async def fetch(session, url):
async with session.get(url, ssl=False) as resp:
return await resp.text()
async def main():
pattern = re.compile(r'\<title\>(?P<title>.*)\<\/title\>')
urls = ('https://www.python.org/',
'https://git-scm.com/',
'https://www.jd.com/',
'https://www.taobao.com/',
'https://www.douban.com/')
async with aiohttp.ClientSession() as session:
for url in urls:
html = await fetch(session, url)
print(pattern.search(html).group('title'))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
"""
装饰器的应用
"""
from functools import wraps
from random import randint
from time import sleep
class Retry():
"""让函数可以重试执行的装饰器"""
def __init__(self, times=3, max_wait=0, errors=(Exception, )):
self.times = times
self.max_wait = max_wait
self.errors = errors
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
for _ in range(self.times):
try:
return func(*args, **kwargs)
except self.errors:
sleep(randint(self.max_wait))
return wrapper
def retry(*, times=3, max_wait=0, errors=(Exception, )):
"""让函数重试执行的装饰器函数"""
def decorate(func):
@wraps(func)
def wrapper(*args, **kwargs):
for _ in range(times):
try:
return func(*args, **kwargs)
except errors:
sleep(randint(max_wait))
return wrapper
return decorate
# @Retry(max_wait=5)
@retry(max_wait=5)
def get_data_from_url(url):
pass
"""
模拟面试编程题
"""
def second_max(items: list, gt=lambda x, y: x > y):
"""从列表中找出第二大元素"""
assert len(items) >= 2
max1, max2 = (items[0], items[1]) \
if gt(items[0], items[1]) else (items[1], items[0])
for i in range(2, len(items)):
if gt(max1, items[i]) and gt(items[i], max2):
max2 = items[i]
elif gt(items[i], max1):
max1, max2 = items[i], max1
return max2
def list_depth(items: list) -> int:
"""计算嵌套列表的嵌套深度"""
if isinstance(items, list):
max_depth = 1
for item in items:
max_depth = max(list_depth(item) + 1, max_depth)
return max_depth
return 0
def main():
"""主函数"""
one_set = {1}
pos, off = 1, 1
while pos <= 100000000:
pos += off
one_set.add(pos)
off += 1
num, *poses = map(int, input().split())
for pos in poses:
print(1 if pos in one_set else 0, end=' ')
# items1 = [38, 95, 27, 95, 88, 73, 61, 50]
# print(second_max(items1))
# items2 = [[1], [[[2]]],[[3]], 4, [[[[[5, [6]]]]]]]
# print(list_depth(items1))
# print(list_depth(items2))
if __name__ == '__main__':
main()
"""
单元测试 - 针对程序中最小的功能模块(函数和方法)的测试
测试方法:
- 白盒测试:程序自己写的测试
- 黑盒测试:测试人员或QA,不知道代码实现细节,只关注功能
编写Python单元测试 - 定义类继承TestCase,写测试方法(test_开头)
执行单元测试:
- unittest.main()
- python3 -m unittest test_example01.py
第三方库 - nose2 / pytest
pip install pytest pytest-cov
pytest -v --cov
------------------------------
pip install nose2 cov-core
nose2 -v -C
"""
from unittest import TestCase
from example01 import seq_search, bin_search
class TestExample01(TestCase):
"""测试查找函数的测试用例"""
# 执行每个测试函数之前要执行的方法
def setUp(self):
self.data1 = [35, 97, 12, 68, 55, 73, 81, 40]
self.data2 = [12, 35, 40, 55, 68, 73, 81, 97]
# 执行每个测试函数之后要执行的方法
def tearDown(self):
pass
def test_seq_search(self):
"""测试顺序查找"""
self.assertEqual(0, seq_search(self.data1, 35))
self.assertEqual(2, seq_search(self.data1, 12))
self.assertEqual(6, seq_search(self.data1, 81))
self.assertEqual(7, seq_search(self.data1, 40))
self.assertEqual(-1, seq_search(self.data1, 99))
self.assertEqual(-1, seq_search(self.data1, 7))
def test_bin_search(self):
"""测试二分查找"""
self.assertEqual(1, bin_search(self.data2, 35))
self.assertEqual(0, bin_search(self.data2, 12))
self.assertEqual(6, bin_search(self.data2, 81))
self.assertEqual(2, bin_search(self.data2, 40))
self.assertEqual(7, bin_search(self.data2, 97))
self.assertEqual(-1, bin_search(self.data2, 7))
self.assertEqual(-1, bin_search(self.data2, 99))
from unittest import TestCase
from example02 import select_sort, merge
class TestExample02(TestCase):
"""测试排序函数的测试用例"""
def setUp(self):
self.data1 = [35, 97, 12, 68, 55, 73, 81, 40]
self.items1 = [12, 35, 68, 97]
self.items2 = [40, 55, 73, 81]
def test_merge(self):
items = merge(self.items1, self.items2)
for i in range(len(items) - 1):
self.assertLessEqual(items[i], items[i + 1])
def test_select_sort(self):
"""测试顺序查找"""
items = select_sort(self.data1)
for i in range(len(items) - 1):
self.assertLessEqual(items[i], items[i + 1])
\ No newline at end of file
## Django知识点概述
### Web应用
问题1:描述一个Web应用的工作流程。
![s](./res/web-application.png)
问题2:描述项目的物理架构。(上图中补充负载均衡(反向代理)服务器、数据库服务器、文件服务器、邮件服务器、缓存服务器、防火墙等,而且每个节点都有可能是多节点构成的集群,如下图所示)
![](./res/05.django_massive_cluster.png)
问题3:描述Django项目的工作流程。(如下图所示)
![](./res/django_request_response_cycle.png)
### MVC架构模式
问题1:为什么要使用MVC架构模式?(模型和视图解耦合)
问题2:MVC架构中每个部分的作用?(如下图所示)
![](./res/mvc.png)
### HTTP请求和响应
#### HTTP请求 = 请求行+请求头+空行+[消息体]
![](./res/http-request.png)
#### HTTP响应 = 响应行+响应头+空行+消息体
![](./res/http-response.png)
1. `HTTPRequest`对象的属性和方法:
- `method` - 获取请求方法
- `path` / `get_full_path()` - 获取请求路径/带查询字符串的路径
- `scheme` / `is_secure()` / `get_host()` / `get_port()` - 获取请求的协议/主机/端口
- `META` / `COOKIES` - 获取请求头/Cookie信息
- `GET` / `POST` / `FILES` - 获取GET或POST请求参数/上传的文件
- `get_signed_cookie()` - 获取带签名的Cookie
- `is_ajax()` - 是不是Ajax异步请求
- `body` / `content_type` / `encoding` - 获取请求的消息体(bytes流)/MIME类型/编码
2. 中间件添加的属性:
- `session` / `user` / `site`
3. `HttpResponse`对象的属性和方法:
- `set_cookie()` / `set_signed_cookie()` / `delete_cookie()` - 添加/删除Cookie
- `__setitem__` / `__getitem__` / `__delitem__` - 添加/获取/删除响应头
- `charset` / `content` / `status_code` - 响应的字符集/消息体(bytes流)/状态码
- 1xx:请求已经收到,继续处理
- 2xx(成功):请求已经成功收到、理解和接收。
- 3xx(重定向):为完成请求要继续执行后续的操作。
- 4xx(客户端错误):请求不正确或不能够被受理。
- 5xx(服务器错误):服务器处理请求失败。
4. `JsonResponse``HttpResponse`的子类型)对象
```Python
>>> from django.http import HttpResponse, JsonResponse
>>>
>>> response = JsonResponse({'foo': 'bar'})
>>> response.content
>>>
>>> response = JsonResponse([1, 2, 3], safe=False)
>>> response.content
>>>
>>> response = HttpResponse(b'...')
>>> response['cotent-type'] = 'application/pdf';
>>> response['content-disposition'] = 'inline; filename="xyz.pdf"'
>>> response['content-disposition'] = 'attachment; filename="xyz.pdf"'
>>>
>>> response.set_signed_cookie('foo', 'bar', salt='')
>>> response.status_code = 200
```
### 数据模型(Model)
问题1:关系型数据库表的设计应该注意哪些问题(范式理论和逆范式)?如何通过表来创建模型类(反向工程)?如何通过模型类来创建表(正向工程)?
```Shell
python manage.py makemigrations <appname>
python manage.py migrate
python manage.py inspectdb > <appname>/models.py
```
问题2:关系型数据库中数据完整性指的是什么?什么时候需要牺牲数据完整性?(实体完整性/参照完整性/域完整性)
问题3:ORM是什么以及解决了什么问题?(对象模型-关系模型双向转换)
1. `Field`及其子类的属性:
- 通用选项:
- `db_column` / `db_tablespace`
- `null` / `blank` / `default`
- `primary_key`
- `db_index` / `unqiue`
- `choices` / `help_text` / `error_message` / `editable` / `hidden`
- 其他选项:
- `CharField`: `max_length`
- `DateField`: `auto_now` / `auto_now_add`
- `DecimalField`: `max_digits` / `decimal_places`
- `FileField`: `storage` / `upload_to`
- `ImageField`: `height_field` / `width_field`
2. `ForeignKey`的属性:
- 重要属性:
- `db_constraint`(提升性能或者数据分片的情况可能需要设置为`False`
- `on_delete`
* `CASCADE`:级联删除。
- `PROTECT`:抛出`ProtectedError`异常,阻止删除引用的对象。
- `SET_NULL`:把外键设置为`null`,当`null`属性被设置为`True`时才能这么做。
- `SET_DEFAULT`:把外键设置为默认值,提供了默认值才能这么做。
- `related_name`
```Python
class Dept(models.Model):
pass
class Emp(models.Model):
dept = models.ForeignKey(related_name='+', ...)
Dept.objects.get(no=10).emp_set.all()
Emp.objects.filter(dept__no=10)
```
> 说明:`related_name`设置为`'+'`,可以防止一对多外键关联从“一”的一方查询“多”的一方。
- 其他属性:
- `to_field` / `limit_choices_to` / `swappable`
3. `Model`的属性和方法
- `objects` / `pk`
- `save()` / `delete()`
- `clean()` / `validate_unique()` / `full_clean()`
4. `QuerySet`的方法
- `get()` / `all()` / `values()`
> 说明:`values()`返回的`QuerySet`中不是模型对象而是字典
- `count()` / `order_by()` / `exists()` / `reverse()`
- `filter()` / `exclude()`
- `exact` / `iexact`:精确匹配/忽略大小写的精确匹配查询
- `contains` / `icontains` / `startswith / istartswith / endswith / iendswith`:基于`like`的模糊查询
- `in`:集合运算
- `gt` / `gte` / `lt` / `lte`:大于/大于等于/小于/小于等于关系运算
- `range`:指定范围查询(SQL中的`between…and…`)
- `year` / `month` / `day` / `week_day` / `hour` / `minute` / `second`:查询时间日期
- `isnull`:查询空值(`True`)或非空值(`False`)
- `search`:基于全文索引的全文检索
- `regex` / `iregex`:基于正则表达式的模糊匹配查询
- `aggregate()` / `annotate()`
- `Avg` / `Count` / `Sum` / `Max` / `Min`
```Python
>>> from django.db.models import Avg
>>> Emp.objects.aggregate(avg_sal=Avg('sal'))
(0.001) SELECT AVG(`TbEmp`.`sal`) AS `avg_sal` FROM `TbEmp`; args=()
{'avg_sal': 3521.4286}
```
```Python
>>> Emp.objects.values('dept').annotate(total=Count('dept'))
(0.001) SELECT `TbEmp`.`dno`, COUNT(`TbEmp`.`dno`) AS `total` FROM `TbEmp` GROUP BY `TbEmp`.`dno` ORDER BY NULL LIMIT 21; args=()
<QuerySet [{'dept': 10, 'total': 4}, {'dept': 20, 'total': 7}, {'dept': 30, 'total': 3}]
```
- `first()` / `last()`
> 说明:调用`first()`方法相当于用`[0]`对`QuerySet`进行切片。
- `only()` / `defer()`
```Python
>>> Emp.objects.filter(pk=7800).only('name', 'sal')
(0.001) SELECT `TbEmp`.`empno`, `TbEmp`.`ename`, `TbEmp`.`sal` FROM `TbEmp` WHERE `TbEmp`.`empno` = 7800 LIMIT 21; args=(7800,)
<QuerySet [<Emp: Emp object (7800)>]>
>>> Emp.objects.filter(pk=7800).defer('name', 'sal')
(0.001) SELECT `TbEmp`.`empno`, `TbEmp`.`job`, `TbEmp`.`mgr`, `TbEmp`.`comm`, `TbEmp`.`dno` FROM `TbEmp` WHERE `TbEmp`.`empno` = 7800 LIMIT 21; args=(7800,)
<QuerySet [<Emp: Emp object (7800)>]>
```
- `create()` / `update()` / `raw()`
```Python
>>> Emp.objects.filter(dept__no=20).update(sal=F('sal') + 100)
(0.011) UPDATE `TbEmp` SET `sal` = (`TbEmp`.`sal` + 100) WHERE `TbEmp`.`dno` = 20; args=(100, 20)
>>>
>>> Emp.objects.raw('select empno, ename, job from TbEmp where dno=10')
<RawQuerySet: select empno, ename, job from TbEmp where dno=10>
```
5. `Q`对象和`F`对象
> 说明:Q对象主要用来解决多条件组合的复杂查询;F对象主要用于更新数据。
```Python
>>> from django.db.models import Q
>>> Emp.objects.filter(
... Q(name__startswith='张'),
... Q(sal__lte=5000) | Q(comm__gte=1000)
... ) # 查询名字以“张”开头且工资小于等于5000或补贴大于等于1000的员工
<QuerySet [<Emp: 张三丰>]>
```
```Python
>>> from backend.models import Emp, Dept
>>> emps = Emp.objects.filter(dept__no=20)
>>> from django.db.models import F
>>> emps.update(sal=F('sal') + 100)
```
6. 原生SQL查询
```Python
from django.db import connections
with connections['...'].cursor() as cursor:
cursor.execute("UPDATE TbEmp SET sal=sal+10 WHERE dno=30")
cursor.execute("SELECT ename, job FROM TbEmp WHERE dno=10")
row = cursor.fetchall()
```
7. 模型管理器
```Python
class BookManager(models.Manager):
def title_count(self, keyword):
return self.filter(title__icontains=keyword).count()
class Book(models.Model):
objects = BookManager()
```
### 视图函数(Controller)
#### 如何设计视图函数
1. 用户的每个操作(用户故事)对应一个视图函数。
2. [每个视图函数可以构成一个事务边界](https://docs.djangoproject.com/en/2.1/ref/settings/)。
- 事务的ACID特性。
- 原子性(Atomicity):事务中各项的操作要么全做要么全不做;
- 一致性(Consistentcy):事务前后系统的状态是一致的;
- 隔离性(Isolation):并发执行的事务无法看到彼此的中间状态;
- 持久性(Duration):事务完成后所做的改动都会被持久化。
- 事务隔离级别 - 设置事务隔离级别是为了数据库底层依据事务隔离级别为数据加上适当的锁。如果需要保证数据的强一致性,那么关系型数据库仍然是唯一的也是最好的选择,因为关系型数据库可以通过锁机制来保护数据。事务隔离级别从低到高依次是:Read Uncommitted(读未提交)、Read Committed(读提交)、Repeatable Read(可重复读)、Serializable(串行化)。事务隔离级别越高,数据并发访问的问题越少,但是性能越差;事务隔离级别越低,数据并发访问的问题越多,但是性能越好。
- 数据并发访问会产生5种问题(请参考我的[《Java面试题全集(上)》](https://blog.csdn.net/jackfrued/article/details/44921941)第80题对该问题的讲解):
- 第1类丢失更新(A事务撤销覆盖B事务更新的数据)和第2类丢失更新(A事务提交覆盖B事务更新的数据)。
- 脏读(读脏数据):一个事务读取到其他尚未提交的事务的数据。
- 不可重复读: 一个事务在读取它的查询结果时,被另一个事务更新了它的查询记录导致无法读到数据。
- 幻读:一个事务在读取它的查询结果时,发现读到了被另一个事务提交的新数据。
```SQL
-- 设置全局默认的事务隔离级别
set global transaction isolation level repeatable read;
-- 设置当前会话的事务隔离级别
set session transaction isolation level read committed;
-- 查询当前会话的事务隔离级别
select @@tx_isolation;
```
- Django中的事务控制。
- 给每个请求绑定事务环境(反模式)。
```Python
ATOMIC_REQUESTS = True
```
- 使用事务装饰器(简单易用) - 粗粒度(控制不够精细)。
```Python
@transaction.non_atomic_requests
@transaction.atomic
```
- 使用上下文语法(细粒度 - 事务控制的范围更加精准)。
```Python
with transaction.atomic():
pass
```
- 关闭自动提交使用手动提交。
```Python
AUTOCOMMIT = False
```
```Python
transaction.commit()
transaction.rollback()
```
#### URL配置
1. 可以让部分URL只在调试模式下生效。
```Python
from django.conf import settings
urlpatterns = [
...
]
if settings.DEBUG:
urlpatterns += [ ... ]
```
2. 可以使用命名捕获组捕获路径参数。
```Python
url(r'api/code/(?P<mobile>1[3-9]\d{9})'),
path('api/code/<str:mobile>'),
```
3. URL配置不关心请求使用的方法(一个视图函数可以处理不同的请求方式)。
4. 如果使用`url`函数捕获的路径参数都是字符串,`path`函数可以指定路径参数类型。
5. 可以使用`include`函数引入其他URL配置,捕获的参数会向下传递。
6. 在`url`和`path`函数甚至是`include`函数中都可以用字典向视图传入额外的参数,如果参数与捕获的参数同名,则使用字典中的参数。
7. 可以用`reverse`函数实现URL的逆向解析(从名字解析出URL),在模板中也可以用`{% url %}`实现同样的操作。
```Python
path('', views.index, name='index')
return redirect(reverse('index'))
return redirect('index')
```
### 模板(View)
#### 后端渲染
1. 模板的配置和渲染函数。
```Python
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'templates'), ],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
```
```Python
resp = render(request, 'index.html', {'foo': ...})
```
2. 模板遇到变量名的查找顺序。
- 字典查找(如:`foo['bar']`)
- 属性查找(如:`foo.bar`)
- 方法调用(如:`foo.bar()`)
- 方法不能有必须传值的参数
- 在模板中不能够给方法传参
- 如果方法的`alters_data`被设置为`True`则不能调用该方法(避免误操作的风险),模型对象动态生成的`delete()`和`save()`方法都设定了`alters_data = True`。
- 列表索引查找(如:`foo[0]`)
3. 模板标签的使用。
- `{% if %}` / `{% else %}` / `{% endif %}`
- `{% for %}` / `{% endfor %}`
- `{% ifequal %}` / `{% endifequal %}` / `{% ifnotequal %}` / `{% endifnotequal %}`
- `{# comment #}` / `{% comment %}` / `{% endcomment %}`
4. 过滤器的使用。
- `lower` / `upper` / `first` / `last` / `truncatewords` / `date `/ `time` / `length` / `pluralize` / `center` / `ljust` / `rjust` / `cut` / `urlencode` / `default_if_none` / `filesizeformat` / `join` / `slice` / `slugify`
5. 模板的包含和继承。
- `{% include %}` / `{% block %}`
- `{% extends %}`
6. 模板加载器(后面优化部分会讲到)。
- 文件系统加载器
```Python
TEMPLATES = [{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'templates')],
}]
```
- 应用目录加载器
```Python
TEMPLATES = [{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'APP_DIRS': True,
}]
```
#### 前端渲染
1. 前端模板引擎:Handlebars / Mustache。
2. 前端MV\*框架。
- MVC - AngularJS
- MVVM(Model-View-ViewModel) - Vue.js
#### 其他视图
1. MIME(多用途Internet邮件扩展)类型 - 告知浏览器传输的数据类型。
| Content-Type | 说明 |
| ---------------- | ------------------------------------------------------------ |
| application/json | [JSON](https://zh.wikipedia.org/wiki/JSON)(JavaScript Object Notation) |
| application/pdf | [PDF](https://zh.wikipedia.org/wiki/PDF)(Portable Document Format) |
| audio/mpeg | [MP3](https://zh.wikipedia.org/wiki/MP3)或其他[MPEG](https://zh.wikipedia.org/wiki/MPEG)音频文件 |
| audio/vnd.wave | [WAV](https://zh.wikipedia.org/wiki/WAV)音频文件 |
| image/gif | [GIF](https://zh.wikipedia.org/wiki/GIF)图像文件 |
| image/jpeg | [JPEG](https://zh.wikipedia.org/wiki/JPEG)图像文件 |
| image/png | [PNG](https://zh.wikipedia.org/wiki/PNG)图像文件 |
| text/html | [HTML](https://zh.wikipedia.org/wiki/HTML)文件 |
| text/xml | [XML](https://zh.wikipedia.org/wiki/XML) |
| video/mp4 | [MP4](https://zh.wikipedia.org/wiki/MP4)视频文件 |
| video/quicktime | [QuickTime](https://zh.wikipedia.org/wiki/QuickTime)视频文件 |
2. 如何处置生成的内容(inline / attachment)。
```Python
>>> from urllib.parse import quote
>>>
>>> response['content-type'] = 'application/pdf'
>>> filename = quote('Python语言规范.pdf')
>>> filename
'Python%E8%AF%AD%E8%A8%80%E8%A7%84%E8%8C%83.pdf'
>>> response['content-disposition'] = f'attachment; filename="{filename}"'
```
> 提醒:URL以及请求和响应头中的中文都应该处理成[百分号编码](https://zh.wikipedia.org/zh-hans/%E7%99%BE%E5%88%86%E5%8F%B7%E7%BC%96%E7%A0%81)。
3. 生成CSV / Excel / PDF / 统计报表。
- 向浏览器传输二进制数据。
```Python
buffer = ByteIO()
resp = HttpResponse(content_type='...')
resp['Content-Disposition'] = 'attachment; filename="..."'
resp.write(buffer.getvalue())
```
```Python
def get_style(name, color=0, bold=False, italic=False):
style = xlwt.XFStyle()
font = xlwt.Font()
font.name = name
font.colour_index = color
font.bold = bold
font.italic = italic
style.font = font
return style
def export_emp_excel(request):
# 创建Excel工作簿(使用三方库xlwt)
workbook = xlwt.Workbook()
# 向工作簿中添加工作表
sheet = workbook.add_sheet('员工详细信息')
# 设置表头
titles = ['编号', '姓名', '主管', '职位', '工资', '部门名称']
for col, title in enumerate(titles):
sheet.write(0, col, title, get_style('HanziPenSC-W3', 2, True))
# 使用Django的ORM框架查询员工数据
emps = Emp.objects.all().select_related('dept').select_related('mgr')
cols = ['no', 'name', 'mgr', 'job', 'sal', 'dept']
# 通过嵌套的循环将员工表的数据写入Excel工作表的单元格
for row, emp in enumerate(emps):
for col, prop in enumerate(cols):
val = getattr(emp, prop, '')
if isinstance(val, (Dept, Emp)):
val = val.name
sheet.write(row + 1, col, val)
# 将Excel文件的二进制数据写入内存
buffer = BytesIO()
workbook.save(buffer)
# 通过HttpResponse对象向浏览器输出Excel文件
resp = HttpResponse(buffer.getvalue())
resp['content-type'] = 'application/msexcel'
# 如果文件名有中文需要处理成百分号编码
resp['content-disposition'] = 'attachment; filename="detail.xls"'
return resp
```
- 大文件的流式处理:`StreamingHttpResponse`。
```Python
def download_file(request):
file_stream = open('...', 'rb')
# 如果文件的二进制数据较大则最好用迭代器进行处理避免过多的占用服务器内存
file_iter = iter(lambda: file_stream.read(4096), b'')
resp = StreamingHttpResponse(file_iter)
# 中文文件名要处理成百分号编码
filename = quote('...', 'utf-8')
resp['Content-Type'] = '...'
resp['Content-Disposition'] = f'attachment; filename="{filename}"'
return resp
```
> 说明:如果需要生成PDF文件,可以需要安装`reportlab`。另外,使用StreamingHttpResponse只能减少内存的开销,但是如果下载一个大文件会导致一个请求长时间占用服务器资源,比较好的做法还是把报表提前生成好(可以考虑使用定时任务),放在静态资源服务器或者是云存储服务器上以访问静态资源的方式访问。
- [ECharts](http://echarts.baidu.com/)或[Chart.js](https://www.chartjs.org/)。
- 思路:后端只提供JSON格式的数据,前端JavaScript渲染生成图表。
```Python
def get_charts_data(request):
"""获取统计图表JSON数据"""
names = []
totals = []
# 通过connections获取指定数据库连接并创建游标对象
with connections['backend'].cursor() as cursor:
# 在使用ORM框架时可以使用对象管理器的aggregate()和annotate()方法实现分组和聚合函数查询
# 执行原生SQL查询(如果ORM框架不能满足业务或性能需求)
cursor.execute('select dname, total from vw_dept_emp')
for row in cursor.fetchall():
names.append(row[0])
totals.append(row[1])
return JsonResponse({'names': names, 'totals': totals})
```
```HTML
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>统计图表</title>
<style>
#main {
width: 600px;
height: 400px;
}
</style>
</head>
<body>
<div id="main"></div>
<script src="https://cdn.bootcss.com/echarts/4.2.0-rc.2/echarts.min.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script>
var myChart = echarts.init($('#main')[0]);
$.ajax({
'url': 'charts_data',
'type': 'get',
'data': {},
'dataType': 'json',
'success': function(json) {
var option = {
title: {
text: '员工分布统计图'
},
tooltip: {},
legend: {
data:['人数']
},
xAxis: {
data: json.names
},
yAxis: {},
series: [{
name: '人数',
type: 'bar',
data: json.totals
}]
};
myChart.setOption(option);
},
'error': function() {}
});
</script>
</body>
</html>
```
### 中间件
问题1:中间件背后的设计理念是什么?(分离横切关注功能/拦截过滤器模式)
问题2:中间件有哪些不同的实现方式?(参考下面的代码)
问题3:描述Django内置的中间件及其执行顺序。(推荐阅读:[Django官方文档 - 中间件 - 中间件顺序](https://docs.djangoproject.com/zh-hans/2.0/ref/middleware/#middleware-ordering))
#### 激活中间件
```Python
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'common.middlewares.block_sms_middleware',
]
```
#### 自定义中间件
```Python
def simple_middleware(get_response):
def middleware(request, *args, **kwargs):
response = get_response(request, *args, **kwargs)
return response
return middleware
```
```Python
class MyMiddleware(object):
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
return response
```
```Python
class MyMiddleware(object):
def __init__(self):
pass
def process_request(request):
pass
def process_view(request, view_func, view_args, view_kwargs):
pass
def process_template_response(request, response):
pass
def process_response(request, response):
pass
def process_exception(request, exception):
pass
```
#### 内置中间件
1. CommonMiddleware - 基础设置中间件
- DISALLOWED_USER_AGENTS - 不被允许的用户代理(浏览器)
- APPEND_SLASH - 是否追加`/`
- USE_ETAG - 浏览器缓存相关
2. SecurityMiddleware - 安全相关中间件
- SECURE_HSTS_SECONDS - 强制使用HTTPS的时间
- SECURE_HSTS_INCLUDE_SUBDOMAINS - HTTPS是否覆盖子域名
- SECURE_CONTENT_TYPE_NOSNIFF - 是否允许浏览器推断内容类型
- SECURE_BROWSER_XSS_FILTER - 是否启用跨站脚本攻击过滤器
- SECURE_SSL_REDIRECT - 是否重定向到HTTPS连接
- SECURE_REDIRECT_EXEMPT - 免除重定向到HTTPS
3. SessionMiddleware - 会话中间件
4. CsrfViewMiddleware - 防范跨站身份伪造中间件
5. XFrameOptionsMiddleware - 防范点击劫持攻击中间件
![](./res/click-jacking.png)
![](./res/builtin-middlewares.png)
### 表单
1. 用法:通常不要用来生成页面上的表单控件(耦合度太高不容易定制),主要用来验证数据。
2. Form的属性和方法:
- `is_valid()` / `is_multipart()`
- `errors` / `fields` / `is_bound` / `changed_data` / `cleaned_data`
- `add_error()` / `has_errors()` / `non_field_errors()`
- `clean()`
3. Form.errors的方法:
- `as_data()` / `as_json()` / `get_json_data()`
问题1:Django中的`Form`和`ModelForm`有什么作用?(通常不用来生成表单主要用来验证数据)
问题2:表单上传文件时应该注意哪些问题?(表单的设置、多文件上传、图片预览(FileReader)、Ajax上传文件、上传后的文件如何存储、调用云存储(如[阿里云OSS](https://www.aliyun.com/product/oss)、[七牛云](https://www.qiniu.com/)、[LeanCloud](https://leancloud.cn/storage/)等))
```HTML
<form action="" method="post" enctype="multipart/form-data">
<input type="file" name="..." multiple>
<input type="file" name="foo">
<input type="file" name="foo">
<input type="file" name="foo">
</form>
```
> 说明:上传图片文件的预览效果可以通过HTML5的FileReader来实现。
> 说明:使用云存储通常是比自己配置分布式文件系统这种方式更靠谱的做法,而且云存储通常成本并不太高,不仅如此大多数云存储还提供了如图片剪裁、生成水印、视频转码、CDN等服务。如果要自己做上传的视频文件转码,需要安装三方库ffmpeg,在程序中调用该三方库可以实现转码。
### Cookie和Session
问题1:使用Cookie能解决什么问题?(用户跟踪,解决HTTP协议无状态问题)
1. URL重写
```
http://www.abc.com/path/resource?foo=bar
```
2. 隐藏域(隐式表单域)- 埋点
```HTML
<form action="" method="post">
<input type="hidden" name="foo" value="bar">
</form>
```
3. Cookie - 浏览器中的临时文件(文本文件)- BASE64
问题2:Cookie和Session之间关系是什么?(Session的标识通过Cookie保存和传输)
#### Session的配置
1. Session对应的中间件:`django.contrib.sessions.middleware.SessionMiddleware`。
2. Session引擎。
- 基于数据库(默认方式)
```Python
INSTALLED_APPS = [
'django.contrib.sessions',
]
```
- 基于缓存(推荐使用)
```Python
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'session'
```
- 基于文件(基本不考虑)
- 基于Cookie(不靠谱)
```Python
SESSION_ENGINE = 'django.contrib.sessions.backends.signed_cookies'
```
3. Cookie相关的配置。
```Python
SESSION_COOKIE_NAME = 'djang_session_id'
SESSION_COOKIE_AGE = 1209600
# 如果设置为True,Cookie就是基于浏览器窗口的Cookie,不会持久化
SESSION_EXPIRE_AT_BROWSER_CLOSE = False
SESSION_SAVE_EVERY_REQUEST = False
SESSION_COOKIE_HTTPONLY = True
```
4. session的属性和方法。
- `session_key` / `session_data` / `expire_date`
- `__getitem__` / `__setitem__` / `__delitem__` / `__contains__`
- `set_expiry()` / `get_expiry_age()` / `get_expiry_date()` - 设置/获取会话超期时间
- `flush()` - 销毁会话
- `set_test_cookie()` / `test_cookie_worked()` / `delete_test_cookie()` - 测试浏览器是否支持Cookie(提示用户如果浏览器禁用Cookie可能会影响网站的使用)
5. session的序列化。
```Python
SESSION_SERIALIZER = 'django.contrib.sessions.serializers.JSONSerializer'
```
- JSONSerializer(1.6及以后默认)- 如果想将自定义的对象放到session中,会遇到“Object of type 'XXX' is not JSON serializable”的问题(如果配置使用Redis保存Session,django-redis使用了Pickle序列化,这个问题就不存在了)。
- PickleSerializer(1.6以前的默认)- 因为安全问题不推荐使用,但是只要不去反序列化用户构造的恶意的Payload其实也没有什么风险。关于这种方式的安全漏洞,可以参考《[Python Pickle的任意代码执行漏洞实践和Payload构造》](http://www.polaris-lab.com/index.php/archives/178/)一文或《软件架构-Python语言实现》上关于这个问题的讲解。
> 说明:如果使用了django_redis整合Redis作为session的存储引擎,那么由于django_redis又封装了一个PickleSerializer来提供序列化,所以不会存在上述的问题,且Redis中保存的value是pickle序列化之后的结果。
### 缓存
#### 配置缓存
```Python
CACHES = {
# 默认缓存
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
'redis://1.2.3.4:6379/0',
],
'KEY_PREFIX': 'fangtx',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 1000,
},
'PASSWORD': '1qaz2wsx',
}
},
# 页面缓存
'page': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
'redis://1.2.3.4:6379/1',
],
'KEY_PREFIX': 'fangtx:page',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 500,
},
'PASSWORD': '1qaz2wsx',
}
},
# 会话缓存
'session': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
'redis://1.2.3.4:6379/2',
],
'KEY_PREFIX': 'fangtx:session',
'TIMEOUT': 1209600,
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 2000,
},
'PASSWORD': '1qaz2wsx',
}
},
# 接口数据缓存
'api': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
'redis://1.2.3.4:6379/3',
],
'KEY_PREFIX': 'fangtx:api',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 500,
},
'PASSWORD': '1qaz2wsx',
}
},
}
```
> 说明:通过Redis底层提供的多个数据库来隔离缓存数据有助于缓存数据的管理。如果配置了Redis的主从复制(读写分离),LOCATION列表中可以配置多个Redis连接,第一个被视为master用来进行写操作,后面的被视为slave用来进行读操作。
#### 全站缓存
```Python
MIDDLEWARE_CLASSES = [
'django.middleware.cache.UpdateCacheMiddleware',
...
'django.middleware.common.CommonMiddleware',
...
'django.middleware.cache.FetchFromCacheMiddleware',
]
CACHE_MIDDLEWARE_ALIAS = 'default'
CACHE_MIDDLEWARE_SECONDS = 300
CACHE_MIDDLEWARE_KEY_PREFIX = 'djang:cache'
```
#### 视图层缓存
```Python
from django.views.decorators.cache import cache_page
from django.views.decorators.vary import vary_on_cookie
@cache_page(timeout=60 * 15, cache='page')
@vary_on_cookie
def my_view(request):
pass
```
```Python
from django.views.decorators.cache import cache_page
urlpatterns = [
url(r'^foo/([0-9]{1,2})/$', cache_page(60 * 15)(my_view)),
]
```
#### 其他内容
1. 模板片段缓存。
- `{% load cache %}`
- `{% cache %}` / `{% endcache %}`
2. 使用底层API访问缓存。
```Python
>>> from django.core.cache import cache
>>>
>>> cache.set('my_key', 'hello, world!', 30)
>>> cache.get('my_key')
>>> cache.clear()
```
```Python
>>> from django.core.cache import caches
>>> cache1 = caches['page']
>>> cache2 = caches['page']
>>> cache1 is cache2
True
>>> cache3 = caches['session']
>>> cache2 is cache3
False
```
```Python
>>> from django_redis import get_redis_connection
>>>
>>> redis_client = get_redis_connection()
>>> redis_client.hgetall()
```
### 日志
#### 日志级别
NOTSET < DEBUG < INFO < WARNING < ERROR < FATAL
#### 日志配置
```Python
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
# 配置日志格式化器
'formatters': {
'simple': {
'format': '%(asctime)s %(module)s.%(funcName)s: %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S',
},
'verbose': {
'format': '%(asctime)s %(levelname)s [%(process)d-%(threadName)s] '
'%(module)s.%(funcName)s line %(lineno)d: %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S',
}
},
# 配置日志过滤器
'filters': {
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
},
# 配置日志处理器
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'filters': ['require_debug_true'],
'formatter': 'simple',
},
'file1': {
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': 'access.log',
'when': 'W0',
'backupCount': 12,
'formatter': 'simple',
'level': 'INFO',
},
'file2': {
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': 'error.log',
'when': 'D',
'backupCount': 31,
'formatter': 'verbose',
'level': 'WARNING',
},
},
# 配置日志器
'loggers': {
'django': {
'handlers': ['console', 'file1', 'file2'],
'propagate': True,
'level': 'DEBUG',
},
}
}
```
[日志配置官方示例](https://docs.djangoproject.com/zh-hans/2.0/topics/logging/#s-examples)。
#### 日志分析
1. Linux相关命令:head、tail、grep、awk、uniq、sort
```Shell
tail -10000 access.log | awk '{print $1}' | uniq -c | sort -r
```
2. 实时日志文件分析:Python + 正则表达式 + Crontab
3. [《Python日志分析工具》](https://github.com/jkklee/web_log_analyse)。
4. [《集中式日志系统ELK》](https://www.ibm.com/developerworks/cn/opensource/os-cn-elk/index.html)。
- ElasticSearch:搜索引擎,实现全文检索。
- Logstash:负责从指定节点收集日志。
- Kibana:日志可视化工具。
5. 大数据日志处理:Flume+Kafka日志采集、Storm / Spark实时数据处理、Impala实时查询。
### RESTful
问题1:RESTful架构到底解决了什么问题?(URL具有自描述性、资源表述与视图的解耦和、互操作性利用构建微服务以及集成第三方系统、无状态性提高水平扩展能力)
问题2:项目在使用RESTful架构时有没有遇到一些问题或隐患?(对资源访问的限制、资源从属关系检查、避免泄露业务信息、防范可能的攻击)
> 补充:下面的几个和安全性相关的响应头在前面讲中间件的时候提到过的。
>
> - X-Frame-Options: DENY
> - X-Content-Type-Options: nosniff
> - X-XSS-Protection: 1; mode=block;
> - Strict­-Transport-­Security: max-age=31536000;
问题3:如何保护API中的敏感信息以及防范重放攻击?(摘要和令牌)
推荐阅读:[《如何有效防止API的重放攻击》](https://help.aliyun.com/knowledge_detail/50041.html)。
#### 使用djangorestframework
安装djangorestfrmework(为了描述方便,以下统一简称为drf)。
```Shell
pip install djangorestframework
```
配置drf。
```Python
INSTALLED_APPS = [
'rest_framework',
]
REST_FRAMEWORK = {
# 配置默认页面大小
'PAGE_SIZE': 10,
# 配置默认的分页类
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
# 配置异常处理器
# 'EXCEPTION_HANDLER': 'api.exceptions.exception_handler',
# 配置默认解析器
# 'DEFAULT_PARSER_CLASSES': (
# 'rest_framework.parsers.JSONParser',
# 'rest_framework.parsers.FormParser',
# 'rest_framework.parsers.MultiPartParser',
# ),
# 配置默认限流类
# 'DEFAULT_THROTTLE_CLASSES': (),
# 配置默认授权类
# 'DEFAULT_PERMISSION_CLASSES': (
# 'rest_framework.permissions.IsAuthenticated',
# ),
# 配置默认认证类
# 'DEFAULT_AUTHENTICATION_CLASSES': (
# 'rest_framework_jwt.authentication.JSONWebTokenAuthentication',
# ),
}
```
#### 编写序列化器
```Python
from rest_framework import serializers
from rest_framework.serializers import ModelSerializer
from common.models import District, HouseType, Estate, Agent
class DistrictSerializer(ModelSerializer):
class Meta:
model = District
fields = ('distid', 'name')
class HouseTypeSerializer(ModelSerializer):
class Meta:
model = HouseType
fields = '__all__'
class AgentSerializer(ModelSerializer):
class Meta:
model = Agent
fields = ('agentid', 'name', 'tel', 'servstar', 'certificated')
class EstateSerializer(ModelSerializer):
district = serializers.SerializerMethodField()
agents = serializers.SerializerMethodField()
@staticmethod
def get_agents(estate):
return AgentSerializer(estate.agents, many=True).data
@staticmethod
def get_district(estate):
return DistrictSerializer(estate.district).data
class Meta:
model = Estate
fields = '__all__'
```
#### 方法1:使用装饰器
```Python
@api_view(['GET'])
@cache_page(timeout=None, cache='api')
def provinces(request):
queryset = District.objects.filter(parent__isnull=True)
serializer = DistrictSerializer(queryset, many=True)
return Response(serializer.data)
@api_view(['GET'])
@cache_page(timeout=300, cache='api')
def cities(request, provid):
queryset = District.objects.filter(parent__distid=provid)
serializer = DistrictSerializer(queryset, many=True)
return Response(serializer.data)
```
```Python
urlpatterns = [
path('districts/', views.provinces, name='districts'),
path('districts/<int:provid>/', views.cities, name='cities'),
]
```
> 说明:上面使用了Django自带的视图装饰器(@cache_page)来实现对API接口返回数据的缓存。
#### 方法2:使用APIView及其子类
更好的复用代码,不要重“复发明轮子”。
```Python
class HouseTypeApiView(CacheResponseMixin, ListAPIView):
queryset = HouseType.objects.all()
serializer_class = HouseTypeSerializer
```
```Python
urlpatterns = [
path('housetypes/', views.HouseTypeApiView.as_view(), name='housetypes'),
]
```
> 说明:上面使用了drf_extensions提供的CacheResponseMixin混入类实现了对接口数据的缓存。如果重写了获取数据的方法,可以使用drf_extensions提供的@cache_response来实现对接口数据的缓存,也可以用自定义的函数来生成缓存中的key。当然还有一个选择就是通过Django提供的@method_decorator装饰器,将@cache_page装饰器处理为装饰方法的装饰器,这样也能提供使用缓存服务。
`drf-extensions`配置如下所示。
```Python
# 配置DRF扩展来支持缓存API接口调用结果
REST_FRAMEWORK_EXTENSIONS = {
'DEFAULT_CACHE_RESPONSE_TIMEOUT': 300,
'DEFAULT_USE_CACHE': 'default',
# 配置默认缓存单个对象的key函数
'DEFAULT_OBJECT_CACHE_KEY_FUNC': 'rest_framework_extensions.utils.default_object_cache_key_func',
# 配置默认缓存对象列表的key函数
'DEFAULT_LIST_CACHE_KEY_FUNC': 'rest_framework_extensions.utils.default_list_cache_key_func',
}
```
#### 方法3:使用ViewSet及其子类
```Python
class HouseTypeViewSet(CacheResponseMixin, viewsets.ModelViewSet):
queryset = HouseType.objects.all()
serializer_class = HouseTypeSerializer
pagination_class = None
```
```Python
router = DefaultRouter()
router.register('housetypes', views.HouseTypeViewSet)
urlpatterns += router.urls
```
djangorestframework提供了基于Bootstrap定制的页面来显示接口返回的JSON数据,当然也可以使用[POSTMAN](https://www.getpostman.com/)这样的工具对API接口进行测试。
#### 补充说明
在这里顺便提一下跟前端相关的几个问题。
问题1:如何让浏览器能够发起DELETE/PUT/PATCH?
```HTML
<form method="post">
<input type="hidden" name="_method" value="delete">
</form>
```
```Python
if request.method == 'POST' and '_method' in request.POST:
request.method = request.POST['_method'].upper()
```
```HTML
<script>
$.ajax({
'url': '/api/provinces',
'type': 'put',
'data': {},
'dataType': 'json',
'success': function(json) {
// Web = 标签(内容) + CSS(显示) + JS(行为)
// JavaScript = ES + BOM + DOM
// DOM操作实现页面的局部刷新
},
'error': function() {}
});
$.getJSON('/api/provinces', function(json) {
// DOM操作实现页面的局部刷新
});
</script>
```
问题2:如何解决多个JavaScript库之间某个定义(如$函数)冲突的问题?
```HTML
<script src="js/jquery.min.js"></script>
<script src="js/abc.min.js"></script>
<script>
// $已经被后加载的JavaScript库占用了
// 但是可以直接用绑定在window对象上的jQuery去代替$
jQuery(function() {
jQuery('#okBtn').on('click', function() {});
});
</script>
```
```HTML
<script src="js/abc.min.js"></script>
<script src="js/jquery.min.js"></script>
<script>
// 将$让出给其他的JavaScript库使用
jQuery.noConflict();
jQuery(function() {
jQuery('#okBtn').on('click', function() {});
});
</script>
```
问题3:jQuery对象与原生DOM对象之间如何转换?
```HTML
<button id="okBtn">点我</button>
<script src="js/jquery.min.js"></script>
<script>
var btn = document.getElementById('okBtn'); // 原生JavaScript对象(使用相对麻烦)
var $btn = $('#okBtn'); // jQuery对象(拥有更多的属性和方法而且没有浏览器兼容性问题)
$btn.on('click', function() {});
// $(btn)可以将原生JavaScript对象转成jQuery对象
// $btn.get(0)或$btn[0]可以获得原生的JavaScript对象
</script>
```
#### 过滤数据
如果需要过滤数据(对数据接口设置筛选条件、排序条件等),可以使用`django-filter`三方库来实现。
```Shell
pip install django-filter
```
```Python
INSTALLED_APPS = [
'django_filters',
]
REST_FRAMEWORK = {
'DEFAULT_FILTER_BACKENDS': (
'django_filters.rest_framework.DjangoFilterBackend',
'rest_framework.filters.OrderingFilter',
),
}
```
```Python
class EstateViewSet(CacheResponseMixin, ModelViewSet):
# 通过queryset指定如何获取数据(资源)
queryset = Estate.objects.all().select_related('district').prefetch_related('agents')
# 通过serializer_class指定如何序列化数据
serializer_class = EstateSerializer
# 通过filter_backends指定如何提供筛选(覆盖默认的设置)
# filter_backends = (DjangoFilterBackend, OrderingFilter)
# 指定根据哪些字段进行数据筛选
filter_fields = ('district', )
# 指定根据哪些字段对数据进行排序
ordering_fields = ('hot', )
```
#### 身份认证
查看drf中APIView类的代码可以看出,drf默认的认证方案是 `DEFAULT_AUTHENTICATION_CLASSES`,如果修改authentication_classes就可以自行定制身份认证的方案。
```Python
class APIView(View):
# The following policies may be set at either globally, or per-view.
renderer_classes = api_settings.DEFAULT_RENDERER_CLASSES
parser_classes = api_settings.DEFAULT_PARSER_CLASSES
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
content_negotiation_class = api_settings.DEFAULT_CONTENT_NEGOTIATION_CLASS
metadata_class = api_settings.DEFAULT_METADATA_CLASS
versioning_class = api_settings.DEFAULT_VERSIONING_CLASS
# 此处省略下面的代码
```
```Python
DEFAULTS = {
# Base API policies
'DEFAULT_RENDERER_CLASSES': (
'rest_framework.renderers.JSONRenderer',
'rest_framework.renderers.BrowsableAPIRenderer',
),
'DEFAULT_PARSER_CLASSES': (
'rest_framework.parsers.JSONParser',
'rest_framework.parsers.FormParser',
'rest_framework.parsers.MultiPartParser'
),
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
),
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.AllowAny',
),
'DEFAULT_THROTTLE_CLASSES': (),
'DEFAULT_CONTENT_NEGOTIATION_CLASS': 'rest_framework.negotiation.DefaultContentNegotiation',
'DEFAULT_METADATA_CLASS': 'rest_framework.metadata.SimpleMetadata',
'DEFAULT_VERSIONING_CLASS': None,
# 此处省略下面的代码
}
```
自定义认证类,继承`BaseAuthentication`并重写`authenticate(self, request)`方法,通过请求中的userid和token来确定用户身份。如果认证成功,该方法应返回一个二元组(用户和令牌的信息),否则产生异常。也可以重写 `authenticate_header(self, request)`方法来返回一个字符串,该字符串将用于`HTTP 401 Unauthorized`响应中的WWW-Authenticate响应头的值。如果未重写该方法,那么当未经身份验证的请求被拒绝访问时,身份验证方案将返回`HTTP 403 Forbidden`响应。
```Python
class MyAuthentication(BaseAuthentication):
"""自定义用户身份认证类"""
def authenticate(self, request):
try:
token = request.GET['token'] or request.POST['token']
user_token = UserToken.objects.filter(token=token).first()
if user_token:
return user_token.user, user_token
else:
raise AuthenticationFailed('请提供有效的用户身份标识')
except KeyError:
raise AuthenticationFailed('请提供有效的用户身份标识')
def authenticate_header(self, request):
pass
```
使用自定义的认证类。
```Python
class EstateViewSet(CacheResponseMixin, ModelViewSet):
# 通过queryset指定如何获取数据(资源)
queryset = Estate.objects.all().select_related('district').prefetch_related('agents')
# 通过serializer_class指定如何序列化数据
serializer_class = EstateSerializer
# 指定根据哪些字段进行数据筛选
filter_fields = ('district', 'name')
# 指定根据哪些字段对数据进行排序
ordering_fields = ('hot', )
# 指定用于进行用户身份验证的类
authentication_classes = (MyAuthentication, )
```
> 说明:也可以在Django配置文件中将自定义的认证类设置为默认认证方式。
#### 授予权限
权限检查总是在视图的最开始处运行,在任何其他代码被允许进行之前。最简单的权限是允许通过身份验证的用户访问,并拒绝未经身份验证的用户访问,这对应于dfr中的`IsAuthenticated`类,可以用它来取代默认的`AllowAny`类。权限策略可以在Django的drf配置中用`DEFAULT_PERMISSION_CLASSES`全局设置。
```Python
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
)
}
```
也可以在基于`APIView`类的视图上设置身份验证策略。
```Python
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
class ExampleView(APIView):
permission_classes = (IsAuthenticated, )
# 此处省略其他代码
```
或者在基于`@api_view`装饰器的视图函数上设置。
```Python
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
@api_view(['GET'])
@permission_classes((IsAuthenticated, ))
def example_view(request, format=None):
# 此处省略其他代码
```
自定义权限需要继承`BasePermission`并实现以下方法中的一个或两个,下面是BasePermission的代码。
```Python
@six.add_metaclass(BasePermissionMetaclass)
class BasePermission(object):
"""
A base class from which all permission classes should inherit.
"""
def has_permission(self, request, view):
"""
Return `True` if permission is granted, `False` otherwise.
"""
return True
def has_object_permission(self, request, view, obj):
"""
Return `True` if permission is granted, `False` otherwise.
"""
return True
```
如果请求被授予访问权限,则方法应该返回True,否则返False。下面的例子演示了阻止黑名单中的IP地址访问接口数据(这个在反爬虫的时候很有用哟)。
```Python
from rest_framework import permissions
class BlacklistPermission(permissions.BasePermission):
"""
Global permission check for blacklisted IPs.
"""
def has_permission(self, request, view):
ip_addr = request.META['REMOTE_ADDR']
blacklisted = Blacklist.objects.filter(ip_addr=ip_addr).exists()
return not blacklisted
```
如果要实现更为完整的权限验证,可以考虑RBAC或ACL。
1. RBAC - 基于角色的访问控制,如下图所示。
![](./res/rbac-basic.png)
![](./res/rbac-full.png)
2. ACL - 访问控制列表(每个用户绑定自己的访问白名单)。
#### 访问限流
可以修改dfr配置的`DEFAULT_THROTTLE_CLASSES` 和 `DEFAULT_THROTTLE_RATES`两个值来设置全局默认限流策略。例如:
```Python
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': (
'rest_framework.throttling.AnonRateThrottle',
'rest_framework.throttling.UserRateThrottle'
),
'DEFAULT_THROTTLE_RATES': {
'anon': '3/min',
'user': '10000/day'
}
}
```
`DEFAULT_THROTTLE_RATES`中使用的频率描述可能包括`second`、`minute`、`hour`或`day`。
如果要为接口单独设置限流,可以在每个视图或视图集上设置限流策略,如下所示:
```Python
from rest_framework.throttling import UserRateThrottle
from rest_framework.views import APIView
class ExampleView(APIView):
throttle_classes = (UserRateThrottle, )
# 此处省略下面的代码
```
```Python
@api_view(['GET'])
@throttle_classes([UserRateThrottle, ])
def example_view(request, format=None):
# 此处省略下面的代码
```
当然也可以通过继承`BaseThrottle`来自定义限流策略,通常需要重写`allow_request`和`wait`方法。
### 异步任务和计划任务
#### Celery的应用
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
推荐阅读:[《Celery官方文档中文版》](http://docs.jinkan.org/docs/celery/),上面有极为详细的配置和使用指南。
![](./res/celery_architecture.png)
Celery是一个本身不提供队列服务,官方推荐使用RabbitMQ或Redis来实现消息队列服务,前者是更好的选择,它对AMQP(高级消息队列协议)做出了非常好的实现。
1. 安装RabbitMQ。
```Shell
docker pull rabbitmq
docker run -d -p 5672:5672 --name myrabbit rabbitmq
docker container exec -it myrabbit /bin/bash
```
2. 创建用户、资源以及分配操作权限。
```Shell
rabbitmqctl add_user luohao 123456
rabbitmqctl set_user_tags luohao administrator
rabbitmqctl add_vhost vhost1
rabbitmqctl set_permissions -p vhost1 luohao ".*" ".*" ".*"
```
3. 创建Celery实例。
```Python
# 注册环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fangtx.settings')
# 创建Celery实例
app = celery.Celery(
'fangtx',
broker='amqp://luohao:123456@120.77.222.217:5672/vhost1'
)
# 从项目的配置文件读取Celery配置信息
app.config_from_object('django.conf:settings')
# 从指定的文件(例如celery_config.py)中读取Celery配置信息
# app.config_from_object('celery_config')
# 让Celery自动从参数指定的应用中发现异步任务/定时任务
app.autodiscover_tasks(['common', ])
# 让Celery自动从所有注册的应用中发现异步任务/定时任务
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
```
4. 启动Celery创建woker(消息的消费者)。
```Shell
celery -A <name> worker -l debug &
```
5. 执行异步任务。
```Python
@app.task
def send_email(from, to, cc, subject, content):
pass
send_email.delay('', [], [], '', '')
```
6. 创建定时任务。
```Python
# 配置定时任务(计划任务)
app.conf.update(
timezone=settings.TIME_ZONE,
enable_utc=True,
# 定时任务(计划任务)相当于是消息的生产者
# 如果只有生产者没有消费者那么消息就会在消息队列中积压
# 将来实际部署项目的时候生产者、消费者、消息队列可能都是不同节点
beat_schedule={
'task1': {
'task': 'common.tasks.show_msg',
'schedule': crontab(),
'args': ('刘强东,奶茶妹妹喊你回家喝奶啦', )
},
},
)
```
```Python
@app.task
def show_msg(content):
print(content)
```
7. 启动Celery创建执行定时任务的beat(消息的生产者)。
```Shell
celery -A <name> beat -l info
```
8. 检查消息队列状况。
```Shell
rabbitmqctl list_queues -p vhost1
```
9. 监控Celery - 可以通过flower来对Celery进行监控。
```Shell
pip install flower
celery flower --broker=amqp://luohao:123456@120.77.222.217:5672/vhost1
```
### 其他问题
问题1:如何解决JavaScript跨域获取数据的问题?(django-cors-headers)
```Python
INSTALLED_APPS = [
'corsheaders',
]
MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware',
]
# 配置跨域白名单
# CORS_ORIGIN_WHITELIST = ('www.abc.com', 'www.baidu.com')
# CORS_ORIGIN_REGEX_WHITELIST = ('...', )
# CORS_ALLOW_CREDENTIALS = True
# CORS_ALLOW_METHODS = ('GET', 'POST', 'PUT', 'DELETE')
```
问题2:网站图片(水印、剪裁)和视频(截图、水印、转码)是如何处理的?(云存储、FFmpeg)
问题3:网站如何架设(静态资源)文件系统?(FastDFS、云存储、CDN)
### 安全保护
问题1:什么是跨站脚本攻击,如何防范?(对提交的内容进行消毒)
问题2:什么是跨站身份伪造,如何防范?(使用随机令牌)
问题3:什么是SQL注射攻击,如何防范?(不拼接SQL语句,避免使用单引号)
问题4:什么是点击劫持攻击,如何防范?(不允许`<iframe>`加载非同源站点内容)
#### Django提供的安全措施
签名数据的API
```Python
>>> from django.core.signing import Signer
>>> signer = Signer()
>>> value = signer.sign('hello, world!')
>>> value
'hello, world!:BYMlgvWMTSPLxC-DqxByleiMVXU'
>>> signer.unsign(value)
'hello, world!'
>>>
>>> signer = Signer(salt='1qaz2wsx')
>>> signer.sign('hello, world!')
'hello, world!:9vEvG6EA05hjMDB5MtUr33nRA_M'
>>>
>>> from django.core.signing import TimestampSigner
>>> signer = TimestampSigner()
>>> value = signer.sign('hello, world!')
>>> value
'hello, world!:1fpmcQ:STwj464IFE6eUB-_-hyUVF3d2So'
>>> signer.unsign(value, max_age=5)
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/Users/Hao/Desktop/fang.com/venv/lib/python3.6/site-packages/django/core/signing.py", line 198, in unsign
'Signature age %s > %s seconds' % (age, max_age))
django.core.signing.SignatureExpired: Signature age 21.020604848861694 > 5 seconds
>>> signer.unsign(value, max_age=120)
'hello, world!'
```
CSRF令牌和小工具
```HTML
{% csrf_token %}
```
- @csrf_exempt:免除令牌
- @csrf_protect:提供令牌保护
- @require_csrf_token:提供令牌保护
- @ensure_csrf_cookie:强制视图发送带令牌的cookie
> 说明:可以在Chrome浏览器中安装EditThisCookie插件来方便的查看Cookie。
#### 用户敏感信息的保护
1. 哈希摘要(签名)
```Python
>>> import hashlib
>>>
>>> md5_hasher = hashlib.md5()
>>> md5_hasher.update('hello, world!'.encode())
>>> md5_hasher.hexdigest()
'3adbbad1791fbae3ec908894c4963870'
>>>
>>> sha1_hasher = hashlib.sha1()
>>> sha1_hasher.update('hello, world!'.encode())
>>> sha1_hasher.update('goodbye, world!'.encode())
>>> sha1_hasher.hexdigest()
'1f09d30c707d53f3d16c530dd73d70a6ce7596a9'
```
2. 加密和解密(对称加密和非对称加密)
```Shell
pip install rsa
```
```Python
>>> pub_key, pri_key = rsa.newkeys(1024)
>>> message = 'hello, world!'
>>> crypto = rsa.encrypt(message.encode(), pub_key)
>>> crypto
b'Ou{gH\xa9\xa8}O\xe3\x1d\x052|M\x9d9?\xdc\xd8\xecF\xd3v\x9b\xde\x8e\x12\xe6M\xebvx\x08\x08\x8b\xe8\x86~\xe4^)w\xf2\xef\x9e\x9fOg\x15Q\xb7\x7f\x1d\xcfV\xf1\r\xbe^+\x8a\xbf }\x10\x01\xa4U9b\x97\xf5\xe0\x90T\'\xd4(\x9b\x00\xa5\x92\x17\xad4\xb0\xb0"\xd4\x16\x94*s\xe1r\xb7L\xe2\x98\xb7\x7f\x03\xd9\xf2\t\xee*\xe6\x93\xe6\xe1o\xfd\x18\x83L\x0cfL\xff\xe4\xdd%\xf2\xc0/\xfb'
>>> origin = rsa.decrypt(crypto, pri_key).decode()
>>> origin
'hello, world!'
```
```Shell
pip install pycrypto
```
AES对称加密:
```Python
>>> from hashlib import md5
>>>
>>> from Crypto.Cipher import AES
>>> from Crypto import Random
>>>
>>> key = md5(b'mysecret').hexdigest()
>>> iv = Random.new().read(AES.block_size)
>>> str1 = '我爱你们!'
>>> str2 = AES.new(key, AES.MODE_CFB, iv).encrypt(str1)
b'p\x96o\x85\x0bq\xc4-Y\xc4\xbcp\n)&'
>>> str3 = AES.new(key, AES.MODE_CFB, iv).decrypt(str2).decode
'我爱你们!'
```
RSA非对称加密:
```Python
>>> from Crypto.PublicKey import RSA
>>> # 生成密钥对
>>> key_pair = RSA.generate(2048)
>>> # 导入公钥
>>> pub_key = RSA.importKey(key_pair.publickey().exportKey())
>>> # 导入私钥
>>> pri_key = RSA.importKey(key_pair.exportKey())
>>> # 明文
>>> message1 = 'hello, world!'.encode()
>>> # 加密数据
>>> message2 = pub_key.encrypt(message1, None)
(b'\x03\x86t\xa0\x00\xc4\xea\xd2\x80\xed\xa7YN7\x07\xff\x88\xaa\x1eW\x0cmH0\x06\xa7\'\xbc<w@q\x8b\xaf\xf7:g\x92{=\xe2E\xa5@\x1as2\xdd\xcb\x8e[\x98\x85\xdf,X\xecj.U\xd6\xa7W&u\'Uz"\x0f\x0e\\<\xa4\xfavC\x93\xa7\xbcO"\xb9a\x06]<.\xc1\r1}*\xdf\xccdqXML\x93\x1b\xe9\xda\xdf\xab|\xf8\x18\xe4\x99\xbb\x7f\x18}\xd9\x9a\x1e*J\\\xca\x1a\xd1\x85\xf7t\x81\xd95{\x19\xc9\x81\xb6^}\x9c5\xca\xfe\xcf\xc8\xd8M\x9a\x8c-\xf1t\xee\xf9\x12\x90\x01\xca\x92~\x00c5qg5g\x95&\x10\xb1\x0b\x1fo\x95\xf2\xbc\x8d\xf3f"@\xc5\x188\x0bX\x9cfo\xea\x97\x05@\xe5\xb2\xda\xb8\x97a\xa5w\xa8\x01\x9a\xa5N\xc4\x81\x8d\x0f<\x96iU\xd3\x95\xacJZs\xab_ #\xee\xf9\x0f\xf2\x12\xdb\xfc\xf8g\x18v\x02k+\xda\x16Si\xbf\xbb\xec\xf7w\x90\xde\xae\x97\t\xed{}5\xd0',)
>>> # 解密数据
>>> message3 = pri_key.decrypt(message2)
'hello, world!'
```
#### 安全相关建议
1. 虽然 Django 自带了稳固的安全保护措施,但是依然要采用正确的方式部署应用程序,利用 Web 服务器、操作系统和其他组件提供的安全保护措施。
2. 记得把 Python 代码放在 Web 服务器的文档根目录之外,避免代码意外泄露。
3. 谨慎处理用户上传的文件。
4. Django本身没有对请求次数加以限制(包括验证用户身份的请求),为了防止暴力攻击和破解,可以考虑使用具有一次消费性的验证码或对这类请求的次数进行限制。
5. 将缓存系统、数据库服务器以及重要的资源服务器都放在第二级防火墙之后(不要放在DMZ)。
### 测试相关
测试是发现和标记缺陷的过程。所谓的缺陷是指实际结果和期望结果之间的任何差别。有的地方,测试也被认为是执行以找出错误为目的的程序的过程。 测试是为了让产品达到以下目标:
1. 满足需求用户满意
2. 改善产品的市场占有率
3. 树立对产品的信任
4. 减少开发和维护的成本
#### 功能测试
如果一个软件单元的行为方式与它的开发规范完全一样,那么该软件单元就通过了它的功能测试。
- 白盒测试:开发人员自己实现,最基本的形式是单元测试,还有集成测试和系统测试。
- 黑盒测试:由开发团队之外的人执行,对测试代码没有可见性,将被测系统视为黑盒子。通常由测试人员或QA工程师来执行,Web应用可以通过Selenium这样的测试框架自动化实施。
#### 性能测试
软件在高工作负载下对其响应性和健壮性展开的测试。
- 负载测试:在特定负载下执行的测试。
- 压力测试:突发条件或极限条件下的性能测试。
#### 安全性测试
系统的敏感数据都是经过认证和授权之后才能访问。
#### 其他测试
易用性测试 / 安装测试 / 可访问性测试
#### 单元测试
测试函数和对象的方法(程序中最小最基本的单元)。通过对实际输出和预期输出的比对以及各种的断言条件来判定被测单元是否满足设计需求。
- 测试用例
- 测试固件 - 每次测试时都要使用的东西。
- 测试套件(测试集)- 组合了多个测试用例而构成的集合。
```Python
class UtilTest(TestCase):
def setUp(self):
self.pattern = re.compile(r'\d{6}')
def test_gen_mobile_code(self):
for _ in range(100):
self.assertIsNotNone(self.pattern.match(gen_mobile_code()))
def test_to_md5_hex(self):
md5_dict = {
'123456': 'e10adc3949ba59abbe56e057f20f883e',
'123123123': 'f5bb0c8de146c67b44babbf4e6584cc0',
'1qaz2wsx': '1c63129ae9db9c60c3e8aa94d3e00495',
}
for key, value in md5_dict.items():
self.assertEqual(value, to_md5_hex(key))
```
`TestCase`的断言方法:
- assertEqual / assertNotEqual
- assertTrue / assertFalse / assertIsNot
- assertRaise / assertRaiseRegexp
- assertAlmostEqual / assertNotAlmostEqual
- assertGreater / assertGreaterEqual / assertLess / assertLessEqual
- assertRegexpMatches / assertNotRegexpMatches
- assertListEqual / assertSetEqual / assertTupleEqual / assertDictEqual
可以使用nose2或pytest来辅助执行单元测试,同时通过cov-core或pytest-cov可以对测试覆度进行评估。覆盖率由百分比表示。比如测试代码执行过了程序的每一行,那么覆盖率就是100%。这种时候,几乎不会出现新程序上线后突然无法运行的尴尬情况。覆盖率不关心代码内容究竟是什么,覆盖率是用来检查“测试代码不足、测试存在疏漏”的一个指标,“测试内容是否妥当”并不归它管。
```Shell
pip install nose2 pytest cov-core pytest-cov
```
可以使用Selenium来实现Web应用的自动化测试,它还可以用于屏幕抓取与浏览器行为模拟,通过爬虫抓取页面上的动态数据也可以使用它。Selenium其实包括三个部分:
- Selenium IDE:嵌入到浏览器的插件,可以录制和回放脚本。
![](./res/selenium_ide.png)
- Selenium WebDriver:支持多种语言可以操控浏览器的API。
- Selenium Standalone Server:Selenium Grid、远程控制、分布式部署。
```Shell
pip install selenium
```
```Python
from selenium import webdriver
import pytest
import contextlib
@pytest.fixture(scope='session')
def chrome():
# 设置使用无头浏览器(不会打开浏览器窗口)
options = webdriver.ChromeOptions()
options.add_argument('--headless')
driver = webdriver.Chrome(options=options)
yield driver
driver.quit()
def test_baidu_index(chrome):
chrome.get('https://www.baidu.com')
assert chrome.title == '百度一下,你就知道'
```
除了Selenium之外,还有一个Web自动化测试工具名叫Robot Framework。
```Shell
nose2 -v -C
pytest --cov
```
```Shell
Ran 7 tests in 0.002s
OK
Name Stmts Miss Cover
----------------------------------------------
example01.py 15 0 100%
example02.py 49 49 0%
example03.py 22 22 0%
example04.py 61 61 0%
example05.py 29 29 0%
example06.py 39 39 0%
example07.py 19 19 0%
example08.py 27 27 0%
example09.py 18 18 0%
example10.py 19 19 0%
example11.py 22 22 0%
example12.py 28 28 0%
example13.py 28 28 0%
test_ddt_example.py 18 0 100%
test_pytest_example.py 11 6 45%
test_unittest_example.py 22 0 100%
----------------------------------------------
TOTAL 427 367 14%
```
在测试过程中需要孤立各种外部依赖(数据库、外部接口调用、时间依赖),具体又包括两个方面:
1. 数据源:数据本地化 / 置于内存中 / 测试之后回滚
2. 资源虚拟化:存根/桩(stub)、仿制/模拟(mock)、伪造(fake)
- stub:测试期间为提供响应的函数生成的替代品
- mock:代替实际对象(以及该对象的API)的对象
- fake:没有达到生产级别的轻量级对象
#### 集成测试
集成多个函数或方法的输入输出的测试,测试时需要将多个测试对象组合在一起。
- 测试组件互操作性 / 需求变更测试 / 外部依赖和API / 调试硬件问题 / 在代码路径中发现异常
#### 系统测试
对需求的测试,测试成品是否最终满足了所有需求,在客户验收项目时进行。
#### 数据驱动测试
使用外部数据源实现对输入值与期望值的参数化,避免在测试中使用硬编码的数据。
被测函数:
```Python
def add(x, y):
return x + y
```
data.csv文件:
```
3,1,2
0,1,-1
100,50,50
100,1,99
15,7,8
```
测试代码:
```Python
import csv
from unittest import TestCase
from ddt import ddt, data, unpack
@ddt
class TestAdd(TestCase):
def load_data_from_csv(filename):
data_items = []
with open(filename, 'r', newline='') as fs:
reader = csv.reader(fs)
for row in reader:
data_items.append(list(map(int, row)))
return data_items
@data(*load_data_from_csv('data.csv'))
@unpack
def test_add(self, result, param1, param2):
self.assertEqual(result, add(param1, param2))
```
#### Django中的测试
1. 测试Django视图 - Django中提供的`TestCase`扩展了`unittest`中的`TestCase`,绑定了一个名为`client`的属性,可以用来模拟浏览器发出的GET、POST、DELETE、PUT等请求。
```Python
class SomeViewTest(TestCase):
def test_example_view(self):
resp = self.client.get(reverse('index'))
self.assertEqual(200, resp.status_code)
self.assertEqual(5, resp.context['num'])
```
2. 运行测试 - 配置测试数据库。
```Python
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'HOST': 'localhost',
'PORT': 3306,
'NAME': 'DbName',
'USER': os.environ['DB_USER'],
'PASSWORD': os.environ['DB_PASS'],
'TEST': {
'NAME': 'DbName_for_testing',
'CHARSET': 'utf8',
},
}
}
```
```Shell
python manage.py test
python manage.py test common
python manage.py test common.tests.UtilsTest
python manage.py test common.tests.UtilsTest.test_to_md5_hex
```
3. 评估测试覆盖度
```Shell
pip install coverage
coverage run --source=<path1> --omit=<path2> manage.py test common
coverage report
Name Stmts Miss Cover
---------------------------------------------------
common/__init__.py 0 0 100%
common/admin.py 1 0 100%
common/apps.py 3 3 0%
common/forms.py 16 16 0%
common/helper.py 32 32 0%
common/middlewares.py 19 19 0%
common/migrations/__init__.py 0 0 100%
common/models.py 71 2 97%
common/serializers.py 14 14 0%
common/tests.py 14 8 43%
common/urls_api.py 3 3 0%
common/urls_user.py 3 3 0%
common/utils.py 22 7 68%
common/views.py 69 69 0%
---------------------------------------------------
TOTAL 267 176 34%
```
#### 性能测试
问题1:性能测试的指标有哪些?
1. ab( Apache Benchmark) / webbench / httpperf
```Shell
yum -y install httpd
ab -c 10 -n 1000 http://www.baidu.com/
...
Benchmarking www.baidu.com (be patient).....done
Server Software: BWS/1.1
Server Hostname: www.baidu.com
Server Port: 80
Document Path: /
Document Length: 118005 bytes
Concurrency Level: 10
Time taken for tests: 0.397 seconds
Complete requests: 100
Failed requests: 98
(Connect: 0, Receive: 0, Length: 98, Exceptions: 0)
Write errors: 0
Total transferred: 11918306 bytes
HTML transferred: 11823480 bytes
Requests per second: 252.05 [#/sec] (mean)
Time per request: 39.675 [ms] (mean)
Time per request: 3.967 [ms] (mean, across all concurrent requests)
Transfer rate: 29335.93 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 6 7 0.6 7 9
Processing: 20 27 22.7 24 250
Waiting: 8 11 21.7 9 226
Total: 26 34 22.8 32 258
Percentage of the requests served within a certain time (ms)
50% 32
66% 34
75% 34
80% 34
90% 36
95% 39
98% 51
99% 258
100% 258 (longest request)
```
2. mysqlslap
```Shell
mysqlslap -a -c 100 -h 1.2.3.4 -u root -p
mysqlslap -a -c 100 --number-of-queries=1000 --auto-generate-sql-load-type=read -h <负载均衡服务器IP地址> -u root -p
mysqlslap -a --concurrency=50,100 --number-of-queries=1000 --debug-info --auto-generate-sql-load-type=mixed -h 1.2.3.4 -u root -p
```
3. sysbench
```Shell
sysbench --test=threads --num-threads=64 --thread-yields=100 --thread-locks=2 run
sysbench --test=memory --num-threads=512 --memory-block-size=256M --memory-total-size=32G run
```
4. jmeter
请查看[《使用JMeter进行性能测试》](https://www.ibm.com/developerworks/cn/java/l-jmeter/index.html)。
5. LoadRunner / QTP
### 项目调试
可以使用django-debug-toolbar来辅助项目调试。
1. 安装
```Shell
pip install django-debug-toolbar
```
2. 配置 - 修改settings.py。
```Python
INSTALLED_APPS = [
'debug_toolbar',
]
MIDDLEWARE = [
'debug_toolbar.middleware.DebugToolbarMiddleware',
]
DEBUG_TOOLBAR_CONFIG = {
# 引入jQuery库
'JQUERY_URL': 'https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js',
# 工具栏是否折叠
'SHOW_COLLAPSED': True,
# 是否显示工具栏
'SHOW_TOOLBAR_CALLBACK': lambda x: True,
}
```
3. 配置 - 修改urls.py。
```Python
if settings.DEBUG:
import debug_toolbar
urlpatterns.insert(0, path('__debug__/', include(debug_toolbar.urls)))
```
4. 使用 - 在页面右侧可以看到一个调试工具栏,上面包括了执行时间、项目设置、请求头、SQL、静态资源、模板、缓存、信号等调试信息,查看起来非常的方便。
### 部署相关
请参考《Django项目上线指南》。
### 性能相关
#### 网站优化两大定律:
1. 尽可能的使用缓存 - 牺牲空间换取时间(普适策略)。
2. 能推迟的都推迟 - 使用消息队列将并行任务串行来缓解服务器压力。
- 服务器CPU利用率出现瞬时峰值 - 削峰(CPU利用率平缓的增长)
- 上下游节点解耦合(下订单和受理订单的系统通常是分离的)
#### Django框架
1. 配置缓存来缓解数据库的压力,并有合理的机制应对[缓存穿透和缓存雪崩](https://www.cnblogs.com/zhangweizhong/p/6258797.html)。
2. 开启[模板缓存](https://docs.djangoproject.com/en/2.0/ref/templates/api/#django.template.loaders.cached.Loader)来加速模板的渲染。
```Python
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'templates'), ],
# 'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
'loaders': [(
'django.template.loaders.cached.Loader', [
'django.template.loaders.filesystem.Loader',
'django.template.loaders.app_directories.Loader',
], ),
],
},
},
]
```
3. 用惰性求值、迭代器、`defer()`、`only()`等缓解内存压力。
4. 用`select_related()`和`prefetch_related()`执行预加载避免“1+N查询问题”。
#### 数据库
1. 用ID生成器代替自增主键(性能更好、适用于分布式环境)。
- 自定义ID生成器
- UUID
```Python
>>> my_uuid = uuid.uuid1()
>>> my_uuid
UUID('63f859d0-a03a-11e8-b0ad-60f81da8d840')
>>> my_uuid.hex
'63f859d0a03a11e8b0ad60f81da8d840'
```
2. 避免不必要的外键列上的约束(除非必须保证参照完整性),更不要使用触发器之类的机制。
3. 使用索引来优化查询性能(索引放在要用于查询的字段上)。InnoDB用的是BTREE索引,使用>、<、>=、<=、BETWEEN或者LIKE 'pattern'(pattern不以通配符开头)时都可以用到索引。因为建立索引需要额外的磁盘空间,而主键上是有默认的索引,所以主键要尽可能选择较短的数据类型来减少磁盘占用,提高索引的缓存效果。
```SQL
create index idx_goods_name on tb_goods (gname(10));
```
```SQL
-- 无法使用索引
select * from tb_goods where gname like '%iPhone%';
-- 可以使用索引
select * from tb_goods where gname like 'iPhone%';
```
```Python
# 无法使用索引
Goods.objects.filter(name_icontains='iPhone')
# 可以使用索引
Goods.objects.filter(name__istartswith='iPhone');
```
4. 使用存储过程(存储在服务器端编译过的一组SQL语句)。
```SQL
drop procedure if exists sp_avg_sal_by_dept;
create procedure sp_avg_sal_by_dept(deptno integer, out avg_sal float)
begin
select avg(sal) into avg_sal from TbEmp where dno=deptno;
end;
call sp_avg_sal_by_dept(10, @a);
select @a;
```
```Python
>>> from django.db import connection
>>> cursor = connection.cursor()
>>> cursor.callproc('sp_avg_sal_by_dept', (10, 0))
>>> cursor.execute('select @_sp_avg_sal_by_dept_1')
>>> cursor.fetchone()
(2675.0,)
```
5. 使用数据分区。通过分区可以存储更多的数据、优化查询更大的吞吐量、可以快速删除过期的数据。关于这个知识点可以看看MySQL的[官方文档](https://dev.mysql.com/doc/refman/5.7/en/partitioning-overview.html)。
- RANGE分区:基于连续区间范围,把数据分配到不同的分区。
- LIST分区:基于枚举值的范围,把数据分配到不同的分区。
- HASH分区 / KEY分区:基于分区个数,把数据分配到不同的分区。
```SQL
CREATE TABLE tb_emp (
eno INT NOT NULL,
ename VARCHAR(20) NOT NULL,
job VARCHAR(10) NOT NULL,
hiredate DATE NOT NULL,
dno INT NOT NULL
)
PARTITION BY HASH(dno)
PARTITIONS 4;
```
```SQL
CREATE TABLE tb_emp (
eno INT NOT NULL,
ename VARCHAR(20) NOT NULL,
job VARCHAR(10) NOT NULL,
hiredate DATE NOT NULL,
dno INT NOT NULL
)
PARTITION BY RANGE( YEAR(hiredate) ) (
PARTITION p0 VALUES LESS THAN (1960),
PARTITION p1 VALUES LESS THAN (1970),
PARTITION p2 VALUES LESS THAN (1980),
PARTITION p3 VALUES LESS THAN (1990),
PARTITION p4 VALUES LESS THAN MAXVALUE
);
```
6. 使用`explain`来分析查询性能 - 执行计划。
```SQL
explain select * from ...;
```
`explain`结果解析:
- select_type:表示select操作的类型,常见的值有SIMPLE(简单查询,没有使用子查询或者表连接查询)、PRIMARY(主查询,外层的查询)、UNION(并集操作中的第二个或者后面的查询)、SUBQUERY(子查询中的第一个SELECT)等。
- table:输出结果的表。
- type:MySQL在表中找到所需行的方式,也称为访问类型,常见的值有:
- ALL:全表扫描(遍历全表找到匹配的行)
- index:索引全扫描(遍历整个索引)
- range:索引范围扫描
- ref:非唯一索引扫描或唯一索引的前缀扫描
- eq_ref:唯一索引扫描
- const / system:表中最多有一行匹配
- NULL:不用访问表或者索引
- possible_keys:查询时可能用到的索引。
- key:实际使用的索引。
- key_len:使用到索引字段的长度。
- rows:扫描行的数量。
- Extra:额外的信息(执行情况的说明或描述)。
> 说明:关于MySQL更多的知识尤其是性能调优和运维方面的内容,推荐大家阅读网易出品的《深入浅出MySQL(第2版)》,网易出品必属精品。
7. 使用慢查询日志来发现性能低下的查询。
```SQL
mysql> show variables like 'slow_query%';
+---------------------------+----------------------------------+
| Variable_name | Value |
+---------------------------+----------------------------------+
| slow_query_log | OFF |
| slow_query_log_file | /mysql/data/localhost-slow.log |
+---------------------------+----------------------------------+
mysql> show variables like 'long_query_time';
+-----------------+-----------+
| Variable_name | Value |
+-----------------+-----------+
| long_query_time | 10.000000 |
+-----------------+-----------+
```
```SQL
mysql> set global slow_query_log='ON';
mysql> set global long_query_time=1;
```
```INI
[mysqld]
slow_query_log=ON
slow_query_log_file=/usr/local/mysql/data/slow.log
long_query_time=1
```
#### 其他
请参考《Python性能调优》。
\ No newline at end of file
## 开启团队项目
### 创建项目
我们的项目使用Git作为版本控制工具,首先可以在代码托管平台上创建一个新项目。这里我们使用了国内的[“码云”](https://gitee.com)来创建项目,并通过该平台实现版本控制和缺陷管理,当然如果愿意也可以使用[github](https://github.com/)或者国内的[CODING](https://coding.net/))来做同样的事情,当然也可以自己用诸如[Gitlab](https://gitlab.com)这样的工具来搭建自己的代码仓库。创建好项目之后,我们先为项目添加一个名为`.gitignore`文件,该文件用来忽略掉那些不需要纳入版本控制系统的文件,如果不知道怎么编写该文件,可以使用gitignore.io](https://www.gitignore.io/)网站提供的在线生成工具,如下所示。
![](./res/gitignore_io.png)
### 初始版本
接下来,我们将项目克隆到本地,并为项目创建真正意义上的初始版本。
```Shell
git clone https://gitee.com/jackfrued/fang.com.git
cd fang.com
python3 -m venv venv
source venv/bin/activate
pip install -U pip
pip install django django-celery django-redis djangorestframework pymysql redis pillow
pip freeze > requirements.txt
```
**提示**:在使用pip安装依赖项以后,可以通过`pip check`来检查依赖项是否兼容,确保万无一失。如果希望使用国内的PyPI源,可以按照如下所示的方式进行配置。在用户主目录下找到或创建名为`.pip`的文件夹,并在该文件夹中添加一个`pip.conf`文件,其内容如下所示。
```INI
[global]
trusted-host=mirrors.aliyun.com
index-url=http://mirrors.aliyun.com/pypi/simple/
# index-url=https://pypi.tuna.tsinghua.edu.cn/simple/
# index-url=https://mirrors.ustc.edu.cn/pypi/web/
# index-url=https://pypi.douban.com/simple
```
上面的配置文件中使用了阿里云的PyPI镜像,由于该镜像没有使用HTTPS,所以要先将其设置为受信任的站点,当然也可以直接使用下面提供的其他HTTPS镜像。使用Windows的用户可以在用户主目录下创建`pip`文件夹(注意前面有没有点)并添加该配置文件,文件名为`pip.ini`
接下来创建项目和应用,通常我们会对项目中的功能进行垂直拆分,因此需要创建多个应用来对应不同的功能模块。
```Shell
django-admin startproject fang .
python manage.py startapp common
python manage.py startapp forum
python manage.py startapp rent
```
对项目的公共配置文件进行必要修改后,就可以通过反向工程(根据关系型数据库中的表来生成模型从而减少不必要的重复劳动)完成对模型类的创建,反向工程生成的模型可能需要适当的修改和调整才能应用于项目,尤其是存在多对多关系的时候。
```Shell
python manage.py inspectdb > common/models.py
```
如果需要使用Django的admin项目或者第三方xadmin,还需要执行迁移操作来创建额外的数据库表。
```Shell
python manage.py migrate
```
至此,我们就可以将当前的工作纳入版本控制并同步到服务器。
```Shell
git add .
git commit -m '项目初始版本'
git push
```
### 日常开发
不管是使用“git-flow”还是“github-flow”,都不能够在master上面直接进行开发,对于后者要创建自己的分支并在上面进行开发。
```Shell
git checkout -b jackfrued
git add .
git commit -m '提交的原因'
git push origin jackfrued
git branch -d jackfrued
git checkout master
git pull
git checkout -b jackfrued
```
...@@ -306,7 +306,7 @@ ...@@ -306,7 +306,7 @@
### Day91~100 - [团队项目开发](./Day91-100) ### Day91~100 - [团队项目开发](./Day91-100)
#### 过程模型 #### 第91天:团队开发和项目选题
1. 软件过程模型 1. 软件过程模型
- 经典过程模型(瀑布模型) - 经典过程模型(瀑布模型)
...@@ -322,7 +322,18 @@ ...@@ -322,7 +322,18 @@
- 修复bug(问题描述、重现步骤、测试人员、被指派人)。 - 修复bug(问题描述、重现步骤、测试人员、被指派人)。
- 评审会议(Showcase)。 - 评审会议(Showcase)。
- 回顾会议(当前周期做得好和不好的地方)。 - 回顾会议(当前周期做得好和不好的地方)。
> 补充:敏捷软件开发宣言
>
> - **个体和互动** 高于 流程和工具
> - **工作的软件** 高于 详尽的文档
> - **客户合作** 高于 合同谈判
> - **响应变化** 高于 遵循计划
![](./res/the-daily-scrum-in-the-sprint-cycle.png)
2. 项目团队组建 2. 项目团队组建
- 团队的构成和角色 - 团队的构成和角色
![company_architecture](./res/company_architecture.png) ![company_architecture](./res/company_architecture.png)
...@@ -332,21 +343,29 @@ ...@@ -332,21 +343,29 @@
![](./res/pylint.png) ![](./res/pylint.png)
- Python中的一些“惯例”(请参考[《Python惯例-如何编写Pythonic的代码》](Python惯例-如何编写Pythonic的代码.md) - Python中的一些“惯例”(请参考[《Python惯例-如何编写Pythonic的代码》](Python惯例-如何编写Pythonic的代码.md)
- 影响代码可读性的原因:
- 代码注释太少或者没有注释
- 代码破坏了语言的最佳实践
- 反模式编程(意大利面代码、复制-黏贴编程、自负编程、……)
3. 团队开发工具介绍 3. 团队开发工具介绍
- 版本控制:Git、Mercury - 版本控制:Git、Mercury
- 缺陷管理:Github/Gitee、Redmine、禅道 - 缺陷管理:[Gitlab](https://about.gitlab.com/)[Redmine](http://www.redmine.org.cn/)
- 持续集成:Jenkins、Travis-CI - 敏捷闭环工具:[禅道](https://www.zentao.net/)[JIRA](https://www.atlassian.com/software/jira/features)
- 持续集成:[Jenkins](https://jenkins.io/)[Travis-CI](https://travis-ci.org/)
请参考[《团队项目开发》](团队项目开发.md) 请参考[《团队项目开发》](团队项目开发.md)
#### 项目选题和理解业务 ##### 项目选题和理解业务
1. 选题范围设定 1. 选题范围设定
- CMS(用户端):新闻聚合网站、问答/分享社区、影评/书评网站等。 - CMS(用户端):新闻聚合网站、问答/分享社区、影评/书评网站等。
- MIS(用户端+管理端):KMS、KPI考核系统、HRS、仓储管理系统等。 - MIS(用户端+管理端):KMS、KPI考核系统、HRS、CRM系统、供应链系统、仓储管理系统等。
- App后台(管理端+数据接口):二手交易类App、报刊杂志类App、健康健美类App、旅游类App、社交类App、阅读类App等。 - App后台(管理端+数据接口):二手交易类、报刊杂志类、小众电商类、新闻资讯类、旅游类、社交类、阅读类等。
- 其他类型:自身行业背景和工作经验、业务容易理解和把控。 - 其他类型:自身行业背景和工作经验、业务容易理解和把控。
2. 需求理解、模块划分和任务分配 2. 需求理解、模块划分和任务分配
...@@ -366,11 +385,13 @@ ...@@ -366,11 +385,13 @@
| | 查看评论 | 白元芳 | 正在进行 | 20% | 4 | 2018/8/7 | | 2018/8/7 | | 需要进行代码审查 | | | 查看评论 | 白元芳 | 正在进行 | 20% | 4 | 2018/8/7 | | 2018/8/7 | | 需要进行代码审查 |
| | 评论投票 | 白元芳 | 等待 | 0% | 4 | 2018/8/8 | | 2018/8/8 | | | | | 评论投票 | 白元芳 | 等待 | 0% | 4 | 2018/8/8 | | 2018/8/8 | | |
#### 概念模型和正向工程 #### 第92天:数据库设计和OOAD
##### 概念模型和正向工程
1. UML和E-R 1. UML(统一建模语言)的类
![uml](./res/uml-graph.png) ![uml](./res/uml-class-diagram.png)
2. 通过模型创建表(正向工程) 2. 通过模型创建表(正向工程)
...@@ -379,44 +400,53 @@ ...@@ -379,44 +400,53 @@
python manage.py migrate python manage.py migrate
``` ```
#### 物理模型和反向工程 ##### 物理模型和反向工程
1. PowerDesigner 1. PowerDesigner
![](./res/power-designer-pdm.png) ![](./res/power-designer-pdm.png)
2. 通过数据表创建模型 2. 通过数据表创建模型(反向工程)
```Shell ```Shell
python manage.py inspectdb > app/models.py python manage.py inspectdb > app/models.py
``` ```
#### 项目开发中的公共问题 #### 第93-98天:使用Django开发项目
##### 项目开发中的公共问题
1. 数据的配置 1. 数据库的配置(多库、主从、路由)
2. 缓存的配置 2. 缓存的配置(分区缓存、键设置、超时设置、主从复制、故障恢复)
3. 日志的配置 3. 日志的配置
4. Django的使用技巧 4. Django的使用技巧(Django-Debug-ToolBar)
5. 好用的Python模块(图像处理、数据加密、三方API) 5. 好用的Python模块(日期计算、图像处理、数据加密、三方API)
### REST API设计 ##### REST API设计
1. RESTful架构 1. RESTful架构
- [理解RESTful架构](http://www.ruanyifeng.com/blog/2011/09/restful.html)
- [RESTful API设计指南](http://www.ruanyifeng.com/blog/2014/05/restful_api.html)
- [RESTful API最佳实践](http://www.ruanyifeng.com/blog/2018/10/restful-api-best-practices.html)
2. API接口文档的撰写([《网络API接口设计》](网络API接口设计.md) 2. API接口文档的撰写([《网络API接口设计》](网络API接口设计.md)
3. Django-REST-Framework的应用 - [RAP2](http://rap2.taobao.org/)
- [Apizza](https://apizza.net/)
3. [django-REST-framework](https://www.django-rest-framework.org/)的应用(具体请参考[《Django知识点概述》](Django知识点概述.md)
#### 项目中的重点难点剖析 ##### 项目中的重点难点剖析
1. 使用缓存缓解数据库压力(Redis 1. 使用缓存缓解数据库压力 - Redis(具体请参考[《Django知识点概述》](Django知识点概述.md)
2. 使用消息队列缓解服务器压力(Celery + RabbitMQ 2. 使用消息队列缓解服务器压力 - Celery + RabbitMQ(具体请参考[《Django知识点概述》](Django知识点概述.md)
#### 单元测试 #### 第99-100天:测试和部署
##### 单元测试
1. 测试的种类 1. 测试的种类
2. 编写单元测试(unitest、TestCase 2. 编写单元测试(unittest、pytest、nose2、tox、ddt、……
3. 测试覆盖率(Coverage) 3. 测试覆盖率(coverage)
#### 项目部署 ##### 项目部署
1. 部署前的准备工作 1. 部署前的准备工作
- 关键设置(SECRET_KEY / DEBUG / ALLOWED_HOSTS / 缓存 / 数据库) - 关键设置(SECRET_KEY / DEBUG / ALLOWED_HOSTS / 缓存 / 数据库)
...@@ -424,25 +454,34 @@ ...@@ -424,25 +454,34 @@
- 日志相关配置 - 日志相关配置
2. Linux常用命令回顾 2. Linux常用命令回顾
3. Linux常用服务的安装和配置 3. Linux常用服务的安装和配置
4. uWSGI和Nginx的使用 4. uWSGI/Gunicorn和Nginx的使用
5. 虚拟化容器(Docker) - Gunicorn和uWSGI的比较
- 对于不需要大量定制化的简单应用程序,Gunicorn是一个不错的选择,uWSGI的学习曲线比Gunicorn要陡峭得多,Gunicorn的默认参数就已经能够适应大多数应用程序。
- uWSGI支持异构部署。
- 由于Nginx本身支持uWSGI,在线上一般都将Nginx和uWSGI捆绑在一起部署,而且uWSGI属于功能齐全且高度定制的WSGI中间件。
- 在性能上,Gunicorn和uWSGI其实表现相当。
5. 虚拟化技术(Docker)
#### 性能测试 ##### 性能测试
1. AB的使用 1. AB的使用
2. SQLslap的使用 2. SQLslap的使用
3. sysbench的使用 3. sysbench的使用
#### 自动化测试 ##### 自动化测试
1. 使用Shell和Python进行自动化测试 1. 使用Shell和Python进行自动化测试
2. 使用Selenium实现自动化测试 2. 使用Selenium实现自动化测试
- Selenium IDE
- Selenium WebDriver
- Selenium Remote Control
3. 测试工具Robot Framework介绍 3. 测试工具Robot Framework介绍
#### 项目性能调优 ##### 项目性能调优
1. 数据库性能调优 1. 数据库性能调优
- 软硬件优化
- SQL优化
- 架构优化
2. 代码性能调优 2. 代码性能调优
3. 静态文件服务器和CDN加速 3. 云存储和CDN加速
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment