
使用 C 语言从头实现类 sqlite 数据库


View on GitHub (pull requests welcome)

06 - 数据库游标

这部分内容比上一部分内容要短小一些。我们将对其进行一些细小的重构以便下一节的 B-Tree 实现更为容易。



闲话少说,让我们来看看游标 Cursor 类型的定义:

+typedef struct {
+    Table    *table;
+    uint32_t  row_num;
+    bool      end_of_table; /* Indicates a position one past the last element */
+} Cursor;



最后,它有一个布尔值 end_of_table。这样我们就可以在表的末尾表示一个位置(也就是我们可能想要插入一行的地方)。

函数 table_start()table_end() 用于创建新的游标:

+Cursor *
+table_start(Table *table)
+    Cursor *cursor = (Cursor *) malloc(sizeof(Cursor));
+    cursor->table = table;
+    cursor->row_num = 0;
+    cursor->end_of_table = (table->num_rows == 0);
+    return cursor;
+Cursor *
+table_end(Table *table)
+    Cursor *cursor = (Cursor *) malloc(sizeof(Cursor));
+    cursor->table = table;
+    cursor->row_num = table->num_rows;
+    cursor->end_of_table = true;
+    return cursor;

我们的 row_slot() 函数将变成 cursor_value(),它返回一个有游标描述位置的指针:

void *
-row_slot(Table *table, uint32_t row_num)
+cursor_value(Cursor *cursor)
+    uint32_t   row_num = cursor->row_num;
     uint32_t   page_num = row_num / ROWS_PER_PAGE;
-    void      *page = get_page(table->pager, page_num);
+    void      *page = get_page(cursor->table->pager, page_num);
     uint32_t    row_offset = row_num % ROWS_PER_PAGE;
     uint32_t    byte_offset = row_offset * ROW_SIZE;
     return (char *) page + byte_offset;

在我们当前的表结构中推进光标就像递增行号一样简单。在 B-Tree 中,这将会更复杂一些。

+cursor_advance(Cursor *cursor)
+    cursor->row_num += 1;
+    if (cursor->row_num >= cursor->table->num_rows) {
+        cursor->end_of_table = true;
+    }


     Row *row_to_insert = &statement->row_to_insert;
-    serialize_row(row_to_insert, row_slot(table, table->num_rows));
+    Cursor *cursor = table_end(table);
+    serialize_row(row_to_insert, cursor_value(cursor));
     table->num_rows += 1;

+    free(cursor);
     return EXECUTE_SUCCESS;


 execute_select(Statement *statement, Table *table)
+    Cursor *cursor = table_start(table);
     Row row;
-    for (uint32_t i = 0; i < table->num_rows; i++) {
-        deserialize_row(row_slot(table, i), &row);
+    while (!(cursor->end_of_table)) {
+        deserialize_row(cursor_value(cursor), &row);
+        cursor_advance(cursor);

+    free(cursor);
     return EXECUTE_SUCCESS;

好了,就这样吧!我说过,这是一个较短的重构,应该能帮助我们重写。就像我说的,这是一个较短的重构,当我们把表的数据结构重写成 B-Tree 时,它应该能帮助我们。

函数 execute_select()execute_insert() 可以完全通过游标与表交互,而不需要假设表是如何存储的。

下面是完整的 diff 文件:

diff --git a/db.c b/db.c
index c13021c..14ab8d1 100644
--- a/db.c
+++ b/db.c
@@ -94,6 +94,12 @@ struct Table_t
 typedef struct Table_t Table;

+typedef struct {
+    Table    *table;
+    uint32_t  row_num;
+    bool      end_of_table; /* Indicates a position one past the last element */
+} Cursor;

 void print_row(Row *row);
 InputBuffer *new_input_buffer();
@@ -115,6 +121,11 @@ Pager *pager_open(const char *filename);
 void *get_page(Pager *pager, uint32_t page_num);
 void pager_flush(Pager* pager, uint32_t page_num, uint32_t size);

+Cursor *table_start(Table *table);
+Cursor *table_end(Table *table);
+void cursor_advance(Cursor *cursor);
+void *cursor_value(Cursor *cursor);
 print_row(Row *row)
@@ -190,21 +201,30 @@ execute_insert(Statement *statement, Table *table)

     Row *row_to_insert = &statement->row_to_insert;
-    serialize_row(row_to_insert, row_slot(table, table->num_rows));
+    Cursor *cursor = table_end(table);
+    serialize_row(row_to_insert, cursor_value(cursor));
     table->num_rows += 1;

+    free(cursor);
     return EXECUTE_SUCCESS;

 execute_select(Statement *statement, Table *table)
+    Cursor *cursor = table_start(table);
     Row row;
-    for (uint32_t i = 0; i < table->num_rows; i++) {
-        deserialize_row(row_slot(table, i), &row);
+    while (!(cursor->end_of_table)) {
+        deserialize_row(cursor_value(cursor), &row);
+        cursor_advance(cursor);

+    free(cursor);
     return EXECUTE_SUCCESS;

@@ -368,10 +388,11 @@ deserialize_row(void *source, Row *destination)

 void *
-row_slot(Table *table, uint32_t row_num)
+cursor_value(Cursor *cursor)
+    uint32_t   row_num = cursor->row_num;
     uint32_t   page_num = row_num / ROWS_PER_PAGE;
-    void      *page = get_page(table->pager, page_num);
+    void      *page = get_page(cursor->table->pager, page_num);
     uint32_t    row_offset = row_num % ROWS_PER_PAGE;
     uint32_t    byte_offset = row_offset * ROW_SIZE;
     return (char *) page + byte_offset;
@@ -501,3 +522,36 @@ pager_flush(Pager* pager, uint32_t page_num, uint32_t size)
+Cursor *
+table_start(Table *table)
+    Cursor *cursor = (Cursor *) malloc(sizeof(Cursor));
+    cursor->table = table;
+    cursor->row_num = 0;
+    cursor->end_of_table = (table->num_rows == 0);
+    return cursor;
+Cursor *
+table_end(Table *table)
+    Cursor *cursor = (Cursor *) malloc(sizeof(Cursor));
+    cursor->table = table;
+    cursor->row_num = table->num_rows;
+    cursor->end_of_table = true;
+    return cursor;
+cursor_advance(Cursor *cursor)
+    cursor->row_num += 1;
+    if (cursor->row_num >= cursor->table->num_rows) {
+        cursor->end_of_table = true;
+    }