以下是将数据迁移和路由逻辑转换为ThinkPHP6实现的完整方案,包含命令行工具、模型操作和自动化分表:
一、目录结构调整
project/├─ app/│ ├─ command/ # 新增命令行目录│ │ └─ MigrateData.php # 数据迁移命令│ ├─ controller/│ ├─ model/│ │ └─ DataModel.php # 数据模型│ ├─ service/│ │ └─ ShardingService.php # 分表服务│ └─ config/│ └─ database.php # 数据库配置
二、数据库配置(config/database.php)
return ['connections' => ['mysql' => ['type' => 'mysql','hostname' => '127.0.0.1','database' => 'your_db','username' => 'root','password' => 'password','hostport' => '3306','charset' => 'utf8mb4','table_prefix' => 'tp_', // 表前缀]]];
三、数据模型(app/model/DataModel.php)
<?phpnamespace app\model;use think\Model;class DataModel extends Model{protected $tablePrefix = 'tp_'; // 匹配数据库配置protected $autoWriteTimestamp = true;/*** 动态切换数据表* @param int $year* @return $this*/public function switchTable(int $year): self{$tableName = $this->getTablePrefix() . 'data_' . $year;$this->setTable($tableName);return $this;}/*** 获取所有历史表名*/public static function getHistoryTables(): array{$db = self::getDb();$prefix = self::getTablePrefix();$result = $db->query("SHOW TABLES LIKE '{$prefix}data_%'");return array_map(function($table) use ($prefix) {return substr($table, strlen($prefix));}, $result);}}
四、分表服务类(app/service/ShardingService.php)
<?phpnamespace app\service;use app\model\DataModel;use think\facade\Log;class ShardingService{public function migrateData(int $year): bool{try {// 1. 创建目标表$this->createTableIfNotExist($year);// 2. 分块迁移数据$sourceModel = (new DataModel)->switchTable('current');$count = $sourceModel->whereYear('create_time', $year)->count();if ($count > 0) {$sourceModel->chunkById(5000, function ($rows) use ($year) {(new DataModel)->switchTable($year)->insertAll($rows->toArray());}, 'id');Log::info("Migrated {$count} records for {$year}");}// 3. 清理源数据(可选)// $sourceModel->whereYear('create_time', $year)->delete();return true;} catch (\Exception $e) {Log::error("Migration failed: " . $e->getMessage());return false;}}private function createTableIfNotExist(int $year): void{$tableName = 'data_' . $year;if (!DataModel::getHistoryTables()) {DataModel::createTable($tableName);}}}
五、命令行工具(app/command/MigrateData.php)
<?phpnamespace app\command;use think\console\Command;use think\console\Input;use think\console\Output;use app\service\ShardingService;class MigrateData extends Command{protected function configure(){$this->setName('migration:data')->setDescription('Data migration by year');}protected function execute(Input $input, Output $output){$service = new ShardingService();// 获取最近3年数据$years = range(date('Y')-3, date('Y'));foreach ($years as $year) {$output->writeln("Processing year: {$year}");$service->migrateData($year);}$output->success("Migration completed");}}
六、路由动态解析(app/controller/BaseController.php)
<?phpnamespace app\controller;use app\model\DataModel;use think\Request;class BaseController{/*** 动态路由解析*/protected function resolveTable(Request $request): string{$time = $request->param('create_time') ?: now();$year = $time instanceof \DateTime ? $time->format('Y') : date('Y');// 自动创建不存在的表if (!in_array($year, DataModel::getHistoryTables())) {\think\facade\Db::execute("CREATE TABLE `tp_data_{$year}` LIKE `tp_data_current`");}return "tp_data_{$year}";}/*** 范围查询处理*/protected function buildRangeQuery($start, $end){$years = range((int)date('Y', strtotime($start)), (int)date('Y', strtotime($end)));$where = [];foreach ($years as $year) {$table = "tp_data_{$year}";$conditions = [];if ($year == min($years) && $year == max($years)) {$conditions[] = "`create_time` BETWEEN '{$start}' AND '{$end}'";} elseif ($year == min($years)) {$conditions[] = "`create_time` >= '{$start}'";} elseif ($year == max($years)) {$conditions[] = "`create_time` <= '{$end}'";} else {$conditions[] = "`create_time` IS NOT NULL";}$where[] = "`{$table}`.id IS NOT NULL AND (" . implode(' OR ', $conditions) . ")";}return ['tables' => implode(',', $years),'where' => implode(' OR ', $where)];}}
七、使用示例(控制器)
<?phpnamespace app\controller;class DataController extends BaseController{public function query(Request $request){// 动态路由解析$table = $this->resolveTable($request);// 构建范围查询[$tables, $where] = $this->buildRangeQuery($request->param('start_time'),$request->param('end_time'));// 执行查询$result = \think\facade\Db::table($tables)->whereRaw($where)->select();return json($result);}}
八、定时任务配置
注册命令行指令:
# 在 app/console.php 中添加return [\app\command\MigrationsData::class,];
设置Cron任务(每天凌晨执行): ```bash
编辑crontab
crontab -e
添加以下内容
0 2 * /usr/bin/php /path/to/think migration:data >> /var/log/migration.log 2>&1
---### 九、关键优化措施1. **分块处理机制**:```php// 在ShardingService中使用分块查询$sourceModel->chunkById(5000, function ($rows) use ($year) {// 批量插入}, 'id');
事务控制:
Db::startTrans();try {// 执行数据迁移...Db::commit();} catch (\Exception $e) {Db::rollback();}
连接池配置:
// config/database.php 增加连接池配置'pool' => ['min_connections' => 1,'max_connections' => 10,'connect_timeout' => 3.0,]
十、生产环境部署检查清单
监控表大小
SELECT
TABLE_NAME AS Table,
ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) AS Size (MB)
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ‘your_db’;
3. 回滚方案:```php// 数据回滚示例public function rollback(int $year){$oldTable = "data_".($year-1);$newTable = "data_{$year}";\think\facade\Db::execute("RENAME TABLE {$oldTable} TO tmp, {$newTable} TO {$oldTable}, tmp TO {$newTable}");}
通过以上改造,完整实现了:
- 基于ThinkPHP6的命令行数据迁移工具
- 动态表路由解析
- 自动化分表管理
- 符合框架规范的模型操作
- 完善的错误处理和日志记录
建议通过以下命令测试:
# 本地调试php think migration:data --year=2022# 查看执行日志tail -f runtime/log/migration.log
