鞍山网站制作的网站,搬家网站怎么做,新手做网站视频教程,企业策划案怎么写thinkphp 做分布式服务读写分离分库分表#xff08;分区#xff09; 引言 thinkphp* 大道至简一、分库分表分表php 分库分表hash算法0、分表的方法#xff08;thinkphp#xff09;1、ThinkPHP6 业务分表之一#xff1a;UID 发号器2、ThinkPHP6 业务分表之二#xff1a;用… thinkphp 做分布式服务读写分离分库分表分区 引言 thinkphp* 大道至简一、分库分表分表php 分库分表hash算法0、分表的方法thinkphp1、ThinkPHP6 业务分表之一UID 发号器2、ThinkPHP6 业务分表之二用户 其他杂项1亿条数据在PHP中实现Mysql数据库分表100张 引言 thinkphp* 大道至简
一、分库分表
分表
分表分库一般分垂直和水平垂直是指感觉业务来进行库的拆分比如专门的用户库或者订单库这样子但垂直还是无法解决单表数据量过大导致的性能会差的问题这里可能会和上面矛盾网上多数指的是 2000W 就会影响但好像没有多少人谈过他们的表结构情况。水平分指的是某一个表里面的数据量非常大我们按照一定的规则来进行一个拆分分流比如把用户的数据由一直存放在用户表变为可能这条数据是在用户1号表或者2号表这样子。规则可能是 范围range或者哈希hash也不知道我喜欢的取模算不算哈希。分了其实会带来一些问题的复杂性比如分库那如何确保多库事务性的一致性、分布式锁等等很多然后还有之前我们的 join 查询现在可能就无法使用呢。
php 分库分表hash算法
app/common.php 公共方法定义 $userid 也可以用下面的uid 发号器定义通过哈希来就算出表名称
//哈希分表function get_hash_table($table, $userid) {$str crc32($userid);if ($str 0) {$hash 0 . substr(abs($str), 0, 1);} else {$hash substr($str, 0, 2);}return $table . _ . $hash;}控制器调用计算表名
public function index() {echo $tableget_hash_table(message, 18991).br;echo $tableget_hash_table(message, 18993).br;echo $tableget_hash_table(message, 18994).br;
}0、分表的方法thinkphp
public function getPartitionTableName($dataarray()) {// 对数据表进行分区if(isset($data[$this-partition[field]])) {$field $data[$this-partition[field]];switch($this-partition[type]) {case id:// 按照id范围分表$step $this-partition[expr];$seq floor($field / $step)1;break;case year:// 按照年份分表if(!is_numeric($field)) {$field strtotime($field);}$seq date(Y,$field)-$this-partition[expr]1;break;case mod:// 按照id的模数分表$seq ($field % $this-partition[num])1;break;case md5:// 按照md5的序列分表$seq (ord(substr(md5($field),0,1)) % $this-partition[num])1;break;default :if(function_exists($this-partition[type])) {// 支持指定函数哈希$fun $this-partition[type];$seq (ord(substr($fun($field),0,1)) % $this-partition[num])1;}else{// 按照字段的首字母的值分表$seq (ord($field{0}) % $this-partition[num])1;}}return $this-getTableName()._.$seq;}else{// 当设置的分表字段不在查询条件或者数据中// 进行联合查询必须设定 partition[num]$tableName array();for($i0;$i$this-partition[num];$i)$tableName[] SELECT * FROM .$this-getTableName()._.($i1);$tableName ( .implode( UNION ,$tableName).) AS .$this-name;return $tableName;}
}
1、ThinkPHP6 业务分表之一UID 发号器
我们现在假设项目是新成立的暂时没有一个技术债。目前我们要先规划一个用户表打算划分 16 个表按照取模的方式来查询。最先可能我们要考虑如何定义 UID 的问题不过对我们 PHPer 来说不是什么难事毕竟我们基本都不用 UUID 来做 UID 的占用的空间会比较多不利于索引。
但是不用 UUID 选择了 INT 来做 UID那我们要如何确保它的连续性和唯一性呢业内常用的可能是雪花算法但我选择自己写一个简易的发号器。
发号器需要加东西然后取东西我们可以利用 Redis List 来很好的实现我们需要的先进先出功能。通过一个命令或者说脚本我们定时往列表中填上自增的 ID然后在注册流程中来取最前面的。 代码层面 app/common.php 公共方法定义获取redis 的值
if(!function_exists(get_redis)) {function get_redis() {return new \Predis\Client(tcp://IP:端口, [parameters [password 密码,],]);}
}定义一个发号器函数 app/command/GenerateUID.php
?php
declare (strict_types 1);namespace app\command;use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class GenerateUID extends Command
{protected $redis;/** 发号器列表 键** var string*/protected $cache_key generate:uid;/** 锁 键** var string*/protected $lock_key generate:uid_lock;/** 发号器列表最大容量默认为 500000** var int*/protected $max_capacity 5E5;public function __construct(){$this-redis get_redis();parent::__construct();}protected function configure(){// 指令配置$this-setName(generate:uid)-setDescription(生成 UID);}protected function execute(Input $input, Output $output){$current_capacity $this-get_list_length();// 计算发号器需要增加的数量$append_capacity $this-max_capacity - $current_capacity;$uid_max $this-get_uid_max();$output-writeln(当前最大UID . $uid_max);$output-writeln(当前剩余容量 . $current_capacity);$output-writeln(最大存储容量 . $this-max_capacity);$output-writeln(需要追加容量 . $append_capacity);// 如果不需要增加则结束if($append_capacity 0) {return;}$data [];for($i 1; $i $append_capacity; $i) {$data[] $uid_max $i;}// 把需要加的数据进行分块方便快速追加$data array_chunk($data, 1000);// 加锁$lock $this-redis-executeRaw([SET,$this-lock_key,1,EX,10 * 60,NX,]);if($lock ! OK) {$output-writeln(获取锁失败);return;}try {foreach ($data as $item) {$this-redis-rpush($this-cache_key, $item);}} catch (\Exception $e) {$output-error($e-getMessage());} finally {// 释放锁$this-redis-del($this-lock_key);}}/** 获取发号器列表的长度** return int*/protected function get_list_length() :int{return $this-redis-llen($this-cache_key);}/** 获取当前最大的 UID** return int*/protected function get_uid_max() :int{$value (int) $this-redis-lindex($this-cache_key, -1);if($value) {return $value;}return 0;}
}config/console.php 定义一个打印函数类
?php
return [// 指令定义commands [...generate:uid app\command\GenerateUID,],
];执行命令 $ php think generate:uid 当前最大UID0 当前剩余容量0 最大存储容量500000 需要追加容量500000 2、ThinkPHP6 业务分表之二用户
上一篇我们将了 UID 的发号器那这一篇我们将要去实现用户的注册入库和简单的查询。
首先我们要实现数据表的识别。获取表名称 app/common.php
if(!function_exists(table_name)) {/** 获取表名称** param string $name 表名* param int|null $uid UID* return string*/function table_name(string $name, ?int $uid null) {$support_table [users 16, // 表名 分表数];if(!isset($support_table[$name])) {return $name;}// 如果没有传递 UID那则需要调用其他方法来获取 UIDif(is_null($uid)) {$uid (int) 1;}if((int) $uid 0) {throw new \Exception(UID 值异常);}// 取 UID 和 分表数的模值转化为小写十六进制return sprintf(%s_%x, $name, $uid % $support_table[$name]);}
}执行写入操作 app/controller/Auth.php
?php
namespace app\controller;use app\BaseController;
use think\facade\Db;
use think\helper\Str;class Auth extends BaseController
{/** 注册接口*/public function register(){$redis get_redis();// 获取最左的值$uid $redis-lpop(generate:uid);// 为空说明列表已经没内容了if(is_null($uid)) {return 无法获取 UID;}// 获取表名$table_name table_name(users, $uid);// 插入数据Db::table($table_name)-insert([id $uid,nickname 随机生成 . Str::random(),]);return 注册成功;}
}
其他杂项
1亿条数据在PHP中实现Mysql数据库分表100张
当数据量猛增的时候大家都会选择库表散列等等方式去优化数据读写速度。笔者做了一个简单的尝试1亿条数据分100张表。具体实现过程如下 首先创建100张表
$i0;
while($i99){
echo $newNumber \r\n;
$sqlCREATE TABLE code_.$i. (full_code char(10) NOT NULL,create_time int(10) unsigned NOT NULL,PRIMARY KEY (full_code),
) ENGINEMyISAM DEFAULT CHARSETutf8;
mysql_query($sql);
$i;下面说一下我的分表规则full_code作为主键我们对full_code做hash 函数如下
$table_nameget_hash_table(code,$full_code);
function get_hash_table($table,$code,$s100){
$hash sprintf(%u, crc32($code));
echo $hash;
$hash1 intval(fmod($hash, $s));return $table._.$hash1;
}这样插入数据前通过get_hash_table获取数据存放的表名。 最后我们使用merge存储引擎来实现一张完整的code表 1 CREATE TABLE IF NOT EXISTS code ( 2 full_code char(10) NOT NULL, 3 create_time int(10) unsigned NOT NULL, 4 INDEX(full_code) 5 ) TYPEMERGE UNION(code_0,code_1,code_2…) INSERT_METHODLAST ; 这样我们通过select * from code就可以得到所有的full_code数据了。