以下是将数据迁移和路由逻辑转换为ThinkPHP6实现的完整方案,包含命令行工具、模型操作和自动化分表:


一、目录结构调整

  1. project/
  2. ├─ app/
  3. ├─ command/ # 新增命令行目录
  4. └─ MigrateData.php # 数据迁移命令
  5. ├─ controller/
  6. ├─ model/
  7. └─ DataModel.php # 数据模型
  8. ├─ service/
  9. └─ ShardingService.php # 分表服务
  10. └─ config/
  11. └─ database.php # 数据库配置

二、数据库配置(config/database.php)

  1. return [
  2. 'connections' => [
  3. 'mysql' => [
  4. 'type' => 'mysql',
  5. 'hostname' => '127.0.0.1',
  6. 'database' => 'your_db',
  7. 'username' => 'root',
  8. 'password' => 'password',
  9. 'hostport' => '3306',
  10. 'charset' => 'utf8mb4',
  11. 'table_prefix' => 'tp_', // 表前缀
  12. ]
  13. ]
  14. ];

三、数据模型(app/model/DataModel.php)

  1. <?php
  2. namespace app\model;
  3. use think\Model;
  4. class DataModel extends Model
  5. {
  6. protected $tablePrefix = 'tp_'; // 匹配数据库配置
  7. protected $autoWriteTimestamp = true;
  8. /**
  9. * 动态切换数据表
  10. * @param int $year
  11. * @return $this
  12. */
  13. public function switchTable(int $year): self
  14. {
  15. $tableName = $this->getTablePrefix() . 'data_' . $year;
  16. $this->setTable($tableName);
  17. return $this;
  18. }
  19. /**
  20. * 获取所有历史表名
  21. */
  22. public static function getHistoryTables(): array
  23. {
  24. $db = self::getDb();
  25. $prefix = self::getTablePrefix();
  26. $result = $db->query("SHOW TABLES LIKE '{$prefix}data_%'");
  27. return array_map(function($table) use ($prefix) {
  28. return substr($table, strlen($prefix));
  29. }, $result);
  30. }
  31. }

四、分表服务类(app/service/ShardingService.php)

  1. <?php
  2. namespace app\service;
  3. use app\model\DataModel;
  4. use think\facade\Log;
  5. class ShardingService
  6. {
  7. public function migrateData(int $year): bool
  8. {
  9. try {
  10. // 1. 创建目标表
  11. $this->createTableIfNotExist($year);
  12. // 2. 分块迁移数据
  13. $sourceModel = (new DataModel)->switchTable('current');
  14. $count = $sourceModel->whereYear('create_time', $year)->count();
  15. if ($count > 0) {
  16. $sourceModel->chunkById(5000, function ($rows) use ($year) {
  17. (new DataModel)->switchTable($year)->insertAll($rows->toArray());
  18. }, 'id');
  19. Log::info("Migrated {$count} records for {$year}");
  20. }
  21. // 3. 清理源数据(可选)
  22. // $sourceModel->whereYear('create_time', $year)->delete();
  23. return true;
  24. } catch (\Exception $e) {
  25. Log::error("Migration failed: " . $e->getMessage());
  26. return false;
  27. }
  28. }
  29. private function createTableIfNotExist(int $year): void
  30. {
  31. $tableName = 'data_' . $year;
  32. if (!DataModel::getHistoryTables()) {
  33. DataModel::createTable($tableName);
  34. }
  35. }
  36. }

五、命令行工具(app/command/MigrateData.php)

  1. <?php
  2. namespace app\command;
  3. use think\console\Command;
  4. use think\console\Input;
  5. use think\console\Output;
  6. use app\service\ShardingService;
  7. class MigrateData extends Command
  8. {
  9. protected function configure()
  10. {
  11. $this->setName('migration:data')->setDescription('Data migration by year');
  12. }
  13. protected function execute(Input $input, Output $output)
  14. {
  15. $service = new ShardingService();
  16. // 获取最近3年数据
  17. $years = range(date('Y')-3, date('Y'));
  18. foreach ($years as $year) {
  19. $output->writeln("Processing year: {$year}");
  20. $service->migrateData($year);
  21. }
  22. $output->success("Migration completed");
  23. }
  24. }

