1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include <inttypes.h>
29
30#include "client/PeerCache.h"
31
32namespace Hdfs {
33namespace Internal {
34
35LruMap<std::string, PeerCache::value_type> PeerCache::Map;
36
37PeerCache::PeerCache(const SessionConfig& conf)
38 : cacheSize(conf.getSocketCacheCapacity()),
39 expireTimeInterval(conf.getSocketCacheExpiry()) {
40 Map.setMaxSize(cacheSize);
41}
42
43std::string PeerCache::buildKey(const DatanodeInfo& datanode) {
44 std::stringstream ss;
45 ss.imbue(std::locale::classic());
46 ss << datanode.getIpAddr() << datanode.getXferPort()
47 << datanode.getDatanodeId();
48 return ss.str();
49}
50
51shared_ptr<Socket> PeerCache::getConnection(const DatanodeInfo& datanode) {
52 std::string key = buildKey(datanode);
53 value_type value;
54 int64_t elipsed;
55
56 if (!Map.findAndErase(key, &value)) {
57 LOG(DEBUG1, "PeerCache miss for datanode %s uuid(%s).",
58 datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str());
59 return shared_ptr<Socket>();
60 } else if ((elipsed = ToMilliSeconds(value.second, steady_clock::now())) >
61 expireTimeInterval) {
62 LOG(DEBUG1, "PeerCache expire for datanode %s uuid(%s).",
63 datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str());
64 return shared_ptr<Socket>();
65 }
66
67 LOG(DEBUG1, "PeerCache hit for datanode %s uuid(%s), elipsed %" PRId64,
68 datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str(),
69 elipsed);
70 return value.first;
71}
72
73void PeerCache::addConnection(shared_ptr<Socket> peer,
74 const DatanodeInfo& datanode) {
75 std::string key = buildKey(datanode);
76 value_type value(peer, steady_clock::now());
77 Map.insert(key, value);
78 LOG(DEBUG1, "PeerCache add for datanode %s uuid(%s).",
79 datanode.formatAddress().c_str(), datanode.getDatanodeId().c_str());
80}
81}
82}
83