api.go 6.8 KB


  1. package gorqlite
  2. /*
  3. this file has low level stuff:
  4. rqliteApiGet()
  5. rqliteApiPost()
  6. There is some code duplication between those and they should
  7. probably be combined into one function.
  8. nothing public here.
  9. */
  10. import "bytes"
  11. import "encoding/json"
  12. import "errors"
  13. import "fmt"
  14. import "io/ioutil"
  15. import "net/http"
  16. import "time"
  17. /* *****************************************************************
  18. method: rqliteApiGet() - for api_STATUS
  19. - lowest level interface - does not do any JSON unmarshaling
  20. - handles retries
  21. - handles timeouts
  22. * *****************************************************************/
  23. func (conn *Connection) rqliteApiGet(apiOp apiOperation) ([]byte, error) {
  24. var responseBody []byte
  25. trace("%s: rqliteApiGet() called", conn.ID)
  26. // only api_STATUS now - maybe someday BACKUP
  27. if apiOp != api_STATUS {
  28. return responseBody, errors.New("rqliteApiGet() called for invalid api operation")
  29. }
  30. // just to be safe, check this
  31. peersToTry := conn.cluster.makePeerList()
  32. if len(peersToTry) < 1 {
  33. return responseBody, errors.New("I don't have any cluster info")
  34. }
  35. trace("%s: I have a peer list %d peers long", conn.ID, len(peersToTry))
  36. // failure log is used so that if all peers fail, we can say something
  37. // about why each failed
  38. failureLog := make([]string, 0)
  39. PeerLoop:
  40. for peerNum, peerToTry := range peersToTry {
  41. trace("%s: attemping to contact peer %d", conn.ID, peerNum)
  42. // docs say default GET policy is up to 10 follows automatically
  43. url := conn.assembleURL(api_STATUS, peerToTry)
  44. req, err := http.NewRequest("GET", url, nil)
  45. if err != nil {
  46. trace("%s: got error '%s' doing http.NewRequest", conn.ID, err.Error())
  47. failureLog = append(failureLog, fmt.Sprintf("%s failed due to %s", url, err.Error()))
  48. continue PeerLoop
  49. }
  50. trace("%s: http.NewRequest() OK")
  51. req.Header.Set("Content-Type", "application/json")
  52. client := &http.Client{}
  53. client.Timeout = time.Duration(conn.timeout) * time.Second
  54. response, err := client.Do(req)
  55. if err != nil {
  56. trace("%s: got error '%s' doing client.Do", conn.ID, err.Error())
  57. failureLog = append(failureLog, fmt.Sprintf("%s failed due to %s", url, err.Error()))
  58. continue PeerLoop
  59. }
  60. defer response.Body.Close()
  61. trace("%s: client.Do() OK")
  62. responseBody, err := ioutil.ReadAll(response.Body)
  63. if err != nil {
  64. trace("%s: got error '%s' doing ioutil.ReadAll", conn.ID, err.Error())
  65. failureLog = append(failureLog, fmt.Sprintf("%s failed due to %s", url, err.Error()))
  66. continue PeerLoop
  67. }
  68. trace("%s: ioutil.ReadAll() OK")
  69. if response.Status != "200 OK" {
  70. trace("%s: got code %s", conn.ID, response.Status)
  71. failureLog = append(failureLog, fmt.Sprintf("%s failed, got: %s", url, response.Status))
  72. continue PeerLoop
  73. }
  74. // if we got here, we succeeded
  75. trace("%s: api call OK, returning", conn.ID)
  76. return responseBody, nil
  77. }
  78. // if we got here, all peers failed. Let's build a verbose error message
  79. var stringBuffer bytes.Buffer
  80. stringBuffer.WriteString("tried all peers unsuccessfully. here are the results:\n")
  81. for n, v := range failureLog {
  82. stringBuffer.WriteString(fmt.Sprintf(" peer #%d: %s\n", n, v))
  83. }
  84. return responseBody, errors.New(stringBuffer.String())
  85. }
  86. /* *****************************************************************
  87. method: rqliteApiPost() - for api_QUERY and api_WRITE
  88. - lowest level interface - does not do any JSON unmarshaling
  89. - handles 301s, etc.
  90. - handles retries
  91. - handles timeouts
  92. it is called with an apiOperation type because the URL it will use varies
  93. depending on the API operation type (api_QUERY vs. api_WRITE)
  94. * *****************************************************************/
  95. func (conn *Connection) rqliteApiPost(apiOp apiOperation, sqlStatements []string) ([]byte, error) {
  96. var responseBody []byte
  97. switch apiOp {
  98. case api_QUERY:
  99. trace("%s: rqliteApiGet() post called for a QUERY of %d statements", conn.ID, len(sqlStatements))
  100. case api_WRITE:
  101. trace("%s: rqliteApiGet() post called for a QUERY of %d statements", conn.ID, len(sqlStatements))
  102. default:
  103. return responseBody, errors.New("weird! called for an invalid apiOperation in rqliteApiPost()")
  104. }
  105. // jsonify the statements. not really needed in the
  106. // case of api_STATUS but doesn't hurt
  107. jStatements, err := json.Marshal(sqlStatements)
  108. if err != nil {
  109. return nil, err
  110. }
  111. // just to be safe, check this
  112. peersToTry := conn.cluster.makePeerList()
  113. if len(peersToTry) < 1 {
  114. return responseBody, errors.New("I don't have any cluster info")
  115. }
  116. // failure log is used so that if all peers fail, we can say something
  117. // about why each failed
  118. failureLog := make([]string, 0)
  119. PeerLoop:
  120. for peerNum, peer := range peersToTry {
  121. trace("%s: trying peer #%d", conn.ID, peerNum)
  122. // we're doing a post, and the RFCs say that if you get a 301, it's not
  123. // automatically followed, so we have to do that ourselves
  124. responseStatus := "Haven't Tried Yet"
  125. var url string
  126. for responseStatus == "Haven't Tried Yet" || responseStatus == "301 Moved Permanently" {
  127. url = conn.assembleURL(apiOp, peer)
  128. req, err := http.NewRequest("POST", url, bytes.NewBuffer(jStatements))
  129. if err != nil {
  130. trace("%s: got error '%s' doing http.NewRequest", conn.ID, err.Error())
  131. failureLog = append(failureLog, fmt.Sprintf("%s failed due to %s", url, err.Error()))
  132. continue PeerLoop
  133. }
  134. req.Header.Set("Content-Type", "application/json")
  135. client := &http.Client{}
  136. response, err := client.Do(req)
  137. if err != nil {
  138. trace("%s: got error '%s' doing client.Do", conn.ID, err.Error())
  139. failureLog = append(failureLog, fmt.Sprintf("%s failed due to %s", url, err.Error()))
  140. continue PeerLoop
  141. }
  142. defer response.Body.Close()
  143. responseBody, err = ioutil.ReadAll(response.Body)
  144. if err != nil {
  145. trace("%s: got error '%s' doing ioutil.ReadAll", conn.ID, err.Error())
  146. failureLog = append(failureLog, fmt.Sprintf("%s failed due to %s", url, err.Error()))
  147. continue PeerLoop
  148. }
  149. responseStatus = response.Status
  150. if responseStatus == "301 Moved Permanently" {
  151. v := response.Header["Location"]
  152. failureLog = append(failureLog, fmt.Sprintf("%s redirected me to %s", url, v[0]))
  153. url = v[0]
  154. continue PeerLoop
  155. } else if responseStatus == "200 OK" {
  156. trace("%s: api call OK, returning", conn.ID)
  157. return responseBody, nil
  158. } else {
  159. trace("%s: got error in responseStatus: %s", conn.ID, responseStatus)
  160. failureLog = append(failureLog, fmt.Sprintf("%s failed, got: %s", url, response.Status))
  161. continue PeerLoop
  162. }
  163. }
  164. }
  165. // if we got here, all peers failed. Let's build a verbose error message
  166. var stringBuffer bytes.Buffer
  167. stringBuffer.WriteString("tried all peers unsuccessfully. here are the results:\n")
  168. for n, v := range failureLog {
  169. stringBuffer.WriteString(fmt.Sprintf(" peer #%d: %s\n", n, v))
  170. }
  171. return responseBody, errors.New(stringBuffer.String())
  172. }