上节的代码无法区分同一个对象的不同版本,为了记录对象版本以及其他一些元数据,本节中会加入一个新组件:元数据服务
元数据服务就是提供对元数据的存取功能的服务。元数据指的是对象的描述信息,比如对象的名字、版本、大小以及散列值.
新增加服务后的结构图如下
需要新安装一个ElasticSearch搜索引擎。安装过程可自行百度
ElasticSearch搜索引擎中,我们需要定义这样一个映射。
'{"mappings":{"objects":{"properties":{"name":{"type":"string","index":"not_analyzed"},"version":{"type":"integer"},"size":{"type":"integer"},"hash":{"type":"string"}}}}}'
ES的索引相当于数据库而类型相当于数据库的表,映射则相当于定义表结构。该索引只有一个类型就是objects,包括4个属性分别是name,version,size和hash
接口服务代码添加了versions访问处理函数
1 func main() {2 go heartbeat.ListenHeartbeat()3 http.HandleFunc("/objects/", objects.Handler)4 http.HandleFunc("/locate/", locate.Handler)5 http.HandleFunc("/versions/", versions.Handler)6 log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))7 }
version处理函数仅处理Get请求
调用ES的API进行version查询最后返回
1 func Handler(w http.ResponseWriter, r *http.Request) { 2 m := r.Method 3 if m != http.MethodGet { 4 w.WriteHeader(http.StatusMethodNotAllowed) 5 return 6 } 7 from := 0 8 size := 1000 9 name := strings.Split(r.URL.EscapedPath(), "/")[2]10 for {11 metas, e := es.SearchAllVersions(name, from, size)12 if e != nil {13 log.Println(e)14 w.WriteHeader(http.StatusInternalServerError)15 return16 }17 for i := range metas {18 b, _ := json.Marshal(metas[i])19 w.Write(b)20 w.Write([]byte("\n"))21 }22 if len(metas) != size {23 return24 }25 from += size26 }27 }
OBJECT处理函数中 添加了DELETE处理
在ES中添加新版本,数据为0 ,哈希为"",表示删除操作
OBJECT处理函数中 PUT操作没有较大改变 但是添加了在ES中记录元数据的代码
OBJECT处理函数中 GET操作会根据version查询ES 获取元数据 才开始获取对象数据
1 func get(w http.ResponseWriter, r *http.Request) { 2 name := strings.Split(r.URL.EscapedPath(), "/")[2] 3 versionId := r.URL.Query()["version"] 4 version := 0 5 var e error 6 if len(versionId) != 0 { 7 version, e = strconv.Atoi(versionId[0]) 8 if e != nil { 9 log.Println(e)10 w.WriteHeader(http.StatusBadRequest)11 return12 }13 }14 meta, e := es.GetMetadata(name, version)15 if e != nil {16 log.Println(e)17 w.WriteHeader(http.StatusInternalServerError)18 return19 }20 if meta.Hash == "" {21 w.WriteHeader(http.StatusNotFound)22 return23 }24 object := url.PathEscape(meta.Hash)25 stream, e := getStream(object)26 if e != nil {27 log.Println(e)28 w.WriteHeader(http.StatusNotFound)29 return30 }31 io.Copy(w, stream)32 }33 34 func put(w http.ResponseWriter, r *http.Request) {35 hash := utils.GetHashFromHeader(r.Header)36 if hash == "" {37 log.Println("missing object hash in digest header")38 w.WriteHeader(http.StatusBadRequest)39 return40 }41 42 c, e := storeObject(r.Body, url.PathEscape(hash))43 if e != nil {44 log.Println(e)45 w.WriteHeader(c)46 return47 }48 if c != http.StatusOK {49 w.WriteHeader(c)50 return51 }52 53 name := strings.Split(r.URL.EscapedPath(), "/")[2]54 size := utils.GetSizeFromHeader(r.Header)55 e = es.AddVersion(name, hash, size)56 if e != nil {57 log.Println(e)58 w.WriteHeader(http.StatusInternalServerError)59 }60 }
实操验证
开启 rabbitmq ES 设置ES映射
开启 apiserver dataserver
使用curl 上传一个test3对象 这时候会显示400 bad request。 因为没提供test3 对象的散列值
先计算对象的哈希值的 然后再添加哈希PUT一次
OK!!