六、路由动态解析(app/controller/BaseController.php)

  1. <?php
  2. namespace app\controller;
  3. use app\model\DataModel;
  4. use think\Request;
  5. class BaseController
  6. {
  7. /**
  8. * 动态路由解析
  9. */
  10. protected function resolveTable(Request $request): string
  11. {
  12. $time = $request->param('create_time') ?: now();
  13. $year = $time instanceof \DateTime ? $time->format('Y') : date('Y');
  14. // 自动创建不存在的表
  15. if (!in_array($year, DataModel::getHistoryTables())) {
  16. \think\facade\Db::execute("CREATE TABLE `tp_data_{$year}` LIKE `tp_data_current`");
  17. }
  18. return "tp_data_{$year}";
  19. }
  20. /**
  21. * 范围查询处理
  22. */
  23. protected function buildRangeQuery($start, $end)
  24. {
  25. $years = range((int)date('Y', strtotime($start)), (int)date('Y', strtotime($end)));
  26. $where = [];
  27. foreach ($years as $year) {
  28. $table = "tp_data_{$year}";
  29. $conditions = [];
  30. if ($year == min($years) && $year == max($years)) {
  31. $conditions[] = "`create_time` BETWEEN '{$start}' AND '{$end}'";
  32. } elseif ($year == min($years)) {
  33. $conditions[] = "`create_time` >= '{$start}'";
  34. } elseif ($year == max($years)) {
  35. $conditions[] = "`create_time` <= '{$end}'";
  36. } else {
  37. $conditions[] = "`create_time` IS NOT NULL";
  38. }
  39. $where[] = "`{$table}`.id IS NOT NULL AND (" . implode(' OR ', $conditions) . ")";
  40. }
  41. return [
  42. 'tables' => implode(',', $years),
  43. 'where' => implode(' OR ', $where)
  44. ];
  45. }
  46. }

七、使用示例(控制器)

  1. <?php
  2. namespace app\controller;
  3. class DataController extends BaseController
  4. {
  5. public function query(Request $request)
  6. {
  7. // 动态路由解析
  8. $table = $this->resolveTable($request);
  9. // 构建范围查询
  10. [$tables, $where] = $this->buildRangeQuery(
  11. $request->param('start_time'),
  12. $request->param('end_time')
  13. );
  14. // 执行查询
  15. $result = \think\facade\Db::table($tables)
  16. ->whereRaw($where)
  17. ->select();
  18. return json($result);
  19. }
  20. }

八、定时任务配置

  1. 注册命令行指令:

    1. # 在 app/console.php 中添加
    2. return [
    3. \app\command\MigrationsData::class,
    4. ];
  2. 设置Cron任务(每天凌晨执行): ```bash

    编辑crontab

    crontab -e

添加以下内容

0 2 * /usr/bin/php /path/to/think migration:data >> /var/log/migration.log 2>&1

  1. ---
  2. ### 九、关键优化措施
  3. 1. **分块处理机制**:
  4. ```php
  5. // 在ShardingService中使用分块查询
  6. $sourceModel->chunkById(5000, function ($rows) use ($year) {
  7. // 批量插入
  8. }, 'id');
  1. 事务控制

    1. Db::startTrans();
    2. try {
    3. // 执行数据迁移...
    4. Db::commit();
    5. } catch (\Exception $e) {
    6. Db::rollback();
    7. }
  2. 连接池配置

    1. // config/database.php 增加连接池配置
    2. 'pool' => [
    3. 'min_connections' => 1,
    4. 'max_connections' => 10,
    5. 'connect_timeout' => 3.0,
    6. ]

十、生产环境部署检查清单

  1. 确认表结构同步:

    1. php think migrate:data --dry-run
  2. 监控指标: ```bash

    查看队列表状态

    SHOW PROCESSLIST;

监控表大小

SELECT TABLE_NAME AS Table, ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) AS Size (MB) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ‘your_db’;

  1. 3. 回滚方案:
  2. ```php
  3. // 数据回滚示例
  4. public function rollback(int $year)
  5. {
  6. $oldTable = "data_".($year-1);
  7. $newTable = "data_{$year}";
  8. \think\facade\Db::execute("RENAME TABLE {$oldTable} TO tmp, {$newTable} TO {$oldTable}, tmp TO {$newTable}");
  9. }

通过以上改造,完整实现了:

  1. 基于ThinkPHP6的命令行数据迁移工具
  2. 动态表路由解析
  3. 自动化分表管理
  4. 符合框架规范的模型操作
  5. 完善的错误处理和日志记录

建议通过以下命令测试:

  1. # 本地调试
  2. php think migration:data --year=2022
  3. # 查看执行日志
  4. tail -f runtime/log/migration.log