python模拟hdfs分block写文本文件

hdfs在保存文件时,会将一个文件拆分成多个block,block的大小默认是128M。一个block既不会多出一个字节,也不会少一个字节,而是刚好128*1024
*1024个字节,这就引出了一个平常不会注意的细节,对于文本文件,数据是以行的形式存在的,如果一个block的大小固定为128M,那么hdfs是怎么做到的呢?

一定存在,而且几乎每一个block都存在这种情况,文件的最后一行并不完整,最后一行数据被切分成了两部分,前面的部分在当前的block里,后面的部分在下一个block里,只有这样,才能保证一个block的大小刚好是128M。

运行在hadoop上的spark程序在读取数据时,可以并行读取block的数据,当前block读取结束后,必须读取下一个block的第一行数据,这样才能读取当前block最后一行的完整数据。

处于兴趣,我打算写一个可以模型hdfs保存文件的程序,为了确保文件的大小是固定的,必须向文件里写入字节串,这样才能精确的计算写入数据的大小。当一个block写满时,则创建一个新的block文件继续写入,最后一行数据如果不能都写入一个block,则将其拆分到两个block中。

python 文件对象提供的write方法只接收字符串,但文件对象还有一个buffer对象,它可以写入字节串,使用字节串,才能精确地计算写入数据的大小,控制一个block的大小。

class SplitFile():
    def __init__(self, filename, block_szie):
        self.filename = filename                # 文件名称
        self.file_index = 0                     # 文件索引
        self.block_size = block_szie            # 文件块大小
        self.curr_file_handler = None           # 文件句柄
        self._set_new_handler()

    def _get_filename(self):
        file_name = f"{self.filename}.{self.file_index}"
        self.file_index += 1
        return file_name

    def _set_new_handler(self):
        if self.curr_file_handler is None:
            self.curr_file_handler =  BlockFile(self._get_filename(), self.block_size)
        else:
            if not self.curr_file_handler.is_writeable():
                self.curr_file_handler.close()
                self.curr_file_handler = BlockFile(self._get_filename(), self.block_size)

    def write(self, data):
        """
        写文件
        :param data:
        :return:
        """
        self._set_new_handler()
        if isinstance(data, str):
            if not data.endswith('\n'):
                data += '\n'
            data = data.encode(encoding='utf-8')
        elif isinstance(data, bytes):
            if not data.endswith(b'\n'):
                data += b'\n'
        else:
            raise

        surplus = self.curr_file_handler.write(data)
        if surplus:
            self.write(surplus)

    def close(self):
        self.curr_file_handler.close()


class BlockFile():
    def __init__(self, filename, block_size):
        self.filename = filename            # 块文件名称
        self.block_size = block_size        # 块大小
        self.writen_size = 0                # 已经写的字节数
        self._create_file()

    def is_writeable(self):     # 是否可写
        return self.block_size > self.writen_size

    def _create_file(self):
        self.file = open(self.filename, 'w', encoding='utf-8')  # 文本模式打开

    def write(self, data):
        """
        写文件
        :param data:
        :return:
        """
        data_length = len(data)
        writeable_length = self.block_size - self.writen_size
        surplus = b''
        if writeable_length >= data_length:
            self.file.buffer.write(data)
        else:
            surplus = data[writeable_length:]
            self.file.buffer.write(data[0:writeable_length])

        self.writen_size += data_length
        return surplus

    def close(self):
        self.file.close()


def test_write():
    sf = SplitFile('1.txt', 100)
    for i in range(30):
        sf.write('我是一行数据')

    sf.close()


def test_read():
    with open('1.txt.0', 'r', encoding='utf-8')as f:
        lines = f.buffer.readlines()

    if not lines[-1].endswith(b'\n'):
        with open('1.txt.1', 'r', encoding='utf-8')as f:
            cut_line = f.buffer.readline()
            lines[-1] += cut_line

    for line in lines:
        print(line.decode(encoding='utf-8'), end='')


if __name__ == '__main__':
    test_write()
    test_read()

1.txt.0文件内容

我是一行数据
我是一行数据
我是一行数据
我是一行数据
我是一行数据
我�

1.txt.1文件内容

�一行数据
我是一行数据
我是一行数据
我是一行数据
我是一行数据
我是一�

乱码是因为一个汉字由3个字节构成,拆开以后不能正常显示,test_read函数读取1.txt.1 的第一行补齐1.txt.0最后一行,实测可行。

扫描关注, 与我技术互动

QQ交流群: 211426309

加入知识星球, 每天收获更多精彩内容

分享日常研究的python技术和遇到的问题及解决方案