php 企业网站管理系统,团智慧团建登录入口,开发网站找什么公司,五种网站类型Parquet是Apache基金会支持的项目#xff0c;是面向列存储二进制文件格式。支持不同类型的压缩方式#xff0c;广泛用于数据科学和大数据环境#xff0c;如Hadoop生态。
本文主要介绍Go如何生成和处理parquet文件。
创建结构体
首先创建struct#xff0c;用于表示要处理…Parquet是Apache基金会支持的项目是面向列存储二进制文件格式。支持不同类型的压缩方式广泛用于数据科学和大数据环境如Hadoop生态。
本文主要介绍Go如何生成和处理parquet文件。
创建结构体
首先创建struct用于表示要处理的数据
type user struct {ID string parquet:nameid, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYFirstName string parquet:namefirstname, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYLastName string parquet:namelastname, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYEmail string parquet:nameemail, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYPhone string parquet:namephone, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYBlog string parquet:nameblog, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYUsername string parquet:nameusername, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYScore float64 parquet:namescore, typeDOUBLECreatedAt time.Time //wont be saved in the parquet file
}这里要提醒的是tag,用于说明struct中每个字段在生成parquet过程中如何被处理。
parquet-go包可以处理parquet数据更多的tag可以参考其官网。
生成parquet文件
下面现给出生成parquet文件的代码然后分别进行说明
package mainimport (fmtlogtimegithub.com/bxcodec/faker/v3github.com/xitongsys/parquet-go-source/localgithub.com/xitongsys/parquet-go/parquetgithub.com/xitongsys/parquet-go/readergithub.com/xitongsys/parquet-go/writer
)type user struct {ID string parquet:nameid, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYFirstName string parquet:namefirstname, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYLastName string parquet:namelastname, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYEmail string parquet:nameemail, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYPhone string parquet:namephone, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYBlog string parquet:nameblog, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYUsername string parquet:nameusername, typeBYTE_ARRAY, encodingPLAIN_DICTIONARYScore float64 parquet:namescore, typeDOUBLECreatedAt time.Time //wont be saved in the parquet file
}const recordNumber 10000func main() {var data []*user//create fake datafor i : 0; i recordNumber; i {u : user{ID: faker.UUIDDigit(),FirstName: faker.FirstName(),LastName: faker.LastName(),Email: faker.Email(),Phone: faker.Phonenumber(),Blog: faker.URL(),Username: faker.Username(),Score: float64(i),CreatedAt: time.Now(),}data append(data, u)}err : generateParquet(data)if err ! nil {log.Fatal(err)}}func generateParquet(data []*user) error {log.Println(generating parquet file)fw, err : local.NewLocalFileWriter(output.parquet)if err ! nil {return err}//parameters: writer, type of struct, sizepw, err : writer.NewParquetWriter(fw, new(user), int64(len(data)))if err ! nil {return err}//compression typepw.CompressionType parquet.CompressionCodec_GZIPdefer fw.Close()for _, d : range data {if err pw.Write(d); err ! nil {return err}}if err pw.WriteStop(); err ! nil {return err}return nil
}定义结构体上面已经说明但需要提醒的是类型与文档保持一致
Primitive TypeGo TypeBOOLEANboolINT32int32INT64int64INT96(deprecated)stringFLOATfloat32DOUBLEfloat64BYTE_ARRAYstringFIXED_LEN_BYTE_ARRAYstring
接着就是使用faker包生成模拟数据。然后调用err : generateParquet(data)方法。该方法大概逻辑为
首先准备输出文件然后基于本地输出文件构造pw,用于写parquet数据 fw, err : local.NewLocalFileWriter(output.parquet)if err ! nil {return err}//parameters: writer, type of struct, sizepw, err : writer.NewParquetWriter(fw, new(user), int64(len(data)))if err ! nil {return err}//compression typepw.CompressionType parquet.CompressionCodec_GZIPdefer fw.Close()然后设置压缩类型并通过defer操作确保关闭文件。下面开始写数据 for _, d : range data {if err pw.Write(d); err ! nil {return err}}if err pw.WriteStop(); err ! nil {return err}return nil循环写数据最后调用pw.WriteStop()停止写。 成功写文件后下面介绍如何读取parquet文件。
读取parquet文件
首先介绍如何一次性读取文件主要用于读取较小的文件
func readParquet() ([]*user, error) {fr, err : local.NewLocalFileReader(output.parquet)if err ! nil {return nil, err}pr, err : reader.NewParquetReader(fr, new(user), recordNumber)if err ! nil {return nil, err}u : make([]*user, recordNumber)if err pr.Read(u); err ! nil {return nil, err}pr.ReadStop()fr.Close()return u, nil
}大概流程如下首先定义本地文件然后构造pr用于读取parquet文件 fr, err : local.NewLocalFileReader(output.parquet)if err ! nil {return nil, err}pr, err : reader.NewParquetReader(fr, new(user), recordNumber)if err ! nil {return nil, err}然后定义目标内容容器u一次性读取数据 u : make([]*user, recordNumber)if err pr.Read(u); err ! nil {return nil, err}pr.ReadStop()fr.Close()但一次性大量记录加载至内存可能有问题。这是官方文档提示 If the parquet file is very big (even the size of parquet file is small, the uncompressed size may be very large), please don’t read all rows at one time, which may induce the OOM. You can read a small portion of the data at a time like a stream-oriented file. 大意是不要一次读取文件至内存可能造成OOM。实际应用中应该分页读取下面通过代码进行说明 func readPartialParquet(pageSize, page int) ([]*user, error) {fr, err : local.NewLocalFileReader(output.parquet)if err ! nil {return nil, err}defer func() {_ fr.Close()}()pr, err : reader.NewParquetReader(fr, new(user), int64(pageSize))if err ! nil {return nil, err}defer pr.ReadStop()//num : pr.GetNumRows()pr.SkipRows(int64(pageSize * page))u : make([]*user, pageSize)if err pr.Read(u); err ! nil {return nil, err}return u, nil
}与上面函数差异不大首先函数包括两个参数用于指定页大小和页数关键代码是跳过一定记录 pr.SkipRows(int64(pageSize * page))根据这个方法可以获得总行数pr.GetNumRows()然后结合页大小计算总页数最后循环可以实现分页查询。
计算列平均值
既然使用了Parquet列存储格式下面演示下如何计算Score列的平均值。
func calcScoreAVG() (float64, error) {fr, err : local.NewLocalFileReader(output.parquet)if err ! nil {return 0.0, err}pr, err : reader.NewParquetColumnReader(fr, recordNumber)if err ! nil {return 0.0, err}num : int(pr.GetNumRows())data, _, _, err : pr.ReadColumnByPath(parquet_go_root\u0001score, num)if err ! nil {return 0.0, err}var result float64for _, i : range data {result i.(float64)}return (result / float64(num)), nil
}首先打开文件然后调用pr.GetNumRows()方法获取总行数。然后基于路径指定列其中parquet_go_root为根路径因为前面使用字节数组这里分割符变为\u0001完整路径为parquet_go_root\u0001score。