构建简单的数据库

使用 C 语言从头实现类 sqlite 数据库

概览

View on GitHub (pull requests welcome)

06 - 数据库游标



这部分内容比上一部分内容要短小一些。我们将对其进行一些细小的重构以便下一节的 B-Tree 实现更为容易。

我们将在这部分内容中引入游标对象,游标对象表示表中的位置。您可以通过游标做以下的事情:

这些行为都是我们现在需要实现的。后续我们还需要实现:

闲话少说,让我们来看看游标 Cursor 类型的定义:

+typedef struct {
+    Table    *table;
+    uint32_t  row_num;
+    bool      end_of_table; /* Indicates a position one past the last element */
+} Cursor;

对于我们现在的表数据结果而言,您所需要的仅仅是一个行编号就可以定义表中的数据。

一个游标也有一个对它所属的表的引用(所以我们的游标函数可以只把游标作为参数)。

最后,它有一个布尔值 end_of_table。这样我们就可以在表的末尾表示一个位置(也就是我们可能想要插入一行的地方)。

函数 table_start()table_end() 用于创建新的游标:

+Cursor *
+table_start(Table *table)
+{
+    Cursor *cursor = (Cursor *) malloc(sizeof(Cursor));
+
+    cursor->table = table;
+    cursor->row_num = 0;
+    cursor->end_of_table = (table->num_rows == 0);
+
+    return cursor;
+}
+
+Cursor *
+table_end(Table *table)
+{
+    Cursor *cursor = (Cursor *) malloc(sizeof(Cursor));
+
+    cursor->table = table;
+    cursor->row_num = table->num_rows;
+    cursor->end_of_table = true;
+
+    return cursor;
+}

我们的 row_slot() 函数将变成 cursor_value(),它返回一个有游标描述位置的指针:

void *
-row_slot(Table *table, uint32_t row_num)
+cursor_value(Cursor *cursor)
 {
+    uint32_t   row_num = cursor->row_num;
     uint32_t   page_num = row_num / ROWS_PER_PAGE;
-    void      *page = get_page(table->pager, page_num);
+    void      *page = get_page(cursor->table->pager, page_num);
     uint32_t    row_offset = row_num % ROWS_PER_PAGE;
     uint32_t    byte_offset = row_offset * ROW_SIZE;
     return (char *) page + byte_offset;

在我们当前的表结构中推进光标就像递增行号一样简单。在 B-Tree 中,这将会更复杂一些。

+void
+cursor_advance(Cursor *cursor)
+{
+    cursor->row_num += 1;
+    if (cursor->row_num >= cursor->table->num_rows) {
+        cursor->end_of_table = true;
+    }
+}

最后,我们使用抽象的游标来重写后端虚拟机的代码。当插入一行数据时,我们在表的末尾创建一个游标,写入数据并关闭游标。

     Row *row_to_insert = &statement->row_to_insert;
-    serialize_row(row_to_insert, row_slot(table, table->num_rows));
+    Cursor *cursor = table_end(table);
+
+    serialize_row(row_to_insert, cursor_value(cursor));
     table->num_rows += 1;

+    free(cursor);
+
     return EXECUTE_SUCCESS;
 }

当查询表中的所有数据时,我们在表的开始处创建游标,打印行记录并移动到下一行。重复这个过程直到到达表末尾。

 ExecuteResult
 execute_select(Statement *statement, Table *table)
 {
+    Cursor *cursor = table_start(table);
+
     Row row;
-    for (uint32_t i = 0; i < table->num_rows; i++) {
-        deserialize_row(row_slot(table, i), &row);
+    while (!(cursor->end_of_table)) {
+        deserialize_row(cursor_value(cursor), &row);
         print_row(&row);
+        cursor_advance(cursor);
 }

+    free(cursor);
+
     return EXECUTE_SUCCESS;
 }

好了,就这样吧!我说过,这是一个较短的重构,应该能帮助我们重写。就像我说的,这是一个较短的重构,当我们把表的数据结构重写成 B-Tree 时,它应该能帮助我们。

函数 execute_select()execute_insert() 可以完全通过游标与表交互,而不需要假设表是如何存储的。

下面是完整的 diff 文件:

diff --git a/db.c b/db.c
index c13021c..14ab8d1 100644
--- a/db.c
+++ b/db.c
@@ -94,6 +94,12 @@ struct Table_t
 };
 typedef struct Table_t Table;

+typedef struct {
+    Table    *table;
+    uint32_t  row_num;
+    bool      end_of_table; /* Indicates a position one past the last element */
+} Cursor;
+

 void print_row(Row *row);
 InputBuffer *new_input_buffer();
@@ -115,6 +121,11 @@ Pager *pager_open(const char *filename);
 void *get_page(Pager *pager, uint32_t page_num);
 void pager_flush(Pager* pager, uint32_t page_num, uint32_t size);

+Cursor *table_start(Table *table);
+Cursor *table_end(Table *table);
+void cursor_advance(Cursor *cursor);
+void *cursor_value(Cursor *cursor);
+
 void
 print_row(Row *row)
 {
@@ -190,21 +201,30 @@ execute_insert(Statement *statement, Table *table)
     }

     Row *row_to_insert = &statement->row_to_insert;
-    serialize_row(row_to_insert, row_slot(table, table->num_rows));
+    Cursor *cursor = table_end(table);
+
+    serialize_row(row_to_insert, cursor_value(cursor));
     table->num_rows += 1;

+    free(cursor);
+
     return EXECUTE_SUCCESS;
 }

 ExecuteResult
 execute_select(Statement *statement, Table *table)
 {
+    Cursor *cursor = table_start(table);
+
     Row row;
-    for (uint32_t i = 0; i < table->num_rows; i++) {
-        deserialize_row(row_slot(table, i), &row);
+    while (!(cursor->end_of_table)) {
+        deserialize_row(cursor_value(cursor), &row);
         print_row(&row);
+        cursor_advance(cursor);
     }

+    free(cursor);
+
     return EXECUTE_SUCCESS;
 }

@@ -368,10 +388,11 @@ deserialize_row(void *source, Row *destination)
 }

 void *
-row_slot(Table *table, uint32_t row_num)
+cursor_value(Cursor *cursor)
 {
+    uint32_t   row_num = cursor->row_num;
     uint32_t   page_num = row_num / ROWS_PER_PAGE;
-    void      *page = get_page(table->pager, page_num);
+    void      *page = get_page(cursor->table->pager, page_num);
     uint32_t    row_offset = row_num % ROWS_PER_PAGE;
     uint32_t    byte_offset = row_offset * ROW_SIZE;
     return (char *) page + byte_offset;
@@ -501,3 +522,36 @@ pager_flush(Pager* pager, uint32_t page_num, uint32_t size)
         exit(EXIT_FAILURE);
     }
 }
+
+Cursor *
+table_start(Table *table)
+{
+    Cursor *cursor = (Cursor *) malloc(sizeof(Cursor));
+
+    cursor->table = table;
+    cursor->row_num = 0;
+    cursor->end_of_table = (table->num_rows == 0);
+
+    return cursor;
+}
+
+Cursor *
+table_end(Table *table)
+{
+    Cursor *cursor = (Cursor *) malloc(sizeof(Cursor));
+
+    cursor->table = table;
+    cursor->row_num = table->num_rows;
+    cursor->end_of_table = true;
+
+    return cursor;
+}
+
+void
+cursor_advance(Cursor *cursor)
+{
+    cursor->row_num += 1;
+    if (cursor->row_num >= cursor->table->num_rows) {
+        cursor->end_of_table = true;
+    }
+}